From 85ec65328ad72a472a5ca197f4e642635df11e16 Mon Sep 17 00:00:00 2001 From: clintval Date: Mon, 27 Nov 2023 17:12:05 -0800 Subject: [PATCH] chore: suit review from @nh13 --- fgpyo/fastx/__init__.py | 26 +++++++++++-------- fgpyo/fastx/tests/test_fastx_zipped.py | 36 ++++++++++++++++++++++++-- 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/fgpyo/fastx/__init__.py b/fgpyo/fastx/__init__.py index 42eff051..276199c9 100644 --- a/fgpyo/fastx/__init__.py +++ b/fgpyo/fastx/__init__.py @@ -1,6 +1,6 @@ """ -Examples of Zipping FASTX Files -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Zipping FASTX Files +~~~~~~~~~~~~~~~~~~~ Zipping a set of FASTA/FASTQ files into a single stream of data is a common task in bioinformatics and can be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager. The context manager @@ -59,23 +59,27 @@ def __enter__(self) -> "FastxZipped": self._fastx = tuple(FastxFile(str(path), persist=self._persist) for path in self._paths) return self - def _next_record_safe(self, fastx: FastxFile) -> Optional[FastxRecord]: - """Return the next record from a FASTX file or :py:obj:`None` if there are none left.""" - try: - return next(fastx) - except StopIteration: - return None + @staticmethod + def _name_minus_ordinal(name: str) -> str: + """Return the name of the FASTX record minus its ordinal suffix (e.g. "/1" or "/2").""" + return name[: len(name) - 2] if len(name) >= 2 and name[-2] == "/" else name def __next__(self) -> Tuple[FastxRecord, ...]: """Return the next set of FASTX records from the zipped FASTX files.""" - records = tuple(self._next_record_safe(handle) for handle in self._fastx) + records = tuple(next(handle, None) for handle in self._fastx) if all(record is None for record in records): raise StopIteration elif any(record is None for record in records): - raise ValueError("One or more of the FASTX files is truncated!") + sequence_name: str = [record.name for record in records if record is not None][0] + raise ValueError( + f"One or more of the FASTX files is truncated for sequence {sequence_name}:\n\t" + + "\n\t".join( + str(self._paths[i]) for i, record in enumerate(records) if record is None + ) + ) else: records = cast(Tuple[FastxRecord, ...], records) - record_names = {record.name for record in records} + record_names: set[str] = {self._name_minus_ordinal(record.name) for record in records} if len(record_names) != 1: raise ValueError(f"FASTX record names do not all match: {record_names}") return records diff --git a/fgpyo/fastx/tests/test_fastx_zipped.py b/fgpyo/fastx/tests/test_fastx_zipped.py index 5bba1565..3638cf0d 100644 --- a/fgpyo/fastx/tests/test_fastx_zipped.py +++ b/fgpyo/fastx/tests/test_fastx_zipped.py @@ -106,7 +106,7 @@ def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> No assert record1.sequence == "AAAA" assert record2.name == "seq1" assert record2.sequence == "CCCC" - with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"): + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"): next(handle) assert all(fastx.closed for fastx in context_manager._fastx) @@ -118,7 +118,7 @@ def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> No assert record1.sequence == "CCCC" assert record2.name == "seq1" assert record2.sequence == "AAAA" - with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"): + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"): next(handle) assert all(fastx.closed for fastx in context_manager._fastx) @@ -164,3 +164,35 @@ def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: P next(handle) assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_handles_sequence_names_with_suffixes(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` does not use sequence name suffixes in equality tests.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1/1\nAAAA\n") + fasta2.write_text(">seq1/2\nCCCC\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1/1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1/2" + assert record2.sequence == "CCCC" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped__name_minus_ordinal_works_with_r1_and_r2_ordinals() -> None: + assert FastxZipped._name_minus_ordinal("seq1") == "seq1" + assert FastxZipped._name_minus_ordinal("seq1/1") == "seq1" + assert FastxZipped._name_minus_ordinal("seq1/2") == "seq1" + assert FastxZipped._name_minus_ordinal("1") == "1" + assert FastxZipped._name_minus_ordinal("1/1") == "1" + assert FastxZipped._name_minus_ordinal("1/2") == "1" + assert FastxZipped._name_minus_ordinal("") == "" + assert FastxZipped._name_minus_ordinal("/1") == "" + assert FastxZipped._name_minus_ordinal("/2") == ""