diff --git a/fgpyo/sam/__init__.py b/fgpyo/sam/__init__.py index c9dc8f4..fc10bad 100644 --- a/fgpyo/sam/__init__.py +++ b/fgpyo/sam/__init__.py @@ -158,6 +158,7 @@ import enum import io import sys +from array import array from itertools import chain from pathlib import Path from typing import IO @@ -177,6 +178,7 @@ from pysam import AlignedSegment from pysam import AlignmentFile as SamFile from pysam import AlignmentHeader as SamHeader +from pysam import qualitystring_to_array from typing_extensions import deprecated import fgpyo.io @@ -194,6 +196,9 @@ NO_REF_POS: int = -1 """The reference position to use to indicate no position in SAM/BAM.""" +NO_BASE_QUAL: array = qualitystring_to_array("*") +"""A base quality string to use for a SAM record with missing base qualities.""" + _IOClasses = (io.TextIOBase, io.BufferedIOBase, io.RawIOBase, io.IOBase) """The classes that should be treated as file-like classes""" @@ -668,15 +673,45 @@ def build( return PairOrientation.RF +def isize(r1: AlignedSegment, r2: Optional[AlignedSegment] = None) -> int: + """Computes the insert size for a pair of records.""" + if r2 is None: + r2_is_unmapped = r1.mate_is_unmapped + r2_reference_id = r1.next_reference_id + else: + r2_is_unmapped = r2.is_unmapped + r2_reference_id = r2.reference_id + + if r1.is_unmapped or r2_is_unmapped or r1.reference_id != r2_reference_id: + return 0 + + if r2 is None: + if not r1.has_tag("MC"): + raise ValueError('Cannot determine proper pair status without R2\'s cigar ("MC")!') + r2_cigar = Cigar.from_cigarstring(str(r1.get_tag("MC"))) + r2_is_reverse = r1.mate_is_reverse + r2_reference_start = r1.next_reference_start + r2_reference_end = r1.next_reference_start + r2_cigar.length_on_target() + else: + r2_is_reverse = r2.is_reverse + r2_reference_start = r2.reference_start + r2_reference_end = r2.reference_end + + r1_pos = r1.reference_end if r1.is_reverse else r1.reference_start + r2_pos = r2_reference_end if r2_is_reverse else r2_reference_start + return r2_pos - r1_pos + + DefaultProperlyPairedOrientations = {PairOrientation.FR} """The default orientations for properly paired reads.""" -def properly_paired( +def is_proper_pair( r1: AlignedSegment, r2: Optional[AlignedSegment] = None, max_insert_size: int = 1000, orientations: set[PairOrientation] = DefaultProperlyPairedOrientations, + isize: Callable[[AlignedSegment, AlignedSegment], int] = isize, ) -> bool: """Determines if a read pair is properly paired or not. @@ -691,6 +726,7 @@ def properly_paired( r2: The second read in the template. If undefined, mate data set upon R1 will be used. max_insert_size: The maximum insert size to consider a read pair "proper". orientations: The valid set of orientations to consider a read pair "proper". + isize: A function that takes the two alignments and calculates their isize. See: [`htsjdk.samtools.SamPairUtil.isProperPair()`](https://github.com/samtools/htsjdk/blob/c31bc92c24bc4e9552b2a913e52286edf8f8ab96/src/main/java/htsjdk/samtools/SamPairUtil.java#L106-L125) @@ -707,9 +743,7 @@ def properly_paired( and r2_is_mapped and r1.reference_id == r2_reference_id and PairOrientation.build(r1, r2) in orientations - # TODO: consider replacing with `abs(isize(r1, r2)) <= max_insert_size` - # which can only be done if isize() is modified to allow for an optional R2. - and 0 < abs(r1.template_length) <= max_insert_size + and 0 < abs(isize(r1, r2)) <= max_insert_size ) @@ -794,30 +828,39 @@ def from_read(cls, read: pysam.AlignedSegment) -> List["SupplementaryAlignment"] return [] -def isize(r1: AlignedSegment, r2: AlignedSegment) -> int: - """Computes the insert size for a pair of records.""" - if r1.is_unmapped or r2.is_unmapped or r1.reference_id != r2.reference_id: - return 0 - else: - r1_pos = r1.reference_end if r1.is_reverse else r1.reference_start - r2_pos = r2.reference_end if r2.is_reverse else r2.reference_start - return r2_pos - r1_pos +def sum_of_base_qualities(rec: AlignedSegment, min_quality_score: int = 15) -> int | None: + """Calculate the sum of base qualities score for an alignment record. + + This function is useful for calculating the "mate score" as implemented in samtools fixmate. + + Args: + rec: The alignment record to calculate the sum of base qualities from. + min_quality_score: The minimum base quality score to use for summation. + + See: + [`calc_sum_of_base_qualities()`](https://github.com/samtools/samtools/blob/4f3a7397a1f841020074c0048c503a01a52d5fa2/bam_mate.c#L227-L238) + [`MD_MIN_QUALITY`](https://github.com/samtools/samtools/blob/4f3a7397a1f841020074c0048c503a01a52d5fa2/bam_mate.c#L42) + """ + if rec.query_qualities is None or rec.query_qualities == NO_BASE_QUAL: + return None + score: int = sum(qual for qual in rec.query_qualities if qual >= min_quality_score) + return score def set_mate_info( r1: AlignedSegment, r2: AlignedSegment, - is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = properly_paired, + is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair, ) -> None: """Resets mate pair information between reads in a pair. Args: r1: Read 1 (first read in the template). r2: Read 2 with the same query name as r1 (second read in the template). - is_proper_pair: A function that takes the two reads and determines proper pair status. + is_proper_pair: A function that takes the two alignments and determines proper pair status. """ if r1.query_name != r2.query_name: - raise ValueError("Cannot set mate info on reads with different query names!") + raise ValueError("Cannot set mate info on alignments with different query names!") for src, dest in [(r1, r2), (r2, r1)]: dest.next_reference_id = src.reference_id @@ -828,6 +871,9 @@ def set_mate_info( dest.set_tag("MC", src.cigarstring) dest.set_tag("MQ", src.mapping_quality) + r1.set_tag("ms", sum_of_base_qualities(r2)) + r2.set_tag("ms", sum_of_base_qualities(r1)) + template_length = isize(r1, r2) r1.template_length = template_length r2.template_length = -template_length @@ -837,10 +883,82 @@ def set_mate_info( r2.is_proper_pair = proper_pair +def set_mate_info_on_secondary( + primary: AlignedSegment, + secondary: AlignedSegment, + is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair, +) -> None: + """Set mate info on a secondary alignment to the next read ordinal's primary alignment. + + Args: + primary: The primary alignment of the secondary's mate. + secondary: The secondary alignment to set mate information upon. + is_proper_pair: A function that takes the two alignments and determines proper pair status. + + Raises: + ValueError: If primary and secondary are of the same read ordinal. + ValueError: If the primary is marked as either secondary or supplementary. + ValueError: If the secondary is not marked as secondary. + """ + if primary.is_read1 == secondary.is_read1 or primary.is_secondary or primary.is_supplementary: + raise ValueError("Secondary mate info must be set from a primary of the next ordinal!") + if not secondary.is_secondary: + raise ValueError("Cannot set mate info on an alignment not marked as secondary!") + if primary.query_name != secondary.query_name: + raise ValueError("Cannot set mate info on alignments with different query names!") + + secondary.next_reference_id = primary.reference_id + secondary.next_reference_name = primary.reference_name + secondary.next_reference_start = primary.reference_start + secondary.mate_is_forward = primary.is_forward + secondary.mate_is_mapped = primary.is_mapped + secondary.set_tag("MC", primary.cigarstring) + secondary.set_tag("MQ", primary.mapping_quality) + secondary.set_tag("ms", sum_of_base_qualities(primary)) + + # NB: calculate isize and proper pair as if this secondary alignment was the primary alignment. + secondary.is_proper_pair = is_proper_pair(primary, secondary) + secondary.template_length = isize(primary, secondary) + + +def set_mate_info_on_supplementary(primary: AlignedSegment, supp: AlignedSegment) -> None: + """Set mate info on a supplementary alignment to the next read ordinal's primary alignment. + + Args: + primary: The primary alignment of the supplementary's mate. + supp: The supplementary alignment to set mate information upon. + + Raises: + ValueError: If primary and secondary are of the same read ordinal. + ValueError: If the primary is marked as either secondary or supplementary. + ValueError: If the secondary is not marked as secondary. + """ + if primary.is_read1 == supp.is_read1 or primary.is_secondary or primary.is_supplementary: + raise ValueError("Supplementary mate info must be set from a primary of the next ordinal!") + if not supp.is_supplementary: + raise ValueError("Cannot set mate info on an alignment not marked as supplementary!") + if primary.query_name != supp.query_name: + raise ValueError("Cannot set mate info on alignments with different query names!") + + supp.next_reference_id = primary.reference_id + supp.next_reference_name = primary.reference_name + supp.next_reference_start = primary.reference_start + supp.mate_is_forward = primary.is_forward + supp.mate_is_mapped = primary.is_mapped + supp.set_tag("MC", primary.cigarstring) + supp.set_tag("MQ", primary.mapping_quality) + supp.set_tag("ms", sum_of_base_qualities(primary)) + + # NB: for a non-secondary supplemental alignment, set the following to the same as the primary. + if not supp.is_secondary: + supp.is_proper_pair = primary.is_proper_pair + supp.template_length = -primary.template_length + + def set_as_pairs( r1: AlignedSegment, r2: AlignedSegment, - is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = properly_paired, + is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair, ) -> None: """Forces the two reads to become pairs as long as they share the same query name. @@ -1118,6 +1236,10 @@ def all_recs(self) -> Iterator[AlignedSegment]: for rec in recs: yield rec + def set_mate_info(self) -> "Template": + """Reset all mate information on every record in a template.""" + return set_mate_info_for_template(self) + def write_to( self, writer: SamFile, @@ -1176,6 +1298,31 @@ def __next__(self) -> Template: return Template.build(recs, validate=False) +def set_mate_info_for_template( + template: Template, + is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair, +) -> Template: + """Reset all mate information on every record in a template. + + Args: + template: The template of alignments to reset all mate information on. + is_proper_pair: A function that takes two alignments and determines proper pair status. + """ + if template.r1 is not None and template.r2 is not None: + set_mate_info(template.r1, template.r2, is_proper_pair=is_proper_pair) + if template.r1 is not None: + for rec in template.r2_secondaries: + set_mate_info_on_secondary(template.r1, rec, is_proper_pair=is_proper_pair) + for rec in template.r2_supplementals: + set_mate_info_on_supplementary(template.r1, rec) + if template.r2 is not None: + for rec in template.r1_secondaries: + set_mate_info_on_secondary(template.r2, rec, is_proper_pair=is_proper_pair) + for rec in template.r1_supplementals: + set_mate_info_on_supplementary(template.r2, rec) + return template + + class SamOrder(enum.Enum): """ Enumerations of possible sort orders for a SAM file. diff --git a/tests/fgpyo/sam/test_sam.py b/tests/fgpyo/sam/test_sam.py index fe1f844..31a2f75 100755 --- a/tests/fgpyo/sam/test_sam.py +++ b/tests/fgpyo/sam/test_sam.py @@ -19,7 +19,7 @@ from fgpyo.sam import CigarParsingException from fgpyo.sam import PairOrientation from fgpyo.sam import SamFileType -from fgpyo.sam import properly_paired +from fgpyo.sam import is_proper_pair from fgpyo.sam.builder import SamBuilder @@ -418,82 +418,82 @@ def test_pair_orientation_build_raises_if_it_cant_find_mate_cigar_tag() -> None: PairOrientation.build(r1) -def test_properly_paired_when_actually_proper() -> None: - """Test that properly_paired returns True when reads are properly paired.""" +def test_is_proper_pair_when_actually_proper() -> None: + """Test that is_proper_pair returns True when reads are properly paired.""" builder = SamBuilder() r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") - assert properly_paired(r1, r2) + assert is_proper_pair(r1, r2) r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="10M", start2=100, cigar2="10M") r1.is_reverse = True r2.is_reverse = False sam.set_mate_info(r1, r2) - assert properly_paired(r1, r2) + assert is_proper_pair(r1, r2) -def test_properly_paired_when_actually_proper_and_no_r2() -> None: - """Test that properly_paired returns True when reads are properly paired, but no R2.""" +def test_is_proper_pair_when_actually_proper_and_no_r2() -> None: + """Test that is_proper_pair returns True when reads are properly paired, but no R2.""" builder = SamBuilder() r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") - assert properly_paired(r1) + assert is_proper_pair(r1) r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="10M", start2=100, cigar2="10M") r1.is_reverse = True r2.is_reverse = False sam.set_mate_info(r1, r2) - assert properly_paired(r1) + assert is_proper_pair(r1) -def test_not_properly_paired_if_wrong_orientation() -> None: +def test_not_is_proper_pair_if_wrong_orientation() -> None: """Test that reads are not properly paired if they are not in the right orientation.""" builder = SamBuilder() r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") r1.is_forward = False r2.is_forward = True sam.set_mate_info(r1, r2) - assert not properly_paired(r1, r2) + assert not is_proper_pair(r1, r2) r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") r1.is_forward = True r2.is_forward = True sam.set_mate_info(r1, r2) - assert not properly_paired(r1, r2) + assert not is_proper_pair(r1, r2) r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") r1.is_forward = False r2.is_forward = False sam.set_mate_info(r1, r2) - assert not properly_paired(r1, r2) + assert not is_proper_pair(r1, r2) -def test_not_properly_paired_if_wrong_orientation_and_no_r2() -> None: +def test_not_is_proper_pair_if_wrong_orientation_and_no_r2() -> None: """Test reads are not properly paired if they are not in the right orientation, but no R2.""" builder = SamBuilder() r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") r1.is_forward = False r2.is_forward = True sam.set_mate_info(r1, r2) - assert not properly_paired(r1) + assert not is_proper_pair(r1) r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") r1.is_forward = True r2.is_forward = True sam.set_mate_info(r1, r2) - assert not properly_paired(r1) + assert not is_proper_pair(r1) r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M") r1.is_forward = False r2.is_forward = False sam.set_mate_info(r1, r2) - assert not properly_paired(r1) + assert not is_proper_pair(r1) -def test_not_properly_paired_if_too_far_apart() -> None: +def test_not_is_proper_pair_if_too_far_apart() -> None: """Test that reads are not properly paired if they are too far apart.""" builder = SamBuilder() r1, r2 = builder.add_pair(chrom="chr1", start1=100, start2=100 + 1000) sam.set_mate_info(r1, r2) - assert not properly_paired(r1, r2) + assert not is_proper_pair(r1, r2) def test_isize() -> None: