From f6341341cf463d88a7d3b0b181a3b032578b68ad Mon Sep 17 00:00:00 2001 From: clintval Date: Fri, 24 Nov 2023 11:24:48 -0800 Subject: [PATCH] Add a context manager for zipping FASTX files --- docs/api.rst | 7 ++ fgpyo/fastx/__init__.py | 107 +++++++++++++++++++ fgpyo/fastx/tests/__init__.py | 0 fgpyo/fastx/tests/test_fastx_zipped.py | 141 +++++++++++++++++++++++++ 4 files changed, 255 insertions(+) create mode 100644 fgpyo/fastx/__init__.py create mode 100644 fgpyo/fastx/tests/__init__.py create mode 100644 fgpyo/fastx/tests/test_fastx_zipped.py diff --git a/docs/api.rst b/docs/api.rst index 63da3ba7..c3e54ec4 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -24,6 +24,13 @@ FASTA files .. automodule:: fgpyo.fasta.builder :members: + +FASTX files +=========== + +.. automodule:: fgpyo.fastx + :members: + Metric files ============ diff --git a/fgpyo/fastx/__init__.py b/fgpyo/fastx/__init__.py new file mode 100644 index 00000000..56e23a75 --- /dev/null +++ b/fgpyo/fastx/__init__.py @@ -0,0 +1,107 @@ +""" +Tools for working with FASTA or FASTQ data +------------------------------------------ + +Examples of Zipping FASTX Files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Zipping a series of FASTA/FASTQ files in a single stream of data is a common task in bioinformatics +and be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager and iterator. The context +manager facilitates opening of all input FASTA/FASTQ files and and for every iteration will return +a tuple of the next :class:`~pysam.FastxRecord` records. An exception will be raised if any of the +input files are truncated or if record names are not equivalent and in sync. + +Importantly, this context manager is optimized for fast streaming read-only usage and any previous +records saved while advancing the iterator will not be correct as the underlying pointer in memory +will refer to the most recent record, and not any past records. To save past records during +iteration use the builtin `copy()` or `deepcopy()` methods in the Python standard library. + +.. code-block:: python + + >>> from fgpyo.fastx import FastxZipped + >>> with FastxZipped("r1.fq", "r2.fq") as zipped: + ... for (r1, r2) in zipped: + ... print(f"{r1.name}: {r1.sequence}, {r2.name}: {r2.sequence}") + seq1: AAAA, seq1: CCCC + seq2: GGGG, seq2: TTTT + +""" +from contextlib import AbstractContextManager +from pathlib import Path +from types import TracebackType +from typing import cast +from typing import Iterator +from typing import Optional +from typing import Union +from typing import Tuple +from typing import Type + +from pysam import FastxFile, FastxRecord + + +class FastxZipped(AbstractContextManager, Iterator[Tuple[FastxRecord, ...]]): + """A context manager that will lazily zip over any number of FASTA/FASTQ files. + + Attributes: + paths: Paths to the FASTX files to zip over. + + """ + + def __init__(self, *paths: Union[Path, str]) -> None: + """Insantiate a :class:`FastxZipped` context manager and iterator. + + Args: + paths: Paths to the FASTX files to zip over. + + """ + if len(paths) <= 0: + raise ValueError(f"Must provide at least one FASTX to {self.__class__.__name__}") + self.paths: Tuple[Union[Path, str], ...] = paths + self._fastx: Tuple[FastxFile, ...] = tuple() + + def __enter__(self) -> "FastxZipped": + """Enter the :class:`FastxZipped` context manager by opening all FASTX files.""" + self._fastx = tuple(FastxFile(str(path), persist=False) for path in self.paths) + return self + + def _next_record_safe(self, fastx: FastxFile) -> Optional[FastxRecord]: + """Return the next record from a FASTX file or None if there are none left.""" + try: + return next(fastx) + except StopIteration: + return None + + def __next__(self) -> Tuple[FastxRecord, ...]: + """Return the next set of FASTX records from the zipped FASTX files. + + Warning: + Do not rely on previous values from :func:`~fgpyo.fastx.FastxZipped.next()` + as this iterator will not persist the string value stored in memory from + previous returns. This iterator is fast and meant for streaming read-only + use as a result of this design decision. + + """ + records = tuple(self._next_record_safe(handle) for handle in self._fastx) + if all(record is None for record in records): + raise StopIteration + elif any(record is None for record in records): + raise ValueError("One or more of the FASTX files is truncated!") + else: + records = cast(Tuple[FastxRecord, ...], records) + record_names = {record.name for record in records} + if len(record_names) != 1: + raise ValueError(f"FASTX record names do not all match: {record_names}") + return records + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> Optional[bool]: + """Exit the :class:`FastxZipped` context manager by closing all FASTX files.""" + for fastx in self._fastx: + fastx.close() + if exc_type is not None: + raise exc_type(exc_val).with_traceback(exc_tb) + return None diff --git a/fgpyo/fastx/tests/__init__.py b/fgpyo/fastx/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fgpyo/fastx/tests/test_fastx_zipped.py b/fgpyo/fastx/tests/test_fastx_zipped.py new file mode 100644 index 00000000..7dccb7e7 --- /dev/null +++ b/fgpyo/fastx/tests/test_fastx_zipped.py @@ -0,0 +1,141 @@ +import pytest + +from fgpyo.fastx import FastxZipped + +# TODO: Remove all the assert boilerplate when FastxRecord can be used for equality testing: +# https://github.com/pysam-developers/pysam/issues/1243 + + +def test_fastx_zipped_requires_at_least_one_fastx() -> None: + """Test that :class:`FastxZipped` requires at least one FASTX path to be instantiated.""" + with pytest.raises(ValueError, match=r"Must provide at least one FASTX"): + FastxZipped() + + +def test_fastx_zipped_iterates_one_empty_fastx(tmp_path) -> None: + """Test that :class:`FastxZipped` can iterate over one empty FASTX file.""" + input = tmp_path / "input" + input.mkdir() + fastx = input / "input.fastx" + fastx.write_text("") + + with FastxZipped(fastx) as handle: + assert len(list(handle)) == 0 + + +def test_fastx_zipped_iterates_over_a_single_fasta(tmp_path) -> None: + """Test that :class:`FastxZipped` can iterate over a single FASTA file.""" + input = tmp_path / "input" + input.mkdir() + fasta = input / "input.fasta" + fasta.write_text(">seq1\nACGT\n>seq2\nTGCA\n") + + context_manager = FastxZipped(fasta) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def test_fastx_zipped_iterates_over_a_single_fastq(tmp_path) -> None: + """Test that :class:`FastxZipped` can iterate over a single FASTQ file.""" + input = tmp_path / "input" + input.mkdir() + fasta = input / "input.fasta" + fasta.write_text("@seq1\tcomment1\nACGT\n+\nFFFF\n" + "@seq2\tcomment2\nTGCA\n+\n!!!!\n") + + context_manager = FastxZipped(fasta) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + assert record1.comment == "comment1" + assert record1.quality == "FFFF" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + assert record2.comment == "comment2" + assert record2.quality == "!!!!" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path) -> None: + """Test that :class:`FastxZipped` raises an exception on truncated FASTX files.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n") + fasta2.write_text(">seq1\nCCCC\n>seq2\nGGGG\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "CCCC" + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + context_manager = FastxZipped(fasta2, fasta1) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "CCCC" + assert record2.name == "seq1" + assert record2.sequence == "AAAA" + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_can_iterate_over_multiple_fastx_files(tmp_path) -> None: + """Test that :class:`FastxZipped` can iterate over multiple FASTX files.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n") + fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "GGGG" + (record1, record2) = next(handle) + assert record1.name == "seq2" + assert record1.sequence == "CCCC" + assert record2.name == "seq2" + assert record2.sequence == "TTTT" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path) -> None: + """Test that :class:`FastxZipped` raises an exception on mismatched sequence names.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n") + fasta2.write_text(">seq2\nCCCC\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + with pytest.raises(ValueError, match=r"FASTX record names do not all match"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx)