diff --git a/prymer/offtarget/bwa.py b/prymer/offtarget/bwa.py index d84e7a8..868db22 100644 --- a/prymer/offtarget/bwa.py +++ b/prymer/offtarget/bwa.py @@ -44,9 +44,12 @@ ``` """ # noqa: E501 +import logging import os +import subprocess from dataclasses import dataclass from pathlib import Path +from threading import Thread from typing import ClassVar from typing import Optional from typing import cast @@ -56,6 +59,7 @@ from fgpyo.sam import Cigar from pysam import AlignedSegment from pysam import AlignmentHeader +from typing_extensions import override from prymer.api import coordmath from prymer.util.executable_runner import ExecutableRunner @@ -298,7 +302,7 @@ def __init__( "/dev/stdin", ] - super().__init__(command=command) + super().__init__(command=command, stderr=subprocess.PIPE) header = [] for line in self._subprocess.stdout: @@ -309,6 +313,18 @@ def __init__( self.header = AlignmentHeader.from_text("".join(header)) + # NB: ExecutableRunner will default to redirecting stderr to /dev/null. However, we would + # like to preserve stderr messages from bwa for potential debugging. To do this, we create + # a single thread to continuously read from stderr and redirect text lines to a debug + # logger. The close() method of this class will additionally join the stderr logging thread. + self._logger = logging.getLogger(self.__class__.__qualname__) + self._stderr_thread = Thread( + daemon=True, + target=self._stream_to_sink, + args=(self._subprocess.stderr, self._logger.debug), + ) + self._stderr_thread.start() + def __signal_bwa(self) -> None: """Signals BWA to process the queries.""" self._subprocess.stdin.flush() @@ -317,6 +333,19 @@ def __signal_bwa(self) -> None: self._subprocess.stdin.write("\n" * 16) self._subprocess.stdin.flush() + @override + def close(self) -> bool: + """ + Gracefully terminates the underlying subprocess if it is still running. + + Returns: + True: if the subprocess was terminated successfully + False: if the subprocess failed to terminate or was not already running + """ + safely_closed: bool = super().close() + self._stderr_thread.join() + return safely_closed + def map_one(self, query: str, id: str = "unknown") -> BwaResult: """Maps a single query to the genome and returns the result. diff --git a/prymer/util/executable_runner.py b/prymer/util/executable_runner.py index e69500c..26da723 100644 --- a/prymer/util/executable_runner.py +++ b/prymer/util/executable_runner.py @@ -14,8 +14,10 @@ from contextlib import AbstractContextManager from pathlib import Path from types import TracebackType +from typing import Callable from typing import Optional from typing import Self +from typing import TextIO class ExecutableRunner(AbstractContextManager): @@ -30,6 +32,12 @@ class ExecutableRunner(AbstractContextManager): Subclasses of [`ExecutableRunner`][prymer.util.executable_runner.ExecutableRunner] provide additional type checking of inputs and orchestrate parsing output data from specific command-line tools. + + Warning: + Users of this class must be acutely aware of deadlocks that can exist when manually + writing and reading to subprocess pipes. The Python documentation for subprocess and PIPE + has warnings to this effect as well as recommended workarounds and alternatives. + https://docs.python.org/3/library/subprocess.html """ __slots__ = ("_command", "_subprocess", "_name") @@ -40,9 +48,13 @@ class ExecutableRunner(AbstractContextManager): def __init__( self, command: list[str], + # NB: users of this class must be acutely aware of deadlocks that can exist when manually + # writing and reading to subprocess pipes. The Python documentation for subprocess and PIPE + # has warnings to this effect as well as recommended workarounds and alternatives. + # https://docs.python.org/3/library/subprocess.html stdin: int = subprocess.PIPE, stdout: int = subprocess.PIPE, - stderr: int = subprocess.PIPE, + stderr: int = subprocess.DEVNULL, ) -> None: if len(command) == 0: raise ValueError(f"Invocation must not be empty, received {command}") @@ -71,6 +83,15 @@ def __exit__( super().__exit__(exc_type, exc_value, traceback) self.close() + @staticmethod + def _stream_to_sink(stream: TextIO, sink: Callable[[str], None]) -> None: + """Redirect a text IO stream to a text sink.""" + while True: + if line := stream.readline(): + sink(line.rstrip()) + else: + break + @classmethod def validate_executable_path(cls, executable: str | Path) -> Path: """Validates user-provided path to an executable. @@ -115,8 +136,7 @@ def is_alive(self) -> bool: def close(self) -> bool: """ - Gracefully terminates the underlying subprocess if it is still - running. + Gracefully terminates the underlying subprocess if it is still running. Returns: True: if the subprocess was terminated successfully diff --git a/tests/offtarget/test_bwa.py b/tests/offtarget/test_bwa.py index 6f0cea6..24be7ec 100644 --- a/tests/offtarget/test_bwa.py +++ b/tests/offtarget/test_bwa.py @@ -1,3 +1,4 @@ +import logging import shutil from dataclasses import replace from pathlib import Path @@ -99,6 +100,25 @@ def test_map_one_uniquely_mapped(ref_fasta: Path) -> None: assert result.query == query +def test_stderr_redirected_to_logger(ref_fasta: Path, caplog: pytest.LogCaptureFixture) -> None: + """Tests that we redirect the stderr of the bwa executable to a logger..""" + caplog.set_level(logging.DEBUG) + query = Query(bases="TCTACTAAAAATACAAAAAATTAGCTGGGCATGATGGCATGCACCTGTAATCCCGCTACT", id="NA") + with BwaAlnInteractive(ref=ref_fasta, max_hits=1) as bwa: + result = bwa.map_one(query=query.bases, id=query.id) + assert result.hit_count == 1 + assert result.hits[0].refname == "chr1" + assert result.hits[0].start == 61 + assert result.hits[0].negative is False + assert f"{result.hits[0].cigar}" == "60M" + assert result.query == query + assert "[bwa_aln_core] calculate SA coordinate..." in caplog.text + assert "[bwa_aln_core] convert to sequence coordinate..." in caplog.text + assert "[bwa_aln_core] refine gapped alignments..." in caplog.text + assert "[bwa_aln_core] print alignments..." in caplog.text + assert "[bwa_aln_core] 1 sequences have been processed" in caplog.text + + def test_map_one_unmapped(ref_fasta: Path) -> None: """Tests that bwa returns an unmapped alignment. The hit count should be zero and the list of hits empty."""