Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding initial batch of samtools type hinted wrapper functions
Browse files Browse the repository at this point in the history
Nathan Roach committed Dec 30, 2022
1 parent 299cc7c commit eb32cac
Showing 14 changed files with 1,214 additions and 94 deletions.
5 changes: 5 additions & 0 deletions fgpyo/pysam_dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def pysam_fn(*args: str) -> str:
"""
Type hinted import of an example function from the pysam samtools dispatcher.
"""
pass
6 changes: 6 additions & 0 deletions fgpyo/pysam_dispatch/samtools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from fgpyo.pysam_dispatch.samtools.dict import dict
from fgpyo.pysam_dispatch.samtools.faidx import faidx
from fgpyo.pysam_dispatch.samtools.index import index
from fgpyo.pysam_dispatch.samtools.sort import sort

__all__ = ["dict", "faidx", "index", "sort"]
73 changes: 73 additions & 0 deletions fgpyo/pysam_dispatch/samtools/dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Type-Hinted Wrapper around pysam samtools dict dispatch
-------------------------------------------------------
"""

from pathlib import Path
from typing import TYPE_CHECKING
from typing import List
from typing import Optional

if TYPE_CHECKING:
from fgpyo.pysam_dispatch import pysam_fn as _pysam_dict
else:
from pysam import dict as _pysam_dict


def dict(
input: Path,
output: Path,
no_header: bool = False,
alias: bool = False,
assembly: Optional[str] = None,
alt: Optional[Path] = None,
species: Optional[str] = None,
uri: Optional[str] = None,
) -> str:
"""
Calls the `samtools` function `dict` using the pysam samtools dispatcher.
Arguments will be formatted into the appropriate command-line call which will be invoked
using the pysam dispatcher.
Returns the stdout of the resulting call to pysam.dict()
Args:
input: Path to the FASTA files to generate a sequence dictionary for.
output: Path to the file to write output to.
no_header: If true will not print the @HD header line.
assembly: The assembly for the AS tag.
alias: Adds an AN tag with the same value as the SN tag, except that a 'chr' prefix is
removed if SN has one or added if it does not. For mitochondria (i.e., when SN is “M”
or “MT”, with or without a “chr” prefix), also adds the remaining combinations of
“chr/M/MT” to the AN tag.
alt: Add an AH tag to each sequence listed in the specified bwa-style .alt file. These
files use SAM records to represent alternate locus sequences (as named in the QNAME
field) and their mappings to the primary assembly.
species: Specify the species for the SP tag.
uri: Specify the URI for the UR tag. Defaults to the absolute path of ref.fasta.
"""

args: List[str] = ["--output", str(output)]

if no_header:
args.append("--no-header")

if alias:
args.append("--alias")

if assembly is not None:
args.extend(["--assembly", assembly])

if alt is not None:
args.extend(["--alt", str(alt)])

if species is not None:
args.extend(["--species", species])

if uri is not None:
args.extend(["--uri", uri])

args.append(str(input))

return _pysam_dict(*args)
120 changes: 120 additions & 0 deletions fgpyo/pysam_dispatch/samtools/faidx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Type-Hinted Wrapper around pysam samtools faidx dispatch
--------------------------------------------------------
"""

import enum
from pathlib import Path
from typing import TYPE_CHECKING
from typing import List
from typing import Optional
from typing import Tuple

if TYPE_CHECKING:
from fgpyo.pysam_dispatch import pysam_fn as _pysam_faidx
else:
from pysam import faidx as _pysam_faidx


@enum.unique
class FaidxMarkStrand(enum.Enum):
RevComp = "rc"
No = "no"
Sign = "sign"
Custom = "custom"


