-
Notifications
You must be signed in to change notification settings - Fork 0
/
gff2coverage.py
72 lines (57 loc) · 2.69 KB
/
gff2coverage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import pandas as pd
def gff2coverage(annotations, reference):
df_ann = pd.read_csv(annotations, index_col=False, sep='\t', header=None, comment='#')
df_ann.columns = ['seqname', 'source', 'feature', 'start', 'end', 'score', 'strand', 'frame', 'attribute']
df_ref = pd.read_csv(reference, index_col=False, sep='\t', header=None, comment='#')
df_ref.columns = ['seqname', 'source', 'feature', 'start', 'end', 'score', 'strand', 'frame', 'attribute']
total = {}
for k, v in df_ref.iterrows():
total[v.seqname] = v.end
overlapped = {}
for k, v in df_ann.iterrows():
overlapped.setdefault(v.seqname, []).append((v.start, v.end))
sequences = df_ref.seqname.unique().tolist()
for seqname in sequences:
if not seqname in overlapped:
continue
overlapped[seqname] = merge_overlap(overlapped[seqname])
gran_total_length = 0
gran_ref_length = 0
for seqname in sequences:
current_ref = df_ref[(df_ref.seqname == seqname)].iloc[0]
ref_len = abs(current_ref.end - current_ref.start)
gran_ref_length += ref_len
if not seqname in overlapped:
continue
current = overlapped[seqname]
total_length = 0
for (start, end) in current:
total_length += abs(end - start)
perc = total_length * 100 / ref_len
gran_total_length += total_length
print('seqname %s len: %s covered: %s percentage: %f ' % (seqname, ref_len, total_length, perc,))
gran_perc = gran_total_length * 100 / gran_ref_length
print('Total len: %s covered: %s percentage: %f ' % (gran_ref_length, gran_total_length, gran_perc,))
def merge_overlap(intervals):
sorted_by_lower_bound = sorted(intervals, key=lambda tup: tup[0])
merged = []
for higher in sorted_by_lower_bound:
if not merged:
merged.append(higher)
else:
lower = merged[-1]
# test for intersection between lower and higher:
# we know via sorting that lower[0] <= higher[0]
if higher[0] <= lower[1]:
upper_bound = max(lower[1], higher[1])
merged[-1] = (lower[0], upper_bound) # replace by merged interval
else:
merged.append(higher)
return merged
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser() # pylint: disable=invalid-name
parser.add_argument("-a", "--annotation", help="Annotation file (.gff3 format)", required=True)
parser.add_argument("-r", "--reference", help="Reference genome file (.gff3 format)", required=True)
args = parser.parse_args() # pylint: disable=invalid-name
gff2coverage(args.annotation, args.reference)