Skip to content

Commit

Permalink
feat: add primer3 logic
Browse files Browse the repository at this point in the history
  • Loading branch information
emmcauley committed Sep 26, 2024
1 parent f3887f4 commit 6b57128
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 89 deletions.
167 changes: 96 additions & 71 deletions prymer/primer3/primer3.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
```
The `design_primers()` method on `Primer3` is used to design the primers given a
The `design_oligos()` method on `Primer3` is used to design the primers given a
[`Primer3Input`][prymer.primer3.primer3_input.Primer3Input]. The latter includes all the
parameters and target region.
Expand All @@ -63,7 +63,7 @@
primer_and_amplicon_params=params, \
task=DesignLeftPrimersTask(), \
)
>>> left_result = designer.design_primers(design_input=design_input)
>>> left_result = designer.design_oligos(design_input=design_input)
```
Expand Down Expand Up @@ -152,6 +152,7 @@
from prymer.primer3.primer3_task import DesignLeftPrimersTask
from prymer.primer3.primer3_task import DesignPrimerPairsTask
from prymer.primer3.primer3_task import DesignRightPrimersTask
from prymer.primer3.primer3_task import PickHybProbeOnly
from prymer.util.executable_runner import ExecutableRunner


Expand All @@ -170,22 +171,22 @@ class Primer3Failure(Metric["Primer3Failure"]):
count: int


PrimerLikeType = TypeVar("PrimerLikeType", bound=OligoLike)
"""Type variable for a `Primer3Result`, which must implement `PrimerLike`"""
OligoLikeType = TypeVar("OligoLikeType", bound=OligoLike)
"""Type variable for a `Primer3Result`, which must implement `OligoLike`"""


@dataclass(init=True, slots=True, frozen=True)
class Primer3Result(Generic[PrimerLikeType]):
class Primer3Result(Generic[OligoLikeType]):
"""Encapsulates Primer3 design results (both valid designs and failures).
Attributes:
filtered_designs: filtered and ordered (by objective function score) list of primer
pairs or single primers that were returned by Primer3
pairs or single oligos that were returned by Primer3
failures: ordered list of Primer3Failures detailing design failure reasons and corresponding
count
"""

filtered_designs: list[PrimerLikeType]
filtered_designs: list[OligoLikeType]
failures: list[Primer3Failure]

def as_primer_result(self) -> "Primer3Result[Oligo]":
Expand All @@ -197,7 +198,7 @@ def as_primer_result(self) -> "Primer3Result[Oligo]":
def as_primer_pair_result(self) -> "Primer3Result[PrimerPair]":
"""Returns this Primer3Result assuming the design results are of type `PrimerPair`."""
if len(self.filtered_designs) > 0 and not isinstance(self.filtered_designs[0], PrimerPair):
raise ValueError("Cannot call `as_primer_pair_result` on `Primer` results")
raise ValueError("Cannot call `as_primer_pair_result` on `Oligo` results")
return typing.cast(Primer3Result[PrimerPair], self)

def primers(self) -> list[Oligo]:
Expand All @@ -212,7 +213,7 @@ def primer_pairs(self) -> list[PrimerPair]:
try:
return self.as_primer_pair_result().filtered_designs
except ValueError as ex:
raise ValueError("Cannot call `primer_pairs` on `Primer` results") from ex
raise ValueError("Cannot call `primer_pairs` on `Oligo` results") from ex


class Primer3(ExecutableRunner):
Expand Down Expand Up @@ -308,13 +309,6 @@ def get_design_sequences(self, region: Span) -> tuple[str, str]:
hard_masked = "".join(soft_masked_list)
return soft_masked, hard_masked

@staticmethod
def _is_valid_primer(design_input: Primer3Input, primer_design: Oligo) -> bool:
return (
primer_design.longest_dinucleotide_run_length()
<= design_input.primer_and_amplicon_params.primer_max_dinuc_bases
)

