diff --git a/moPepGen/cli/generate_index.py b/moPepGen/cli/generate_index.py index 4d2f5026..6f8ee0a6 100644 --- a/moPepGen/cli/generate_index.py +++ b/moPepGen/cli/generate_index.py @@ -83,6 +83,9 @@ def create_gtf_copy(file:Path, output_dir:Path, symlink:bool=True) -> Path: if file.suffix.lower() == '.gz': if symlink: symlink = False + logger( + "--gtf-symlink was suppressed because compressed GTF file was received. " + ) elif file.suffix.lower() != '.gtf': raise ValueError(f"Cannot handle gtf file {file}") diff --git a/moPepGen/gtf/GTFPointer.py b/moPepGen/gtf/GTFPointer.py index 29ab895a..502a102a 100644 --- a/moPepGen/gtf/GTFPointer.py +++ b/moPepGen/gtf/GTFPointer.py @@ -8,6 +8,7 @@ TranscriptAnnotationModel, GTF_FEATURE_TYPES ) +from moPepGen.gtf.GTFSourceInferrer import GTFSourceInferrer GENE_DICT_CACHE_SIZE = 10 @@ -104,8 +105,7 @@ def to_line(self) -> str: def iterate_pointer(handle:IO, source:str=None) -> Iterable[Union[GenePointer, TranscriptPointer]]: """ Iterate over a GTF file and yield pointers. """ if not source: - count = 0 - inferred = {} + inferrer = GTFSourceInferrer() cur_gene_id:str = None cur_tx_id:str = None @@ -124,18 +124,7 @@ def iterate_pointer(handle:IO, source:str=None) -> Iterable[Union[GenePointer, T record = GtfIO.line_to_seq_feature(line) if not source: - if count > 100: - inferred = sorted(inferred.items(), key=lambda x: x[1]) - source = inferred[-1][0] - record.source = source - else: - count += 1 - record.infer_annotation_source() - inferred_source = record.source - if inferred_source not in inferred: - inferred[inferred_source] = 1 - else: - inferred[inferred_source] += 1 + record.source = inferrer.infer(record) else: record.source = source diff --git a/moPepGen/gtf/GTFSourceInferrer.py b/moPepGen/gtf/GTFSourceInferrer.py new file mode 100644 index 00000000..4c3c1aa3 --- /dev/null +++ b/moPepGen/gtf/GTFSourceInferrer.py @@ -0,0 +1,31 @@ +""" Infer GTF source (e.g. GENCODE/ENSEMBL) """ +from __future__ import annotations +from typing import Dict, TYPE_CHECKING + + +if TYPE_CHECKING: + from moPepGen.gtf.GTFSeqFeature import GTFSeqFeature + +class GTFSourceInferrer(): + """ Infer GTF source (e.g. GENOCDE/ENSEMBL) """ + def __init__(self): + """ Constructor """ + self.max_iter = 100 + self.data:Dict[str,int] = {} + self.count = 0 + self.source:str = None + + def infer(self, record:GTFSeqFeature) -> str: + """ Infer the source of a GTF record """ + if self.count > self.max_iter: + if not self.source: + self.source = sorted(self.data.items(), key=lambda x:x[1])[-1][0] + return self.source + self.count += 1 + record.infer_annotation_source() + source = record.source + if source not in self.data: + self.data[source] = 1 + else: + self.data[source] += 1 + return source diff --git a/moPepGen/gtf/GenomicAnnotation.py b/moPepGen/gtf/GenomicAnnotation.py index bf2142ae..adbde542 100644 --- a/moPepGen/gtf/GenomicAnnotation.py +++ b/moPepGen/gtf/GenomicAnnotation.py @@ -10,6 +10,7 @@ from .TranscriptAnnotationModel import TranscriptAnnotationModel, GTF_FEATURE_TYPES from .GeneAnnotationModel import GeneAnnotationModel from .GTFSeqFeature import GTFSeqFeature +from .GTFSourceInferrer import GTFSourceInferrer if TYPE_CHECKING: @@ -113,25 +114,14 @@ def dump_gtf(self, handle:Union[str, IO], biotype:List[str]=None, source:str=Non """ record:GTFSeqFeature if not source: - count = 0 - inferred = {} + inferrer = GTFSourceInferrer() + for record in GtfIO.parse(handle): if biotype is not None and record.biotype not in biotype: continue if not source: - if count > 100: - inferred = sorted(inferred.items(), key=lambda x: x[1]) - source = inferred[-1][0] - record.source = source - else: - count += 1 - record.infer_annotation_source() - inferred_source = record.source - if inferred_source not in inferred: - inferred[inferred_source] = 1 - else: - inferred[inferred_source] += 1 + record.source = inferrer.infer(record) else: record.source = source @@ -142,11 +132,11 @@ def dump_gtf(self, handle:Union[str, IO], biotype:List[str]=None, source:str=Non self.add_transcript_record(record) - if not source: - inferred = sorted(inferred.items(), key=lambda x: x[1]) - source = inferred[-1][0] - self.source = source + if not source: + source = inferrer.source + else: + self.source = source for transcript_model in self.transcripts.values(): transcript_model.sort_records()