From cd81c24fce26cde4bfa4e0d9f9204c7c2b2965b9 Mon Sep 17 00:00:00 2001 From: maclandrol Date: Mon, 26 Mar 2018 15:58:23 -0400 Subject: [PATCH] bumb version --- bin/coretracker | 53 ++++++++------ bin/coretracker-run | 121 +++++++++++++++++++------------ conda_build/meta.yaml | 3 +- coretracker/__init__.py | 2 +- coretracker/coreutils/utils.py | 33 ++++----- coretracker/settings/settings.py | 2 +- requirements.txt | 3 +- 7 files changed, 127 insertions(+), 90 deletions(-) diff --git a/bin/coretracker b/bin/coretracker index 7b14a11..adad11d 100755 --- a/bin/coretracker +++ b/bin/coretracker @@ -29,6 +29,7 @@ import argparse import glob import logging import os +import gc process = psutil.Process(os.getpid()) ENABLE_PAR = True @@ -66,9 +67,9 @@ def AlignmentProgramType(program): def memory_used(): mem = 0 try: - mem = process.get_memory_info().rss/(1024.0*1024) + mem = process.get_memory_info().rss / (1024.0 * 1024) except: - mem = process.memory_info().rss/(1024.0*1024) + mem = process.memory_info().rss / (1024.0 * 1024) return mem @@ -159,34 +160,38 @@ def set_coretracker(args, settings): return reafinder, clf, model -def compile_result(x, clf, cod_align, model): +def compile_result(x, spec_data, clf, cod_align, model, glimit, fpos, settings): """compile result from analysis""" - reafinder, fitch, data = x - s_complete_data = utils.makehash() - s_complete_data['aa'][fitch.ori_aa1][fitch.dest_aa1] = data - s_complete_data['genome'] = reafinder.reassignment_mapper['genome'] + fitch, s_complete_data, slist = x + data = utils.makehash() + data['genome'] = spec_data + for spec in slist: + data['aa'][fitch.ori_aa1][fitch.dest_aa1][spec] = s_complete_data[spec] + + out_res = None X_data, X_labels, _ = read_from_json( - s_complete_data, None, use_global=reafinder.settings.USE_GLOBAL) + data, None, use_global=settings.USE_GLOBAL) # extract usefull features if X_data is not None and X_data.size: X_data, X_dataprint, selected_et = model.format_data(X_data) pred_prob = clf.predict_proba(X_data) #pred = clf.predict(X_data) pred = pred_prob.argmax(axis=1) - if sum(pred) == 0 and reafinder.settings.SKIP_EMPTY: + if sum(pred) == 0 and settings.SKIP_EMPTY: return None sppval, outdir, rkp, codvalid = utils.get_report( - fitch, reafinder, cod_align, (X_data, X_labels, pred_prob, pred)) + fitch, s_complete_data, cod_align, (X_data, X_labels, pred_prob, pred), glimit, fpos, settings) utils.print_data_to_txt(os.path.join(outdir, fitch.ori_aa + "_to_" + fitch.dest_aa + "_data.txt"), selected_et, X_dataprint, X_labels, pred, pred_prob, sppval, fitch.dest_aa, valid=codvalid) - tmp_data = [X_labels, pred, pred_prob, codvalid] - del X_data del X_dataprint - del s_complete_data - return rkp, tmp_data - else: - return None + out_res = (rkp, tmp_data, data['aa']) + + del fitch + del X_data + del s_complete_data + _ = gc.collect() + return out_res if __name__ == '__main__': @@ -291,6 +296,10 @@ if __name__ == '__main__': cod_align = SeqIO.to_dict(fcodon_align) reafinder.set_rea_mapper() + spec_data = reafinder.reassignment_mapper['genome'] + genelimit = reafinder.seqset.gene_limits + filt_position = reafinder.seqset.filt_position + # The program is run at this phase done = False results = [] @@ -298,18 +307,20 @@ if __name__ == '__main__': if args.parallel > 0 and ENABLE_PAR: results = Parallel(n_jobs=args.parallel, verbose=1)(delayed(compile_result)( - x, clf, cod_align, model) for x in reafinder.run_analysis(codon_align, fcodon_align)) + x, spec_data, clf, cod_align, model, genelimit, filt_position, setting) for x in reafinder.run_analysis(codon_align, fcodon_align)) done = True + elif args.parallel > 0: logging.warning( "Joblib requirement not found! Disabling parallelization") if not done: for x in reafinder.run_analysis(codon_align, fcodon_align): - results.append(compile_result(x, clf, cod_align, model)) + results.append(compile_result( + x, spec_data, clf, cod_align, model, genelimit, filt_position, setting)) # remove None results then unzip results = [r for r in results if r is not None] - results, ALL_PRED = zip(*results) + results, ALL_PRED, rjson = zip(*results) if args.valid and args.expos and results: rea_pos_keeper = ddict(dict) @@ -317,9 +328,9 @@ if __name__ == '__main__': for cuspec, readt in r.items(): for k in readt.keys(): rea_pos_keeper[cuspec][k] = readt[k] - exp_outfile = os.path.join(reafinder.settings.OUTDIR, "positions.json") + exp_outfile = os.path.join(setting.OUTDIR, "positions.json") reafinder.export_position(rea_pos_keeper, exp_outfile) - reafinder.save_all(ALL_PRED, True, nofilter=args.nofilter) + reafinder.save_all(ALL_PRED, rjson, True, nofilter=args.nofilter) logging.info("\n**END (%.3f s, %.3f MB)" % (abs(time.time() - start_t), memory_used())) diff --git a/bin/coretracker-run b/bin/coretracker-run index a4d2673..f811bc5 100755 --- a/bin/coretracker-run +++ b/bin/coretracker-run @@ -19,34 +19,34 @@ from coretracker.classifier import read_from_json from coretracker.coreutils import * from coretracker.settings import Settings from coretracker.coreutils.letterconfig import aa_letters_3to1 - from Bio import SeqIO +from functools import partial import argparse import logging import psutil import time import os import yaml +import resource +import gc import sys +import multiprocessing as mp process = psutil.Process(os.getpid()) -ENABLE_PAR = True -CPU_COUNT = 0 +CPU_COUNT = mp.cpu_count() + +global spec_data +global cod_align + try: - from multiprocessing import cpu_count - CPU_COUNT = cpu_count() - from joblib import Parallel, delayed, dump, load + from joblib import dump, load except ImportError: - try: - from sklearn.externals.joblib import Parallel, delayed, dump, load - except: - ENABLE_PAR = False + from sklearn.externals.joblib import dump, load etiquette = ["fitch", "suspected", "Fisher pval", "Gene frac", "N. rea", "N. used", "Cod. count", "Sub. count", "G. len", "codon_lik", "N. mixte", "id"] # , 'total_aa'] - def memory_used(): # Bad practice rewriting this method at each time mem = 0 @@ -57,33 +57,38 @@ def memory_used(): return mem -def compile_result(x, clf, cod_align, model): +def compile_result(x, spec_data, clf, cod_align, model, glimit, fpos, settings): """compile result from analysis""" - reafinder, fitch, data = x - s_complete_data = utils.makehash() - s_complete_data['aa'][fitch.ori_aa1][fitch.dest_aa1] = data - s_complete_data['genome'] = reafinder.reassignment_mapper['genome'] + fitch, s_complete_data, slist = x + data = utils.makehash() + data['genome'] = spec_data + for spec in slist: + data['aa'][fitch.ori_aa1][fitch.dest_aa1][spec] = s_complete_data[spec] + + out_res = None X_data, X_labels, _ = read_from_json( - s_complete_data, None, use_global=reafinder.settings.USE_GLOBAL) + data, None, use_global=settings.USE_GLOBAL) # extract usefull features if X_data is not None and X_data.size: X_data, X_dataprint, selected_et = model.format_data(X_data) pred_prob = clf.predict_proba(X_data) #pred = clf.predict(X_data) pred = pred_prob.argmax(axis=1) - if sum(pred) == 0 and reafinder.settings.SKIP_EMPTY: + if sum(pred) == 0 and settings.SKIP_EMPTY: return None sppval, outdir, rkp, codvalid = utils.get_report( - fitch, reafinder, cod_align, (X_data, X_labels, pred_prob, pred)) + fitch, s_complete_data, cod_align, (X_data, X_labels, pred_prob, pred), glimit, fpos, settings) utils.print_data_to_txt(os.path.join(outdir, fitch.ori_aa + "_to_" + fitch.dest_aa + "_data.txt"), selected_et, X_dataprint, X_labels, pred, pred_prob, sppval, fitch.dest_aa, valid=codvalid) tmp_data = [X_labels, pred, pred_prob, codvalid] - del X_data del X_dataprint - del s_complete_data - return rkp, tmp_data - else: - return None + out_res =(rkp, tmp_data, data['aa']) + + del fitch + del X_data + del s_complete_data + _ = gc.collect() + return out_res if __name__ == '__main__': @@ -113,6 +118,9 @@ if __name__ == '__main__': parser.add_argument('--parallel', dest='parallel', nargs='?', const=CPU_COUNT, type=int, default=0, help="Use Parallelization during execution for each reassignment. This does not guarantee an increase in speed. CPU count will be used if no argument is provided") + parser.add_argument('--memory_efficient', action="store_true", + dest='mefficient', help="Memory efficient execution") + parser.add_argument('--version', action='version', version='coretracker-prep v.%s' % __version__) parser.add_argument('--debug', dest='debug', action='store_true', @@ -123,9 +131,11 @@ if __name__ == '__main__': args = parser.parse_args() start_t = time.time() + if args.debug: logging.basicConfig(level=logging.DEBUG) aasubset = None + if args.aapair: try: with open(args.aapair) as f: @@ -136,41 +146,59 @@ if __name__ == '__main__': print("Input provided with --aapair is not valid !") sys.exit(0) - run_instance = load(args.input) - run_instance.rfinder.settings.update_params(COMPUTE_POS=args.expos) - run_instance.rfinder.settings.update_params(VALIDATION=args.valid) - run_instance.rfinder.settings.update_params(IMAGE_FORMAT=args.imformat) + pools = None + n_jobs = 1 + if args.parallel > 0: + pool = mp.Pool(args.parallel) + + r_instance = load(args.input, mmap_mode='r+') + logging.debug('Instance was read in %.3f s' % (time.time() - start_t)) + clf, model = r_instance.get_model(etiquette) + nofilter = r_instance.args.nofilter + reafinder = r_instance.rfinder + settings = reafinder.settings + settings.update_params(COMPUTE_POS=args.expos) + settings.update_params(VALIDATION=args.valid) + settings.update_params(IMAGE_FORMAT=args.imformat) if args.outdir: if not os.path.exists(args.outdir): os.makedirs(args.outdir) # let original error handling - run_instance.rfinder.settings.update_params(OUTDIR=args.outdir) + settings.update_params(OUTDIR=args.outdir) - codon_align, fcodon_align = run_instance.rfinder.seqset.get_codon_alignment() + if args.mefficient: + clf.clf.n_jobs = 1 + n_jobs = (args.parallel or 2) + + codon_align, fcodon_align = reafinder.seqset.get_codon_alignment() + spec_data = reafinder.reassignment_mapper['genome'] + genelimit = reafinder.seqset.gene_limits + filt_position = reafinder.seqset.filt_position cod_align = SeqIO.to_dict(fcodon_align) - clf, model = run_instance.get_model(etiquette) - reafinder = run_instance.rfinder - nofilter = run_instance.args.nofilter - del run_instance done = False results = [] ALL_PRED = [] - if args.parallel > 0 and ENABLE_PAR: - results = Parallel(n_jobs=args.parallel, verbose=1)(delayed(compile_result)( - x, clf, cod_align, model) for x in reafinder.run_analysis(codon_align, fcodon_align, aasubset)) + if pool and args.parallel: + partial_func = partial(compile_result, spec_data=spec_data, clf=clf, cod_align=cod_align, model=model, glimit=genelimit, + fpos=filt_position, settings=settings) + for res in pool.imap_unordered(partial_func, reafinder.run_analysis(codon_align, fcodon_align, aasubset), n_jobs): + if res is not None: + results.append(res) + + pool.close() + pool.join() done = True - elif args.parallel > 0: - logging.warning( - "Joblib requirement not found! Disabling parallelization") if not done: for x in reafinder.run_analysis(codon_align, fcodon_align, aasubset): - results.append(compile_result(x, clf, cod_align, model)) - # remove None results then unzip - results = [r for r in results if r is not None] - results, ALL_PRED = zip(*results) + results.append(compile_result( + x, spec_data, clf, cod_align, model, genelimit, filt_position, settings)) + results = [r for r in results if r is not None] + + # unzip data + results, ALL_PRED, rjson = zip(*results) if args.valid and args.expos and results: rea_pos_keeper = ddict(dict) @@ -178,9 +206,10 @@ if __name__ == '__main__': for cuspec, readt in r.items(): for k in readt.keys(): rea_pos_keeper[cuspec][k] = readt[k] - exp_outfile = os.path.join(reafinder.settings.OUTDIR, "positions.json") + exp_outfile = os.path.join(settings.OUTDIR, "positions.json") reafinder.export_position(rea_pos_keeper, exp_outfile) - reafinder.save_all(ALL_PRED, True, nofilter=nofilter) + reafinder.save_all(ALL_PRED, rjson, True, nofilter=nofilter) + logging.info("\n**END (%.3f s, %.3f MB)" % (abs(time.time() - start_t), memory_used())) diff --git a/conda_build/meta.yaml b/conda_build/meta.yaml index 2b47231..f236bb1 100644 --- a/conda_build/meta.yaml +++ b/conda_build/meta.yaml @@ -1,4 +1,4 @@ -{% set version = "1.4.7" %} +{% set version = "1.4.8" %} package: name: coretracker @@ -52,6 +52,7 @@ requirements: - muscle - hmmer - mafft + - joblib test: diff --git a/coretracker/__init__.py b/coretracker/__init__.py index 25d27e2..9c147e0 100644 --- a/coretracker/__init__.py +++ b/coretracker/__init__.py @@ -12,7 +12,7 @@ # along with this program. If not, see . __project__ = 'CoreTracker' -__version__ = '1.4.7' +__version__ = '1.4.8' __author__ = 'Emmanuel Noutahi' __email__ = "fmr.noutahi@umontreal.ca" __license__ = "GPLv3" diff --git a/coretracker/coreutils/utils.py b/coretracker/coreutils/utils.py index a58152b..4ce2605 100644 --- a/coretracker/coreutils/utils.py +++ b/coretracker/coreutils/utils.py @@ -1148,7 +1148,8 @@ def __init__(self, seqset, settings): self.interesting_case = [] self.reassignment_mapper = makehash() - def update_reas(self, codon, cible_aa, speclist, codon_align, codonpos, filt_pos, genelim): + @classmethod + def update_reas(clc, codon, cible_aa, speclist, codon_align, codonpos, filt_pos, genelim): """Update the list of codons reassignment and position""" rea_position_keeper = defaultdict(dict) genelim = sorted(genelim, key=lambda x: x[2]) @@ -1467,10 +1468,11 @@ def _valid_state(spec, valid): OUT.write("\t%s\t%s\n" % (spec_with_rea[0].ljust( max_spec_len), "\t".join(spec_with_rea[1:]))) - def save_all(self, predictions, savecodon=False, nofilter=False): + def save_all(self, predictions, rjson, savecodon=False, nofilter=False): """Save everything""" if savecodon: self.reassignment_mapper['codons'] = self.get_codon_usage() + self.reassignment_mapper['aa'] = rjson self.save_json() if self.settings.SAVE_ALIGN: id_filtfile = os.path.join( @@ -1509,7 +1511,6 @@ def run_analysis(self, codon_align, fcodon_align, aasubset=None): slist = fitch.get_species_list( self.settings.LIMIT_TO_SUSPECTED_SPECIES) alldata = {} - prediction_data = {} for genome in self.seqset.common_genome: rec = self.seqset.prot_dict[genome] leaf = fitch.tree & genome @@ -1595,13 +1596,11 @@ def run_analysis(self, codon_align, fcodon_align, aasubset=None): gdata['codons'] = {'global': gcodon_rea.get_aa_usage( genome, aa2), 'filtered': fcodon_rea.get_aa_usage(genome, aa2)} alldata[genome] = gdata - if genome in slist: - prediction_data[genome] = gdata if(fitch.is_valid(self.settings.COUNT_THRESHOLD) or self.settings.SHOW_ALL): - self.reassignment_mapper['aa'][aa2][aa1] = alldata - self.interesting_case.append("%s to %s" % (aa2, aa1)) - yield (self, fitch, prediction_data) + logging.debug( + "- Case with predictions: %s to %s" % (aa2, aa1)) + yield (fitch, alldata, slist) # free objects del aa_alignment del gcodon_rea @@ -2016,13 +2015,11 @@ def get_suffix(x, add_label): return x if add_label else "" -def get_report(fitchtree, reafinder, codon_align, prediction, output="", pie_size=45): +def get_report(fitchtree, gdata, codon_align, prediction, genelimit, filt_position, settings, output="", pie_size=45): """ Render tree to a pdf""" - settings = reafinder.settings OUTDIR = purge_directory(os.path.join(settings.OUTDIR, fitchtree.ori_aa + "_to_" + fitchtree.dest_aa)) - gdata = reafinder.reassignment_mapper['aa'][ - fitchtree.ori_aa1][fitchtree.dest_aa1] + c_rea = get_rea_genome_list(prediction[1], prediction[3]) if not output: output = os.path.join(OUTDIR, "Codon_data." + settings.IMAGE_FORMAT) @@ -2119,7 +2116,7 @@ def get_report(fitchtree, reafinder, codon_align, prediction, output="", pie_siz table['---'] = '-' table['...'] = '.' data_present, data_var, rkp, codvalid = codon_adjust_improve( - fitchtree, reafinder, codon_align, table, prediction, outdir=OUTDIR) + fitchtree, codon_align, table, prediction, genelimit, filt_position, settings, outdir=OUTDIR) # get report output rep_out = os.path.join(OUTDIR, "Report_" + @@ -2297,15 +2294,13 @@ def violin_plot(vals, output, score, codon, cible, imformat="pdf"): return output, (codon, cible, score) -def codon_adjust_improve(fitchtree, reafinder, codon_align, codontable, prediction, outdir=""): +def codon_adjust_improve(fitchtree, codon_align, codontable, prediction, genelimit, filt_position, settings, outdir=""): """Get a representation of the improvement after translation""" cible_aa = fitchtree.dest_aa1 X_data, X_labels, pred_prob, pred = prediction true_codon_set = get_codon_set_and_genome(pred, X_labels, 1) - settings = reafinder.settings - genelimit = reafinder.seqset.gene_limits - filt_position = reafinder.seqset.filt_position + sc_meth = settings.MATRIX method = settings.MODE if settings.MODE in [ 'wilcoxon', 'mannwhitney', 'ttest'] else 'wilcoxon' @@ -2339,7 +2334,7 @@ def codon_adjust_improve(fitchtree, reafinder, codon_align, codontable, predicti limits = limit_finder_fn(codon_pos) if settings.COMPUTE_POS: # only compute this if asked - rea_pos = reafinder.update_reas( + rea_pos = ReaGenomeFinder.update_reas( codon, cible_aa, speclist, codon_align, pos, filt_position, genelimit) for cuspec, readt in rea_pos.items(): for k in readt.keys(): @@ -2377,7 +2372,7 @@ def codon_adjust_improve(fitchtree, reafinder, codon_align, codontable, predicti logging.debug('{} --> {} : {:.2e}'.format(*viout)) - codvalid[codon] = (tmpvalid, viout[-1] < reafinder.confd) + codvalid[codon] = (tmpvalid, viout[-1] < settings.CONF) data_var[codon] = score_improve diff --git a/coretracker/settings/settings.py b/coretracker/settings/settings.py index a4888bd..ae3d6de 100644 --- a/coretracker/settings/settings.py +++ b/coretracker/settings/settings.py @@ -84,7 +84,7 @@ def set(self, **kwargs): self.SUBMAT = getattr(MatrixInfo, "blosum62") # alpha to use , default is 0.05 - self.CONF = kwargs.get('CONF', parameters.CONF) + self.CONF = kwargs.get('CONF', parameters.CONF) or 0.05 self.STARTDIST = kwargs.get('STARTDIST', parameters.STARTDIST) self.SHOW_ALL = kwargs.get('SHOW_ALL', parameters.SHOW_ALL) self.FORCED_CHI2 = kwargs.get('FORCED_CHI2', parameters.FORCED_CHI2) diff --git a/requirements.txt b/requirements.txt index da339a9..c9ae947 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ biopython>=1.65 ete3 WeasyPrint seaborn -psutil \ No newline at end of file +psutil +joblib \ No newline at end of file