@staticmethod
def _screen_pair_results(
design_input: Primer3Input, designed_primer_pairs: list[PrimerPair]
Expand Down Expand Up @@ -349,8 +343,8 @@ def _screen_pair_results(
valid_primer_pair_designs.append(primer_pair)
return valid_primer_pair_designs, dinuc_pair_failures

def design_primers(self, design_input: Primer3Input) -> Primer3Result: # noqa: C901
"""Designs primers or primer pairs given a target region.
def design_oligos(self, design_input: Primer3Input) -> Primer3Result: # noqa: C901
"""Designs primers, primer pairs, and/or internal probes given a target region.
Args:
design_input: encapsulates the target region, design task, specifications, and scoring
Expand All @@ -371,12 +365,18 @@ def design_primers(self, design_input: Primer3Input) -> Primer3Result: # noqa:
f"Error, trying to use a subprocess that has already been "
f"terminated, return code {self._subprocess.returncode}"
)

design_region: Span = self._create_design_region(
target_region=design_input.target,
max_amplicon_length=design_input.primer_and_amplicon_params.max_amplicon_length,
min_primer_length=design_input.primer_and_amplicon_params.min_primer_length,
)
design_region: Span
match design_input.task:
case PickHybProbeOnly():
design_region = design_input.target
case DesignRightPrimersTask() | DesignLeftPrimersTask() | DesignPrimerPairsTask():
design_region = self._create_design_region(
target_region=design_input.target,
max_amplicon_length=design_input.primer_and_amplicon_params.max_amplicon_length,
min_primer_length=design_input.primer_and_amplicon_params.min_primer_length,
)
case _ as unreachable:
assert_never(unreachable)

soft_masked, hard_masked = self.get_design_sequences(design_region)
global_primer3_params = {
Expand All @@ -389,7 +389,6 @@ def design_primers(self, design_input: Primer3Input) -> Primer3Result: # noqa:
**global_primer3_params,
**design_input.to_input_tags(design_region=design_region),
}

# Submit inputs to primer3
for tag, value in assembled_primer3_tags.items():
self._subprocess.stdin.write(f"{tag}={value}")
Expand Down Expand Up @@ -454,15 +453,16 @@ def primer3_error(message: str) -> None:
unfiltered_designs=all_pair_results,
)

