diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9437b81..e53bd31 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -19,11 +19,11 @@ jobs: PYTHON_VERSION: ["3.11", "3.12"] steps: - uses: actions/checkout@v4 - - name: Checkout fulcrumgenomics/bwa + - name: Checkout fulcrumgenomics/bwa-aln-interactive uses: actions/checkout@v4 with: - repository: fulcrumgenomics/bwa - ref: interactive_aln + repository: fulcrumgenomics/bwa-aln-interactive + ref: main path: bwa fetch-depth: 0 diff --git a/prymer/offtarget/bwa.py b/prymer/offtarget/bwa.py index 7c4bd47..b266e2f 100644 --- a/prymer/offtarget/bwa.py +++ b/prymer/offtarget/bwa.py @@ -33,6 +33,7 @@ >>> bwa.map_all(queries=[query]) [BwaResult(query=Query(id='NA', bases='AAAAAA'), hit_count=3968, hits=[])] >>> bwa.close() +True ``` """ # noqa: E501 @@ -45,10 +46,10 @@ from typing import cast import pysam -from fgpyo import sam from fgpyo import sequence from fgpyo.sam import Cigar from pysam import AlignedSegment +from pysam import AlignmentHeader from prymer.api import coordmath from prymer.util.executable_runner import ExecutableRunner @@ -201,6 +202,7 @@ class BwaAlnInteractive(ExecutableRunner): reverse_complement: reverse complement each query sequence before alignment. include_alt_hits: if True include hits to references with names ending in _alt, otherwise do not include them. + header: the SAM alignment header. """ def __init__( @@ -284,20 +286,14 @@ def __init__( super().__init__(command=command) - # HACK ALERT - # This is a hack. By trial and error, pysam.AlignmentFile() will block reading unless - # there's at least a few records due to htslib wanting to read a few records for format - # auto-detection. Lame. So a hundred queries are sent to the aligner to align enable the - # htslib auto-detection to complete, and for us to be able to read using pysam. - num_warmup: int = 100 - for i in range(num_warmup): - query = Query(id=f"ignoreme:{i}", bases="A" * 100) - fastq_str = query.to_fastq(reverse_complement=self.reverse_complement) - self._subprocess.stdin.write(fastq_str) - self.__signal_bwa() # forces the input to be sent to the underlying process. - self._reader = sam.reader(path=self._subprocess.stdout, file_type=sam.SamFileType.SAM) - for _ in range(num_warmup): - next(self._reader) + header = [] + for line in self._subprocess.stdout: + if line.startswith("@"): + header.append(line) + if line.startswith("@PG"): + break + + self.header = AlignmentHeader.from_text("".join(header)) def __signal_bwa(self) -> None: """Signals BWA to process the queries""" @@ -334,13 +330,18 @@ def map_all(self, queries: list[Query]) -> list[BwaResult]: for query in queries: fastq_str = query.to_fastq(reverse_complement=self.reverse_complement) self._subprocess.stdin.write(fastq_str) - self.__signal_bwa() # forces the input to be sent to the underlying process. + + # Force the input to be sent to the underlying process. + self.__signal_bwa() # Read back the results results: list[BwaResult] = [] for query in queries: # get the next alignment and convert to a result - results.append(self._to_result(query=query, rec=next(self._reader))) + line: str = next(self._subprocess.stdout).strip() + assert not line.startswith("@"), f"SAM record must not start with '@'! {line}" + alignment = AlignedSegment.fromstring(line, self.header) + results.append(self._to_result(query=query, rec=alignment)) return results @@ -412,7 +413,3 @@ def to_hits(self, rec: AlignedSegment) -> list[BwaHit]: hits = [hit for hit in hits if not hit.refname.endswith("_alt")] return hits - - def close(self) -> None: - self._reader.close() - super().close() diff --git a/tests/offtarget/test_bwa.py b/tests/offtarget/test_bwa.py index daccd13..b914364 100644 --- a/tests/offtarget/test_bwa.py +++ b/tests/offtarget/test_bwa.py @@ -2,6 +2,8 @@ from dataclasses import replace from pathlib import Path from tempfile import NamedTemporaryFile +from typing import TypeAlias +from typing import cast import pytest from fgpyo.sam import Cigar @@ -12,6 +14,9 @@ from prymer.offtarget.bwa import BwaHit from prymer.offtarget.bwa import Query +SamHeaderType: TypeAlias = dict[str, dict[str, str] | list[dict[str, str]]] +"""A type alias for a SAM sequence header dictionary.""" + @pytest.mark.parametrize("bases", [None, ""]) def test_query_no_bases(bases: str | None) -> None: @@ -68,6 +73,19 @@ def test_hit_build_rc() -> None: assert hit_r.negative is True +def test_header_is_properly_constructed(ref_fasta: Path) -> None: + """Tests that bwa will return a properly constructed header.""" + with BwaAlnInteractive(ref=ref_fasta, max_hits=1) as bwa: + header: SamHeaderType = bwa.header.to_dict() + assert set(header.keys()) == {"HD", "SQ", "PG"} + assert header["HD"] == {"GO": "query", "SO": "unsorted", "VN": "1.5"} + assert header["SQ"] == [{"LN": 10001, "SN": "chr1"}] + assert len(header["PG"]) == 1 + program_group: dict[str, str] = cast(list[dict[str, str]], header["PG"])[0] + assert program_group["ID"] == "bwa" + assert program_group["PN"] == "bwa" + + def test_map_one_uniquely_mapped(ref_fasta: Path) -> None: """Tests that bwa maps one hit when a query uniquely maps.""" query = Query(bases="TCTACTAAAAATACAAAAAATTAGCTGGGCATGATGGCATGCACCTGTAATCCCGCTACT", id="NA")