def faidx(
input: Path,
output: Optional[Path] = None,
length: int = 60,
regions: Optional[List[str]] = None,
region_file: Optional[Path] = None,
fai_idx: Optional[Path] = None,
gzi_idx: Optional[Path] = None,
continue_if_non_existent: bool = False,
fastq: bool = False,
reverse_complement: bool = False,
mark_strand: FaidxMarkStrand = FaidxMarkStrand.RevComp,
custom_mark_strand: Optional[Tuple[str, str]] = None,
) -> str:
"""
Calls the `samtools` function `faidx` using the pysam samtools dispatcher.
Arguments will be formatted into the appropriate command-line call which will be invoked
using the pysam dispatcher.
Returns the stdout of the resulting call to pysam.faidx()
Args:
input: Path to the FAIDX files to index / read from.
output: Path to the file to write FASTA output.
length: Length of FASTA sequence line.
regions: regions to extract from the FASTA file in samtools region format
(<chrom>:<1-based start>-<1-based end>)
region_file: Path to file containing regions to extract from the FASTA file in samtools
region format (<chrom>:<1-based start>-<1-based end>). 1 region descriptor per line.
fai_idx: Read/Write to specified FAI index file.
gzi_idx: Read/Write to specified compressed file index (used with .gz files).
continue_if_non_existent: If true continue qorking if a non-existent region is requested.
fastq: Read FASTQ files and output extracted sequences in FASTQ format. Same as using
samtools fqidx.
reverse_complement: Output the sequence as the reverse complement. When this option is
used, “/rc” will be appended to the sequence names. To turn this off or change the
string appended, use the mark_strand parameter.
mark_strand: Append strand indicator to sequence name. Type to append can be one of:
FaidxMarkStrand.RevComp - Append '/rc' when writing the reverse complement. This is the
default.
FaidxMarkStrand.No - Do not append anything.
FaidxMarkStrand.Sign - Append '(+)' for forward strand or '(-)' for reverse
complement. This matches the output of “bedtools getfasta -s”.
FaidxMarkStrand.Custom - custom,<custom_mark_strand[0]>,<custom_mark_strand[1]>
Append string <pos> to names when writing the forward strand and <neg> when writing the
reverse strand. Spaces are preserved, so it is possible to move the indicator into the
comment part of the description line by including a leading space in the strings
<custom_mark_strand[0]> and <custom_mark_strand[1]>.
custom_mark_strand: The custom strand indicators to use in in the Custom MarkStrand
setting. The first value of the tuple will be used as the positive strand indicator,
the second value will be used as the negative strand indicator.
"""

mark_strand_str: str
if mark_strand == FaidxMarkStrand.Custom:
assert custom_mark_strand is not None, (
"Cannot use custom mark strand without providing the custom strand indicators to "
+ "`custom_mark_string`"
)
mark_strand_str = f"{mark_strand.value},{custom_mark_strand[0]},{custom_mark_strand[1]}"
else:
mark_strand_str = mark_strand.value

args: List[str] = ["--length", str(length), "--mark-strand", mark_strand_str]

if output is not None:
args.extend(["--output", str(output)])

if continue_if_non_existent:
args.append("--continue")
if reverse_complement:
args.append("--reverse-complement")
if fastq:
args.append("--fastq")

if fai_idx is not None:
args.extend(["--fai-idx", str(fai_idx)])

if gzi_idx is not None:
args.extend(["--gzi-idx", str(gzi_idx)])

if region_file is not None:
args.extend(["--region-file", str(region_file)])

args.append(str(input))

if regions is not None:
args.extend(regions)

return _pysam_faidx(*args)
70 changes: 70 additions & 0 deletions fgpyo/pysam_dispatch/samtools/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Type-Hinted Wrapper around pysam samtools index dispatch
--------------------------------------------------------
"""

import enum
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Optional

if TYPE_CHECKING:
from fgpyo.pysam_dispatch import pysam_fn as _pysam_index
else:
from pysam import index as _pysam_index


@enum.unique
class SamIndexType(enum.Enum):
BAI = "-b"
CSI = "-c"


def index(
input: Path,
# See https://github.com/pysam-developers/pysam/issues/1155
# inputs: List[Path], Cant currently accept multiple
output: Optional[Path] = None,
threads: int = 0,
index_type: SamIndexType = SamIndexType.BAI,
csi_index_size: int = 14,
) -> str:
"""
Calls the `samtools` function `index` using the pysam samtools dispatcher.
Arguments will be formatted into the appropriate command-line call which will be invoked
using the pysam dispatcher.
Returns the stdout of the resulting call to pysam.index()
Args:
input: Path to the SAM/BAM/CRAM file to index.
output: Path to the file to write output. (Currently may only be used when exactly one
alignment file is being indexed.)
threads: Number of input/output compression threads to use in addition to main thread.
index_type: The type of index file to produce when indexing.
csi_index_size: Sets the minimum interval size of CSI indices to 2^INT.
"""

# assert len(inputs) >= 1, "Must provide at least one input to samtools index."

# if len(inputs) != 1:
# assert (
# output is None
# ), "Output currently can only be used if there is exactly one input file being indexed"
# args = ["-M"]
# else:
# args = []
args = []

if index_type != SamIndexType.BAI:
args.append(index_type.value)

if index_type == SamIndexType.CSI:
args.extend(["-m", str(csi_index_size)])
args.extend(["-@", str(threads)])
args.append(str(input))
if output is not None:
args.extend(["-o", str(output)])

return _pysam_index(*args)
114 changes: 114 additions & 0 deletions fgpyo/pysam_dispatch/samtools/sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
Type-Hinted Wrapper around pysam samtools sort dispatch
-------------------------------------------------------
"""

