Skip to content

Commit

Permalink
chore: suit review from @nh13
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval committed Nov 28, 2023
1 parent 4c2e82a commit 312a458
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
24 changes: 13 additions & 11 deletions fgpyo/fastx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
Examples of Zipping FASTX Files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Zipping FASTX Files
~~~~~~~~~~~~~~~~~~~
Zipping a set of FASTA/FASTQ files into a single stream of data is a common task in bioinformatics
and can be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager. The context manager
Expand Down Expand Up @@ -59,23 +59,25 @@ def __enter__(self) -> "FastxZipped":
self._fastx = tuple(FastxFile(str(path), persist=self._persist) for path in self._paths)
return self

def _next_record_safe(self, fastx: FastxFile) -> Optional[FastxRecord]:
"""Return the next record from a FASTX file or :py:obj:`None` if there are none left."""
try:
return next(fastx)
except StopIteration:
return None
@staticmethod
def _name_minus_ordinal(name: str) -> str:
"""Return the name of the FASTX record minus its ordinal suffix (e.g. "/1" or "/2")."""
return name[:len(name) - 2] if len(name) >= 2 and name[-2] == "/" else name

def __next__(self) -> Tuple[FastxRecord, ...]:
"""Return the next set of FASTX records from the zipped FASTX files."""
records = tuple(self._next_record_safe(handle) for handle in self._fastx)
records = tuple(next(handle, None) for handle in self._fastx)
if all(record is None for record in records):
raise StopIteration
elif any(record is None for record in records):
raise ValueError("One or more of the FASTX files is truncated!")
sequence_name: str = [record.name for record in records if record is not None][0]
raise ValueError(
f"One or more of the FASTX files is truncated for sequence {sequence_name}:\n\t"
+ "\n\t".join(str(self._paths[i]) for i, record in enumerate(records) if record is None)
)
else:
records = cast(Tuple[FastxRecord, ...], records)
record_names = {record.name for record in records}
record_names: set[str] = {self._name_minus_ordinal(record.name) for record in records}
if len(record_names) != 1:
raise ValueError(f"FASTX record names do not all match: {record_names}")
return records
Expand Down
35 changes: 33 additions & 2 deletions fgpyo/fastx/tests/test_fastx_zipped.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> No
assert record1.sequence == "AAAA"
assert record2.name == "seq1"
assert record2.sequence == "CCCC"
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"):
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"):
next(handle)

assert all(fastx.closed for fastx in context_manager._fastx)
Expand All @@ -118,7 +118,7 @@ def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> No
assert record1.sequence == "CCCC"
assert record2.name == "seq1"
assert record2.sequence == "AAAA"
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"):
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"):
next(handle)

assert all(fastx.closed for fastx in context_manager._fastx)
Expand Down Expand Up @@ -164,3 +164,34 @@ def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: P
next(handle)

assert all(fastx.closed for fastx in context_manager._fastx)


def tests_fastx_zipped_handles_sequence_names_with_suffixes(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` does not use sequence name suffixes in equality tests."""
input = tmp_path / "input"
input.mkdir()
fasta1 = input / "input1.fasta"
fasta2 = input / "input2.fasta"
fasta1.write_text(">seq1/1\nAAAA\n")
fasta2.write_text(">seq1/2\nCCCC\n")

context_manager = FastxZipped(fasta1, fasta2)
with context_manager as handle:
(record1, record2) = next(handle)
assert record1.name == "seq1/1"
assert record1.sequence == "AAAA"
assert record2.name == "seq1/2"
assert record2.sequence == "CCCC"

assert all(fastx.closed for fastx in context_manager._fastx)

def tests_fastx_zipped__name_minus_ordinal_works_with_r1_and_r2_ordinals() -> None:
assert FastxZipped._name_minus_ordinal("seq1") == "seq1"
assert FastxZipped._name_minus_ordinal("seq1/1") == "seq1"
assert FastxZipped._name_minus_ordinal("seq1/2") == "seq1"
assert FastxZipped._name_minus_ordinal("1") == "1"
assert FastxZipped._name_minus_ordinal("1/1") == "1"
assert FastxZipped._name_minus_ordinal("1/2") == "1"
assert FastxZipped._name_minus_ordinal("") == ""
assert FastxZipped._name_minus_ordinal("/1") == ""
assert FastxZipped._name_minus_ordinal("/2") == ""

0 comments on commit 312a458

Please sign in to comment.