From d3a3aa66a8b0630aee54746578958fb6c596df84 Mon Sep 17 00:00:00 2001
From: clintval <valentine.clint@gmail.com>
Date: Fri, 24 Nov 2023 11:24:48 -0800
Subject: [PATCH] Add a context manager for zipping FASTX files

---
 docs/api.rst                           |   7 ++
 fgpyo/fastx/__init__.py                |  97 +++++++++++++++++
 fgpyo/fastx/tests/__init__.py          |   0
 fgpyo/fastx/tests/test_fastx_zipped.py | 144 +++++++++++++++++++++++++
 4 files changed, 248 insertions(+)
 create mode 100644 fgpyo/fastx/__init__.py
 create mode 100644 fgpyo/fastx/tests/__init__.py
 create mode 100644 fgpyo/fastx/tests/test_fastx_zipped.py

diff --git a/docs/api.rst b/docs/api.rst
index 63da3ba7..c3e54ec4 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -24,6 +24,13 @@ FASTA files
 .. automodule:: fgpyo.fasta.builder
    :members:
 
+
+FASTX files
+===========
+
+.. automodule:: fgpyo.fastx
+   :members:
+
 Metric files
 ============
 
diff --git a/fgpyo/fastx/__init__.py b/fgpyo/fastx/__init__.py
new file mode 100644
index 00000000..576d5a52
--- /dev/null
+++ b/fgpyo/fastx/__init__.py
@@ -0,0 +1,97 @@
+"""
+Tools for working with FASTA or FASTQ data
+------------------------------------------
+
+Examples of Zipping FASTX Files
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Zipping a series of FASTA/FASTQ files into a single stream of data is a common task in
+bioinformatics and can be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager. The
+context manager facilitates opening of all input FASTA/FASTQ files and closing them after iteration
+is complete. For every iteration of :class:`~fgpyo.fastx.FastxZipped`, a tuple of the next
+:class:`~pysam.FastxRecord` records is returned. An exception will be raised if any of the input
+files are malformed or truncated and if record names are not equivalent and in sync.
+
+Importantly, this context manager is optimized for fast streaming read-only usage and, by default,
+any previous records saved while advancing the iterator will not be correct as the underlying
+pointer in memory will refer to the most recent record only, and not any past records. To preserve
+the state of all previously iterated records, set the parameter ``persist`` to :py:obj:`True`.
+
+.. code-block:: python
+
+   >>> from fgpyo.fastx import FastxZipped
+   >>> with FastxZipped("r1.fq", "r2.fq") as zipped:
+   ...    for (r1, r2) in zipped:
+   ...         print(f"{r1.name}: {r1.sequence}, {r2.name}: {r2.sequence}")
+   seq1: AAAA, seq1: CCCC
+   seq2: GGGG, seq2: TTTT
+
+"""
+from contextlib import AbstractContextManager
+from pathlib import Path
+from types import TracebackType
+from typing import cast
+from typing import Iterator
+from typing import Optional
+from typing import Union
+from typing import Tuple
+from typing import Type
+
+from pysam import FastxFile, FastxRecord
+
+
+class FastxZipped(AbstractContextManager, Iterator[Tuple[FastxRecord, ...]]):
+    """A context manager that will lazily zip over any number of FASTA/FASTQ files."""
+
+    def __init__(self, *paths: Union[Path, str], persist: bool = False) -> None:
+        """Insantiate a :class:`FastxZipped` context manager and iterator.
+
+        Args:
+            paths: Paths to the FASTX files to zip over.
+            persist: Whether to allow previous records to be saved while advancing the iterator.
+
+        """
+        if len(paths) <= 0:
+            raise ValueError(f"Must provide at least one FASTX to {self.__class__.__name__}")
+        self._persist: bool = persist
+        self._paths: Tuple[Union[Path, str], ...] = paths
+        self._fastx: Tuple[FastxFile, ...] = tuple()
+
+    def __enter__(self) -> "FastxZipped":
+        """Enter the :class:`FastxZipped` context manager by opening all FASTX files."""
+        self._fastx = tuple(FastxFile(str(path), persist=self._persist) for path in self._paths)
+        return self
+
+    def _next_record_safe(self, fastx: FastxFile) -> Optional[FastxRecord]:
+        """Return the next record from a FASTX file or :py:obj:`None` if there are none left."""
+        try:
+            return next(fastx)
+        except StopIteration:
+            return None
+
+    def __next__(self) -> Tuple[FastxRecord, ...]:
+        """Return the next set of FASTX records from the zipped FASTX files."""
+        records = tuple(self._next_record_safe(handle) for handle in self._fastx)
+        if all(record is None for record in records):
+            raise StopIteration
+        elif any(record is None for record in records):
+            raise ValueError("One or more of the FASTX files is truncated!")
+        else:
+            records = cast(Tuple[FastxRecord, ...], records)
+            record_names = {record.name for record in records}
+            if len(record_names) != 1:
+                raise ValueError(f"FASTX record names do not all match: {record_names}")
+            return records
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        """Exit the :class:`FastxZipped` context manager by closing all FASTX files."""
+        for fastx in self._fastx:
+            fastx.close()
+        if exc_type is not None:
+            raise exc_type(exc_val).with_traceback(exc_tb)
+        return None
diff --git a/fgpyo/fastx/tests/__init__.py b/fgpyo/fastx/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/fgpyo/fastx/tests/test_fastx_zipped.py b/fgpyo/fastx/tests/test_fastx_zipped.py
new file mode 100644
index 00000000..b9b5d6e6
--- /dev/null
+++ b/fgpyo/fastx/tests/test_fastx_zipped.py
@@ -0,0 +1,144 @@
+from copy import copy, deepcopy
+from pathlib import Path
+
+import pytest
+
+from fgpyo.fastx import FastxZipped
+
+# TODO: Remove all the assert boilerplate when FastxRecord can be used for equality testing:
+#       https://github.com/pysam-developers/pysam/issues/1243
+
+
+def test_fastx_zipped_requires_at_least_one_fastx() -> None:
+    """Test that :class:`FastxZipped` requires at least one FASTX path to be instantiated."""
+    with pytest.raises(ValueError, match=r"Must provide at least one FASTX"):
+        FastxZipped()
+
+
+def test_fastx_zipped_iterates_one_empty_fastx(tmp_path: Path) -> None:
+    """Test that :class:`FastxZipped` can iterate over one empty FASTX file."""
+    input = tmp_path / "input"
+    input.mkdir()
+    fastx = input / "input.fastx"
+    fastx.write_text("")
+
+    with FastxZipped(fastx) as handle:
+        assert len(list(handle)) == 0
+
+
+def test_fastx_zipped_iterates_over_a_single_fasta(tmp_path: Path) -> None:
+    """Test that :class:`FastxZipped` can iterate over a single FASTA file."""
+    input = tmp_path / "input"
+    input.mkdir()
+    fasta = input / "input.fasta"
+    fasta.write_text(">seq1\nACGT\n>seq2\nTGCA\n")
+
+    context_manager = FastxZipped(fasta)
+    with context_manager as handle:
+        (record1,) = next(handle)
+        assert record1.name == "seq1"
+        assert record1.sequence == "ACGT"
+        (record2,) = next(handle)
+        assert record2.name == "seq2"
+        assert record2.sequence == "TGCA"
+
+    assert all(fastx.closed for fastx in context_manager._fastx)
+
+
+def test_fastx_zipped_iterates_over_a_single_fastq(tmp_path: Path) -> None:
+    """Test that :class:`FastxZipped` can iterate over a single FASTQ file."""
+    input = tmp_path / "input"
+    input.mkdir()
+    fasta = input / "input.fasta"
+    fasta.write_text("@seq1\tcomment1\nACGT\n+\nFFFF\n" + "@seq2\tcomment2\nTGCA\n+\n!!!!\n")
+
+    context_manager = FastxZipped(fasta)
+    with context_manager as handle:
+        (record1,) = next(handle)
+        assert record1.name == "seq1"
+        assert record1.sequence == "ACGT"
+        assert record1.comment == "comment1"
+        assert record1.quality == "FFFF"
+        (record2,) = next(handle)
+        assert record2.name == "seq2"
+        assert record2.sequence == "TGCA"
+        assert record2.comment == "comment2"
+        assert record2.quality == "!!!!"
+
+    assert all(fastx.closed for fastx in context_manager._fastx)
+
+
+def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> None:
+    """Test that :class:`FastxZipped` raises an exception on truncated FASTX files."""
+    input = tmp_path / "input"
+    input.mkdir()
+    fasta1 = input / "input1.fasta"
+    fasta2 = input / "input2.fasta"
+    fasta1.write_text(">seq1\nAAAA\n")
+    fasta2.write_text(">seq1\nCCCC\n>seq2\nGGGG\n")
+
+    context_manager = FastxZipped(fasta1, fasta2)
+    with context_manager as handle:
+        (record1, record2) = next(handle)
+        assert record1.name == "seq1"
+        assert record1.sequence == "AAAA"
+        assert record2.name == "seq1"
+        assert record2.sequence == "CCCC"
+        with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"):
+            next(handle)
+
+    assert all(fastx.closed for fastx in context_manager._fastx)
+
+    context_manager = FastxZipped(fasta2, fasta1)
+    with context_manager as handle:
+        (record1, record2) = next(handle)
+        assert record1.name == "seq1"
+        assert record1.sequence == "CCCC"
+        assert record2.name == "seq1"
+        assert record2.sequence == "AAAA"
+        with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"):
+            next(handle)
+
+    assert all(fastx.closed for fastx in context_manager._fastx)
+
+
+def tests_fastx_zipped_can_iterate_over_multiple_fastx_files(tmp_path: Path) -> None:
+    """Test that :class:`FastxZipped` can iterate over multiple FASTX files."""
+    input = tmp_path / "input"
+    input.mkdir()
+    fasta1 = input / "input1.fasta"
+    fasta2 = input / "input2.fasta"
+    fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n")
+    fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n")
+
+    context_manager = FastxZipped(fasta1, fasta2)
+    with context_manager as handle:
+        (record1, record2) = next(handle)
+        assert record1.name == "seq1"
+        assert record1.sequence == "AAAA"
+        assert record2.name == "seq1"
+        assert record2.sequence == "GGGG"
+        (record1, record2) = next(handle)
+        assert record1.name == "seq2"
+        assert record1.sequence == "CCCC"
+        assert record2.name == "seq2"
+        assert record2.sequence == "TTTT"
+
+    assert all(fastx.closed for fastx in context_manager._fastx)
+
+
+def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: Path) -> None:
+    """Test that :class:`FastxZipped` raises an exception on mismatched sequence names."""
+    input = tmp_path / "input"
+    input.mkdir()
+    fasta1 = input / "input1.fasta"
+    fasta2 = input / "input2.fasta"
+    fasta1.write_text(">seq1\nAAAA\n")
+    fasta2.write_text(">seq2\nCCCC\n")
+
+    context_manager = FastxZipped(fasta1, fasta2)
+    with context_manager as handle:
+        with pytest.raises(ValueError, match=r"FASTX record names do not all match"):
+            next(handle)
+
+    assert all(fastx.closed for fastx in context_manager._fastx)