diff --git a/clipkit/files.py b/clipkit/files.py index 00e1760..b88d093 100644 --- a/clipkit/files.py +++ b/clipkit/files.py @@ -45,6 +45,7 @@ def get_alignment_and_format( raise InvalidInputFileFormat("File could not be read") + def write_debug_log_file(msa): for info in msa.generate_debug_log_info(): log_file_logger.debug(f"{str(info[0] + 1)} {info[1]} {info[2].value} {info[3]}") diff --git a/clipkit/helpers.py b/clipkit/helpers.py index f0457b3..29ed4de 100644 --- a/clipkit/helpers.py +++ b/clipkit/helpers.py @@ -45,16 +45,14 @@ def get_gap_chars(seq_type: SeqType) -> list[str]: return DEFAULT_AA_GAP_CHARS -def create_msa(alignment: MultipleSeqAlignment, gap_chars: list[str]=None) -> MSA: +def create_msa(alignment: MultipleSeqAlignment, gap_chars: list[str] = None) -> MSA: """ Create MSA class """ return MSA.from_bio_msa(alignment, gap_chars) -def write_msa( - msa: MSA, out_file_name: str, out_file_format: FileFormat -) -> None: +def write_msa(msa: MSA, out_file_name: str, out_file_format: FileFormat) -> None: """ msa is populated with sites that are kept after trimming is finished """ @@ -78,4 +76,3 @@ def write_complement(msa: MSA, out_file: str, out_file_format: FileFormat) -> No elif out_file_format.value == "phylip_sequential": SeqIO.write(output_msa, out_file, "phylip-sequential") SeqIO.write(output_msa, completmentOut, out_file_format.value) - \ No newline at end of file diff --git a/clipkit/modes.py b/clipkit/modes.py index 082d897..e4001a2 100644 --- a/clipkit/modes.py +++ b/clipkit/modes.py @@ -6,6 +6,7 @@ from Bio.Align import MultipleSeqAlignment from .msa import MSA + class TrimmingMode(Enum): gappy = "gappy" smart_gap = "smart-gap" diff --git a/clipkit/msa.py b/clipkit/msa.py index 84ac93a..a72beec 100644 --- a/clipkit/msa.py +++ b/clipkit/msa.py @@ -5,12 +5,18 @@ from typing import Union from .modes import TrimmingMode -from .site_classification import SiteClassificationType, determine_site_classification_type +from .site_classification import ( + SiteClassificationType, + determine_site_classification_type, +) from .settings import DEFAULT_AA_GAP_CHARS from .stats import TrimmingStats + class MSA: - def __init__(self, header_info, seq_records, gap_chars=DEFAULT_AA_GAP_CHARS) -> None: + def __init__( + self, header_info, seq_records, gap_chars=DEFAULT_AA_GAP_CHARS + ) -> None: self.header_info = header_info self.seq_records = seq_records self._original_length = len(self.seq_records[0]) @@ -33,13 +39,15 @@ def to_bio_msa(self) -> MultipleSeqAlignment: return self._to_bio_msa(self.sites_kept) def complement_to_bio_msa(self) -> MultipleSeqAlignment: - return self._to_bio_msa(self.sites_trimmed) + return self._to_bio_msa(self.sites_trimmed) def _to_bio_msa(self, sites) -> MultipleSeqAlignment: # NOTE: we use the description as the id to preserve the full sequence description - see issue #20 return MultipleSeqAlignment( [ - SeqRecord(Seq("".join(rec)), id=str(info["description"]), description="") + SeqRecord( + Seq("".join(rec)), id=str(info["description"]), description="" + ) for rec, info in zip(sites.tolist(), self.header_info) ] ) @@ -86,20 +94,23 @@ def stats(self) -> TrimmingStats: def is_any_entry_sequence_only_gaps(self) -> tuple[bool, Union[str, None]]: for idx, row in enumerate(self.trimmed): - if ( - np.all(row == row[0]) # all values the same - and (row[0] in self.gap_chars) + if np.all(row == row[0]) and ( # all values the same + row[0] in self.gap_chars ): return True, self.header_info[idx].get("id") - return False, None + return False, None def trim( self, mode: TrimmingMode, gap_threshold=None, ) -> np.array: - self._site_positions_to_trim = self.determine_site_positions_to_trim(mode, gap_threshold) - self._site_positions_to_keep = np.delete(np.arange(self._original_length), self._site_positions_to_trim) + self._site_positions_to_trim = self.determine_site_positions_to_trim( + mode, gap_threshold + ) + self._site_positions_to_keep = np.delete( + np.arange(self._original_length), self._site_positions_to_trim + ) @property def column_character_frequencies(self): @@ -125,7 +136,7 @@ def column_character_frequencies(self): def site_classification_types(self): if self._site_classification_types is not None: return self._site_classification_types - + site_classification_types = np.array( [ determine_site_classification_type(col_char_freq) @@ -162,14 +173,8 @@ def determine_site_positions_to_trim(self, mode, gap_threshold): col_char_freqs = self.column_character_frequencies site_classification_types = self.site_classification_types sites_to_trim = np.where( - ( - site_classification_types - == SiteClassificationType.other - ) - | ( - site_classification_types - == SiteClassificationType.singleton - ) + (site_classification_types == SiteClassificationType.other) + | (site_classification_types == SiteClassificationType.singleton) )[0] elif mode in (TrimmingMode.kpic_gappy, TrimmingMode.kpic_smart_gap): sites_to_trim_gaps_based = np.where(self.site_gappyness >= gap_threshold)[0] @@ -177,14 +182,8 @@ def determine_site_positions_to_trim(self, mode, gap_threshold): col_char_freqs = self.column_character_frequencies site_classification_types = self.site_classification_types sites_to_trim_classification_based = np.where( - ( - site_classification_types - == SiteClassificationType.other - ) - | ( - site_classification_types - == SiteClassificationType.singleton - ) + (site_classification_types == SiteClassificationType.other) + | (site_classification_types == SiteClassificationType.singleton) )[0] sites_to_trim = np.unique( @@ -195,7 +194,6 @@ def determine_site_positions_to_trim(self, mode, gap_threshold): return sites_to_trim - def generate_debug_log_info(self): """ Returns tuples of site position, keep or trim, site classification type, and gappyness @@ -207,4 +205,9 @@ def generate_debug_log_info(self): keep_or_trim_lookup[trim_idx] = "trim" for idx, gappyness in enumerate(self.site_gappyness): - yield (idx, keep_or_trim_lookup[idx], self.site_classification_types[idx], gappyness) + yield ( + idx, + keep_or_trim_lookup[idx], + self.site_classification_types[idx], + gappyness, + ) diff --git a/clipkit/site_classification.py b/clipkit/site_classification.py index c484fb1..eeb3073 100644 --- a/clipkit/site_classification.py +++ b/clipkit/site_classification.py @@ -1,11 +1,13 @@ from enum import Enum + class SiteClassificationType(Enum): parsimony_informative = "parsimony-informative" constant = "constant" singleton = "singleton" other = "other" + def determine_site_classification_type( character_counts: dict, ) -> SiteClassificationType: diff --git a/tests/integration/test_api.py b/tests/integration/test_api.py index 5667347..005e280 100644 --- a/tests/integration/test_api.py +++ b/tests/integration/test_api.py @@ -6,6 +6,7 @@ from clipkit.modes import TrimmingMode from clipkit.msa import MSA + @pytest.mark.integration class TestApiInvocation(object): def test_input_file(self): @@ -20,7 +21,7 @@ def test_input_file(self): "alignment_length": 6, "output_length": 4, "trimmed_length": 2, - "trimmed_percentage": 33.333 + "trimmed_percentage": 33.333, } assert isinstance(trim_run.version, str) assert isinstance(trim_run.trimmed, MultipleSeqAlignment) @@ -36,6 +37,6 @@ def test_raw_alignment(self): "alignment_length": 6, "output_length": 5, "trimmed_length": 1, - "trimmed_percentage": 16.667 + "trimmed_percentage": 16.667, } assert isinstance(trim_run.version, str) diff --git a/tests/unit/test_warnings.py b/tests/unit/test_warnings.py index dd2139e..0e4bca8 100644 --- a/tests/unit/test_warnings.py +++ b/tests/unit/test_warnings.py @@ -16,28 +16,26 @@ class TestWarnings(object): ( [ {"id": "1", "name": "1", "description": "1"}, - {"id": "2", "name": "2", "description": "2"} + {"id": "2", "name": "2", "description": "2"}, ], - np.array([ - ["", "", "", "", "", ""], - ["", "", "", "", "", ""] - ]), - True + np.array([["", "", "", "", "", ""], ["", "", "", "", "", ""]]), + True, ), ( [ {"id": "1", "name": "1", "description": "1"}, - {"id": "2", "name": "2", "description": "2"} + {"id": "2", "name": "2", "description": "2"}, ], - np.array([ - ["A", "-", "G", "T", "A", "T"], - ["A", "-", "G", "-", "A", "T"] - ]), - False + np.array( + [["A", "-", "G", "T", "A", "T"], ["A", "-", "G", "-", "A", "T"]] + ), + False, ), - ] + ], ) - def test_warn_all_sites_trimmed(self, mocker, header_info, seq_records, should_warn): + def test_warn_all_sites_trimmed( + self, mocker, header_info, seq_records, should_warn + ): mocked_warning = mocker.patch("clipkit.warnings.logger.warning") msa = MSA(header_info, seq_records) @@ -50,46 +48,44 @@ def test_warn_all_sites_trimmed(self, mocker, header_info, seq_records, should_w else: mocked_warning.assert_not_called() - @pytest.mark.parametrize( "header_info, seq_records, gap_only_header_id", [ ( [ {"id": "1", "name": "1", "description": "1"}, - {"id": "2", "name": "2", "description": "2"} + {"id": "2", "name": "2", "description": "2"}, ], - np.array([ - ["-", "-", "-", "-", "-", "-"], - ["A", "G", "G", "T", "A", "C"] - ]), - "1" + np.array( + [["-", "-", "-", "-", "-", "-"], ["A", "G", "G", "T", "A", "C"]] + ), + "1", ), ( [ {"id": "1", "name": "1", "description": "1"}, - {"id": "2", "name": "2", "description": "2"} + {"id": "2", "name": "2", "description": "2"}, ], - np.array([ - ["A", "G", "G", "T", "A", "C"], - ["-", "-", "-", "-", "-", "-"] - ]), - "2" + np.array( + [["A", "G", "G", "T", "A", "C"], ["-", "-", "-", "-", "-", "-"]] + ), + "2", ), ( [ {"id": "1", "name": "1", "description": "1"}, - {"id": "2", "name": "2", "description": "2"} + {"id": "2", "name": "2", "description": "2"}, ], - np.array([ - ["A", "-", "G", "T", "A", "T"], - ["A", "-", "G", "-", "A", "T"] - ]), - None + np.array( + [["A", "-", "G", "T", "A", "T"], ["A", "-", "G", "-", "A", "T"]] + ), + None, ), - ] + ], ) - def test_warn_if_entry_contains_only_gaps(self, mocker, header_info, seq_records, gap_only_header_id): + def test_warn_if_entry_contains_only_gaps( + self, mocker, header_info, seq_records, gap_only_header_id + ): mocked_warning = mocker.patch("clipkit.warnings.logger.warning") msa = MSA(header_info, seq_records)