Skip to content

Commit

Permalink
Add a context manager for zipping FASTX files
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval committed Nov 24, 2023
1 parent 66b2200 commit 9fcf075
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 0 deletions.
7 changes: 7 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ FASTA files
.. automodule:: fgpyo.fasta.builder
:members:


FASTX files
===========

.. automodule:: fgpyo.fastx
:members:

Metric files
============

Expand Down
107 changes: 107 additions & 0 deletions fgpyo/fastx/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Tools for working with FASTA or FASTQ data
------------------------------------------
Examples of Zipping FASTX Files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Zipping a series of FASTA/FASTQ files in a single stream of data is a common task in bioinformatics
and be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager and iterator. The context
manager facilitates opening of all input FASTA/FASTQ files and and for every iteration will return
a tuple of the next :class:`~pysam.FastxRecord` records. An exception will be raised if any of the
input files are truncated or if record names are not equivalent and in sync.
Importantly, this context manager is optimized for fast streaming read-only usage and any previous
records saved while advancing the iterator will not be correct as the underlying pointer in memory
will refer to the most recent record, and not any past records. To save past records during
iteration use the builtin `copy()` or `deepcopy()` methods in the Python standard library.
.. code-block:: python
>>> from fgpyo.fastx import FastxZipped
>>> with FastxZipped("r1.fq", "r2.fq") as zipped:
... for (r1, r2) in zipped:
... print(f"{r1.name}: {r1.sequence}, {r2.name}: {r2.sequence}")
seq1: AAAA, seq1: CCCC
seq2: GGGG, seq2: TTTT
"""
from contextlib import AbstractContextManager
from pathlib import Path
from types import TracebackType
from typing import cast
from typing import Iterator
from typing import Optional
from typing import Union
from typing import Tuple
from typing import Type

from pysam import FastxFile, FastxRecord


class FastxZipped(AbstractContextManager, Iterator[Tuple[FastxRecord, ...]]):
"""A context manager that will lazily zip over any number of FASTA/FASTQ files.
Attributes:
paths: Paths to the FASTX files to zip over.
"""

def __init__(self, *paths: Union[Path, str]) -> None:
"""Insantiate a :class:`FastxZipped` context manager and iterator.
Args:
paths: Paths to the FASTX files to zip over.
"""
if len(paths) <= 0:
raise ValueError(f"Must provide at least one FASTX to {self.__class__.__name__}")
self.paths: Tuple[Union[Path, str], ...] = paths
self._fastx: Tuple[FastxFile, ...] = tuple()

def __enter__(self) -> "FastxZipped":
"""Enter the :class:`FastxZipped` context manager by opening all FASTX files."""
self._fastx = tuple(FastxFile(str(path), persist=False) for path in self.paths)
return self

def _next_record_safe(self, fastx: FastxFile) -> Optional[FastxRecord]:
"""Return the next record from a FASTX file or None if there are none left."""
try:
return next(fastx)
except StopIteration:
return None

def __next__(self) -> Tuple[FastxRecord, ...]:
"""Return the next set of FASTX records from the zipped FASTX files.
Warning:
Do not rely on previous values from :func:`~fgpyo.fastx.FastxZipped.next()`
as this iterator will not persist the string value stored in memory from
previous returns. This iterator is fast and meant for streaming read-only
use as a result of this design decision.
"""
records = tuple(self._next_record_safe(handle) for handle in self._fastx)
if all(record is None for record in records):
raise StopIteration
elif any(record is None for record in records):
raise ValueError("One or more of the FASTX files is truncated!")
else:
records = cast(Tuple[FastxRecord, ...], records)
record_names = {record.name for record in records}
if len(record_names) != 1:
raise ValueError(f"FASTX record names do not all match: {record_names}")
return records

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Optional[bool]:
"""Exit the :class:`FastxZipped` context manager by closing all FASTX files."""
for fastx in self._fastx:
fastx.close()
if exc_type is not None:
raise exc_type(exc_val).with_traceback(exc_tb)
return None
Empty file added fgpyo/fastx/tests/__init__.py
Empty file.
143 changes: 143 additions & 0 deletions fgpyo/fastx/tests/test_fastx_zipped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from pathlib import Path

import pytest

from fgpyo.fastx import FastxZipped

# TODO: Remove all the assert boilerplate when FastxRecord can be used for equality testing:
# https://github.com/pysam-developers/pysam/issues/1243


def test_fastx_zipped_requires_at_least_one_fastx() -> None:
"""Test that :class:`FastxZipped` requires at least one FASTX path to be instantiated."""
with pytest.raises(ValueError, match=r"Must provide at least one FASTX"):
FastxZipped()


def test_fastx_zipped_iterates_one_empty_fastx(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` can iterate over one empty FASTX file."""
input = tmp_path / "input"
input.mkdir()
fastx = input / "input.fastx"
fastx.write_text("")