from pathlib import Path
from typing import TYPE_CHECKING
from typing import Optional

from fgpyo.sam import SamFileType
from fgpyo.sam import SamOrder

if TYPE_CHECKING:
from fgpyo.pysam_dispatch import pysam_fn as _pysam_sort
else:
from pysam import sort as _pysam_sort


def sort(
input: Path,
output: Path,
index_output: bool = True,
sort_unmapped_reads: bool = False,
kmer_size: int = 20,
compression_level: Optional[int] = None,
memory_per_thread: str = "768MB",
sort_order: SamOrder = SamOrder.Coordinate,
sort_tag: Optional[str] = None,
output_format: SamFileType = SamFileType.BAM,
tempfile_prefix: Optional[str] = None,
threads: int = 1,
no_pg: bool = False,
) -> str:
"""
Calls the `samtools` function sort using the pysam samtools dispatcher.
Arguments will be formatted into the appropriate command-line call which will be invoked
using the pysam dispatcher.
Returns the stdout of the resulting call to pysam.sort()
Args:
input: Path to the SAM/BAM/CRAM file to sort.
output: Path to the file to write output.
index_output: If true, creates an index for the output file.
sort_unmapped_reads: If true, sort unmapped reads by their sequence minimizer, reverse
complementing where appropriate. This has the effect of collating some similar data
together, improving the compressibility of the unmapped sequence. The minimiser kmer
size is adjusted using the ``kmer_size`` option. Note data compressed in this manner
may need to be name collated prior to conversion back to fastq.
Mapped sequences are sorted by chromosome and position.
kmer_size: the kmer-size to be used if sorting unmapped reads.
compression_level: The compression level to be used in the final output file.
memory_per_thread: Approximately the maximum required memory per thread, specified either
in bytes or with a K, M, or G suffix.
To prevent sort from creating a huge number of temporary files, it enforces a minimum
value of 1M for this setting.
sort_order: The sort order to use when sorting the file.
sort_tag: The tag to use to use to sort the SAM/BAM/CRAM records. Will be sorted by
this tag first, followed by position (or name, depending on ``sort_order``
provided).
output_format: the output file format to write the results as.
By default, will try to select a format based on the ``output`` filename extension;
if no format can be deduced, bam is selected.
tempfile_prefix: The prefix to use for temporary files. Resulting files will be in
format PREFIX.nnnn.bam, or if the specified PREFIX is an existing directory, to
PREFIX/samtools.mmm.mmm.tmp.nnnn.bam, where mmm is unique to this invocation of the
sort command.
By default, any temporary files are written alongside the output file, as
out.bam.tmp.nnnn.bam, or if output is to standard output, in the current directory
as samtools.mmm.mmm.tmp.nnnn.bam.
threads: The number of threads to use when sorting. By default, operation is
single-threaded.
no_pg: If true, will not add a @PG line to the header of the output file.
"""

output_string = (
f"{output}##idx##{output}{output_format.index_extension}"
if index_output and output_format.index_extension is not None
else str(output)
)

args = ["-m", memory_per_thread, "-O", output_format._name_, "-@", str(threads)]

if sort_unmapped_reads:
args.extend(["-M", "-K", str(kmer_size)])

if compression_level is not None:
args.extend(["-I", str(compression_level)])

