From b76544c05aad231997f18640c243f91fed9723e1 Mon Sep 17 00:00:00 2001 From: clintval Date: Fri, 24 Nov 2023 11:24:48 -0800 Subject: [PATCH] Add a context manager for zipping FASTX files --- docs/api.rst | 7 ++ fgpyo/fastx/__init__.py | 94 ++++++++++++++++ fgpyo/fastx/tests/__init__.py | 0 fgpyo/fastx/tests/test_fastx_zipped.py | 144 +++++++++++++++++++++++++ 4 files changed, 245 insertions(+) create mode 100644 fgpyo/fastx/__init__.py create mode 100644 fgpyo/fastx/tests/__init__.py create mode 100644 fgpyo/fastx/tests/test_fastx_zipped.py diff --git a/docs/api.rst b/docs/api.rst index 63da3ba7..c3e54ec4 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -24,6 +24,13 @@ FASTA files .. automodule:: fgpyo.fasta.builder :members: + +FASTX files +=========== + +.. automodule:: fgpyo.fastx + :members: + Metric files ============ diff --git a/fgpyo/fastx/__init__.py b/fgpyo/fastx/__init__.py new file mode 100644 index 00000000..42eff051 --- /dev/null +++ b/fgpyo/fastx/__init__.py @@ -0,0 +1,94 @@ +""" +Examples of Zipping FASTX Files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Zipping a set of FASTA/FASTQ files into a single stream of data is a common task in bioinformatics +and can be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager. The context manager +facilitates opening of all input FASTA/FASTQ files and closing them after iteration is complete. +For every iteration of :class:`~fgpyo.fastx.FastxZipped`, a tuple of the next FASTX records are +returned (of type :class:`~pysam.FastxRecord`). An exception will be raised if any of the input +files are malformed or truncated and if record names are not equivalent and in sync. + +Importantly, this context manager is optimized for fast streaming read-only usage and, by default, +any previous records saved while advancing the iterator will not be correct as the underlying +pointer in memory will refer to the most recent record only, and not any past records. To preserve +the state of all previously iterated records, set the parameter ``persist`` to :py:obj:`True`. + +.. code-block:: python + + >>> from fgpyo.fastx import FastxZipped + >>> with FastxZipped("r1.fq", "r2.fq", persist=False) as zipped: + ... for (r1, r2) in zipped: + ... print(f"{r1.name}: {r1.sequence}, {r2.name}: {r2.sequence}") + seq1: AAAA, seq1: CCCC + seq2: GGGG, seq2: TTTT + +""" +from contextlib import AbstractContextManager +from pathlib import Path +from types import TracebackType +from typing import cast +from typing import Iterator +from typing import Optional +from typing import Union +from typing import Tuple +from typing import Type + +from pysam import FastxFile, FastxRecord + + +class FastxZipped(AbstractContextManager, Iterator[Tuple[FastxRecord, ...]]): + """A context manager that will lazily zip over any number of FASTA/FASTQ files. + + Args: + paths: Paths to the FASTX files to zip over. + persist: Whether to persit the state of previous records during iteration. + + """ + + def __init__(self, *paths: Union[Path, str], persist: bool = False) -> None: + """Insantiate a :class:`FastxZipped` context manager and iterator.""" + if len(paths) <= 0: + raise ValueError(f"Must provide at least one FASTX to {self.__class__.__name__}") + self._persist: bool = persist + self._paths: Tuple[Union[Path, str], ...] = paths + self._fastx: Tuple[FastxFile, ...] = tuple() + + def __enter__(self) -> "FastxZipped": + """Enter the :class:`FastxZipped` context manager by opening all FASTX files.""" + self._fastx = tuple(FastxFile(str(path), persist=self._persist) for path in self._paths) + return self + + def _next_record_safe(self, fastx: FastxFile) -> Optional[FastxRecord]: + """Return the next record from a FASTX file or :py:obj:`None` if there are none left.""" + try: + return next(fastx) + except StopIteration: + return None + + def __next__(self) -> Tuple[FastxRecord, ...]: + """Return the next set of FASTX records from the zipped FASTX files.""" + records = tuple(self._next_record_safe(handle) for handle in self._fastx) + if all(record is None for record in records): + raise StopIteration + elif any(record is None for record in records): + raise ValueError("One or more of the FASTX files is truncated!") + else: + records = cast(Tuple[FastxRecord, ...], records) + record_names = {record.name for record in records} + if len(record_names) != 1: + raise ValueError(f"FASTX record names do not all match: {record_names}") + return records + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> Optional[bool]: + """Exit the :class:`FastxZipped` context manager by closing all FASTX files.""" + for fastx in self._fastx: + fastx.close() + if exc_type is not None: + raise exc_type(exc_val).with_traceback(exc_tb) + return None diff --git a/fgpyo/fastx/tests/__init__.py b/fgpyo/fastx/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fgpyo/fastx/tests/test_fastx_zipped.py b/fgpyo/fastx/tests/test_fastx_zipped.py new file mode 100644 index 00000000..b9b5d6e6 --- /dev/null +++ b/fgpyo/fastx/tests/test_fastx_zipped.py @@ -0,0 +1,144 @@ +from copy import copy, deepcopy +from pathlib import Path + +import pytest + +from fgpyo.fastx import FastxZipped + +# TODO: Remove all the assert boilerplate when FastxRecord can be used for equality testing: +# https://github.com/pysam-developers/pysam/issues/1243 + + +def test_fastx_zipped_requires_at_least_one_fastx() -> None: + """Test that :class:`FastxZipped` requires at least one FASTX path to be instantiated.""" + with pytest.raises(ValueError, match=r"Must provide at least one FASTX"): + FastxZipped() + + +def test_fastx_zipped_iterates_one_empty_fastx(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over one empty FASTX file.""" + input = tmp_path / "input" + input.mkdir() + fastx = input / "input.fastx" + fastx.write_text("") + + with FastxZipped(fastx) as handle: + assert len(list(handle)) == 0 + + +def test_fastx_zipped_iterates_over_a_single_fasta(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over a single FASTA file.""" + input = tmp_path / "input" + input.mkdir() + fasta = input / "input.fasta" + fasta.write_text(">seq1\nACGT\n>seq2\nTGCA\n") + + context_manager = FastxZipped(fasta) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def test_fastx_zipped_iterates_over_a_single_fastq(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over a single FASTQ file.""" + input = tmp_path / "input" + input.mkdir() + fasta = input / "input.fasta" + fasta.write_text("@seq1\tcomment1\nACGT\n+\nFFFF\n" + "@seq2\tcomment2\nTGCA\n+\n!!!!\n") + + context_manager = FastxZipped(fasta) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + assert record1.comment == "comment1" + assert record1.quality == "FFFF" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + assert record2.comment == "comment2" + assert record2.quality == "!!!!" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` raises an exception on truncated FASTX files.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n") + fasta2.write_text(">seq1\nCCCC\n>seq2\nGGGG\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "CCCC" + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + context_manager = FastxZipped(fasta2, fasta1) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "CCCC" + assert record2.name == "seq1" + assert record2.sequence == "AAAA" + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_can_iterate_over_multiple_fastx_files(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over multiple FASTX files.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n") + fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "GGGG" + (record1, record2) = next(handle) + assert record1.name == "seq2" + assert record1.sequence == "CCCC" + assert record2.name == "seq2" + assert record2.sequence == "TTTT" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` raises an exception on mismatched sequence names.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n") + fasta2.write_text(">seq2\nCCCC\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + with pytest.raises(ValueError, match=r"FASTX record names do not all match"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx)