diff --git a/.gitignore b/.gitignore index 7e54d62..f2813da 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,6 @@ dmypy.json .pyre/ # VSCode configurations -.vscode/ \ No newline at end of file +.vscode/ + +.DS_Store diff --git a/.gitmodules b/.gitmodules index 0a7daea..179f373 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,4 @@ [submodule "cgranges"] path = cgranges url = https://github.com/lh3/cgranges + ignore = dirty diff --git a/pybedlite/bed_record.py b/pybedlite/bed_record.py index 517852f..cc1dd26 100644 --- a/pybedlite/bed_record.py +++ b/pybedlite/bed_record.py @@ -27,7 +27,6 @@ if TYPE_CHECKING: from pybedlite.overlap_detector import Interval - """Maximum BED fields that can be present in a well formed BED file written to specification""" MAX_BED_FIELDS: int = 12 @@ -183,6 +182,19 @@ def bed_fields(self) -> List[str]: ), ] + @property + def refname(self) -> str: + """The reference name of the interval described by the record.""" + return self.chrom + + @property + def negative(self) -> bool: + """ + True if the interval is negatively stranded, False if the interval is unstranded or + positively stranded. + """ + return self.strand is BedStrand.Negative + def as_bed_line(self, number_of_output_fields: Optional[int] = None) -> str: """ Converts a BED record to a tab delimited BED line equivalent, including up to the number of diff --git a/pybedlite/overlap_detector.py b/pybedlite/overlap_detector.py index ab08225..66a50f7 100644 --- a/pybedlite/overlap_detector.py +++ b/pybedlite/overlap_detector.py @@ -2,6 +2,25 @@ Utility Classes for Querying Overlaps with Genomic Regions ---------------------------------------------------------- +The :class:`~pybedlite.overlap_detector.OverlapDetector` class detects and returns overlaps between +a set of regions and another region on a reference. The overlap detector may contain a collection +of interval-like Python objects that have the following properties: + + * `refname` (str): The reference sequence name + * `start` (int): A 0-based start position + * `end` (int): A 0-based half-open end position + +This contract is described by the :class:`~pybedlite.overlap_detector.Span` protocol. + +Interval-like Python objects may also contain strandedness information which will be used +for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using +the following property if it is present: + + * `negative (bool)`: True if the interval is negatively stranded, False if the interval is + unstranded or positively stranded + +This contract is described by the :class:`~pybedlite.overlap_detector.StrandedSpan` protocol. + Examples of Detecting Overlaps ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -32,21 +51,26 @@ The module contains the following public classes: - - :class:`~pybedlite.overlap_detector.Interval` -- Represents a region mapping to the genome - that is 0-based and open-ended + - :class:`~pybedlite.overlap_detector.Interval` -- Represents a region mapping to a reference + sequence that is 0-based and open-ended - :class:`~pybedlite.overlap_detector.OverlapDetector` -- Detects and returns overlaps between - a set of genomic regions and another genomic region + a set of regions and another region on a reference """ import itertools from pathlib import Path from typing import Dict +from typing import Generic +from typing import Hashable from typing import Iterable from typing import Iterator from typing import List from typing import Optional +from typing import Protocol from typing import Set from typing import Type +from typing import TypeVar +from typing import Union import attr @@ -56,15 +80,48 @@ from pybedlite.bed_source import BedSource +class Span(Hashable, Protocol): + """ + A structural type for a span on a reference sequence with zero-based open-ended coordinates. + """ + + @property + def refname(self) -> str: + """A reference sequence name.""" + + @property + def start(self) -> int: + """A 0-based start position.""" + + @property + def end(self) -> int: + """A 0-based open-ended end position.""" + + +class StrandedSpan(Span, Protocol): + """ + A structural type for a stranded span on a reference sequence with zero-based open-ended + coordinates. + """ + + @property + def negative(self) -> bool: + """ + True if the interval is negatively stranded, False if the interval is unstranded or + positively stranded. + """ + + @attr.s(frozen=True, auto_attribs=True) class Interval: - """A region mapping to the genome that is 0-based and open-ended + """A region mapping to a reference sequence that is 0-based and open-ended Attributes: refname (str): the refname (or chromosome) start (int): the 0-based start position - end (int): the 0-based end position (exclusive) - negative (bool): true if the interval is on the negative strand, false otherwise + end (int): the 0-based half-open end position + negative (bool): true if the interval is negatively stranded, False if the interval is + unstranded or positively stranded name (Optional[str]): an optional name assigned to the interval """ @@ -135,7 +192,8 @@ def from_ucsc( Construct an `Interval` from a UCSC "position"-formatted string. The "Position" format (referring to the "1-start, fully-closed" system as coordinates are - "positioned" in the browser) + "positioned" in the browser): + * Written as: chr1:127140001-127140001 * The location may optionally be followed by a parenthetically enclosed strand, e.g. chr1:127140001-127140001(+). @@ -144,6 +202,7 @@ def from_ucsc( end coordinates. * When in this format, the assumption is that the coordinate is **1-start, fully-closed.** + https://genome-blog.gi.ucsc.edu/blog/2016/12/12/the-ucsc-genome-browser-coordinate-counting-systems/ # noqa: E501 Note that when the string does not have a specified strand, the `Interval`'s negative @@ -178,55 +237,73 @@ def from_ucsc( ) from exception -class OverlapDetector(Iterable[Interval]): - """Detects and returns overlaps between a set of genomic regions and another genomic region. +SpanType = TypeVar("SpanType", bound=Union[Span, StrandedSpan]) +""" +A generic reference sequence span. This type variable is used for describing the generic type +contained within the :class:`~pybedlite.overlap_detector.OverlapDetector`. +""" + + +class OverlapDetector(Generic[SpanType], Iterable[SpanType]): + """Detects and returns overlaps between a set of regions and another region on a reference. + + The overlap detector may contain a collection of interval-like Python objects that have the + following properties: + + * `refname` (str): The reference sequence name + * `start` (int): A 0-based start position + * `end` (int): A 0-based half-open end position - Since :class:`~pybedlite.overlap_detector.Interval` objects are used both to populate the - overlap detector and to query it, the coordinate system in use is also 0-based open-ended. + Interval-like Python objects may also contain strandedness information which will be used + for sorting them in :func:`~pybedlite.overlap_detector.OverlapDetector.get_overlaps` using + the following property if it is present: + + * `negative (bool)`: True if the interval is negatively stranded, False if the interval is + unstranded or positively stranded The same interval may be added multiple times, but only a single instance will be returned - when querying for overlaps. Intervals with the same coordinates but different names are - treated as different intervals. + when querying for overlaps. This detector is the most efficient when all intervals are added ahead of time. """ - def __init__(self) -> None: + def __init__(self, intervals: Optional[Iterable[SpanType]] = None) -> None: # A mapping from the contig/chromosome name to the associated interval tree self._refname_to_tree: Dict[str, cr.cgranges] = {} # type: ignore self._refname_to_indexed: Dict[str, bool] = {} - self._refname_to_intervals: Dict[str, List[Interval]] = {} + self._refname_to_intervals: Dict[str, List[SpanType]] = {} + if intervals is not None: + self.add_all(intervals) - def __iter__(self) -> Iterator[Interval]: + def __iter__(self) -> Iterator[SpanType]: """Iterates over the intervals in the overlap detector.""" return itertools.chain(*self._refname_to_intervals.values()) - def add(self, interval: Interval) -> None: + def add(self, interval: SpanType) -> None: """Adds an interval to this detector. Args: interval: the interval to add to this detector """ - refname = interval.refname - if refname not in self._refname_to_tree: - self._refname_to_tree[refname] = cr.cgranges() # type: ignore - self._refname_to_indexed[refname] = False - self._refname_to_intervals[refname] = [] + if interval.refname not in self._refname_to_tree: + self._refname_to_tree[interval.refname] = cr.cgranges() # type: ignore + self._refname_to_indexed[interval.refname] = False + self._refname_to_intervals[interval.refname] = [] # Append the interval to the list of intervals for this tree, keeping the index # of where it was inserted - interval_idx: int = len(self._refname_to_intervals[refname]) - self._refname_to_intervals[refname].append(interval) + interval_idx: int = len(self._refname_to_intervals[interval.refname]) + self._refname_to_intervals[interval.refname].append(interval) # Add the interval to the tree - tree = self._refname_to_tree[refname] + tree = self._refname_to_tree[interval.refname] tree.add(interval.refname, interval.start, interval.end, interval_idx) # Flag this tree as needing to be indexed after adding a new interval, but defer # indexing - self._refname_to_indexed[refname] = False + self._refname_to_indexed[interval.refname] = False - def add_all(self, intervals: Iterable[Interval]) -> None: + def add_all(self, intervals: Iterable[SpanType]) -> None: """Adds one or more intervals to this detector. Args: @@ -235,7 +312,7 @@ def add_all(self, intervals: Iterable[Interval]) -> None: for interval in intervals: self.add(interval) - def overlaps_any(self, interval: Interval) -> bool: + def overlaps_any(self, interval: Span) -> bool: """Determines whether the given interval overlaps any interval in this detector. Args: @@ -258,7 +335,7 @@ def overlaps_any(self, interval: Interval) -> bool: else: return True - def get_overlaps(self, interval: Interval) -> List[Interval]: + def get_overlaps(self, interval: Span) -> List[SpanType]: """Returns any intervals in this detector that overlap the given interval. Args: @@ -266,7 +343,13 @@ def get_overlaps(self, interval: Interval) -> List[Interval]: Returns: The list of intervals in this detector that overlap the given interval, or the empty - list if no overlaps exist. The intervals will be return in ascending genomic order. + list if no overlaps exist. The intervals will be returned sorted using the following + sort keys: + + * The interval's start (ascending) + * The interval's end (ascending) + * The interval's strand, positive or negative (assumed to be positive if undefined) + * The interval's reference sequence name (lexicographically) """ tree = self._refname_to_tree.get(interval.refname) if tree is None: @@ -274,54 +357,79 @@ def get_overlaps(self, interval: Interval) -> List[Interval]: else: if not self._refname_to_indexed[interval.refname]: tree.index() - ref_intervals: List[Interval] = self._refname_to_intervals[interval.refname] + ref_intervals: List[SpanType] = self._refname_to_intervals[interval.refname] # NB: only return unique instances of intervals - intervals: Set[Interval] = { + intervals: Set[SpanType] = { ref_intervals[index] for _, _, index in tree.overlap(interval.refname, interval.start, interval.end) } return sorted( - intervals, key=lambda intv: (intv.start, intv.end, intv.negative, intv.name) + intervals, + key=lambda intv: ( + intv.start, + intv.end, + self._negative(intv), + intv.refname, + ), ) - def get_enclosing_intervals(self, interval: Interval) -> List[Interval]: + @staticmethod + def _negative(interval: Span) -> bool: + """ + True if the interval is negatively stranded, False if the interval is unstranded or + positively stranded. + """ + return getattr(interval, "negative", False) + + def get_enclosing_intervals(self, interval: Span) -> List[SpanType]: """Returns the set of intervals in this detector that wholly enclose the query interval. - i.e. query.start >= target.start and query.end <= target.end. + i.e. `query.start >= target.start` and `query.end <= target.end`. + + Args: + interval: the query interval - Args: - interval: the query interval - Returns: - The list of intervals in this detector that enclose the query interval. - The intervals will be returned in ascending genomic order. + Returns: + The list of intervals in this detector that enclose the query interval. The intervals + will be returned sorted using the following sort keys: + + * The interval's start (ascending) + * The interval's end (ascending) + * The interval's strand, positive or negative (assumed to be positive if undefined) + * The interval's reference sequence name (lexicographically) """ results = self.get_overlaps(interval) return [i for i in results if interval.start >= i.start and interval.end <= i.end] - def get_enclosed(self, interval: Interval) -> List[Interval]: + def get_enclosed(self, interval: Span) -> List[SpanType]: """Returns the set of intervals in this detector that are enclosed by the query interval. I.e. target.start >= query.start and target.end <= query.end. - Args: - interval: the query interval + Args: + interval: the query interval + + Returns: + The list of intervals in this detector that are enclosed within the query interval. + The intervals will be returned sorted using the following sort keys: - Returns: - The list of intervals in this detector that are enclosed within the query interval. - The intervals will be return in ascending genomic order. + * The interval's start (ascending) + * The interval's end (ascending) + * The interval's strand, positive or negative (assumed to be positive if undefined) + * The interval's reference sequence name (lexicographically) """ results = self.get_overlaps(interval) return [i for i in results if i.start >= interval.start and i.end <= interval.end] @classmethod - def from_bed(cls, path: Path) -> "OverlapDetector": + def from_bed(cls, path: Path) -> "OverlapDetector[BedRecord]": """Builds a :class:`~pybedlite.overlap_detector.OverlapDetector` from a BED file. Args: path: the path to the BED file Returns: An overlap detector for the regions in the BED file. """ - detector = OverlapDetector() + detector: OverlapDetector[BedRecord] = OverlapDetector() for region in BedSource(path): - detector.add(Interval.from_bedrecord(region)) + detector.add(region) return detector diff --git a/pybedlite/tests/test_overlap_detector.py b/pybedlite/tests/test_overlap_detector.py index ce27b1e..4208fe2 100644 --- a/pybedlite/tests/test_overlap_detector.py +++ b/pybedlite/tests/test_overlap_detector.py @@ -1,9 +1,14 @@ """Tests for :py:mod:`~pybedlite.overlap_detector`""" +from dataclasses import dataclass +from pathlib import Path from typing import List +from typing import Union import pytest +from typing_extensions import TypeAlias +import pybedlite as pybed from pybedlite.bed_record import BedRecord from pybedlite.bed_record import BedStrand from pybedlite.overlap_detector import Interval @@ -11,7 +16,7 @@ def run_test(targets: List[Interval], query: Interval, results: List[Interval]) -> None: - detector = OverlapDetector() + detector: OverlapDetector[Interval] = OverlapDetector() # Use add_all() to covert itself and add() detector.add_all(intervals=targets) # Test overlaps_any() @@ -115,7 +120,7 @@ def test_get_enclosing_intervals() -> None: d = Interval("1", 15, 19) e = Interval("1", 16, 20) - detector = OverlapDetector() + detector: OverlapDetector[Interval] = OverlapDetector() detector.add_all([a, b, c, d, e]) assert detector.get_enclosing_intervals(Interval("1", 10, 100)) == [a] @@ -130,7 +135,7 @@ def test_get_enclosed() -> None: c = Interval("1", 18, 19) d = Interval("1", 50, 99) - detector = OverlapDetector() + detector: OverlapDetector[Interval] = OverlapDetector() detector.add_all([a, b, c, d]) assert detector.get_enclosed(Interval("1", 1, 250)) == [a, b, c, d] @@ -147,7 +152,7 @@ def test_iterable() -> None: d = Interval("1", 15, 19) e = Interval("1", 16, 20) - detector = OverlapDetector() + detector: OverlapDetector[Interval] = OverlapDetector() detector.add_all([a]) assert list(detector) == [a] detector.add_all([a, b, c, d, e]) @@ -216,6 +221,166 @@ def test_construction_from_ucsc_with_strand(strand: str) -> None: ) def test_construction_from_ucsc_other_contigs(contig: str) -> None: """ - `Interval.from_ucsc()` should accomodate non-human, decoy, custom, and other contig names. + `Interval.from_ucsc()` should accommodate non-human, decoy, custom, and other contig names. """ assert Interval.from_ucsc(f"{contig}:101-200") == Interval(contig, 100, 200) + + +def test_that_overlap_detector_allows_generic_parameterization() -> None: + """ + Test that the overlap detector allows for generic parameterization. + """ + records = [BedRecord(chrom="chr1", start=1, end=2), BedRecord(chrom="chr1", start=4, end=5)] + detector: OverlapDetector[BedRecord] = OverlapDetector(records) + overlaps: List[BedRecord] = detector.get_overlaps(Interval("chr1", 1, 2)) + assert overlaps == [BedRecord(chrom="chr1", start=1, end=2)] + + +def test_arbitrary_interval_types() -> None: + """ + Test that an overlap detector can receive different interval-like objects and query them too. + """ + + @dataclass(eq=True, frozen=True) + class ZeroBasedOpenEndedProtocol: + refname: str + start: int + end: int + + @property + def negative(self) -> bool: + return False + + @dataclass(eq=True, frozen=True) + class OneBasedProtocol: + contig: str + one_based_start: int + end: int + + @property + def refname(self) -> str: + return self.contig + + @property + def start(self) -> int: + """A 0-based start position.""" + return self.one_based_start - 1 + + @property + def negative(self) -> bool: + """True if the interval is on the negative strand, False otherwise""" + return False + + @dataclass(eq=True, frozen=True) + class ZeroBasedUnstranded: + refname: str + zero_based_start: int + end: int + + @property + def start(self) -> int: + """A 0-based start position.""" + return self.zero_based_start + + @dataclass(eq=True, frozen=True) + class ZeroBasedStranded: + refname: str + zero_based_start: int + end: int + negative: bool + + @property + def start(self) -> int: + """A 0-based start position.""" + return self.zero_based_start + + # Create minimal features of all supported structural types + zero_based_protocol = ZeroBasedOpenEndedProtocol(refname="chr1", start=1, end=50) + one_based_protocol = OneBasedProtocol(contig="chr1", one_based_start=10, end=60) + zero_based_unstranded = ZeroBasedUnstranded(refname="chr1", zero_based_start=20, end=70) + zero_based_stranded = ZeroBasedStranded( + refname="chr1", zero_based_start=30, end=80, negative=True + ) + # Set up an overlap detector to hold all the features we care about + AllKinds: TypeAlias = Union[ + ZeroBasedOpenEndedProtocol, OneBasedProtocol, ZeroBasedUnstranded, ZeroBasedStranded + ] + features: List[AllKinds] = [ + zero_based_protocol, + one_based_protocol, + zero_based_unstranded, + zero_based_stranded, + ] + detector: OverlapDetector[AllKinds] = OverlapDetector(features) + + assert OverlapDetector._negative(zero_based_protocol) is False + assert OverlapDetector._negative(one_based_protocol) is False + assert OverlapDetector._negative(zero_based_unstranded) is False + assert OverlapDetector._negative(zero_based_stranded) is True + + # Query the overlap detector with yet another type + assert detector.get_overlaps(Interval("chr1", 0, 1)) == [] + assert detector.get_overlaps(Interval("chr1", 0, 9)) == [zero_based_protocol] + assert detector.get_overlaps(Interval("chr1", 11, 12)) == [ + zero_based_protocol, + one_based_protocol, + ] + assert detector.get_overlaps(Interval("chr1", 21, 27)) == [ + zero_based_protocol, + one_based_protocol, + zero_based_unstranded, + ] + assert detector.get_overlaps(Interval("chr1", 32, 35)) == [ + zero_based_protocol, + one_based_protocol, + zero_based_unstranded, + zero_based_stranded, + ] + assert detector.get_overlaps(Interval("chr1", 54, 55)) == [ + one_based_protocol, + zero_based_unstranded, + zero_based_stranded, + ] + assert detector.get_overlaps(Interval("chr1", 61, 62)) == [ + zero_based_unstranded, + zero_based_stranded, + ] + assert detector.get_overlaps(Interval("chr1", 78, 79)) == [zero_based_stranded] + assert detector.get_overlaps(Interval("chr1", 80, 81)) == [] + + +def test_the_overlap_detector_wont_accept_a_non_hashable_feature() -> None: + """ + Test that an overlap detector will not accept a non-hashable feature. + """ + + @dataclass # A dataclass missing both `eq` and `frozen` does not implement __hash__. + class ChromFeature: + refname: str + zero_based_start: int + end: int + + @property + def start(self) -> int: + """A 0-based start position.""" + return self.zero_based_start + + with pytest.raises(TypeError): + OverlapDetector([ChromFeature(refname="chr1", zero_based_start=0, end=30)]).get_overlaps( + Interval("chr1", 0, 30) + ) + + +def test_the_overlap_detector_can_be_built_from_a_bed_file(tmp_path: Path) -> None: + """ + Test that the overlap detector can be built from a BED file. + """ + records = [BedRecord(chrom="chr1", start=1, end=2), BedRecord(chrom="chr1", start=4, end=5)] + + bed = tmp_path / "test.bed" + with pybed.writer(bed, num_fields=3) as writer: + writer.write_all(records) + + detector: OverlapDetector[BedRecord] = OverlapDetector.from_bed(bed) + overlaps: List[BedRecord] = detector.get_overlaps(Interval("chr1", 1, 2)) + assert overlaps == [BedRecord(chrom="chr1", start=1, end=2)] diff --git a/pybedlite/tests/test_pybedlite.py b/pybedlite/tests/test_pybedlite.py index c6f9c51..789cad5 100644 --- a/pybedlite/tests/test_pybedlite.py +++ b/pybedlite/tests/test_pybedlite.py @@ -212,3 +212,15 @@ def test_bedstrand_opposite() -> None: assert BedStrand.Positive.opposite is BedStrand.Negative assert BedStrand.Negative.opposite is BedStrand.Positive + + +def test_bedrecord_refname() -> None: + """Test that the alternate property for reference sequence name is correct.""" + assert BedRecord(chrom="chr1", start=1, end=2).refname == "chr1" + + +def test_bedrecord_negative() -> None: + """Test that the negative property is set correctly.""" + assert not BedRecord(chrom="chr1", start=1, end=2, strand=None).negative + assert not BedRecord(chrom="chr1", start=1, end=2, strand=BedStrand.Positive).negative + assert BedRecord(chrom="chr1", start=1, end=2, strand=BedStrand.Negative).negative