if sort_order == SamOrder.QueryName:
args.append("-n")
elif sort_order == SamOrder.TemplateCoordinate:
args.append("--template-coordinate")
else:
assert (
sort_order == SamOrder.Coordinate
), "Sort order to samtools sort cannot be Unknown or Unsorted"

if sort_tag is not None:
args.extend(["-t", sort_tag])

if tempfile_prefix is not None:
args.extend(["-T", tempfile_prefix])

if no_pg:
args.append("--no-PG")

args.extend(["-o", output_string, str(input)])

return _pysam_sort(*args)
127 changes: 127 additions & 0 deletions fgpyo/pysam_dispatch/samtools/tests/test_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from pathlib import Path

import pytest

import fgpyo.pysam_dispatch.samtools as samtools

EXAMPLE_FASTA: str = """\
>chr1
GATTACATTTGAGAGA
>chr2
CCCCTACCCACCC
>chr1_alt
GATTACATGAGAGA
>chr2_alt
CCCCTACCACCC
"""

EXPECTED_DICT: str = """\
@HD VN:1.0 SO:unsorted
@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b UR:consistent_location
@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 UR:consistent_location
@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 UR:consistent_location
@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 UR:consistent_location
"""

HEADERLESS_DICT: str = """\
@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b UR:consistent_location
@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 UR:consistent_location
@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 UR:consistent_location
@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 UR:consistent_location
"""

OTHER_TAG_DICT: str = """\
@HD VN:1.0 SO:unsorted
@SQ SN:chr1 LN:16 M5:0b2bf1b29cf5338d75a8feb8c8a3784b AN:1 UR:consistent_location AS:test1 SP:test3
@SQ SN:chr2 LN:13 M5:87afe3395654b1fe1443b54490c47871 AN:2 UR:consistent_location AS:test1 SP:test3
@SQ SN:chr1_alt LN:14 M5:23ce480f016f77dfb29d3c17aa98f567 AH:* AN:1_alt UR:consistent_location AS:test1 SP:test3
@SQ SN:chr2_alt LN:12 M5:2ee663b21d249ebecaa9ffbdb6e0b970 AH:* AN:2_alt UR:consistent_location AS:test1 SP:test3
""" # noqa: E501


@pytest.fixture
def example_fasta(tmp_path: Path) -> Path:
outfile = tmp_path / "example.fa"

with outfile.open("w") as out_fh:
out_fh.write(EXAMPLE_FASTA)

return outfile


def test_dict_produces_sequence_dict(
tmp_path: Path,
example_fasta: Path,
) -> None:
# Make sure we're producing the index
output_access = tmp_path / "example.dict"
assert not output_access.exists()
samtools.dict(input=example_fasta, output=output_access, uri="consistent_location")
assert output_access.exists()

output_contents: str
with output_access.open("r") as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == EXPECTED_DICT


def test_dict_no_header_works(
tmp_path: Path,
example_fasta: Path,
) -> None:
# Make sure we're producing the index
output_access = tmp_path / "example.dict"
assert not output_access.exists()
samtools.dict(
input=example_fasta, output=output_access, no_header=True, uri="consistent_location"
)
assert output_access.exists()

output_contents: str
with output_access.open("r") as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == HEADERLESS_DICT


ALT_FILE: str = """\
chr1_alt 0 chr1 1 30 7M2D7M * 0 0 * * NM:i:2
chr2_alt 0 chr2 1 30 6M1D6M * 0 0 * * NM:i:1
"""


@pytest.fixture
def bwa_style_alt(tmp_path: Path) -> Path:
outfile = tmp_path / "example.alt"

with outfile.open("w") as out_fh:
out_fh.write(ALT_FILE)

return outfile


def test_dict_other_tags_work(
tmp_path: Path,
example_fasta: Path,
bwa_style_alt: Path,
) -> None:
# Make sure we're producing the index
output_access = tmp_path / "example.dict"
assert not output_access.exists()
samtools.dict(
input=example_fasta,
output=output_access,
alias=True,
assembly="test1",
alt=bwa_style_alt,
species="test3",
uri="consistent_location",
)
assert output_access.exists()