with FastxZipped(fastx) as handle:
assert len(list(handle)) == 0


def test_fastx_zipped_iterates_over_a_single_fasta(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` can iterate over a single FASTA file."""
input = tmp_path / "input"
input.mkdir()
fasta = input / "input.fasta"
fasta.write_text(">seq1\nACGT\n>seq2\nTGCA\n")

context_manager = FastxZipped(fasta)
with context_manager as handle:
(record1,) = next(handle)
assert record1.name == "seq1"
assert record1.sequence == "ACGT"
(record2,) = next(handle)
assert record2.name == "seq2"
assert record2.sequence == "TGCA"

assert all(fastx.closed for fastx in context_manager._fastx)


def test_fastx_zipped_iterates_over_a_single_fastq(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` can iterate over a single FASTQ file."""
input = tmp_path / "input"
input.mkdir()
fasta = input / "input.fasta"
fasta.write_text("@seq1\tcomment1\nACGT\n+\nFFFF\n" + "@seq2\tcomment2\nTGCA\n+\n!!!!\n")

context_manager = FastxZipped(fasta)
with context_manager as handle:
(record1,) = next(handle)
assert record1.name == "seq1"
assert record1.sequence == "ACGT"
assert record1.comment == "comment1"
assert record1.quality == "FFFF"
(record2,) = next(handle)
assert record2.name == "seq2"
assert record2.sequence == "TGCA"
assert record2.comment == "comment2"
assert record2.quality == "!!!!"

assert all(fastx.closed for fastx in context_manager._fastx)


def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` raises an exception on truncated FASTX files."""
input = tmp_path / "input"
input.mkdir()
fasta1 = input / "input1.fasta"
fasta2 = input / "input2.fasta"
fasta1.write_text(">seq1\nAAAA\n")
fasta2.write_text(">seq1\nCCCC\n>seq2\nGGGG\n")

context_manager = FastxZipped(fasta1, fasta2)
with context_manager as handle:
(record1, record2) = next(handle)
assert record1.name == "seq1"
assert record1.sequence == "AAAA"
assert record2.name == "seq1"
assert record2.sequence == "CCCC"
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"):
next(handle)

assert all(fastx.closed for fastx in context_manager._fastx)

context_manager = FastxZipped(fasta2, fasta1)
with context_manager as handle:
(record1, record2) = next(handle)
assert record1.name == "seq1"
assert record1.sequence == "CCCC"
assert record2.name == "seq1"
assert record2.sequence == "AAAA"
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated!"):
next(handle)

assert all(fastx.closed for fastx in context_manager._fastx)


def tests_fastx_zipped_can_iterate_over_multiple_fastx_files(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` can iterate over multiple FASTX files."""
input = tmp_path / "input"
input.mkdir()
fasta1 = input / "input1.fasta"
fasta2 = input / "input2.fasta"
fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n")
fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n")

context_manager = FastxZipped(fasta1, fasta2)
with context_manager as handle:
(record1, record2) = next(handle)
assert record1.name == "seq1"
assert record1.sequence == "AAAA"
assert record2.name == "seq1"
assert record2.sequence == "GGGG"
(record1, record2) = next(handle)
assert record1.name == "seq2"
assert record1.sequence == "CCCC"
assert record2.name == "seq2"
assert record2.sequence == "TTTT"

assert all(fastx.closed for fastx in context_manager._fastx)


def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: Path) -> None:
"""Test that :class:`FastxZipped` raises an exception on mismatched sequence names."""
input = tmp_path / "input"
input.mkdir()
fasta1 = input / "input1.fasta"
fasta2 = input / "input2.fasta"
fasta1.write_text(">seq1\nAAAA\n")
fasta2.write_text(">seq2\nCCCC\n")

context_manager = FastxZipped(fasta1, fasta2)
with context_manager as handle:
with pytest.raises(ValueError, match=r"FASTX record names do not all match"):
next(handle)

assert all(fastx.closed for fastx in context_manager._fastx)

0 comments on commit 9fcf075

Please sign in to comment.