Skip to content

Commit

Permalink
Add methods to fix mate info on non-primaries and templates
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval committed Dec 27, 2024
1 parent 491777b commit 77fd97a
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 35 deletions.
179 changes: 163 additions & 16 deletions fgpyo/sam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import enum
import io
import sys
from array import array
from itertools import chain
from pathlib import Path
from typing import IO
Expand All @@ -177,6 +178,7 @@
from pysam import AlignedSegment
from pysam import AlignmentFile as SamFile
from pysam import AlignmentHeader as SamHeader
from pysam import qualitystring_to_array
from typing_extensions import deprecated

import fgpyo.io
Expand All @@ -194,6 +196,9 @@
NO_REF_POS: int = -1
"""The reference position to use to indicate no position in SAM/BAM."""

NO_BASE_QUAL: array = qualitystring_to_array("*")
"""A base quality string to use for a SAM record with missing base qualities."""

_IOClasses = (io.TextIOBase, io.BufferedIOBase, io.RawIOBase, io.IOBase)
"""The classes that should be treated as file-like classes"""

Expand Down Expand Up @@ -668,15 +673,45 @@ def build(
return PairOrientation.RF


def isize(r1: AlignedSegment, r2: Optional[AlignedSegment] = None) -> int:
"""Computes the insert size for a pair of records."""
if r2 is None:
r2_is_unmapped = r1.mate_is_unmapped
r2_reference_id = r1.next_reference_id
else:
r2_is_unmapped = r2.is_unmapped
r2_reference_id = r2.reference_id

if r1.is_unmapped or r2_is_unmapped or r1.reference_id != r2_reference_id:
return 0

if r2 is None:
if not r1.has_tag("MC"):
raise ValueError('Cannot determine proper pair status without R2\'s cigar ("MC")!')
r2_cigar = Cigar.from_cigarstring(str(r1.get_tag("MC")))
r2_is_reverse = r1.mate_is_reverse
r2_reference_start = r1.next_reference_start
r2_reference_end = r1.next_reference_start + r2_cigar.length_on_target()
else:
r2_is_reverse = r2.is_reverse
r2_reference_start = r2.reference_start
r2_reference_end = r2.reference_end

r1_pos = r1.reference_end if r1.is_reverse else r1.reference_start
r2_pos = r2_reference_end if r2_is_reverse else r2_reference_start
return r2_pos - r1_pos


DefaultProperlyPairedOrientations = {PairOrientation.FR}
"""The default orientations for properly paired reads."""


def properly_paired(
def is_proper_pair(
r1: AlignedSegment,
r2: Optional[AlignedSegment] = None,
max_insert_size: int = 1000,
orientations: set[PairOrientation] = DefaultProperlyPairedOrientations,
isize: Callable[[AlignedSegment, AlignedSegment], int] = isize,
) -> bool:
"""Determines if a read pair is properly paired or not.
Expand All @@ -691,6 +726,7 @@ def properly_paired(
r2: The second read in the template. If undefined, mate data set upon R1 will be used.
max_insert_size: The maximum insert size to consider a read pair "proper".
orientations: The valid set of orientations to consider a read pair "proper".
isize: A function that takes the two alignments and calculates their isize.
See:
[`htsjdk.samtools.SamPairUtil.isProperPair()`](https://github.com/samtools/htsjdk/blob/c31bc92c24bc4e9552b2a913e52286edf8f8ab96/src/main/java/htsjdk/samtools/SamPairUtil.java#L106-L125)
Expand All @@ -707,9 +743,7 @@ def properly_paired(
and r2_is_mapped
and r1.reference_id == r2_reference_id
and PairOrientation.build(r1, r2) in orientations
# TODO: consider replacing with `abs(isize(r1, r2)) <= max_insert_size`
# which can only be done if isize() is modified to allow for an optional R2.
and 0 < abs(r1.template_length) <= max_insert_size
and 0 < abs(isize(r1, r2)) <= max_insert_size
)


Expand Down Expand Up @@ -794,30 +828,39 @@ def from_read(cls, read: pysam.AlignedSegment) -> List["SupplementaryAlignment"]
return []


def isize(r1: AlignedSegment, r2: AlignedSegment) -> int:
"""Computes the insert size for a pair of records."""
if r1.is_unmapped or r2.is_unmapped or r1.reference_id != r2.reference_id:
return 0
else:
r1_pos = r1.reference_end if r1.is_reverse else r1.reference_start
r2_pos = r2.reference_end if r2.is_reverse else r2.reference_start
return r2_pos - r1_pos
def sum_of_base_qualities(rec: AlignedSegment, min_quality_score: int = 15) -> int | None:
"""Calculate the sum of base qualities score for an alignment record.
This function is useful for calculating the "mate score" as implemented in samtools fixmate.
Args:
rec: The alignment record to calculate the sum of base qualities from.
min_quality_score: The minimum base quality score to use for summation.
See:
[`calc_sum_of_base_qualities()`](https://github.com/samtools/samtools/blob/4f3a7397a1f841020074c0048c503a01a52d5fa2/bam_mate.c#L227-L238)
[`MD_MIN_QUALITY`](https://github.com/samtools/samtools/blob/4f3a7397a1f841020074c0048c503a01a52d5fa2/bam_mate.c#L42)
"""
if rec.query_qualities is None or rec.query_qualities == NO_BASE_QUAL:
return None
score: int = sum(qual for qual in rec.query_qualities if qual >= min_quality_score)
return score


def set_mate_info(
r1: AlignedSegment,
r2: AlignedSegment,
is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = properly_paired,
is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair,
) -> None:
"""Resets mate pair information between reads in a pair.
Args:
r1: Read 1 (first read in the template).
r2: Read 2 with the same query name as r1 (second read in the template).
is_proper_pair: A function that takes the two reads and determines proper pair status.
is_proper_pair: A function that takes the two alignments and determines proper pair status.
"""
if r1.query_name != r2.query_name:
raise ValueError("Cannot set mate info on reads with different query names!")
raise ValueError("Cannot set mate info on alignments with different query names!")

for src, dest in [(r1, r2), (r2, r1)]:
dest.next_reference_id = src.reference_id
Expand All @@ -828,6 +871,9 @@ def set_mate_info(
dest.set_tag("MC", src.cigarstring)
dest.set_tag("MQ", src.mapping_quality)

r1.set_tag("ms", sum_of_base_qualities(r2))
r2.set_tag("ms", sum_of_base_qualities(r1))

template_length = isize(r1, r2)
r1.template_length = template_length
r2.template_length = -template_length
Expand All @@ -837,10 +883,82 @@ def set_mate_info(
r2.is_proper_pair = proper_pair


def set_mate_info_on_secondary(
primary: AlignedSegment,
secondary: AlignedSegment,
is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair,
) -> None:
"""Set mate info on a secondary alignment to the next read ordinal's primary alignment.
Args:
primary: The primary alignment of the secondary's mate.
secondary: The secondary alignment to set mate information upon.
is_proper_pair: A function that takes the two alignments and determines proper pair status.
Raises:
ValueError: If primary and secondary are of the same read ordinal.
ValueError: If the primary is marked as either secondary or supplementary.
ValueError: If the secondary is not marked as secondary.
"""
if primary.is_read1 == secondary.is_read1 or primary.is_secondary or primary.is_supplementary:
raise ValueError("Secondary mate info must be set from a primary of the next ordinal!")
if not secondary.is_secondary:
raise ValueError("Cannot set mate info on an alignment not marked as secondary!")
if primary.query_name != secondary.query_name:
raise ValueError("Cannot set mate info on alignments with different query names!")

secondary.next_reference_id = primary.reference_id
secondary.next_reference_name = primary.reference_name
secondary.next_reference_start = primary.reference_start
secondary.mate_is_forward = primary.is_forward
secondary.mate_is_mapped = primary.is_mapped
secondary.set_tag("MC", primary.cigarstring)
secondary.set_tag("MQ", primary.mapping_quality)
secondary.set_tag("ms", sum_of_base_qualities(primary))

# NB: calculate isize and proper pair as if this secondary alignment was the primary alignment.
secondary.is_proper_pair = is_proper_pair(primary, secondary)
secondary.template_length = isize(primary, secondary)


def set_mate_info_on_supplementary(primary: AlignedSegment, supp: AlignedSegment) -> None:
"""Set mate info on a supplementary alignment to the next read ordinal's primary alignment.
Args:
primary: The primary alignment of the supplementary's mate.
supp: The supplementary alignment to set mate information upon.
Raises:
ValueError: If primary and secondary are of the same read ordinal.
ValueError: If the primary is marked as either secondary or supplementary.
ValueError: If the secondary is not marked as secondary.
"""
if primary.is_read1 == supp.is_read1 or primary.is_secondary or primary.is_supplementary:
raise ValueError("Supplementary mate info must be set from a primary of the next ordinal!")
if not supp.is_supplementary:
raise ValueError("Cannot set mate info on an alignment not marked as supplementary!")
if primary.query_name != supp.query_name:
raise ValueError("Cannot set mate info on alignments with different query names!")

supp.next_reference_id = primary.reference_id
supp.next_reference_name = primary.reference_name
supp.next_reference_start = primary.reference_start
supp.mate_is_forward = primary.is_forward
supp.mate_is_mapped = primary.is_mapped
supp.set_tag("MC", primary.cigarstring)
supp.set_tag("MQ", primary.mapping_quality)
supp.set_tag("ms", sum_of_base_qualities(primary))

# NB: for a non-secondary supplemental alignment, set the following to the same as the primary.
if not supp.is_secondary:
supp.is_proper_pair = primary.is_proper_pair
supp.template_length = -primary.template_length


def set_as_pairs(
r1: AlignedSegment,
r2: AlignedSegment,
is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = properly_paired,
is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair,
) -> None:
"""Forces the two reads to become pairs as long as they share the same query name.
Expand Down Expand Up @@ -1118,6 +1236,10 @@ def all_recs(self) -> Iterator[AlignedSegment]:
for rec in recs:
yield rec

def set_mate_info(self) -> "Template":
"""Reset all mate information on every record in a template."""
return set_mate_info_for_template(self)

def write_to(
self,
writer: SamFile,
Expand Down Expand Up @@ -1176,6 +1298,31 @@ def __next__(self) -> Template:
return Template.build(recs, validate=False)


def set_mate_info_for_template(
template: Template,
is_proper_pair: Callable[[AlignedSegment, AlignedSegment], bool] = is_proper_pair,
) -> Template:
"""Reset all mate information on every record in a template.
Args:
template: The template of alignments to reset all mate information on.
is_proper_pair: A function that takes two alignments and determines proper pair status.
"""
if template.r1 is not None and template.r2 is not None:
set_mate_info(template.r1, template.r2, is_proper_pair=is_proper_pair)
if template.r1 is not None:
for rec in template.r2_secondaries:
set_mate_info_on_secondary(template.r1, rec, is_proper_pair=is_proper_pair)
for rec in template.r2_supplementals:
set_mate_info_on_supplementary(template.r1, rec)
if template.r2 is not None:
for rec in template.r1_secondaries:
set_mate_info_on_secondary(template.r2, rec, is_proper_pair=is_proper_pair)
for rec in template.r1_supplementals:
set_mate_info_on_supplementary(template.r2, rec)
return template


class SamOrder(enum.Enum):
"""
Enumerations of possible sort orders for a SAM file.
Expand Down
38 changes: 19 additions & 19 deletions tests/fgpyo/sam/test_sam.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from fgpyo.sam import CigarParsingException
from fgpyo.sam import PairOrientation
from fgpyo.sam import SamFileType
from fgpyo.sam import properly_paired
from fgpyo.sam import is_proper_pair
from fgpyo.sam.builder import SamBuilder


Expand Down Expand Up @@ -418,82 +418,82 @@ def test_pair_orientation_build_raises_if_it_cant_find_mate_cigar_tag() -> None:
PairOrientation.build(r1)


def test_properly_paired_when_actually_proper() -> None:
"""Test that properly_paired returns True when reads are properly paired."""
def test_is_proper_pair_when_actually_proper() -> None:
"""Test that is_proper_pair returns True when reads are properly paired."""
builder = SamBuilder()
r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
assert properly_paired(r1, r2)
assert is_proper_pair(r1, r2)

r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="10M", start2=100, cigar2="10M")
r1.is_reverse = True
r2.is_reverse = False
sam.set_mate_info(r1, r2)
assert properly_paired(r1, r2)
assert is_proper_pair(r1, r2)


def test_properly_paired_when_actually_proper_and_no_r2() -> None:
"""Test that properly_paired returns True when reads are properly paired, but no R2."""
def test_is_proper_pair_when_actually_proper_and_no_r2() -> None:
"""Test that is_proper_pair returns True when reads are properly paired, but no R2."""
builder = SamBuilder()
r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
assert properly_paired(r1)
assert is_proper_pair(r1)

r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="10M", start2=100, cigar2="10M")
r1.is_reverse = True
r2.is_reverse = False
sam.set_mate_info(r1, r2)
assert properly_paired(r1)
assert is_proper_pair(r1)


def test_not_properly_paired_if_wrong_orientation() -> None:
def test_not_is_proper_pair_if_wrong_orientation() -> None:
"""Test that reads are not properly paired if they are not in the right orientation."""
builder = SamBuilder()
r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
r1.is_forward = False
r2.is_forward = True
sam.set_mate_info(r1, r2)
assert not properly_paired(r1, r2)
assert not is_proper_pair(r1, r2)

r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
r1.is_forward = True
r2.is_forward = True
sam.set_mate_info(r1, r2)
assert not properly_paired(r1, r2)
assert not is_proper_pair(r1, r2)

r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
r1.is_forward = False
r2.is_forward = False
sam.set_mate_info(r1, r2)
assert not properly_paired(r1, r2)
assert not is_proper_pair(r1, r2)


def test_not_properly_paired_if_wrong_orientation_and_no_r2() -> None:
def test_not_is_proper_pair_if_wrong_orientation_and_no_r2() -> None:
"""Test reads are not properly paired if they are not in the right orientation, but no R2."""
builder = SamBuilder()
r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
r1.is_forward = False
r2.is_forward = True
sam.set_mate_info(r1, r2)
assert not properly_paired(r1)
assert not is_proper_pair(r1)

r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
r1.is_forward = True
r2.is_forward = True
sam.set_mate_info(r1, r2)
assert not properly_paired(r1)
assert not is_proper_pair(r1)

r1, r2 = builder.add_pair(chrom="chr1", start1=100, cigar1="115M", start2=250, cigar2="40M")
r1.is_forward = False
r2.is_forward = False
sam.set_mate_info(r1, r2)
assert not properly_paired(r1)
assert not is_proper_pair(r1)


def test_not_properly_paired_if_too_far_apart() -> None:
def test_not_is_proper_pair_if_too_far_apart() -> None:
"""Test that reads are not properly paired if they are too far apart."""
builder = SamBuilder()
r1, r2 = builder.add_pair(chrom="chr1", start1=100, start2=100 + 1000)
sam.set_mate_info(r1, r2)
assert not properly_paired(r1, r2)
assert not is_proper_pair(r1, r2)


def test_isize() -> None:
Expand Down

0 comments on commit 77fd97a

Please sign in to comment.