output_contents: str
with output_access.open("r") as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == OTHER_TAG_DICT
387 changes: 387 additions & 0 deletions fgpyo/pysam_dispatch/samtools/tests/test_faidx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,387 @@
from pathlib import Path

import bgzip
import pytest
from pysam.utils import SamtoolsError

import fgpyo.pysam_dispatch.samtools as samtools
from fgpyo.pysam_dispatch.samtools.faidx import FaidxMarkStrand

EXAMPLE_FASTA: str = """\
>chr1
GATTACATTTGAGAGA
>chr2
CCCCTACCCACCC
"""

SUBSET_FASTA_TEMPLATE: str = """\
>chr1:1-7{mark_strand}
GATTACA
>chr2:8-13{mark_strand}
CCACCC
"""
SUBSET_FASTA: str = SUBSET_FASTA_TEMPLATE.format(mark_strand="")

WRAPPED_SUBSET_FASTA: str = """\
>chr1:1-7
GATTA
CA
>chr2:8-13
CCACC
C
"""


@pytest.fixture
def example_fasta(tmp_path: Path) -> Path:
outfile = tmp_path / "example.fa"

with outfile.open("w") as out_fh:
out_fh.write(EXAMPLE_FASTA)

return outfile


def test_faidx_produces_functional_index(
tmp_path: Path,
example_fasta: Path,
) -> None:
output_index_expected = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(input=example_fasta)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(input=example_fasta, output=output_access, regions=["chr1:1-7", "chr2:8-13"])

output_contents: str
with output_access.open("r") as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == SUBSET_FASTA


def test_faidx_fails_if_non_existent_region_requested(
tmp_path: Path,
example_fasta: Path,
) -> None:
with pytest.raises(SamtoolsError):
output_index_expected = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(input=example_fasta)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(input=example_fasta, output=output_access, regions=["chr3:1-4", "chr1:1-7"])


def test_faidx_passes_if_non_existent_region_requested_when_continue_passed(
tmp_path: Path,
example_fasta: Path,
) -> None:
output_index_expected = Path(f"{example_fasta}.fai")

assert not output_index_expected.exists()
samtools.faidx(input=example_fasta)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
samtools.faidx(
input=example_fasta,
output=output_access,
continue_if_non_existent=True,
regions=["chr3:1-4", "chr1:1-7"],
)


def test_faidx_regions_and_regions_file_result_in_same_thing(
tmp_path: Path,
example_fasta: Path,
) -> None:
output_index_expected = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(input=example_fasta)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional

regions = ["chr1:1-7", "chr2:8-13"]

region_file = tmp_path / "regions.txt"
with region_file.open("w") as region_fh:
region_fh.writelines([f"{region}\n" for region in regions])

samtools.faidx(input=example_fasta, output=output_access, regions=regions)

manually_passed_output_contents: str
with output_access.open("r") as subset_fasta:
manually_passed_output_contents = subset_fasta.read()

samtools.faidx(input=example_fasta, output=output_access, region_file=region_file)

file_passed_output_contents: str
with output_access.open("r") as subset_fasta:
file_passed_output_contents = subset_fasta.read()

assert manually_passed_output_contents == file_passed_output_contents


def test_length_parameter(
tmp_path: Path,
example_fasta: Path,
) -> None:
output_index_expected = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(
input=example_fasta,
)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(
input=example_fasta,
output=output_access,
regions=["chr1:1-7", "chr2:8-13"],
length=5,
)

output_contents: str
with output_access.open() as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == WRAPPED_SUBSET_FASTA


RC_SUBSET_FASTA: str = """\
>chr1:1-7{mark_strand}
TGTAATC
>chr2:8-13{mark_strand}
GGGTGG
"""


def test_rc_parameter(
tmp_path: Path,
example_fasta: Path,
) -> None:
output_index_expected = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(
input=example_fasta,
)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(
input=example_fasta,
output=output_access,
reverse_complement=True,
regions=["chr1:1-7", "chr2:8-13"],
)

output_contents: str
with output_access.open() as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == RC_SUBSET_FASTA.format(mark_strand="/rc")


