diff --git a/prymer/__init__.py b/prymer/__init__.py index e69de29..749a57f 100644 --- a/prymer/__init__.py +++ b/prymer/__init__.py @@ -0,0 +1,7 @@ +from prymer.model import MinOptMax +from prymer.model import Oligo +from prymer.model import PrimerPair +from prymer.model import Span +from prymer.model import Strand + +__all__ = ["Strand", "Span", "Oligo", "PrimerPair", "MinOptMax"] diff --git a/prymer/api/__init__.py b/prymer/api/__init__.py index 5ce0686..74cef8f 100644 --- a/prymer/api/__init__.py +++ b/prymer/api/__init__.py @@ -1,10 +1,4 @@ -from prymer.api.minoptmax import MinOptMax -from prymer.api.oligo import Oligo from prymer.api.picking import build_primer_pairs -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import BedLikeCoords -from prymer.api.span import Span -from prymer.api.span import Strand from prymer.api.variant_lookup import FileBasedVariantLookup from prymer.api.variant_lookup import SimpleVariant from prymer.api.variant_lookup import VariantLookup @@ -14,13 +8,7 @@ from prymer.api.variant_lookup import disk_based __all__ = [ - "MinOptMax", "build_primer_pairs", - "Oligo", - "PrimerPair", - "Span", - "Strand", - "BedLikeCoords", "VariantType", "SimpleVariant", "VariantLookup", diff --git a/prymer/api/coordmath.py b/prymer/api/coordmath.py index 2a85179..028e0b3 100644 --- a/prymer/api/coordmath.py +++ b/prymer/api/coordmath.py @@ -1,51 +1,7 @@ """ -# Methods for coordinate-based math and interval manipulation. - -Contains the following public methods: - -- [`require_same_refname()`][prymer.api.coordmath.require_same_refname] -- ensures that all - provided intervals have the same reference name. -- [`get_locus_string()`][prymer.api.coordmath.get_locus_string] -- returns a formatted - string for an interval (`:-`). -- [`get_closed_end()`][prymer.api.coordmath.get_closed_end] -- gets the closed end of an - interval given its start and length. - +# Methods for coordinate-based math. """ -from pybedlite.overlap_detector import Interval - - -def require_same_refname(*intervals: Interval) -> None: - """ - Require that the input intervals all have the same refname. - - Args: - intervals: one or more intervals - - Raises: - ValueError: if the intervals do not all have the same refname. - """ - - refnames = set(i.refname for i in intervals) - if len(refnames) != 1: - raise ValueError(f"`intervals` must have exactly one refname\n Found {sorted(refnames)}") - - -def get_locus_string(record: Interval) -> str: - """ - Get the locus-string for an interval. - - The output string will have the format `:-` - No conversion on coordinates is performed, so the output is 0-based, open-ended. - - Args: - record: The interval to get the locus-string for. - - Returns: - A locus-string for the interval. - """ - return f"{record.refname}:{record.start}-{record.end}" - def get_closed_end(start: int, length: int) -> int: """ diff --git a/prymer/api/minoptmax.py b/prymer/api/minoptmax.py deleted file mode 100644 index ec0b01b..0000000 --- a/prymer/api/minoptmax.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -# MinOptMax Classes and Methods - -This module contains a class and class methods to hold a range of user-specific thresholds. - -Each of the three class attributes represents a minimal, optimal, and maximal value. -The three values can be either int or float values but all must be of the same type within one -MinOptMax object (for example, `min` cannot be a float while `max` is an int). - -Primer3 will use these values downstream to set an allowable range of specific parameters that -inform primer design. For example, Primer3 can constrain primer melting temperature -to be within a range of 55.0 - 65.0 Celsius based on an input -`MinOptMax(min=55.0, opt=60.0, max=65.0)`. - -## Examples of interacting with the `MinOptMax` class - -```python ->>> thresholds = MinOptMax(min=1.0, opt=2.0, max=4.0) ->>> print(thresholds) -(min:1.0, opt:2.0, max:4.0) ->>> list(thresholds) -[1.0, 2.0, 4.0] - -``` -""" - -from dataclasses import dataclass -from typing import Generic -from typing import Iterator -from typing import TypeVar - -Numeric = TypeVar("Numeric", int, float) - - -@dataclass(slots=True, frozen=True, init=True) -class MinOptMax(Generic[Numeric]): - """Stores a minimum, an optimal, and a maximum value (either all ints or all floats). - - `min` must be less than `max`. `opt` should be greater than the `min` - value and less than the `max` value. - - Attributes: - min: the minimum value - opt: the optimal value - max: the maximum value - - - Raises: - ValueError: if min > max - ValueError: if `min` is not less than `opt` and `opt` is not less than `max` - """ - - min: Numeric - opt: Numeric - max: Numeric - - def __post_init__(self) -> None: - dtype = type(self.min) - if not isinstance(self.max, dtype) or not isinstance(self.opt, dtype): - raise TypeError( - "Min, opt, and max must be the same type; " - f"received min: {dtype}, opt: {type(self.opt)}, max: {type(self.max)}" - ) - if self.min > self.max: - raise ValueError( - f"Min must be no greater than max; received min: {self.min}, max: {self.max}" - ) - if not (self.min <= self.opt <= self.max): - raise ValueError( - "Arguments must satisfy min <= opt <= max: " - f"received min: {self.min}, opt: {self.opt}, max: {self.max}" - ) - - def __iter__(self) -> Iterator[float]: - """Returns an iterator of min, opt, and max""" - return iter([self.min, self.opt, self.max]) - - def __str__(self) -> str: - """Returns a string representation of min, opt, and max""" - return f"(min:{self.min}, opt:{self.opt}, max:{self.max})" diff --git a/prymer/api/oligo.py b/prymer/api/oligo.py deleted file mode 100644 index 0cf3e0a..0000000 --- a/prymer/api/oligo.py +++ /dev/null @@ -1,225 +0,0 @@ -""" -# Oligo Class and Methods - -This module contains a class and class methods to represent an oligo (e.g., designed by Primer3). - -Oligos can represent single primer and/or internal probe designs. - -Class attributes include the base sequence, melting temperature, and the score of the oligo. The -mapping of the oligo to the genome is also stored. - -Optional attributes include naming information and a tail sequence to attach to the 5' end of the -oligo (if applicable). Optional attributes also include the thermodynamic results from Primer3. - -## Examples of interacting with the `Oligo` class - -```python ->>> from prymer.api.span import Span, Strand ->>> oligo_span = Span(refname="chr1", start=1, end=20) ->>> oligo = Oligo(tm=70.0, penalty=-123.0, span=oligo_span, bases="AGCT" * 5) ->>> oligo.longest_hp_length -1 ->>> oligo.length -20 ->>> oligo.name is None -True ->>> oligo = Oligo(tm=70.0, penalty=-123.0, span=oligo_span, bases="GACGG"*4) ->>> oligo.longest_hp_length -3 ->>> oligo.untailed_length -20 ->>> oligo.tailed_length -20 ->>> primer = oligo.with_tail(tail="GATTACA") ->>> primer.untailed_length -20 ->>> primer.tailed_length -27 ->>> primer = primer.with_name(name="fwd_primer") ->>> primer.name -'fwd_primer' - -``` - -Oligos may also be written to a file and subsequently read back in, as the `Oligo` class is an -`fgpyo` `Metric` class: - -```python ->>> from pathlib import Path ->>> left_span = Span(refname="chr1", start=1, end=20) ->>> left = Oligo(tm=70.0, penalty=-123.0, span=left_span, bases="G"*20) ->>> right_span = Span(refname="chr1", start=101, end=120) ->>> right = Oligo(tm=70.0, penalty=-123.0, span=right_span, bases="T"*20) ->>> path = Path("/tmp/path/to/primers.txt") ->>> Oligo.write(path, left, right) # doctest: +SKIP ->>> primers = Oligo.read(path) # doctest: +SKIP ->>> list(primers) # doctest: +SKIP -[ - Oligo(tm=70.0, penalty=-123.0, span=amplicon_span, bases="G"*20), - Oligo(tm=70.0, penalty=-123.0, span=amplicon_span, bases="T"*20) -] - -``` -""" - -from dataclasses import dataclass -from dataclasses import replace -from functools import cached_property -from typing import Any -from typing import Callable -from typing import Dict -from typing import Optional - -from fgpyo.fasta.sequence_dictionary import SequenceDictionary -from fgpyo.sequence import gc_content -from fgpyo.sequence import longest_dinucleotide_run_length -from fgpyo.sequence import longest_homopolymer_length -from fgpyo.util.metric import Metric - -from prymer.api.span import Span - - -@dataclass(frozen=True, init=True, kw_only=True, slots=True) -class Oligo(Metric["Oligo"]): - """Stores the properties of the designed oligo. - - Oligos can include both single primer and internal probe designs. Primer3 emits information - about the specific properties of a primer and/or probe, including the base sequence, - melting temperature (tm), and score. - - The penalty score of the design is emitted by Primer3 and controlled by the corresponding - design parameters. The penalty for a primer is set by the combination of - `PrimerAndAmpliconParameters` and `PrimerWeights`. - A probe penalty is set by `ProbeParameters` and `ProbeWeights`. - - The span of an `Oligo` object represents the mapping of the oligo - to the genome. `Oligo` objects can optionally have a `tail` sequence to prepend to the 5' end - of the primer or probe, as well as a `name` for downstream record keeping. - - `Oligo` objects can also store thermodynamic characteristics of primer and/or probe - design. These include the calculated melting temperatures of the most stable hairpin structure, - self-, and end- complementarity. These values can be emitted by Primer3. - - These thermodynamic characteristics are meant to quantify how likely it is the primer or probe - will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater - than the intended melting temperature for target binding indicates the primer or probe is - likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction. - - Attributes: - bases: the base sequence of the oligo (excluding any tail) - tm: the calculated melting temperature of the oligo - span: the mapping of the primer to the genome - penalty: the penalty or score for the oligo - name: an optional name to use for the primer - tm_homodimer: calculated melting temperature that represents - the tendency of an oligo to bind to itself (self-complementarity) - tm_3p_anchored_homodimer: calculated melting temperature that represents - the tendency of a primer to bind to the 3'-END of an identical primer - tm_secondary_structure: calculated melting temperature of the oligo hairpin structure - tail: an optional tail sequence to put on the 5' end of the primer - - """ - - bases: str - tm: float - span: Span - penalty: float - name: Optional[str] = None - tm_homodimer: Optional[float] = None - tm_3p_anchored_homodimer: Optional[float] = None - tm_secondary_structure: Optional[float] = None - tail: Optional[str] = None - - def __post_init__(self) -> None: - if len(self.bases) == 0: - raise ValueError("Bases must not be an empty string.") - elif self.span.length != len(self.bases): - raise ValueError( - f"Conflicting lengths: span={self.span.length}, bases={len(self.bases)}" - ) - - @property - def id(self) -> str: - """Returns the name if there is one, otherwise the span.""" - return self.name if self.name is not None else f"{self.span}" - - @property - def length(self) -> int: - """Length of un-tailed oligo.""" - return self.span.length - - @property - def untailed_length(self) -> int: - """Length of un-tailed oligo.""" - return self.span.length - - @property - def tailed_length(self) -> int: - """Length of tailed oligo.""" - return self.span.length + (0 if self.tail is None else len(self.tail)) - - @cached_property - def longest_hp_length(self) -> int: - """Length of longest homopolymer in the oligo.""" - return longest_homopolymer_length(self.bases) - - @cached_property - def percent_gc_content(self) -> float: - """The GC of the amplicon sequence in the range 0-100.""" - return round(gc_content(self.bases) * 100, 3) - - @cached_property - def longest_dinucleotide_run_length(self) -> int: - """ - Number of bases in the longest dinucleotide run in a oligo. - - A dinucleotide run is when length two repeat-unit is repeated. For example, - TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2 - (or 0 if there are fewer than 2 bases). - """ - return longest_dinucleotide_run_length(self.bases) - - def with_tail(self, tail: str) -> "Oligo": - """Returns a copy of the oligo with the tail sequence attached.""" - return replace(self, tail=tail) - - def with_name(self, name: str) -> "Oligo": - """Returns a copy of oligo object with the given name.""" - return replace(self, name=name) - - @property - def bases_with_tail(self) -> str: - """ - Returns the sequence of the oligo prepended by the tail. - - If `tail` is None, only return `bases`. - """ - if self.tail is None: - return self.bases - else: - return f"{self.tail}{self.bases}" - - def __str__(self) -> str: - """Returns a string representation of this oligo.""" - return f"{self.bases}\t{self.tm:.2f}\t{self.penalty:.2f}\t{self.span}" - - @classmethod - def _parsers(cls) -> Dict[type, Callable[[str], Any]]: - return { - Span: lambda value: Span.from_string(value), - } - - @staticmethod - def compare(this: "Oligo", that: "Oligo", seq_dict: SequenceDictionary) -> int: - """Compares this oligo to that oligo by their span, ordering references using the given - sequence dictionary. - - Args: - this: the first oligo - that: the second oligo - seq_dict: the sequence dictionary used to order references - - Returns: - -1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise - """ - return Span.compare(this=this.span, that=that.span, seq_dict=seq_dict) diff --git a/prymer/api/picking.py b/prymer/api/picking.py index 020571e..0d9eda0 100644 --- a/prymer/api/picking.py +++ b/prymer/api/picking.py @@ -26,10 +26,10 @@ from pysam import FastaFile from prymer.api.melting import calculate_long_seq_tm -from prymer.api.minoptmax import MinOptMax -from prymer.api.oligo import Oligo -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import Span +from prymer.model import MinOptMax +from prymer.model import Oligo +from prymer.model import PrimerPair +from prymer.model import Span from prymer.ntthal import NtThermoAlign from prymer.primer3 import PrimerAndAmpliconWeights diff --git a/prymer/api/primer_pair.py b/prymer/api/primer_pair.py deleted file mode 100644 index 6333af7..0000000 --- a/prymer/api/primer_pair.py +++ /dev/null @@ -1,248 +0,0 @@ -""" -# Primer Pair Class and Methods - -This module contains the [`PrimerPair`][prymer.api.primer_pair.PrimerPair] class and -class methods to represent a primer pair. The primer pair is comprised of a left and right primer -that work together to amplify an amplicon. - -Class attributes include each of the primers (represented by an -[`Oligo`][prymer.api.primer.Oligo] object), information about the expected amplicon -(positional information about how the amplicon maps to the genome, the sequence, and its melting -temperature), as well as a score of the primer pair (e.g. as emitted by Primer3). - -Optional attributes include naming information to keep track of the pair and design parameters -used to create the pairing. - -## Examples - -```python ->>> from prymer.api.span import Strand ->>> left_span = Span(refname="chr1", start=1, end=20) ->>> left_primer = Oligo(tm=70.0, penalty=-123.0, span=left_span, bases="G"*20) ->>> right_span = Span(refname="chr1", start=101, end=120, strand=Strand.NEGATIVE) ->>> right_primer = Oligo(tm=70.0, penalty=-123.0, span=right_span, bases="T"*20) ->>> primer_pair = PrimerPair( \ - left_primer=left_primer, \ - right_primer=right_primer, \ - amplicon_sequence=None, \ - amplicon_tm=70.0, \ - penalty=-123.0, \ - name="foobar" \ -) ->>> primer_pair.amplicon -Span(refname='chr1', start=1, end=120, strand=) ->>> primer_pair.span -Span(refname='chr1', start=1, end=120, strand=) - ->>> list(primer_pair) -[Oligo(bases='GGGGGGGGGGGGGGGGGGGG', tm=70.0, span=Span(refname='chr1', start=1, end=20, strand=), penalty=-123.0, name=None, tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, tail=None), Oligo(bases='TTTTTTTTTTTTTTTTTTTT', tm=70.0, span=Span(refname='chr1', start=101, end=120, strand=), penalty=-123.0, name=None, tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, tail=None)] -""" # noqa: E501 - -from dataclasses import dataclass -from dataclasses import replace -from functools import cached_property -from typing import Iterator -from typing import Optional - -from fgpyo.fasta.sequence_dictionary import SequenceDictionary -from fgpyo.sequence import gc_content - -from prymer.api.oligo import Oligo -from prymer.api.span import Span - - -@dataclass(frozen=True, init=True, kw_only=True) -class PrimerPair: - """ - Represents a pair of primers that work together to amplify an amplicon. The - coordinates of the amplicon are determined to span from the start of the left - primer through the end of the right primer. - - Attributes: - left_primer: the left primer in the pair - right_primer: the right primer in the pair - amplicon_sequence: an optional sequence of the expected amplicon - amplicon_tm: the melting temperature of the expected amplicon - penalty: the penalty value assigned by primer3 to the primer pair - name: an optional name to use for the primer pair - - Raises: - ValueError: if the chromosomes of the left and right primers are not the same - """ - - left_primer: Oligo - right_primer: Oligo - amplicon_tm: float - penalty: float - name: Optional[str] = None - amplicon_sequence: Optional[str] = None - - def __post_init__(self) -> None: - # Force generation of the amplicon span, which validates the relative positions - # of the left and right primers - amp = self.amplicon - - # If supplied, bases must be the same length as the span length - if self.bases is not None: - if len(self.bases) == 0: - raise ValueError("Bases must not be an empty string") - elif len(self.bases) != amp.length: - raise ValueError( - f"Conflicting lengths: span={amp.length}, sequence={len(self.bases)}" - ) - - @property - def id(self) -> str: - """Returns the name if there is one, otherwise the span.""" - return self.name if self.name is not None else str(self.span) - - @cached_property - def amplicon(self) -> Span: - """Returns the mapping for the amplicon""" - return self.calculate_amplicon_span(self.left_primer, self.right_primer) - - @property - def span(self) -> Span: - """Returns the mapping for the amplicon""" - return self.amplicon - - @property - def bases(self) -> Optional[str]: - """Returns the bases of the amplicon sequence""" - return self.amplicon_sequence - - @property - def length(self) -> int: - """Returns the length of the amplicon""" - return self.amplicon.length - - @cached_property - def percent_gc_content(self) -> float: - """The GC of the amplicon sequence in the range 0-100, or 0 if amplicon sequence is None.""" - if self.bases is None: - return 0.0 - else: - return round(gc_content(self.bases) * 100, 3) - - def with_tails(self, left_tail: str, right_tail: str) -> "PrimerPair": - """ - Returns a copy of the primer pair where the left and right primers are tailed. - - Args: - left_tail: The tail to add to the left primer - right_tail: The tail to add to the right primer - - Returns: - A copy of the primer pair with the tail(s) added to the primers - """ - return replace( - self, - left_primer=self.left_primer.with_tail(left_tail), - right_primer=self.right_primer.with_tail(right_tail), - ) - - def with_names(self, pp_name: str, lp_name: str, rp_name: str) -> "PrimerPair": - """ - Returns a copy of the primer pair with names assigned to the primer pair, - left primer, and right primer. - - Args: - pp_name: The optional name of the primer pair - lp_name: The optional name of the left primer - rp_name: The optional name of the right primer - - Returns: - A copy of the primer pair with the provided names assigned - """ - return replace( - self, - name=pp_name, - left_primer=self.left_primer.with_name(lp_name), - right_primer=self.right_primer.with_name(rp_name), - ) - - def __iter__(self) -> Iterator[Oligo]: - """Returns an iterator of left and right primers""" - return iter([self.left_primer, self.right_primer]) - - def __str__(self) -> str: - """Returns a string representation of the primer pair""" - sequence = self.amplicon_sequence if self.amplicon_sequence else "*" - return ( - f"{self.left_primer}\t{self.right_primer}\t{sequence}\t" - + f"{self.amplicon_tm}\t{self.penalty}" - ) - - @staticmethod - def calculate_amplicon_span(left_primer: Oligo, right_primer: Oligo) -> Span: - """ - Calculates the amplicon Span from the left and right primers. - - Args: - left_primer: the left primer for the amplicon - right_primer: the right primer for the amplicon - - Returns: - a Span starting at the first base of the left primer and ending at the last base of - the right primer - - Raises: - ValueError: If `left_primer` and `right_primer` have different reference names. - ValueError: If `left_primer` doesn't start before the right primer. - ValueError: If `right_primer` ends before `left_primer`. - """ - # Require that `left_primer` and `right_primer` both map to the same reference sequence - if left_primer.span.refname != right_primer.span.refname: - raise ValueError( - "Left and right primers are on different references. " - f"Left primer ref: {left_primer.span.refname}. " - f"Right primer ref: {right_primer.span.refname}" - ) - - # Require that the left primer starts before the right primer - if left_primer.span.start > right_primer.span.start: - raise ValueError( - "Left primer does not start before the right primer. " - f"Left primer span: {left_primer.span}, " - f"Right primer span: {right_primer.span}" - ) - - # Require that the left primer starts before the right primer - if right_primer.span.end < left_primer.span.end: - raise ValueError( - "Right primer ends before left primer ends. " - f"Left primer span: {left_primer.span}, " - f"Right primer span: {right_primer.span}" - ) - - return Span(left_primer.span.refname, left_primer.span.start, right_primer.span.end) - - @staticmethod - def compare( - this: "PrimerPair", - that: "PrimerPair", - seq_dict: SequenceDictionary, - by_amplicon: bool = True, - ) -> int: - """Compares this primer pair to that primer pair by their span, ordering references using - the given sequence dictionary. - - Args: - this: the first primer pair - that: the second primer pair - seq_dict: the sequence dictionary used to order references - by_amplicon: ture to compare using the amplicon property on a primer pair, false to - compare first using the left primer then the right primer - - Returns: - -1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise - """ - if by_amplicon: - return Span.compare(this=this.amplicon, that=that.amplicon, seq_dict=seq_dict) - else: - retval = Oligo.compare(this=this.left_primer, that=that.left_primer, seq_dict=seq_dict) - if retval == 0: - retval = Oligo.compare( - this=this.right_primer, that=that.right_primer, seq_dict=seq_dict - ) - return retval diff --git a/prymer/api/span.py b/prymer/api/span.py deleted file mode 100644 index 2931546..0000000 --- a/prymer/api/span.py +++ /dev/null @@ -1,339 +0,0 @@ -""" -# Span Classes and Methods - -The [`Span`][prymer.api.span.Span] class and associated methods represent positional -information from a span of some sequence (like a primer or an amplicon) over a reference sequence -(e.g. genome, target, amplicon). - -Class attributes include `refname`, `start`, `end`, and `strand`. - -The [`Span`][prymer.api.span.Span] class uses 1-based, closed intervals for start and end -positions. The [`get_bedlike_coords()`][prymer.api.span.Span.get_bedlike_coords] method -may be used to conver a `Span` to zero-based open-ended coordinates. - -## Examples of interacting with the `Span` class - - -```python ->>> span_string = "chr1:1-10:+" ->>> span = Span.from_string(span_string) ->>> span -Span(refname='chr1', start=1, end=10, strand=) ->>> print(span.start) -1 ->>> # get 0-based position within span at position 10 ->>> span.get_offset(position=10) -9 ->>> # create a new subspan derived from the source map ->>> new_subspan = span.get_subspan(offset=5, subspan_length=5) ->>> new_subspan -Span(refname='chr1', start=6, end=10, strand=) ->>> print(span.length) -10 - -``` - -A `Span` can be compared to another `Span` to test for overlap, return the length of overlap, and -test if one span fully contains the other: - -```python ->>> span1 = Span(refname="chr1", start=50, end=100) ->>> span2 = Span(refname="chr1", start=75, end=90) ->>> span1.overlaps(span2) -True ->>> span2.overlaps(span1) -True ->>> span1.length_of_overlap_with(span2) -16 ->>> span1.length_of_overlap_with(Span(refname="chr1", start=75, end=125)) -26 ->>> span1.overlaps(Span(refname="chr1", start=200, end=225)) -False ->>> span1.contains(Span(refname="chr1", start=75, end=125)) -False ->>> span1.contains(span2) -True - -``` - -In some cases, it's useful to have the coordinates be converted to zero-based open-ended, for -example with use with the `pysam` module when using the `fetch()` methods for `pysam.AlignmentFile`, -`pysam.FastaFile`, and `pysam.VariantFile`. - -```python ->>> span.get_bedlike_coords() -BedLikeCoords(start=0, end=10) - -``` -""" - -from dataclasses import dataclass -from dataclasses import replace -from enum import StrEnum -from enum import unique -from typing import Optional -from typing import Self -from typing import Tuple - -from fgpyo.fasta.sequence_dictionary import SequenceDictionary -from fgpyo.util.metric import Metric - -from prymer.api import coordmath - - -@unique -class Strand(StrEnum): - """Represents the strand of a span to the genome.""" - - POSITIVE = "+" - NEGATIVE = "-" - - -@dataclass(init=True, frozen=True) -class BedLikeCoords: - """Represents the coordinates (only, no refname or strand) that correspond to a - zero-based open-ended position.""" - - start: int - end: int - - def __post_init__(self) -> None: - if self.start < 0: - raise ValueError(f"Start position must be >=0; received start={self.start}") - if self.end < self.start: - raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}") - - -@dataclass(init=True, frozen=True, eq=True) -class Span(Metric["Span"]): - """Represents the span of some sequence (target, primer, amplicon, etc.) to a genome. - - Attributes: - refname: name of a reference sequence or contig or chromosome - start: the 1-based start position of the Span (inclusive) - end: the 1-based end position of the Span (inclusive) - strand: the strand of the Span (POSITIVE by default) - """ - - refname: str - start: int - end: int - strand: Strand = Strand.POSITIVE - - def __post_init__(self) -> None: - if self.refname.strip() == "": - raise ValueError("Ref name must be populated") - if self.start < 1: - raise ValueError(f"Start position must be >=1; received start={self.start}") - if self.end < self.start: - raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}") - - @classmethod - def from_string(cls, line: str) -> "Span": - """Creates a Span from an input string. - - The input string should be delimited by a colon (":"). If strand information is missing - after splitting the string, the strand is assumed to be positive. - - Args: - line: input string - - Returns: - Span: the Span object generated from the input string - - Raises: - ValueError: if there are not at least 2 colon-delimited fields in string - - Example: - >>> span_string = "chr1:1-10:+" - >>> Span.from_string(span_string) - Span(refname='chr1', start=1, end=10, strand=) - """ - parts = line.strip().split(":") - if len(parts) == 3: - refname, range_, strand_symbol = parts - try: - strand = Strand(strand_symbol[0]) - except ValueError as e: - raise ValueError( - "Did not find valid strand information; " f"received {strand_symbol}" - ) from e - elif len(parts) == 2: - refname, range_ = parts - strand = Strand.POSITIVE - else: - raise ValueError( - f"Could not parse line (expected 2 or 3 colon-delimited fields), received {line}" - ) - try: - start, end = map(int, range_.split("-")) - except ValueError as e: - raise ValueError( - f"Could not cast positional information to int; received {range_}" - ) from e - return Span(refname=refname, start=start, end=end, strand=strand) - - def __str__(self) -> str: - return f"{self.refname}:{self.start}-{self.end}:{self.strand}" - - @property - def length(self) -> int: - """Get the length of the span/interval.""" - return self.end - self.start + 1 - - def overlaps(self, other: "Span") -> bool: - """Returns True if the spans overlap one another, False otherwise.""" - return ( - (self.refname == other.refname) - and (self.start <= other.end) - and (self.end >= other.start) - ) - - def length_of_overlap_with(self, other: "Span") -> int: - """Returns the length of the region which overlaps the other span, or zero if there is - no overlap""" - overlap: int = 0 - if self.overlaps(other=other): - start = max(self.start, other.start) - end = min(self.end, other.end) - overlap = end - start + 1 - return overlap - - def get_offset(self, position: int) -> int: - """Returns a coordinate position that is relative to the current span. - - Args: - position: the (genomic) position to provide relative coordinates for (1-based) - - Returns: - A 0-based position relative to the Span - - Raises: - ValueError: if the provided position is outside the coordinates of the Span - - Example: - - ```python - >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE) - >>> print(test_span.get_offset(11)) - 1 - - ``` - """ - if position < self.start or position > self.end: - raise ValueError(f"Position not within this span: {position}") - return position - self.start - - def get_bedlike_coords(self) -> BedLikeCoords: - """Returns the zero-based, open-ended coordinates (start and end) of the - (1-based) Span, for using in bed-like formats. - - Returns: - a BedLikeCoords instance containing the 0-based and open-ended coordinates of - the Span - - Example: - - ```python - >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE) - >>> print(test_span.get_bedlike_coords()) - BedLikeCoords(start=9, end=20) - - ``` - """ - return BedLikeCoords(self.start - 1, self.end) - - def get_subspan( - self, offset: int, subspan_length: int, strand: Optional[Strand] = None - ) -> "Span": - """Returns a Span with absolute coords from an offset and a length. - The strand of the new Span will be strand if given, otherwise it will be - self.strand. - - Args: - offset: the difference between the start position of the subspan and that - of the current span (0-based) - subspan_length: the length of the new span - strand: the strand of the new span (if given) - - Returns: - a new Span with objective coords derived from input offset and length - - Raises: - ValueError: if the offset is less than 0 - ValueError: if the length of the subspan is 0 or less - ValueError: if the offset is greater than the length of the source span - - Example: - - ```python - >>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE) - >>> test = span.get_subspan(offset=5, subspan_length=10) - >>> print(test) - chr1:1005-1014:+ - >>> print(test.length) #length = end - start + 1 - 10 - - ``` - """ - - if offset < 0: - raise ValueError(f"Offset must be > 0, received start={offset}") - if subspan_length <= 0: - raise ValueError( - f"Length of a subspan must be positive, received length={subspan_length}" - ) - if offset >= self.length: - raise ValueError( - "Offset of a relative subspan must be < source span length, " - f"received offset={offset}, length={subspan_length}" - ) - if offset + subspan_length > self.length: - raise ValueError( - f"End of sub-span must be within source span: source start={self.start}, " - f"offset={offset}, sub-span length={subspan_length}" - ) - - absolute_start = self.start + offset - absolute_end = coordmath.get_closed_end(absolute_start, subspan_length) - strand = self.strand if strand is None else strand - return replace(self, start=absolute_start, end=absolute_end, strand=strand) - - def contains(self, comparison: Self) -> bool: - """Checks whether one span is wholly contained within another span. - Returns `True` if one span contains the other, otherwise returns `False`. - Does not use strand information (a span is considered to contain the other - if they are adjacent in position but on opposite strands).""" - return ( - self.refname == comparison.refname - and self.start <= comparison.start - and comparison.end <= self.end - ) - - def _to_tuple(self, seq_dict: SequenceDictionary) -> Tuple[int, int, int, int]: - """Returns a tuple of reference index, start position, end position, and strand - (0 forward, 1 reverse)""" - ref_index = seq_dict.by_name(self.refname).index - strand = 0 if self.strand == Strand.POSITIVE else 1 - return ref_index, self.start, self.end, strand - - @staticmethod - def compare(this: "Span", that: "Span", seq_dict: SequenceDictionary) -> int: - """Compares this span to that span, ordering references using the given sequence dictionary. - - Args: - this: the first span - that: the second span - seq_dict: the sequence dictionary used to order references - - Returns: - -1 if the first span is less than the second span, 0 if equal, 1 otherwise - """ - left_tuple = this._to_tuple(seq_dict=seq_dict) - right_tuple = that._to_tuple(seq_dict=seq_dict) - if left_tuple < right_tuple: - return -1 - elif left_tuple > right_tuple: - return 1 - else: - return 0 diff --git a/prymer/api/variant_lookup.py b/prymer/api/variant_lookup.py index a08a6a4..63f25f7 100644 --- a/prymer/api/variant_lookup.py +++ b/prymer/api/variant_lookup.py @@ -77,8 +77,8 @@ from pysam import VariantRecord from strenum import UppercaseStrEnum -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer.model import Span +from prymer.model import Strand _logger = logging.getLogger(__name__) diff --git a/prymer/model.py b/prymer/model.py new file mode 100644 index 0000000..a658844 --- /dev/null +++ b/prymer/model.py @@ -0,0 +1,737 @@ +from dataclasses import dataclass +from dataclasses import replace +from enum import StrEnum +from enum import unique +from functools import cached_property +from typing import Any +from typing import Callable +from typing import Dict +from typing import Generic +from typing import Iterator +from typing import Optional +from typing import Self +from typing import Tuple +from typing import TypeVar + +from fgpyo.fasta.sequence_dictionary import SequenceDictionary +from fgpyo.sequence import gc_content +from fgpyo.sequence import longest_dinucleotide_run_length +from fgpyo.sequence import longest_homopolymer_length +from fgpyo.util.metric import Metric + +Numeric = TypeVar("Numeric", int, float) + + +@dataclass(slots=True, frozen=True, init=True) +class MinOptMax(Generic[Numeric]): + """Stores a minimum, an optimal, and a maximum value (either all ints or all floats). + + `min` must be less than `max`. `opt` should be greater than the `min` + value and less than the `max` value. + + The three values can be either int or float values but all must be of the same type within one + MinOptMax object (for example, `min` cannot be a float while `max` is an int). + + Examples of interacting with the `MinOptMax` class + + ```python + >>> thresholds = MinOptMax(min=1.0, opt=2.0, max=4.0) + >>> print(thresholds) + (min:1.0, opt:2.0, max:4.0) + >>> list(thresholds) + [1.0, 2.0, 4.0] + + ``` + + Attributes: + min: the minimum value + opt: the optimal value + max: the maximum value + + Raises: + ValueError: if min > max + ValueError: if `min` is not less than `opt` and `opt` is not less than `max` + """ + + min: Numeric + opt: Numeric + max: Numeric + + def __post_init__(self) -> None: + dtype = type(self.min) + if not isinstance(self.max, dtype) or not isinstance(self.opt, dtype): + raise TypeError( + "Min, opt, and max must be the same type; " + f"received min: {dtype}, opt: {type(self.opt)}, max: {type(self.max)}" + ) + if self.min > self.max: + raise ValueError( + f"Min must be no greater than max; received min: {self.min}, max: {self.max}" + ) + if not (self.min <= self.opt <= self.max): + raise ValueError( + "Arguments must satisfy min <= opt <= max: " + f"received min: {self.min}, opt: {self.opt}, max: {self.max}" + ) + + def __iter__(self) -> Iterator[float]: + """Returns an iterator of min, opt, and max""" + return iter([self.min, self.opt, self.max]) + + def __str__(self) -> str: + """Returns a string representation of min, opt, and max""" + return f"(min:{self.min}, opt:{self.opt}, max:{self.max})" + + +@unique +class Strand(StrEnum): + """Represents the strand of a span to the genome.""" + + POSITIVE = "+" + NEGATIVE = "-" + + +@dataclass(init=True, frozen=True, eq=True) +class Span(Metric["Span"]): + """Represents the span of some sequence (target, primer, amplicon, etc.) to a genome. + + Attributes: + refname: name of a reference sequence or contig or chromosome + start: the 1-based start position of the Span (inclusive) + end: the 1-based end position of the Span (inclusive) + strand: the strand of the Span (POSITIVE by default) + + Examples: + >>> span_string = "chr1:1-10:+" + >>> span = Span.from_string(span_string) + >>> span + Span(refname='chr1', start=1, end=10, strand=) + >>> print(span.start) + 1 + >>> # get 0-based position within span at position 10 + >>> span.get_offset(position=10) + 9 + >>> # create a new subspan derived from the source map + >>> new_subspan = span.get_subspan(offset=5, subspan_length=5) + >>> new_subspan + Span(refname='chr1', start=6, end=10, strand=) + >>> print(span.length) + 10 + """ + + refname: str + start: int + end: int + strand: Strand = Strand.POSITIVE + + def __post_init__(self) -> None: + if self.refname.strip() == "": + raise ValueError("Ref name must be populated") + if self.start < 1: + raise ValueError(f"Start position must be >=1; received start={self.start}") + if self.end < self.start: + raise ValueError(f"End must be >= start; received start={self.start}, end={self.end}") + + @classmethod + def from_string(cls, line: str) -> "Span": + """Creates a Span from an input string. + + The input string should be delimited by a colon (":"). If strand information is missing + after splitting the string, the strand is assumed to be positive. + + Args: + line: input string + + Returns: + Span: the Span object generated from the input string + + Raises: + ValueError: if there are not at least 2 colon-delimited fields in string + + Example: + >>> span_string = "chr1:1-10:+" + >>> Span.from_string(span_string) + Span(refname='chr1', start=1, end=10, strand=) + """ + parts = line.strip().split(":") + if len(parts) == 3: + refname, range_, strand_symbol = parts + try: + strand = Strand(strand_symbol[0]) + except ValueError as e: + raise ValueError( + "Did not find valid strand information; " f"received {strand_symbol}" + ) from e + elif len(parts) == 2: + refname, range_ = parts + strand = Strand.POSITIVE + else: + raise ValueError( + f"Could not parse line (expected 2 or 3 colon-delimited fields), received {line}" + ) + try: + start, end = map(int, range_.split("-")) + except ValueError as e: + raise ValueError( + f"Could not cast positional information to int; received {range_}" + ) from e + return Span(refname=refname, start=start, end=end, strand=strand) + + def __str__(self) -> str: + """Returns a string version of the span, e.g. `chr1:100-200:+`.""" + return f"{self.refname}:{self.start}-{self.end}:{self.strand}" + + @property + def length(self) -> int: + """Get the length of the span/interval.""" + return self.end - self.start + 1 + + def overlaps(self, other: "Span") -> bool: + """Returns True if the spans overlap one another, False otherwise.""" + return ( + (self.refname == other.refname) + and (self.start <= other.end) + and (self.end >= other.start) + ) + + def length_of_overlap_with(self, other: "Span") -> int: + """Returns the length of the region which overlaps the other span, or zero if there is + no overlap""" + overlap: int = 0 + if self.overlaps(other=other): + start = max(self.start, other.start) + end = min(self.end, other.end) + overlap = end - start + 1 + return overlap + + def get_offset(self, position: int) -> int: + """Returns the relative offset of a 1-based genomic coordinate contained within the span. + + Args: + position: the 1-based genomic position whose offset to calculate + + Returns: + A 0-based offset relative to the Span + + Raises: + ValueError: if the provided position is outside the coordinates of the Span + + Example: + + ```python + >>> test_span = Span(refname="chr1", start=10, end=20, strand=Strand.POSITIVE) + >>> print(test_span.get_offset(11)) + 1 + + ``` + """ + if position < self.start or position > self.end: + raise ValueError(f"Position not within this span: {position}") + return position - self.start + + def get_subspan( + self, offset: int, subspan_length: int, strand: Optional[Strand] = None + ) -> "Span": + """Returns a Span with absolute coords from an offset and a length. + The strand of the new Span will be strand if given, otherwise it will be + self.strand. + + Args: + offset: the difference between the start position of the subspan and that + of the current span (0-based) + subspan_length: the length of the new span + strand: the strand of the new span (if given) + + Returns: + a new Span with objective coords derived from input offset and length + + Raises: + ValueError: if the offset is less than 0 + ValueError: if the length of the subspan is 0 or less + ValueError: if the offset is greater than the length of the source span + + Example: + + ```python + >>> span = Span(refname="chr1", start=1000,end=2000, strand=Strand.POSITIVE) + >>> test = span.get_subspan(offset=5, subspan_length=10) + >>> print(test) + chr1:1005-1014:+ + >>> print(test.length) #length = end - start + 1 + 10 + + ``` + """ + if offset < 0: + raise ValueError(f"Offset must be > 0, received start={offset}") + if subspan_length <= 0: + raise ValueError( + f"Length of a subspan must be positive, received length={subspan_length}" + ) + if offset >= self.length: + raise ValueError( + "Offset of a relative subspan must be < source span length, " + f"received offset={offset}, length={subspan_length}" + ) + if offset + subspan_length > self.length: + raise ValueError( + f"End of sub-span must be within source span: source start={self.start}, " + f"offset={offset}, sub-span length={subspan_length}" + ) + + absolute_start = self.start + offset + absolute_end = absolute_start + subspan_length - 1 + strand = self.strand if strand is None else strand + return replace(self, start=absolute_start, end=absolute_end, strand=strand) + + def contains(self, comparison: Self) -> bool: + """Checks whether one span is wholly contained within another span. + Returns `True` if one span contains the other, otherwise returns `False`. + Does not use strand information (a span is considered to contain the other + if they are adjacent in position but on opposite strands).""" + return ( + self.refname == comparison.refname + and self.start <= comparison.start + and comparison.end <= self.end + ) + + def _to_tuple(self, seq_dict: SequenceDictionary) -> Tuple[int, int, int, int]: + """Returns a tuple of reference index, start position, end position, and strand + (0 forward, 1 reverse)""" + ref_index = seq_dict.by_name(self.refname).index + strand = 0 if self.strand == Strand.POSITIVE else 1 + return ref_index, self.start, self.end, strand + + @staticmethod + def compare(this: "Span", that: "Span", seq_dict: SequenceDictionary) -> int: + """Compares this span to that span, ordering references using the given sequence dictionary. + + Args: + this: the first span + that: the second span + seq_dict: the sequence dictionary used to order references + + Returns: + -1 if the first span is less than the second span, 0 if equal, 1 otherwise + """ + left_tuple = this._to_tuple(seq_dict=seq_dict) + right_tuple = that._to_tuple(seq_dict=seq_dict) + if left_tuple < right_tuple: + return -1 + elif left_tuple > right_tuple: + return 1 + else: + return 0 + + +@dataclass(frozen=True, init=True, kw_only=True, slots=True) +class Oligo(Metric["Oligo"]): + """Stores the properties of an oligo, which can be either a primer or internal oligo. + + The penalty score of the design is emitted by Primer3 and controlled by the corresponding + design parameters. The penalty for a primer is set by the combination of + `PrimerAndAmpliconParameters` and `PrimerWeights`. A probe penalty is set by `ProbeParameters` + and `ProbeWeights`. + + The span of an `Oligo` object represents the mapping of the oligo to the genome. `Oligo` objects + can optionally have a `tail` sequence to prepend to the 5' end of the primer or probe, as well + as a `name` for downstream record keeping. + + `Oligo` objects can also store thermodynamic characteristics of primer and/or probe + design. These include the calculated melting temperatures of the most stable hairpin structure, + self-, and end- complementarity. These values can be emitted by Primer3. + + These thermodynamic characteristics are meant to quantify how likely it is the primer or probe + will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater + than the intended melting temperature for target binding indicates the primer or probe is + likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction. + + ## Examples of interacting with the `Oligo` class + + ```python + >>> from prymer import Span, Strand + >>> oligo_span = Span(refname="chr1", start=1, end=20) + >>> oligo = Oligo(tm=70.0, penalty=-123.0, span=oligo_span, bases="AGCT" * 5) + >>> oligo.longest_hp_length + 1 + >>> oligo.length + 20 + >>> oligo.name is None + True + >>> oligo = Oligo(tm=70.0, penalty=-123.0, span=oligo_span, bases="GACGG"*4) + >>> oligo.longest_hp_length + 3 + >>> oligo.untailed_length + 20 + >>> oligo.tailed_length + 20 + >>> primer = oligo.with_tail(tail="GATTACA") + >>> primer.untailed_length + 20 + >>> primer.tailed_length + 27 + >>> primer = primer.with_name(name="fwd_primer") + >>> primer.name + 'fwd_primer' + + ``` + + Oligos may also be written to a file and subsequently read back in, as the `Oligo` class is an + `fgpyo` `Metric` class: + + ```python + >>> from pathlib import Path + >>> left_span = Span(refname="chr1", start=1, end=20) + >>> left = Oligo(tm=70.0, penalty=-123.0, span=left_span, bases="G"*20) + >>> right_span = Span(refname="chr1", start=101, end=120) + >>> right = Oligo(tm=70.0, penalty=-123.0, span=right_span, bases="T"*20) + >>> path = Path("/tmp/path/to/primers.txt") + >>> Oligo.write(path, left, right) # doctest: +SKIP + >>> primers = Oligo.read(path) # doctest: +SKIP + >>> list(primers) # doctest: +SKIP + [ + Oligo(tm=70.0, penalty=-123.0, span=amplicon_span, bases="G"*20), + Oligo(tm=70.0, penalty=-123.0, span=amplicon_span, bases="T"*20) + ] + + ``` + + Attributes: + bases: the base sequence of the oligo (excluding any tail) + tm: the calculated melting temperature of the oligo + span: the mapping of the primer to the genome + penalty: the penalty or score for the oligo + name: an optional name to use for the primer + tm_homodimer: calculated melting temperature that represents + the tendency of an oligo to bind to itself (self-complementarity) + tm_3p_anchored_homodimer: calculated melting temperature that represents + the tendency of a primer to bind to the 3'-END of an identical primer + tm_secondary_structure: calculated melting temperature of the oligo hairpin structure + tail: an optional tail sequence to put on the 5' end of the primer + + """ + + bases: str + tm: float + span: Span + penalty: float + name: Optional[str] = None + tm_homodimer: Optional[float] = None + tm_3p_anchored_homodimer: Optional[float] = None + tm_secondary_structure: Optional[float] = None + tail: Optional[str] = None + + def __post_init__(self) -> None: + if len(self.bases) == 0: + raise ValueError("Bases must not be an empty string.") + elif self.span.length != len(self.bases): + raise ValueError( + f"Conflicting lengths: span={self.span.length}, bases={len(self.bases)}" + ) + + @property + def id(self) -> str: + """Returns the name if there is one, otherwise the span.""" + return self.name if self.name is not None else f"{self.span}" + + @property + def length(self) -> int: + """Length of un-tailed oligo.""" + return self.span.length + + @property + def untailed_length(self) -> int: + """Length of un-tailed oligo.""" + return self.span.length + + @property + def tailed_length(self) -> int: + """Length of tailed oligo.""" + return self.span.length + (0 if self.tail is None else len(self.tail)) + + @cached_property + def longest_hp_length(self) -> int: + """Length of longest homopolymer in the oligo.""" + return longest_homopolymer_length(self.bases) + + @cached_property + def percent_gc_content(self) -> float: + """The GC of the amplicon sequence in the range 0-100.""" + return round(gc_content(self.bases) * 100, 3) + + @cached_property + def longest_dinucleotide_run_length(self) -> int: + """ + Number of bases in the longest dinucleotide run in a oligo. + + A dinucleotide run is when length two repeat-unit is repeated. For example, + TCTC (length = 4) or ACACACACAC (length = 10). If there are no such runs, returns 2 + (or 0 if there are fewer than 2 bases). + """ + return longest_dinucleotide_run_length(self.bases) + + def with_tail(self, tail: str) -> "Oligo": + """Returns a copy of the oligo with the tail sequence attached.""" + return replace(self, tail=tail) + + def with_name(self, name: str) -> "Oligo": + """Returns a copy of oligo object with the given name.""" + return replace(self, name=name) + + @property + def bases_with_tail(self) -> str: + """ + Returns the sequence of the oligo prepended by the tail. + + If `tail` is None, only return `bases`. + """ + if self.tail is None: + return self.bases + else: + return f"{self.tail}{self.bases}" + + def __str__(self) -> str: + """Returns a string representation of this oligo.""" + return f"{self.bases}\t{self.tm:.2f}\t{self.penalty:.2f}\t{self.span}" + + @classmethod + def _parsers(cls) -> Dict[type, Callable[[str], Any]]: + return { + Span: lambda value: Span.from_string(value), + } + + @staticmethod + def compare(this: "Oligo", that: "Oligo", seq_dict: SequenceDictionary) -> int: + """Compares this oligo to that oligo by their span, ordering references using the given + sequence dictionary. + + Args: + this: the first oligo + that: the second oligo + seq_dict: the sequence dictionary used to order references + + Returns: + -1 if this oligo is less than the that oligo, 0 if equal, 1 otherwise + """ + return Span.compare(this=this.span, that=that.span, seq_dict=seq_dict) + + +@dataclass(frozen=True, init=True, kw_only=True) +class PrimerPair: + """ + Represents a pair of primers that work together to amplify an amplicon. The coordinates of the + amplicon are determined by the start of the left primer and the end of the right primer. + + Optional attributes include naming information to keep track of the pair and design parameters + used to create the pairing. + + Attributes: + left_primer: the left primer in the pair + right_primer: the right primer in the pair + amplicon_sequence: an optional sequence of the expected amplicon + amplicon_tm: the melting temperature of the expected amplicon + penalty: the penalty value assigned by primer3 to the primer pair + name: an optional name to use for the primer pair + + Raises: + ValueError: if the chromosomes of the left and right primers are not the same + + Examples: + >>> from prymer import Strand + >>> left_span = Span(refname="chr1", start=1, end=20) + >>> left_primer = Oligo(tm=70.0, penalty=-123.0, span=left_span, bases="G"*20) + >>> right_span = Span(refname="chr1", start=101, end=120, strand=Strand.NEGATIVE) + >>> right_primer = Oligo(tm=70.0, penalty=-123.0, span=right_span, bases="T"*20) + >>> primer_pair = PrimerPair( \ + left_primer=left_primer, \ + right_primer=right_primer, \ + amplicon_sequence=None, \ + amplicon_tm=70.0, \ + penalty=-123.0, \ + name="foobar" \ + ) + >>> primer_pair.amplicon + Span(refname='chr1', start=1, end=120, strand=) + >>> primer_pair.span + Span(refname='chr1', start=1, end=120, strand=) + + >>> list(primer_pair) + [Oligo(bases='GGGGGGGGGGGGGGGGGGGG', tm=70.0, span=Span(refname='chr1', start=1, end=20, strand=), penalty=-123.0, name=None, tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, tail=None), Oligo(bases='TTTTTTTTTTTTTTTTTTTT', tm=70.0, span=Span(refname='chr1', start=101, end=120, strand=), penalty=-123.0, name=None, tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, tail=None)] + """ # noqa: E501 + + left_primer: Oligo + right_primer: Oligo + amplicon_tm: float + penalty: float + name: Optional[str] = None + amplicon_sequence: Optional[str] = None + + def __post_init__(self) -> None: + # Force generation of the amplicon span, which validates the relative positions + # of the left and right primers + amp = self.amplicon + + # If supplied, bases must be the same length as the span length + if self.bases is not None: + if len(self.bases) == 0: + raise ValueError("Bases must not be an empty string") + elif len(self.bases) != amp.length: + raise ValueError( + f"Conflicting lengths: span={amp.length}, sequence={len(self.bases)}" + ) + + @property + def id(self) -> str: + """Returns the name if there is one, otherwise the span.""" + return self.name if self.name is not None else str(self.span) + + @cached_property + def amplicon(self) -> Span: + """Returns the mapping for the amplicon""" + return self.calculate_amplicon_span(self.left_primer, self.right_primer) + + @property + def span(self) -> Span: + """Returns the mapping for the amplicon""" + return self.amplicon + + @property + def bases(self) -> Optional[str]: + """Returns the bases of the amplicon sequence""" + return self.amplicon_sequence + + @property + def length(self) -> int: + """Returns the length of the amplicon""" + return self.amplicon.length + + @cached_property + def percent_gc_content(self) -> float: + """The GC of the amplicon sequence in the range 0-100, or 0 if amplicon sequence is None.""" + if self.bases is None: + return 0.0 + else: + return round(gc_content(self.bases) * 100, 3) + + def with_tails(self, left_tail: str, right_tail: str) -> "PrimerPair": + """ + Returns a copy of the primer pair where the left and right primers are tailed. + + Args: + left_tail: The tail to add to the left primer + right_tail: The tail to add to the right primer + + Returns: + A copy of the primer pair with the tail(s) added to the primers + """ + return replace( + self, + left_primer=self.left_primer.with_tail(left_tail), + right_primer=self.right_primer.with_tail(right_tail), + ) + + def with_names(self, pp_name: str, lp_name: str, rp_name: str) -> "PrimerPair": + """ + Returns a copy of the primer pair with names assigned to the primer pair, + left primer, and right primer. + + Args: + pp_name: The optional name of the primer pair + lp_name: The optional name of the left primer + rp_name: The optional name of the right primer + + Returns: + A copy of the primer pair with the provided names assigned + """ + return replace( + self, + name=pp_name, + left_primer=self.left_primer.with_name(lp_name), + right_primer=self.right_primer.with_name(rp_name), + ) + + def __iter__(self) -> Iterator[Oligo]: + """Returns an iterator of left and right primers""" + return iter([self.left_primer, self.right_primer]) + + def __str__(self) -> str: + """Returns a string representation of the primer pair""" + sequence = self.amplicon_sequence if self.amplicon_sequence else "*" + return ( + f"{self.left_primer}\t{self.right_primer}\t{sequence}\t" + + f"{self.amplicon_tm}\t{self.penalty}" + ) + + @staticmethod + def calculate_amplicon_span(left_primer: Oligo, right_primer: Oligo) -> Span: + """ + Calculates the amplicon Span from the left and right primers. + + Args: + left_primer: the left primer for the amplicon + right_primer: the right primer for the amplicon + + Returns: + a Span starting at the first base of the left primer and ending at the last base of + the right primer + + Raises: + ValueError: If `left_primer` and `right_primer` have different reference names. + ValueError: If `left_primer` doesn't start before the right primer. + ValueError: If `right_primer` ends before `left_primer`. + """ + # Require that `left_primer` and `right_primer` both map to the same reference sequence + if left_primer.span.refname != right_primer.span.refname: + raise ValueError( + "Left and right primers are on different references. " + f"Left primer ref: {left_primer.span.refname}. " + f"Right primer ref: {right_primer.span.refname}" + ) + + # Require that the left primer starts before the right primer + if left_primer.span.start > right_primer.span.start: + raise ValueError( + "Left primer does not start before the right primer. " + f"Left primer span: {left_primer.span}, " + f"Right primer span: {right_primer.span}" + ) + + # Require that the left primer starts before the right primer + if right_primer.span.end < left_primer.span.end: + raise ValueError( + "Right primer ends before left primer ends. " + f"Left primer span: {left_primer.span}, " + f"Right primer span: {right_primer.span}" + ) + + return Span(left_primer.span.refname, left_primer.span.start, right_primer.span.end) + + @staticmethod + def compare( + this: "PrimerPair", + that: "PrimerPair", + seq_dict: SequenceDictionary, + by_amplicon: bool = True, + ) -> int: + """Compares this primer pair to that primer pair by their span, ordering references using + the given sequence dictionary. + + Args: + this: the first primer pair + that: the second primer pair + seq_dict: the sequence dictionary used to order references + by_amplicon: ture to compare using the amplicon property on a primer pair, false to + compare first using the left primer then the right primer + + Returns: + -1 if this primer pair is less than the that primer pair, 0 if equal, 1 otherwise + """ + if by_amplicon: + return Span.compare(this=this.amplicon, that=that.amplicon, seq_dict=seq_dict) + else: + retval = Oligo.compare(this=this.left_primer, that=that.left_primer, seq_dict=seq_dict) + if retval == 0: + retval = Oligo.compare( + this=this.right_primer, that=that.right_primer, seq_dict=seq_dict + ) + return retval diff --git a/prymer/offtarget/offtarget_detector.py b/prymer/offtarget/offtarget_detector.py index 592eb30..44ae46f 100644 --- a/prymer/offtarget/offtarget_detector.py +++ b/prymer/offtarget/offtarget_detector.py @@ -11,7 +11,7 @@ ```python >>> from pathlib import Path ->>> from prymer.api.span import Strand +>>> from prymer import Strand >>> ref_fasta = Path("./tests/offtarget/data/miniref.fa") >>> left_primer = Oligo(bases="AAAAA", tm=37, penalty=0, span=Span("chr1", start=67, end=71)) >>> right_primer = Oligo(bases="TTTTT", tm=37, penalty=0, span=Span("chr1", start=75, end=79, strand=Strand.NEGATIVE)) @@ -89,10 +89,10 @@ from ordered_set import OrderedSet -from prymer.api.oligo import Oligo -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer.model import Oligo +from prymer.model import PrimerPair +from prymer.model import Span +from prymer.model import Strand from prymer.offtarget.bwa import BWA_EXECUTABLE_NAME from prymer.offtarget.bwa import BwaAlnInteractive from prymer.offtarget.bwa import BwaHit diff --git a/prymer/primer3/primer3.py b/prymer/primer3/primer3.py index c9d60e6..6f26693 100644 --- a/prymer/primer3/primer3.py +++ b/prymer/primer3/primer3.py @@ -42,7 +42,7 @@ ```python >>> from prymer.primer3.primer3_parameters import PrimerAndAmpliconParameters ->>> from prymer.api import MinOptMax +>>> from prymer import MinOptMax >>> target = Span(refname="chr1", start=201, end=250, strand=Strand.POSITIVE) >>> params = PrimerAndAmpliconParameters( \ amplicon_sizes=MinOptMax(min=100, max=250, opt=200), \ @@ -132,12 +132,12 @@ from fgpyo.sequence import reverse_complement from fgpyo.util.metric import Metric -from prymer.api.oligo import Oligo -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import Span -from prymer.api.span import Strand from prymer.api.variant_lookup import SimpleVariant from prymer.api.variant_lookup import VariantLookup +from prymer.model import Oligo +from prymer.model import PrimerPair +from prymer.model import Span +from prymer.model import Strand from prymer.primer3.primer3_failure_reason import Primer3FailureReason from prymer.primer3.primer3_input import Primer3Input from prymer.primer3.primer3_input_tag import Primer3InputTag diff --git a/prymer/primer3/primer3_input.py b/prymer/primer3/primer3_input.py index b0e6712..db07816 100644 --- a/prymer/primer3/primer3_input.py +++ b/prymer/primer3/primer3_input.py @@ -29,7 +29,7 @@ The following examples builds the `Primer3` tags for designing left primers: ```python ->>> from prymer.api import MinOptMax, Strand +>>> from prymer import MinOptMax, Strand, Span >>> from prymer.primer3 import DesignLeftPrimersTask >>> target = Span(refname="chr1", start=201, end=250, strand=Strand.POSITIVE) >>> design_region = Span(refname="chr1", start=150, end=300, strand=Strand.POSITIVE) @@ -99,7 +99,7 @@ from typing import Any from typing import Optional -from prymer.api.span import Span +from prymer.model import Span from prymer.primer3.primer3_input_tag import Primer3InputTag from prymer.primer3.primer3_parameters import PrimerAndAmpliconParameters from prymer.primer3.primer3_parameters import ProbeParameters diff --git a/prymer/primer3/primer3_parameters.py b/prymer/primer3/primer3_parameters.py index 543d807..4707964 100644 --- a/prymer/primer3/primer3_parameters.py +++ b/prymer/primer3/primer3_parameters.py @@ -69,7 +69,7 @@ class stores user input for internal probe design and maps it to the correct Pri from typing import Any from typing import Optional -from prymer.api.minoptmax import MinOptMax +from prymer.model import MinOptMax from prymer.primer3.primer3_input_tag import Primer3InputTag diff --git a/prymer/primer3/primer3_task.py b/prymer/primer3/primer3_task.py index 0de198d..3adc2db 100644 --- a/prymer/primer3/primer3_task.py +++ b/prymer/primer3/primer3_task.py @@ -40,7 +40,7 @@ Suppose we have the following design and target regions: ```python ->>> from prymer.api import Strand +>>> from prymer import Strand >>> design_region = Span(refname="chr1", start=1, end=500, strand=Strand.POSITIVE) >>> target = Span(refname="chr1", start=200, end=300, strand=Strand.POSITIVE) >>> design_region.contains(target) @@ -101,7 +101,7 @@ from strenum import UppercaseStrEnum -from prymer.api.span import Span +from prymer.model import Span from prymer.primer3.primer3_input_tag import Primer3InputTag Primer3TaskType: TypeAlias = Union[ diff --git a/tests/api/conftest.py b/tests/api/conftest.py deleted file mode 100644 index cedb92b..0000000 --- a/tests/api/conftest.py +++ /dev/null @@ -1,13 +0,0 @@ -import pytest -from fgpyo.fasta.sequence_dictionary import SequenceDictionary -from fgpyo.fasta.sequence_dictionary import SequenceMetadata - - -@pytest.fixture -def seq_dict() -> SequenceDictionary: - metadatas: list[SequenceMetadata] = [ - SequenceMetadata(name="chr1", length=1000000, index=0), - SequenceMetadata(name="chr2", length=1000000, index=1), - SequenceMetadata(name="chr3", length=1000000, index=2), - ] - return SequenceDictionary(metadatas) diff --git a/tests/api/test_coordmath.py b/tests/api/test_coordmath.py index 52648ae..0ff14e7 100644 --- a/tests/api/test_coordmath.py +++ b/tests/api/test_coordmath.py @@ -1,21 +1,7 @@ -import pytest -from pybedlite.overlap_detector import Interval +from prymer.api.coordmath import get_closed_end -from prymer.api.coordmath import get_locus_string -from prymer.api.coordmath import require_same_refname - -def test_get_locus_string() -> None: - intervals = [Interval("chr1", 1, 2), Interval("chr2", 3, 4)] - assert [get_locus_string(i) for i in intervals] == ["chr1:1-2", "chr2:3-4"] - - -def test_assert_same_refname_works() -> None: - intervals = [Interval("chr1", 1, 2), Interval("chr1", 3, 4)] - require_same_refname(*intervals) - - -def test_assert_same_refname_works_neg() -> None: - intervals = [Interval("chr1", 1, 2), Interval("chr2", 3, 4)] - with pytest.raises(ValueError): - require_same_refname(*intervals) +def test_get_closed_end() -> None: + assert get_closed_end(start=10, length=0) == 9 + assert get_closed_end(start=10, length=1) == 10 + assert get_closed_end(start=10, length=10) == 19 diff --git a/tests/api/test_picking.py b/tests/api/test_picking.py index 7d2548a..69747ec 100644 --- a/tests/api/test_picking.py +++ b/tests/api/test_picking.py @@ -7,10 +7,10 @@ from fgpyo.fasta.builder import FastaBuilder from fgpyo.sequence import reverse_complement -from prymer.api import MinOptMax -from prymer.api import Oligo -from prymer.api import PrimerPair -from prymer.api import Span +from prymer import MinOptMax +from prymer import Oligo +from prymer import PrimerPair +from prymer import Span from prymer.api import picking from prymer.api.melting import calculate_long_seq_tm from prymer.primer3 import PrimerAndAmpliconWeights diff --git a/tests/api/test_variant_lookup.py b/tests/api/test_variant_lookup.py index 164ee16..f6d1367 100644 --- a/tests/api/test_variant_lookup.py +++ b/tests/api/test_variant_lookup.py @@ -12,8 +12,8 @@ from fgpyo.vcf.builder import VcfFieldType from pysam import VariantRecord -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Span +from prymer import Strand from prymer.api.variant_lookup import FileBasedVariantLookup from prymer.api.variant_lookup import SimpleVariant from prymer.api.variant_lookup import VariantOverlapDetector diff --git a/tests/conftest.py b/tests/conftest.py index fe11698..d2a582a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,9 +5,11 @@ from pathlib import Path import pytest +from fgpyo.fasta.sequence_dictionary import SequenceDictionary +from fgpyo.fasta.sequence_dictionary import SequenceMetadata -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Span +from prymer import Strand @pytest.fixture @@ -24,3 +26,13 @@ def data_dir() -> Path: @pytest.fixture(scope="session") def genome_ref(data_dir: Path) -> Path: return data_dir / "miniref.fa" + + +@pytest.fixture +def seq_dict() -> SequenceDictionary: + metadatas: list[SequenceMetadata] = [ + SequenceMetadata(name="chr1", length=1000000, index=0), + SequenceMetadata(name="chr2", length=1000000, index=1), + SequenceMetadata(name="chr3", length=1000000, index=2), + ] + return SequenceDictionary(metadatas) diff --git a/tests/api/test_minoptmax.py b/tests/model/test_minoptmax.py similarity index 97% rename from tests/api/test_minoptmax.py rename to tests/model/test_minoptmax.py index f9b3e11..e5d526a 100644 --- a/tests/api/test_minoptmax.py +++ b/tests/model/test_minoptmax.py @@ -2,7 +2,7 @@ import pytest -from prymer.api.minoptmax import MinOptMax +from prymer import MinOptMax Numeric = TypeVar("Numeric", int, float) diff --git a/tests/api/test_oligo.py b/tests/model/test_oligo.py similarity index 98% rename from tests/api/test_oligo.py rename to tests/model/test_oligo.py index f3990b7..5de090d 100644 --- a/tests/api/test_oligo.py +++ b/tests/model/test_oligo.py @@ -6,9 +6,9 @@ import pytest from fgpyo.fasta.sequence_dictionary import SequenceDictionary -from prymer.api.oligo import Oligo -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Oligo +from prymer import Span +from prymer import Strand @pytest.mark.parametrize( @@ -50,9 +50,7 @@ def test_invalid_primer_construction_raises() -> None: span=Span(refname="chr1", start=1, end=4, strand=Strand.POSITIVE), ) - with pytest.raises( - ValueError, match="Conflicting lengths" - ): + with pytest.raises(ValueError, match="Conflicting lengths"): Oligo( bases="ACGT", tm=1.0, diff --git a/tests/api/test_primer_pair.py b/tests/model/test_primer_pair.py similarity index 98% rename from tests/api/test_primer_pair.py rename to tests/model/test_primer_pair.py index 57d6f4a..c3d46a7 100644 --- a/tests/api/test_primer_pair.py +++ b/tests/model/test_primer_pair.py @@ -6,10 +6,10 @@ from fgpyo.fasta.sequence_dictionary import SequenceDictionary from fgpyo.sequence import reverse_complement -from prymer.api.oligo import Oligo -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Oligo +from prymer import PrimerPair +from prymer import Span +from prymer import Strand @dataclass(init=True, frozen=True) diff --git a/tests/api/test_span.py b/tests/model/test_span.py similarity index 88% rename from tests/api/test_span.py rename to tests/model/test_span.py index 963cf1d..b744fe9 100644 --- a/tests/api/test_span.py +++ b/tests/model/test_span.py @@ -1,9 +1,8 @@ import pytest from fgpyo.fasta.sequence_dictionary import SequenceDictionary -from prymer.api.span import BedLikeCoords -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Span +from prymer import Strand @pytest.mark.parametrize( @@ -20,32 +19,6 @@ def test_invalid_span(refname: str, invalid_start: int, invalid_end: int, strand Span(refname, invalid_start, invalid_end, Strand(strand)) -@pytest.mark.parametrize( - "invalid_start, invalid_end", - [ - (-1, 10), # start < 0 - (30, 20), # start > end - (21, 20), # start > end - ], -) -def test_invalid_coords(invalid_start: int, invalid_end: int) -> None: - """Test that invalid spans raise an error""" - with pytest.raises(ValueError): - BedLikeCoords(invalid_start, invalid_end) - - -@pytest.mark.parametrize( - "valid_start, valid_end", - [ - (0, 10), # start == 0 - (20, 20), # start == end - ], -) -def test_valid_coords(valid_start: int, valid_end: int) -> None: - """Test that valid spans are OK""" - BedLikeCoords(valid_start, valid_end) - - @pytest.mark.parametrize( "base_idx, expected_rel_coords", [ @@ -138,9 +111,6 @@ def test_span_from_valid_string(line: str, expected_span: Span, expected_length: span = Span.from_string(line) assert span == expected_span assert span.length == expected_length - bedlike_coords = span.get_bedlike_coords() - assert bedlike_coords.end - bedlike_coords.start == expected_length - assert bedlike_coords.end == span.end @pytest.mark.parametrize( diff --git a/tests/offtarget/test_offtarget.py b/tests/offtarget/test_offtarget.py index 7cd3ae9..99cc68a 100644 --- a/tests/offtarget/test_offtarget.py +++ b/tests/offtarget/test_offtarget.py @@ -5,10 +5,10 @@ import pytest from fgpyo.sam import Cigar -from prymer.api.oligo import Oligo -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Oligo +from prymer import PrimerPair +from prymer import Span +from prymer import Strand from prymer.offtarget.bwa import BWA_EXECUTABLE_NAME from prymer.offtarget.bwa import BwaHit from prymer.offtarget.bwa import BwaResult diff --git a/tests/primer3/test_primer3.py b/tests/primer3/test_primer3.py index ca2b64b..8c0ce6f 100644 --- a/tests/primer3/test_primer3.py +++ b/tests/primer3/test_primer3.py @@ -6,11 +6,11 @@ import pytest from fgpyo.sequence import reverse_complement -from prymer.api.minoptmax import MinOptMax -from prymer.api.oligo import Oligo -from prymer.api.primer_pair import PrimerPair -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import MinOptMax +from prymer import Oligo +from prymer import PrimerPair +from prymer import Span +from prymer import Strand from prymer.api.variant_lookup import cached from prymer.primer3.primer3 import Primer3 from prymer.primer3.primer3 import Primer3Failure diff --git a/tests/primer3/test_primer3_input.py b/tests/primer3/test_primer3_input.py index 84b3bbf..e855e29 100644 --- a/tests/primer3/test_primer3_input.py +++ b/tests/primer3/test_primer3_input.py @@ -1,8 +1,8 @@ import pytest -from prymer.api import Span -from prymer.api import Strand -from prymer.api.minoptmax import MinOptMax +from prymer import MinOptMax +from prymer import Span +from prymer import Strand from prymer.primer3 import DesignLeftPrimersTask from prymer.primer3 import DesignPrimerPairsTask from prymer.primer3 import DesignRightPrimersTask diff --git a/tests/primer3/test_primer3_parameters.py b/tests/primer3/test_primer3_parameters.py index 62d4ece..47f397f 100644 --- a/tests/primer3/test_primer3_parameters.py +++ b/tests/primer3/test_primer3_parameters.py @@ -2,7 +2,7 @@ import pytest -from prymer.api.minoptmax import MinOptMax +from prymer import MinOptMax from prymer.primer3.primer3_input_tag import Primer3InputTag from prymer.primer3.primer3_parameters import PrimerAndAmpliconParameters from prymer.primer3.primer3_parameters import ProbeParameters diff --git a/tests/primer3/test_primer3_task.py b/tests/primer3/test_primer3_task.py index e05dd9d..eaf80b7 100644 --- a/tests/primer3/test_primer3_task.py +++ b/tests/primer3/test_primer3_task.py @@ -1,7 +1,7 @@ import pytest -from prymer.api.span import Span -from prymer.api.span import Strand +from prymer import Span +from prymer import Strand from prymer.primer3.primer3_input_tag import Primer3InputTag from prymer.primer3.primer3_task import DesignLeftPrimersTask from prymer.primer3.primer3_task import DesignPrimerPairsTask