case DesignLeftPrimersTask() | DesignRightPrimersTask(): # Single primer design
all_single_results = Primer3._build_primers(
case DesignLeftPrimersTask() | DesignRightPrimersTask() | PickHybProbeOnly():
# Single primer or probe design
all_single_results: list[Oligo] = Primer3._build_oligos(
design_input=design_input,
design_results=primer3_results,
design_region=design_region,
design_task=design_input.task,
unmasked_design_seq=soft_masked,
)
return Primer3._assemble_primers(
return Primer3._assemble_single_designs(
design_input=design_input,
design_results=primer3_results,
unfiltered_designs=all_single_results,
Expand All @@ -472,48 +472,39 @@ def primer3_error(message: str) -> None:
assert_never(unreachable)

@staticmethod
def _build_primers(
def _build_oligos(
design_input: Primer3Input,
design_results: dict[str, str],
design_region: Span,
design_task: Union[DesignLeftPrimersTask, DesignRightPrimersTask],
design_task: Union[DesignLeftPrimersTask, DesignRightPrimersTask, PickHybProbeOnly],
unmasked_design_seq: str,
) -> list[Oligo]:
"""
Builds a list of left or right primers from Primer3 output.
Builds a list of single oligos from Primer3 output.
Args:
design_input: the target region, design task, specifications, and scoring penalties
design_results: design results emitted by Primer3 and captured by design_primers()
design_results: design results emitted by Primer3 and captured by design_oligos()
design_region: the padded design region
design_task: the design task
unmasked_design_seq: the reference sequence corresponding to the target region
Returns:
primers: a list of unsorted and unfiltered primer designs emitted by Primer3
oligos: a list of unsorted and unfiltered primer designs emitted by Primer3
Raises:
ValueError: if Primer3 does not return primer designs
"""
count_tag = design_input.task.count_tag

maybe_count: Optional[str] = design_results.get(count_tag)
if maybe_count is None: # no count tag was found
if "PRIMER_ERROR" in design_results:
primer_error = design_results["PRIMER_ERROR"]
raise ValueError(f"Primer3 returned an error: {primer_error}")
else:
raise ValueError(f"Primer3 did not return the count tag: {count_tag}")
count: int = int(maybe_count)

primers = []
count: int = _check_design_results(design_input, design_results)

primers: list[Oligo] = []
for idx in range(count):
key = f"PRIMER_{design_task.task_type}_{idx}"
str_position, str_length = design_results[key].split(",", maxsplit=1)
position, length = int(str_position), int(str_length) # position is 1-based

match design_task:
case DesignLeftPrimersTask():
case DesignLeftPrimersTask() | PickHybProbeOnly():
span = design_region.get_subspan(
offset=position - 1, subspan_length=length, strand=Strand.POSITIVE
)
Expand All @@ -539,46 +530,39 @@ def _build_primers(
tm=float(design_results[f"PRIMER_{design_task.task_type}_{idx}_TM"]),
penalty=float(design_results[f"PRIMER_{design_task.task_type}_{idx}_PENALTY"]),
span=span,
self_any_th=float(design_results[f"{key}_SELF_ANY_TH"]),
self_end_th=float(design_results[f"{key}_SELF_END_TH"]),
hairpin_th=float(design_results[f"{key}_HAIRPIN_TH"]),
)
)
return primers

@staticmethod
def _assemble_primers(
design_input: Primer3Input, design_results: dict[str, str], unfiltered_designs: list[Oligo]
def _assemble_single_designs(
design_input: Primer3Input,
design_results: dict[str, str],
unfiltered_designs: list[Oligo],
) -> Primer3Result:
"""Helper function to organize primer designs into valid and failed designs.
Wraps `Primer3._is_valid_primer()` and `Primer3._build_failures()` to filter out designs
with dinucleotide runs that are too long and extract additional failure reasons emitted by
Primer3.
Args:
design_input: encapsulates the target region, design task, specifications,
and scoring penalties
unfiltered_designs: list of primers emitted from Primer3
design_results: key-value pairs of results reported by Primer3
"""Screens oligo designs (primers or probes) emitted by Primer3 for acceptable dinucleotide
runs and extracts failure reasons for failed designs."""

Returns:
primer_designs: a `Primer3Result` that encapsulates valid and failed designs
"""
valid_primer_designs = [
valid_oligo_designs = [
design
for design in unfiltered_designs
if Primer3._is_valid_primer(primer_design=design, design_input=design_input)
if _has_acceptable_dinuc_run(oligo_design=design, design_input=design_input)
]
dinuc_failures = [
design
for design in unfiltered_designs
if not Primer3._is_valid_primer(primer_design=design, design_input=design_input)
if not _has_acceptable_dinuc_run(oligo_design=design, design_input=design_input)
]

failure_strings = [design_results[f"PRIMER_{design_input.task.task_type}_EXPLAIN"]]
failures = Primer3._build_failures(dinuc_failures, failure_strings)
primer_designs: Primer3Result = Primer3Result(
filtered_designs=valid_primer_designs, failures=failures
design_candidates: Primer3Result = Primer3Result(
filtered_designs=valid_oligo_designs, failures=failures
)
return primer_designs
return design_candidates

@staticmethod
def _build_primer_pairs(
Expand All @@ -592,7 +576,7 @@ def _build_primer_pairs(
Args:
design_input: the target region, design task, specifications, and scoring penalties
design_results: design results emitted by Primer3 and captured by design_primers()
design_results: design results emitted by Primer3 and captured by design_oligos()
design_region: the padded design region
unmasked_design_seq: the reference sequence corresponding to the target region
Expand All @@ -602,15 +586,15 @@ def _build_primer_pairs(
Raises:
ValueError: if Primer3 does not return the same number of left and right designs
"""
left_primers = Primer3._build_primers(
left_primers = Primer3._build_oligos(
design_input=design_input,
design_results=design_results,
design_region=design_region,
design_task=DesignLeftPrimersTask(),
unmasked_design_seq=unmasked_design_seq,
)

right_primers = Primer3._build_primers(
right_primers = Primer3._build_oligos(
design_input=design_input,
design_results=design_results,
design_region=design_region,
Expand Down Expand Up @@ -760,3 +744,44 @@ def _create_design_region(
)

return design_region


def _check_design_results(design_input: Primer3Input, design_results: dict[str, str]) -> int:
"""Checks for any additional Primer3 errors and reports out the count of emitted designs."""
count_tag = design_input.task.count_tag
maybe_count: Optional[str] = design_results.get(count_tag)
if maybe_count is None: # no count tag was found
if "PRIMER_ERROR" in design_results:
primer_error = design_results["PRIMER_ERROR"]
raise ValueError(f"Primer3 returned an error: {primer_error}")
else:
raise ValueError(f"Primer3 did not return the count tag: {count_tag}")
count: int = int(maybe_count)

return count


def _has_acceptable_dinuc_run(design_input: Primer3Input, oligo_design: Oligo) -> bool:
"""
True if the design's longest dinucleotide run is no more than the stipulated maximum.
For primer designs, the maximum is recorded in the input's
`PrimerAndAmpliconParameters.primer_max_dinuc_bases`.
For probe designs, the maximum is recorded in the input's
`ProbeParameters.probe_max_dinuc_bases`.
Args:
design_input: the Primer3Input object that wraps task-specific and design-specific params
oligo_design: the design candidate
Returns:
"""
max_dinuc_bases: int
if design_input.task.requires_primer_amplicon_params:
max_dinuc_bases = design_input.primer_and_amplicon_params.primer_max_dinuc_bases
elif design_input.task.requires_probe_params:
max_dinuc_bases = design_input.probe_params.probe_max_dinuc_bases

return oligo_design.longest_dinucleotide_run_length() <= max_dinuc_bases
Loading

0 comments on commit 6b57128

Please sign in to comment.