@pytest.mark.parametrize(
argnames=["mark_strand", "expected_fwd_mark_strand", "expected_rev_mark_strand"],
argvalues=[
(FaidxMarkStrand.RevComp, "", "/rc"),
(FaidxMarkStrand.No, "", ""),
(FaidxMarkStrand.Sign, "(+)", "(-)"),
(FaidxMarkStrand.Custom, "ex1a", "ex1b"),
(FaidxMarkStrand.Custom, "ex2a", "ex2b"),
],
ids=["rev comp", "no", "sign", "custom1", "custom2"],
)
def test_mark_strand_parameters(
tmp_path: Path,
example_fasta: Path,
mark_strand: FaidxMarkStrand,
expected_fwd_mark_strand: str,
expected_rev_mark_strand: str,
) -> None:
output_index_expected = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(
input=example_fasta,
)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(
input=example_fasta,
output=output_access,
reverse_complement=False,
mark_strand=mark_strand,
custom_mark_strand=(expected_fwd_mark_strand, expected_rev_mark_strand),
regions=["chr1:1-7", "chr2:8-13"],
)
output_contents: str
with output_access.open() as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == SUBSET_FASTA_TEMPLATE.format(mark_strand=expected_fwd_mark_strand)

samtools.faidx(
input=example_fasta,
output=output_access,
reverse_complement=True,
mark_strand=mark_strand,
custom_mark_strand=(expected_fwd_mark_strand, expected_rev_mark_strand),
regions=["chr1:1-7", "chr2:8-13"],
)

output_contents: str
with output_access.open() as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == RC_SUBSET_FASTA.format(mark_strand=expected_rev_mark_strand)


EXAMPLE_FASTQ: str = """\
@chr1
GATTACATTTGAGAGA
+
;;;;;;;;;;;;;;;;
@chr2
CCCCTACCCACCC
+
;;;;;;;;;;;;;
"""

SUBSET_FASTQ: str = """\
@chr1:1-7
GATTACA
+
;;;;;;;
@chr2:8-13
CCACCC
+
;;;;;;
"""


@pytest.fixture
def example_fastq(tmp_path: Path) -> Path:
outfile = tmp_path / "example.fq"

with outfile.open("w") as out_fh:
out_fh.write(EXAMPLE_FASTQ)

return outfile


def test_fastq_parameter(
tmp_path: Path,
example_fastq: Path,
) -> None:
output_index_expected = Path(f"{example_fastq}.fai")

# Make sure we're producing the index
assert not output_index_expected.exists()
samtools.faidx(
input=example_fastq,
fastq=True,
)
assert output_index_expected.exists()

output_access = tmp_path / "output_subset.fq"
# Make sure the index is functional
samtools.faidx(
input=example_fastq,
output=output_access,
regions=["chr1:1-7", "chr2:8-13"],
fastq=True,
)

output_contents: str
with output_access.open() as subset_fasta:
output_contents = subset_fasta.read()

assert output_contents == SUBSET_FASTQ


@pytest.fixture
def example_fasta_gz(tmp_path: Path) -> Path:
outfile = tmp_path / "example.fa.gz"

with outfile.open(mode="wb") as out_fh:
with bgzip.BGZipWriter(out_fh) as fh:
fh.write(bytes(EXAMPLE_FASTA, "Utf-8"))

return outfile


def test_index_outputs(
tmp_path: Path,
example_fasta: Path,
example_fasta_gz: Path,
) -> None:
example_fai = Path(f"{example_fasta}.fai")

# Make sure we're producing the index
assert not example_fai.exists()
samtools.faidx(
input=example_fasta,
fai_idx=example_fai,
)
assert example_fai.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(
input=example_fasta,
output=output_access,
regions=["chr1:1-7", "chr2:8-13"],
fai_idx=example_fai,
)

output_contents: str
with output_access.open() as subset_fasta:
output_contents = subset_fasta.read()
assert output_contents == SUBSET_FASTA

example_gzi = Path(f"{example_fasta_gz}.gzi")
assert not example_gzi.exists()
samtools.faidx(
input=example_fasta_gz,
gzi_idx=example_gzi,
)
assert example_gzi.exists()

output_access = tmp_path / "output_subset.fa"
# Make sure the index is functional
samtools.faidx(
input=example_fasta,
output=output_access,
regions=["chr1:1-7", "chr2:8-13"],
gzi_idx=example_gzi,
)

