-
Notifications
You must be signed in to change notification settings - Fork 1
/
match_multi_cb.py
114 lines (91 loc) · 3.86 KB
/
match_multi_cb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import sys
import argparse
import editdistance as ed
import pandas as pd
import tqdm
def get_options():
parser = argparse.ArgumentParser(prog='mmc.py')
parser.add_argument('-w', '--whitelist', help='Whitelist file from UMI_tools', required=True)
parser.add_argument('-b', '--barcodes', help='Precompiled barcode list', required=True)
parser.add_argument('-o', '--output', help='Output whitelist file')
parser.add_argument('-r', '--reverse_complement', help='Whitelist is in reversec complement', action='store_true')
parser.add_argument('-d', '--distance', help='Max edit distance allowed', default=1)
parser.add_argument('-E', '--exhaustive_search', help='Search for matches that are not perfect', action='store_true')
options = parser.parse_args()
return options
def reverse_complement(s):
ab = {'A':'T', 'C':'G', 'G':'C', 'T':'A', 'N':'N', ',':','}
return ''.join([ab[x] for x in s[::-1]])
def rc_df(wl_df):
# reverse complement all entries
rc_df = pd.DataFrame(wl_df.values, index=[reverse_complement(x) for x in wl_df.index], columns=wl_df.columns)
# reverse complement everything, doesn't matter the order
rc_df[1] = [reverse_complement(x) for x in wl_df[1]]
# just reverse the counts
rc_df[3] = [','.join(x.split(',')[::-1]) for x in wl_df[3]]
return rc_df
def reshuffle(new_bc, old_bc, synonyms, t_counts, syn_counts):
count_list = syn_counts.split(',')
syn_sum = sum([int(x) for x in count_list])
bc_count = t_counts - syn_sum
syn_l = synonyms.split(',')
syn_idx = syn_l.index(new_bc)
syn_l[syn_idx] = old_bc
count_list[syn_idx] = str(bc_count)
synonyms = ','.join(syn_l)
syn_counts = ','.join(count_list)
return [new_bc, synonyms, t_counts, syn_counts]
def reassign(new_bc, old_bc, synonyms, t_counts, syn_counts):
syn_sum = sum([int(x) for x in syn_counts.split(',')])
bc_count = t_counts - syn_sum
synonyms = f'{old_bc},{synonyms}'
syn_counts = f'{bc_count},{syn_counts}'
return [new_bc, synonyms, t_counts, syn_counts]
def main():
options = get_options()
if options.output:
fout = options.output
else:
fout = sys.stdout
wl_df = pd.read_table(options.whitelist, header=None, index_col=0) # from UMI_tools only...
bc_list = set([x.strip() for x in open(options.barcodes)]) # from 10x?
# convert to rc if needed
if options.reverse_complement:
wl_df = rc_df(wl_df)
# first of all we keep exact matches in the index, to reduce the checks
known = bc_list.intersection(wl_df.index)
unknown = set(wl_df.index) - bc_list
sys.stderr.write(f'{len(known)}/{len(wl_df)} perfect barcode match\n')
sys.stderr.write(f'Analyzying the remaining {len(unknown)} barcodes\n')
reassign_idx = []
reassign_val = []
for bc in tqdm.tqdm(unknown):
synonyms, t_counts, syn_counts = wl_df.loc[bc]
syn_s = set(synonyms.split(','))
candidates = list(syn_s.intersection(bc_list))
if len(candidates) > 0:
# reshuffle
_p = reshuffle(candidates[0], bc, synonyms, t_counts, syn_counts)
reassign_idx.append(_p[0])
reassign_val.append(_p[1:])
elif options.exhaustive_search:
# try harder
for bc_s in syn_s:
candidates = [x for x in bc_list if ed.eval(bc_s, x) <= options.distance] #can be any of these, actually
if len(candidates) > 0:
_p = reassign(candidates[0], bc, synonyms, t_counts, syn_counts)
reassign_idx.append(_p[0])
reassign_val.append(_p[1:])
break
if len(reassign_idx) > 0:
reassigned = pd.DataFrame(reassign_val, index=reassign_idx, columns=wl_df.columns)
out_df = pd.concat([reassigned, wl_df.loc[known]], axis=0)
else:
out_df = wl_df.loc[known]
if options.reverse_complement:
# if reverse_complement, then re-reverse otherwise BC cannot be found in
# reads
out_df = rc_df(out_df)
out_df.to_csv(fout, sep="\t", header=None)
if __name__ == '__main__':
main()