diff --git a/docs/api.rst b/docs/api.rst index fa0e833a..dc93a9e4 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -33,6 +33,13 @@ FASTA files .. automodule:: fgpyo.fasta.builder :members: + +FASTX files +=========== + +.. automodule:: fgpyo.fastx + :members: + Metric files ============ diff --git a/fgpyo/fastx/__init__.py b/fgpyo/fastx/__init__.py new file mode 100644 index 00000000..13ab9745 --- /dev/null +++ b/fgpyo/fastx/__init__.py @@ -0,0 +1,99 @@ +""" +Zipping FASTX Files +~~~~~~~~~~~~~~~~~~~ + +Zipping a set of FASTA/FASTQ files into a single stream of data is a common task in bioinformatics +and can be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager. The context manager +facilitates opening of all input FASTA/FASTQ files and closing them after iteration is complete. +For every iteration of :class:`~fgpyo.fastx.FastxZipped`, a tuple of the next FASTX records are +returned (of type :class:`~pysam.FastxRecord`). An exception will be raised if any of the input +files are malformed or truncated and if record names are not equivalent and in sync. + +Importantly, this context manager is optimized for fast streaming read-only usage and, by default, +any previous records saved while advancing the iterator will not be correct as the underlying +pointer in memory will refer to the most recent record only, and not any past records. To preserve +the state of all previously iterated records, set the parameter ``persist`` to :py:obj:`True`. + +.. code-block:: python + + >>> from fgpyo.fastx import FastxZipped + >>> with FastxZipped("r1.fq", "r2.fq", persist=False) as zipped: + ... for (r1, r2) in zipped: + ... print(f"{r1.name}: {r1.sequence}, {r2.name}: {r2.sequence}") + seq1: AAAA, seq1: CCCC + seq2: GGGG, seq2: TTTT + +""" +from contextlib import AbstractContextManager +from pathlib import Path +from types import TracebackType +from typing import cast +from typing import Iterator +from typing import Optional +from typing import Union +from typing import Set +from typing import Tuple +from typing import Type + +from pysam import FastxFile, FastxRecord + + +class FastxZipped(AbstractContextManager, Iterator[Tuple[FastxRecord, ...]]): + """A context manager that will lazily zip over any number of FASTA/FASTQ files. + + Args: + paths: Paths to the FASTX files to zip over. + persist: Whether to persit the state of previous records during iteration. + + """ + + def __init__(self, *paths: Union[Path, str], persist: bool = False) -> None: + """Insantiate a :class:`FastxZipped` context manager and iterator.""" + if len(paths) <= 0: + raise ValueError(f"Must provide at least one FASTX to {self.__class__.__name__}") + self._persist: bool = persist + self._paths: Tuple[Union[Path, str], ...] = paths + self._fastx = tuple(FastxFile(str(path), persist=self._persist) for path in self._paths) + + @staticmethod + def _name_minus_ordinal(name: str) -> str: + """Return the name of the FASTX record minus its ordinal suffix (e.g. "/1" or "/2").""" + return name[: len(name) - 2] if len(name) >= 2 and name[-2] == "/" else name + + def __next__(self) -> Tuple[FastxRecord, ...]: + """Return the next set of FASTX records from the zipped FASTX files.""" + records = tuple(next(handle, None) for handle in self._fastx) + if all(record is None for record in records): + raise StopIteration + elif any(record is None for record in records): + sequence_name: str = [record.name for record in records if record is not None][0] + raise ValueError( + "One or more of the FASTX files is truncated for sequence " + + f"{self._name_minus_ordinal(sequence_name)}:\n\t" + + "\n\t".join( + str(self._paths[i]) for i, record in enumerate(records) if record is None + ) + ) + else: + records = cast(Tuple[FastxRecord, ...], records) + record_names: Set[str] = {self._name_minus_ordinal(record.name) for record in records} + if len(record_names) != 1: + raise ValueError(f"FASTX record names do not all match: {record_names}") + return records + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> Optional[bool]: + """Exit the :class:`FastxZipped` context manager by closing all FASTX files.""" + self.close() + if exc_type is not None: + raise exc_type(exc_val).with_traceback(exc_tb) + return None + + def close(self) -> None: + """Close the :class:`FastxZipped` context manager by closing all FASTX files.""" + for fastx in self._fastx: + fastx.close() diff --git a/fgpyo/fastx/tests/__init__.py b/fgpyo/fastx/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fgpyo/fastx/tests/test_fastx_zipped.py b/fgpyo/fastx/tests/test_fastx_zipped.py new file mode 100644 index 00000000..95305065 --- /dev/null +++ b/fgpyo/fastx/tests/test_fastx_zipped.py @@ -0,0 +1,228 @@ +import gzip + +from pathlib import Path + +import pytest + +from fgpyo.fastx import FastxZipped + +# TODO: Remove all the assert boilerplate when FastxRecord can be used for equality testing: +# https://github.com/pysam-developers/pysam/issues/1243 + + +def test_fastx_zipped_requires_at_least_one_fastx() -> None: + """Test that :class:`FastxZipped` requires at least one FASTX path to be instantiated.""" + with pytest.raises(ValueError, match=r"Must provide at least one FASTX"): + FastxZipped() + + +def test_fastx_zipped_iterates_one_empty_fastx(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over one empty FASTX file.""" + input = tmp_path / "input" + input.mkdir() + fastx = input / "input.fastx" + fastx.write_text("") + + with FastxZipped(fastx) as handle: + assert len(list(handle)) == 0 + + +def test_fastx_zipped_iterates_over_a_single_fasta(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over a single FASTA file.""" + input = tmp_path / "input" + input.mkdir() + fasta = input / "input.fasta" + fasta.write_text(">seq1\nACGT\n>seq2\nTGCA\n") + + context_manager = FastxZipped(fasta) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def test_fastx_zipped_iterates_over_a_single_fasta_gzipped(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over a single gzipped FASTA file.""" + input = tmp_path / "input" + input.mkdir() + fasta = input / "input.fasta.gz" + + with gzip.open(fasta, "wt") as handle: + handle.write(">seq1\nACGT\n>seq2\nTGCA\n") + + context_manager = FastxZipped(fasta) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def test_fastx_zipped_iterates_over_a_single_fastq(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over a single FASTQ file.""" + input = tmp_path / "input" + input.mkdir() + fastq = input / "input.fastq" + fastq.write_text("@seq1\tcomment1\nACGT\n+\nFFFF\n" + "@seq2\tcomment2\nTGCA\n+\n!!!!\n") + + context_manager = FastxZipped(fastq) + with context_manager as handle: + (record1,) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "ACGT" + assert record1.comment == "comment1" + assert record1.quality == "FFFF" + (record2,) = next(handle) + assert record2.name == "seq2" + assert record2.sequence == "TGCA" + assert record2.comment == "comment2" + assert record2.quality == "!!!!" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` raises an exception on truncated FASTX files.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n") + fasta2.write_text(">seq1\nCCCC\n>seq2\nGGGG\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "CCCC" + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + context_manager = FastxZipped(fasta2, fasta1) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "CCCC" + assert record2.name == "seq1" + assert record2.sequence == "AAAA" + with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_can_iterate_over_multiple_fastx_files(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can iterate over multiple FASTX files.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n") + fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "GGGG" + (record1, record2) = next(handle) + assert record1.name == "seq2" + assert record1.sequence == "CCCC" + assert record2.name == "seq2" + assert record2.sequence == "TTTT" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` raises an exception on mismatched sequence names.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n") + fasta2.write_text(">seq2\nCCCC\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + with pytest.raises(ValueError, match=r"FASTX record names do not all match"): + next(handle) + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped_handles_sequence_names_with_suffixes(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` does not use sequence name suffixes in equality tests.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1/1\nAAAA\n") + fasta2.write_text(">seq1/2\nCCCC\n") + + context_manager = FastxZipped(fasta1, fasta2) + with context_manager as handle: + (record1, record2) = next(handle) + assert record1.name == "seq1/1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1/2" + assert record2.sequence == "CCCC" + + assert all(fastx.closed for fastx in context_manager._fastx) + + +def tests_fastx_zipped__name_minus_ordinal_works_with_r1_and_r2_ordinals() -> None: + """Test that :class:`FastxZipped._name_minus_ordinal` works with none, R1, and R2 ordinals.""" + assert FastxZipped._name_minus_ordinal("") == "" + assert FastxZipped._name_minus_ordinal("/1") == "" + assert FastxZipped._name_minus_ordinal("/2") == "" + assert FastxZipped._name_minus_ordinal("seq1") == "seq1" + assert FastxZipped._name_minus_ordinal("seq1/1") == "seq1" + assert FastxZipped._name_minus_ordinal("seq1/2") == "seq1" + assert FastxZipped._name_minus_ordinal("1") == "1" + assert FastxZipped._name_minus_ordinal("1/1") == "1" + assert FastxZipped._name_minus_ordinal("1/2") == "1" + + +def test_fastx_zipped_accidentally_used_as_iterator_only(tmp_path: Path) -> None: + """Test that :class:`FastxZipped` can also be used as an interator outside a context manager.""" + input = tmp_path / "input" + input.mkdir() + fasta1 = input / "input1.fasta" + fasta2 = input / "input2.fasta" + fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n") + fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n") + + zipped = FastxZipped(fasta1, fasta2) + (record1, record2) = next(zipped) + assert record1.name == "seq1" + assert record1.sequence == "AAAA" + assert record2.name == "seq1" + assert record2.sequence == "GGGG" + (record1, record2) = next(zipped) + assert record1.name == "seq2" + assert record1.sequence == "CCCC" + assert record2.name == "seq2" + assert record2.sequence == "TTTT" + + with pytest.raises(StopIteration): + next(zipped) + + assert all(not fastx.closed for fastx in zipped._fastx) + zipped.close() + assert all(fastx.closed for fastx in zipped._fastx)