compressed_output_contents: str
with output_access.open() as subset_fasta:
compressed_output_contents = subset_fasta.read()

assert compressed_output_contents == SUBSET_FASTA
87 changes: 87 additions & 0 deletions fgpyo/pysam_dispatch/samtools/tests/test_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from pathlib import Path

import pytest

import fgpyo.pysam_dispatch.samtools as samtools
from fgpyo.pysam_dispatch.samtools.index import SamIndexType
from fgpyo.sam import SamOrder
from fgpyo.sam.builder import SamBuilder


@pytest.mark.parametrize(
argnames=["index_type"],
argvalues=[
(SamIndexType.BAI,),
(SamIndexType.CSI,),
],
ids=["BAI", "CSI"],
)
def test_index_works_with_one_input(
tmp_path: Path,
index_type: SamIndexType,
) -> None:
builder = SamBuilder(sort_order=SamOrder.Coordinate)
builder.add_pair(name="test1", chrom="chr1", start1=4000, start2=4300)
builder.add_pair(
name="test2", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+"
)
builder.add_pair(name="test3", chrom="chr2", start1=4000, start2=4300)
builder.add_pair(name="test4", chrom="chr5", start1=4000, start2=4300)

# At the moment sam builder doesnt support generating CRAM and SAM formats, so for now we're
# only testing on BAMs
input_file = tmp_path / "test_input.bam"
builder.to_path(input_file)

output_index_expected = Path(f"{input_file}.{index_type._name_.lower()}")

# Make sure we're producing the index
assert not output_index_expected.exists()
# samtools.index(inputs=[input_file], index_type=index_type)
samtools.index(input=input_file, index_type=index_type)
assert output_index_expected.exists()


# Can't accept multiple inputs at the moment.
# See https://github.com/pysam-developers/pysam/issues/1155
# @pytest.mark.parametrize(
# argnames=["index_type"],
# argvalues=[
# (SamIndexType.BAI,),
# (SamIndexType.CSI,),
# ],
# ids=["BAI", "CSI"],
# )
# def test_index_works_with_multiple_inputs(
# tmp_path: Path,
# index_type: SamIndexType,
# ) -> None:
# builder = SamBuilder(sort_order=SamOrder.Coordinate)
# builder.add_pair(name="test1", chrom="chr1", start1=4000, start2=4300)
# builder.add_pair(
# name="test2", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+"
# )
# builder.add_pair(name="test3", chrom="chr2", start1=4000, start2=4300)
# builder.add_pair(name="test4", chrom="chr5", start1=4000, start2=4300)

# # At the moment sam builder doesnt support generating CRAM and SAM formats, so for now we're
# # only testing on BAMs
# input_file1 = tmp_path / "test_input1.bam"
# builder.to_path(input_file1)
# input_file2 = tmp_path / "test_input2.bam"
# builder.to_path(input_file2)

# inputs = [
# input_file1,
# input_file2,
# ]

# # Make sure we're producing the indices
# for input in inputs:
# output_index_expected = Path(f"{input}.{index_type._name_.lower()}")
# assert not output_index_expected.exists()

# samtools.index(inputs=inputs, index_type=index_type)
# for input in inputs:
# output_index_expected = Path(f"{input}.{index_type._name_.lower()}")
# assert output_index_expected.exists()
79 changes: 79 additions & 0 deletions fgpyo/pysam_dispatch/samtools/tests/test_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from pathlib import Path
from typing import List
from typing import Optional

import pytest

import fgpyo.pysam_dispatch.samtools as samtools
from fgpyo import sam
from fgpyo.sam import SamFileType
from fgpyo.sam import SamOrder
from fgpyo.sam.builder import SamBuilder


@pytest.mark.parametrize(
argnames=["file_type"],
argvalues=[
(SamFileType.SAM,),
(SamFileType.BAM,),
(SamFileType.CRAM,),
],
ids=["SAM", "BAM", "CRAM"],
)
@pytest.mark.parametrize(
argnames=["index_output"],
argvalues=[
(True,),
(False,),
],
ids=["indexed", "not_indexed"],
)
@pytest.mark.parametrize(
argnames=["sort_order", "expected_name_order"],
argvalues=[
(SamOrder.Coordinate, ["test2", "test3", "test4", "test1"]),
(SamOrder.QueryName, ["test1", "test2", "test3", "test4"]),
(SamOrder.TemplateCoordinate, ["test2", "test3", "test4", "test1"]),
],
ids=["Coordinate sorting", "Query name sorting", "Template Sorted"],
)
def test_sort_types(
tmp_path: Path,
file_type: SamFileType,
index_output: bool,
sort_order: Optional[SamOrder],
expected_name_order: List[str],
) -> None:

