-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a context manager for zipping FASTX files (#77)
- Loading branch information
Showing
4 changed files
with
334 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
""" | ||
Zipping FASTX Files | ||
~~~~~~~~~~~~~~~~~~~ | ||
Zipping a set of FASTA/FASTQ files into a single stream of data is a common task in bioinformatics | ||
and can be achieved with the :class:`~fgpyo.fastx.FastxZipped` context manager. The context manager | ||
facilitates opening of all input FASTA/FASTQ files and closing them after iteration is complete. | ||
For every iteration of :class:`~fgpyo.fastx.FastxZipped`, a tuple of the next FASTX records are | ||
returned (of type :class:`~pysam.FastxRecord`). An exception will be raised if any of the input | ||
files are malformed or truncated and if record names are not equivalent and in sync. | ||
Importantly, this context manager is optimized for fast streaming read-only usage and, by default, | ||
any previous records saved while advancing the iterator will not be correct as the underlying | ||
pointer in memory will refer to the most recent record only, and not any past records. To preserve | ||
the state of all previously iterated records, set the parameter ``persist`` to :py:obj:`True`. | ||
.. code-block:: python | ||
>>> from fgpyo.fastx import FastxZipped | ||
>>> with FastxZipped("r1.fq", "r2.fq", persist=False) as zipped: | ||
... for (r1, r2) in zipped: | ||
... print(f"{r1.name}: {r1.sequence}, {r2.name}: {r2.sequence}") | ||
seq1: AAAA, seq1: CCCC | ||
seq2: GGGG, seq2: TTTT | ||
""" | ||
from contextlib import AbstractContextManager | ||
from pathlib import Path | ||
from types import TracebackType | ||
from typing import cast | ||
from typing import Iterator | ||
from typing import Optional | ||
from typing import Union | ||
from typing import Set | ||
from typing import Tuple | ||
from typing import Type | ||
|
||
from pysam import FastxFile, FastxRecord | ||
|
||
|
||
class FastxZipped(AbstractContextManager, Iterator[Tuple[FastxRecord, ...]]): | ||
"""A context manager that will lazily zip over any number of FASTA/FASTQ files. | ||
Args: | ||
paths: Paths to the FASTX files to zip over. | ||
persist: Whether to persit the state of previous records during iteration. | ||
""" | ||
|
||
def __init__(self, *paths: Union[Path, str], persist: bool = False) -> None: | ||
"""Insantiate a :class:`FastxZipped` context manager and iterator.""" | ||
if len(paths) <= 0: | ||
raise ValueError(f"Must provide at least one FASTX to {self.__class__.__name__}") | ||
self._persist: bool = persist | ||
self._paths: Tuple[Union[Path, str], ...] = paths | ||
self._fastx = tuple(FastxFile(str(path), persist=self._persist) for path in self._paths) | ||
|
||
@staticmethod | ||
def _name_minus_ordinal(name: str) -> str: | ||
"""Return the name of the FASTX record minus its ordinal suffix (e.g. "/1" or "/2").""" | ||
return name[: len(name) - 2] if len(name) >= 2 and name[-2] == "/" else name | ||
|
||
def __next__(self) -> Tuple[FastxRecord, ...]: | ||
"""Return the next set of FASTX records from the zipped FASTX files.""" | ||
records = tuple(next(handle, None) for handle in self._fastx) | ||
if all(record is None for record in records): | ||
raise StopIteration | ||
elif any(record is None for record in records): | ||
sequence_name: str = [record.name for record in records if record is not None][0] | ||
raise ValueError( | ||
"One or more of the FASTX files is truncated for sequence " | ||
+ f"{self._name_minus_ordinal(sequence_name)}:\n\t" | ||
+ "\n\t".join( | ||
str(self._paths[i]) for i, record in enumerate(records) if record is None | ||
) | ||
) | ||
else: | ||
records = cast(Tuple[FastxRecord, ...], records) | ||
record_names: Set[str] = {self._name_minus_ordinal(record.name) for record in records} | ||
if len(record_names) != 1: | ||
raise ValueError(f"FASTX record names do not all match: {record_names}") | ||
return records | ||
|
||
def __exit__( | ||
self, | ||
exc_type: Optional[Type[BaseException]], | ||
exc_val: Optional[BaseException], | ||
exc_tb: Optional[TracebackType], | ||
) -> Optional[bool]: | ||
"""Exit the :class:`FastxZipped` context manager by closing all FASTX files.""" | ||
self.close() | ||
if exc_type is not None: | ||
raise exc_type(exc_val).with_traceback(exc_tb) | ||
return None | ||
|
||
def close(self) -> None: | ||
"""Close the :class:`FastxZipped` context manager by closing all FASTX files.""" | ||
for fastx in self._fastx: | ||
fastx.close() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
import gzip | ||
|
||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from fgpyo.fastx import FastxZipped | ||
|
||
# TODO: Remove all the assert boilerplate when FastxRecord can be used for equality testing: | ||
# https://github.com/pysam-developers/pysam/issues/1243 | ||
|
||
|
||
def test_fastx_zipped_requires_at_least_one_fastx() -> None: | ||
"""Test that :class:`FastxZipped` requires at least one FASTX path to be instantiated.""" | ||
with pytest.raises(ValueError, match=r"Must provide at least one FASTX"): | ||
FastxZipped() | ||
|
||
|
||
def test_fastx_zipped_iterates_one_empty_fastx(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` can iterate over one empty FASTX file.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fastx = input / "input.fastx" | ||
fastx.write_text("") | ||
|
||
with FastxZipped(fastx) as handle: | ||
assert len(list(handle)) == 0 | ||
|
||
|
||
def test_fastx_zipped_iterates_over_a_single_fasta(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` can iterate over a single FASTA file.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta = input / "input.fasta" | ||
fasta.write_text(">seq1\nACGT\n>seq2\nTGCA\n") | ||
|
||
context_manager = FastxZipped(fasta) | ||
with context_manager as handle: | ||
(record1,) = next(handle) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "ACGT" | ||
(record2,) = next(handle) | ||
assert record2.name == "seq2" | ||
assert record2.sequence == "TGCA" | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def test_fastx_zipped_iterates_over_a_single_fasta_gzipped(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` can iterate over a single gzipped FASTA file.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta = input / "input.fasta.gz" | ||
|
||
with gzip.open(fasta, "wt") as handle: | ||
handle.write(">seq1\nACGT\n>seq2\nTGCA\n") | ||
|
||
context_manager = FastxZipped(fasta) | ||
with context_manager as handle: | ||
(record1,) = next(handle) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "ACGT" | ||
(record2,) = next(handle) | ||
assert record2.name == "seq2" | ||
assert record2.sequence == "TGCA" | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def test_fastx_zipped_iterates_over_a_single_fastq(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` can iterate over a single FASTQ file.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fastq = input / "input.fastq" | ||
fastq.write_text("@seq1\tcomment1\nACGT\n+\nFFFF\n" + "@seq2\tcomment2\nTGCA\n+\n!!!!\n") | ||
|
||
context_manager = FastxZipped(fastq) | ||
with context_manager as handle: | ||
(record1,) = next(handle) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "ACGT" | ||
assert record1.comment == "comment1" | ||
assert record1.quality == "FFFF" | ||
(record2,) = next(handle) | ||
assert record2.name == "seq2" | ||
assert record2.sequence == "TGCA" | ||
assert record2.comment == "comment2" | ||
assert record2.quality == "!!!!" | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def tests_fastx_zipped_raises_exception_on_truncated_fastx(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` raises an exception on truncated FASTX files.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta1 = input / "input1.fasta" | ||
fasta2 = input / "input2.fasta" | ||
fasta1.write_text(">seq1\nAAAA\n") | ||
fasta2.write_text(">seq1\nCCCC\n>seq2\nGGGG\n") | ||
|
||
context_manager = FastxZipped(fasta1, fasta2) | ||
with context_manager as handle: | ||
(record1, record2) = next(handle) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "AAAA" | ||
assert record2.name == "seq1" | ||
assert record2.sequence == "CCCC" | ||
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"): | ||
next(handle) | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
context_manager = FastxZipped(fasta2, fasta1) | ||
with context_manager as handle: | ||
(record1, record2) = next(handle) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "CCCC" | ||
assert record2.name == "seq1" | ||
assert record2.sequence == "AAAA" | ||
with pytest.raises(ValueError, match=r"One or more of the FASTX files is truncated.*"): | ||
next(handle) | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def tests_fastx_zipped_can_iterate_over_multiple_fastx_files(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` can iterate over multiple FASTX files.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta1 = input / "input1.fasta" | ||
fasta2 = input / "input2.fasta" | ||
fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n") | ||
fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n") | ||
|
||
context_manager = FastxZipped(fasta1, fasta2) | ||
with context_manager as handle: | ||
(record1, record2) = next(handle) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "AAAA" | ||
assert record2.name == "seq1" | ||
assert record2.sequence == "GGGG" | ||
(record1, record2) = next(handle) | ||
assert record1.name == "seq2" | ||
assert record1.sequence == "CCCC" | ||
assert record2.name == "seq2" | ||
assert record2.sequence == "TTTT" | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def tests_fastx_zipped_raises_exception_on_mismatched_sequence_names(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` raises an exception on mismatched sequence names.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta1 = input / "input1.fasta" | ||
fasta2 = input / "input2.fasta" | ||
fasta1.write_text(">seq1\nAAAA\n") | ||
fasta2.write_text(">seq2\nCCCC\n") | ||
|
||
context_manager = FastxZipped(fasta1, fasta2) | ||
with context_manager as handle: | ||
with pytest.raises(ValueError, match=r"FASTX record names do not all match"): | ||
next(handle) | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def tests_fastx_zipped_handles_sequence_names_with_suffixes(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` does not use sequence name suffixes in equality tests.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta1 = input / "input1.fasta" | ||
fasta2 = input / "input2.fasta" | ||
fasta1.write_text(">seq1/1\nAAAA\n") | ||
fasta2.write_text(">seq1/2\nCCCC\n") | ||
|
||
context_manager = FastxZipped(fasta1, fasta2) | ||
with context_manager as handle: | ||
(record1, record2) = next(handle) | ||
assert record1.name == "seq1/1" | ||
assert record1.sequence == "AAAA" | ||
assert record2.name == "seq1/2" | ||
assert record2.sequence == "CCCC" | ||
|
||
assert all(fastx.closed for fastx in context_manager._fastx) | ||
|
||
|
||
def tests_fastx_zipped__name_minus_ordinal_works_with_r1_and_r2_ordinals() -> None: | ||
"""Test that :class:`FastxZipped._name_minus_ordinal` works with none, R1, and R2 ordinals.""" | ||
assert FastxZipped._name_minus_ordinal("") == "" | ||
assert FastxZipped._name_minus_ordinal("/1") == "" | ||
assert FastxZipped._name_minus_ordinal("/2") == "" | ||
assert FastxZipped._name_minus_ordinal("seq1") == "seq1" | ||
assert FastxZipped._name_minus_ordinal("seq1/1") == "seq1" | ||
assert FastxZipped._name_minus_ordinal("seq1/2") == "seq1" | ||
assert FastxZipped._name_minus_ordinal("1") == "1" | ||
assert FastxZipped._name_minus_ordinal("1/1") == "1" | ||
assert FastxZipped._name_minus_ordinal("1/2") == "1" | ||
|
||
|
||
def test_fastx_zipped_accidentally_used_as_iterator_only(tmp_path: Path) -> None: | ||
"""Test that :class:`FastxZipped` can also be used as an interator outside a context manager.""" | ||
input = tmp_path / "input" | ||
input.mkdir() | ||
fasta1 = input / "input1.fasta" | ||
fasta2 = input / "input2.fasta" | ||
fasta1.write_text(">seq1\nAAAA\n>seq2\nCCCC\n") | ||
fasta2.write_text(">seq1\nGGGG\n>seq2\nTTTT\n") | ||
|
||
zipped = FastxZipped(fasta1, fasta2) | ||
(record1, record2) = next(zipped) | ||
assert record1.name == "seq1" | ||
assert record1.sequence == "AAAA" | ||
assert record2.name == "seq1" | ||
assert record2.sequence == "GGGG" | ||
(record1, record2) = next(zipped) | ||
assert record1.name == "seq2" | ||
assert record1.sequence == "CCCC" | ||
assert record2.name == "seq2" | ||
assert record2.sequence == "TTTT" | ||
|
||
with pytest.raises(StopIteration): | ||
next(zipped) | ||
|
||
assert all(not fastx.closed for fastx in zipped._fastx) | ||
zipped.close() | ||
assert all(fastx.closed for fastx in zipped._fastx) |