builder = SamBuilder(sort_order=SamOrder.Unsorted)
builder.add_pair(
name="test3", chrom="chr1", start1=5000, start2=4700, strand1="-", strand2="+"
)
builder.add_pair(name="test2", chrom="chr1", start1=4000, start2=4300)
builder.add_pair(name="test1", chrom="chr5", start1=4000, start2=4300)
builder.add_pair(name="test4", chrom="chr2", start1=4000, start2=4300)

input_file = tmp_path / "test_input.bam"
output_file = tmp_path / f"test_output{file_type.extension}"

builder.to_path(input_file)

samtools.sort(
input=input_file,
output=output_file,
index_output=index_output,
sort_order=sort_order,
)
with sam.reader(output_file) as in_bam:
for name in expected_name_order:
read1 = next(in_bam)
assert (
name == read1.query_name
), "Position based read sort order did not match expectation"
read2 = next(in_bam)
assert (
name == read2.query_name
), "Position based read sort order did not match expectation"

if index_output and file_type != SamFileType.SAM:
assert Path(f"{output_file}{file_type.index_extension}")
9 changes: 5 additions & 4 deletions fgpyo/sam/__init__.py
Original file line number Diff line number Diff line change
@@ -197,13 +197,14 @@ class SamFileType(enum.Enum):
ext (str): The standard file extension for this file type.
"""

def __init__(self, mode: str, ext: str) -> None:
def __init__(self, mode: str, ext: str, index_ext: Optional[str]) -> None:
self.mode = mode
self.extension = ext
self.index_extension = index_ext

SAM = ("", ".sam")
BAM = ("b", ".bam")
CRAM = ("c", ".cram")
SAM = ("", ".sam", None)
BAM = ("b", ".bam", ".bai")
CRAM = ("c", ".cram", ".crai")

@classmethod
def from_path(cls, path: Union[Path, str]) -> "SamFileType":
23 changes: 9 additions & 14 deletions fgpyo/sam/builder.py
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
from pysam import AlignedSegment
from pysam import AlignmentHeader

import fgpyo.pysam_dispatch.samtools as samtools
from fgpyo import sam
from fgpyo.sam import SamOrder

@@ -544,7 +545,7 @@ def to_path(

with NamedTemporaryFile(suffix=".bam", delete=True) as fp:
file_handle: IO
if self.sort_order is SamOrder.Unsorted:
if self.sort_order in [SamOrder.Unsorted, SamOrder.Unknown]:
file_handle = path.open("w")
else:
file_handle = fp.file
@@ -555,20 +556,14 @@ def to_path(
for rec in self._records:
if pred(rec):
writer.write(rec)

default_samtools_opt_list = ["-o", str(path), fp.name]

file_handle.close()
if self.sort_order == SamOrder.QueryName:
# Ignore type hints for now until we have wrappers to use here.
pysam.sort(*(["-n"] + default_samtools_opt_list)) # type: ignore
elif self.sort_order == SamOrder.Coordinate:
# Ignore type hints for now until we have wrappers to use here.
pysam.sort(*default_samtools_opt_list) # type: ignore
if index:
# Ignore type hints for now until we have wrappers to use here.
pysam.index(str(path)) # type: ignore

if self.sort_order not in [SamOrder.Unsorted, SamOrder.Unknown]:
samtools.sort(
input=Path(fp.name),
output=path,
index_output=index,
sort_order=self.sort_order,
)
return path

def __len__(self) -> int:
207 changes: 131 additions & 76 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ pytest = ">=5.4.2"
mypy = ">=0.770"
flake8 = ">=3.8.1"
black = ">=19.10b0"
bgzip = ">=0.4.0"
pytest-cov = ">=2.8.1"
isort = ">=5.10.1"

0 comments on commit eb32cac

Please sign in to comment.