Skip to content

Commit

Permalink
Merge pull request #69 from PixelgenTechnologies/fix-collapse-counts
Browse files Browse the repository at this point in the history
Fix deflated counts in edgelist after collapse
  • Loading branch information
fbdtemme authored Jan 25, 2024
2 parents 36a7c2e + e637f17 commit 243ba54
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

* Performance improvements and reduced bundle size in QC report.
* Fix deflated counts in the edgelist after collapse.

## [0.16.1] - 2024-01-12

Expand Down
2 changes: 1 addition & 1 deletion src/pixelator/cli/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
)
@click.option(
"--polarization-n-permutations",
default=None,
default=0,
required=False,
type=click.IntRange(min=0),
show_default=True,
Expand Down
145 changes: 90 additions & 55 deletions src/pixelator/collapse/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@
"""
import logging
import tempfile
import typing
import warnings
from collections import Counter, defaultdict
from typing import (
Dict,
Generator,
Iterable,
Iterator,
List,
Literal,
Optional,
Set,
Tuple,
)
import warnings

import numpy as np
import numpy.typing as npt
Expand All @@ -43,8 +40,22 @@

UniqueFragment = str
UpiB = str
UniqueFragmentToUpiB = Dict[UniqueFragment, List[UpiB]]
UniqueFragmentAndCount = Tuple[str, int]
UniqueFragmentToUpiB = dict[UniqueFragment, list[UpiB]]


class CollapsedFragment(typing.NamedTuple):
"""A collapsed fragment.
:attr sequence: the consensus sequence for a list of similar fragments
:attr unique_fragments_count: the number of unique fragments that are
represented by this collapsed fragment
:attr reads_count: the number of reads that are represented
by this collapsed fragment
"""

sequence: str
unique_fragments_count: int
reads_count: int


def build_annoytree(data: npt.NDArray[np.uint8], n_trees: int = 10) -> AnnoyIndex:
Expand Down Expand Up @@ -76,7 +87,7 @@ def build_annoytree(data: npt.NDArray[np.uint8], n_trees: int = 10) -> AnnoyInde
return tree


def build_binary_data(seqs: List[str]) -> npt.NDArray[np.uint8]:
def build_binary_data(seqs: list[str]) -> npt.NDArray[np.uint8]:
"""Build a binary matrix from a list of sequences using two-bit encoding.
Convert a list of DNA sequences (str) to binary sequences
Expand All @@ -100,41 +111,59 @@ def build_binary_data(seqs: List[str]) -> npt.NDArray[np.uint8]:
return data


def get_representative_sequence_for_component(
components: List[Set[UniqueFragment]], counts: Dict[UniqueFragment, int]
) -> Generator[Tuple[UniqueFragment, int], None, None]:
def get_collapsed_fragments_for_component(
components: list[set[UniqueFragment]], counts: dict[UniqueFragment, int]
) -> Generator[CollapsedFragment, None, None]:
"""Take the representative sequence from a component based on its counts.
Given a list of components (i.e. sequences that presumably belong to the
same original molecule), and the counts of occurrences for each of these,
return an iterator of tuples with the representative sequence for that
component and the number of reads collapsed for that umi.
return an iterator of objects with the representative sequence for that
component the total number of reads associated with the cluster and
the number of collapsed fragments.
The representative molecule is selected as the one with the most associated
upib's, and if there is a tie the lexicographically smallest sequence is picked
to make the results reproducible between runs.
:param components: a list of components as produced from `get_connected_components`
:param counts: a dictionary of the counts of each unique fragment
:yields: the representative sequence and the size of the component as a tuple
:rtype: Generator[Tuple[UniqueFragment, int], None, None]
:rtype: Generator[CollapsedFragment, None, None]
:yields CollapsedFragment: a collapsed fragment
"""
for component in components:
counts_and_sequences = defaultdict(list)
for seq in component:
counts_and_sequences[counts[seq]].append(seq)
max_count = max(counts_and_sequences.keys())
yield (
sorted(counts_and_sequences[max_count])[0],
len(component),
)
# get the most common sequence in the component and use
# this as the 'consensus sequence'

# get an iterator over the fragments
fragment_it = iter(component)

# Initialize variables for finding the representative with the first fragment
# of a component
representative_frag = next(fragment_it)
representative_frag_count = counts[representative_frag]
total_reads = representative_frag_count

# Loop over the other fragments to find the representative
for fragment in fragment_it:
fragment_read_counts = counts[fragment]
total_reads += fragment_read_counts
if fragment_read_counts > representative_frag_count or (
fragment_read_counts == representative_frag_count
and fragment < representative_frag
):
representative_frag = fragment
representative_frag_count = fragment_read_counts

collapsed_fragments = len(component)
yield CollapsedFragment(representative_frag, collapsed_fragments, total_reads)


# This code snipped has been obtained from:
# https://github.com/CGATOxford/umi-tools
def get_connected_components(
graph: Dict[UniqueFragment, List[UniqueFragment]], counts: Dict[UniqueFragment, int]
) -> List[Set[UniqueFragment]]:
graph: dict[UniqueFragment, list[UniqueFragment]], counts: dict[UniqueFragment, int]
) -> list[set[UniqueFragment]]:
"""Get connected components from a graph of sequences.
Find the connected components from the sequence graph (represented by an adjacency
Expand All @@ -150,7 +179,7 @@ def get_connected_components(
:param counts: a dictionary of counts (copies) for each sequence in `graph`
:returns: a list of sets of sequences where each set represents a
connected component
:rtype: List[Set[UniqueFragment]]
:rtype: list[set[UniqueFragment]]
"""
found = set()
components = []
Expand All @@ -163,10 +192,10 @@ def get_connected_components(


def identify_fragments_to_collapse(
seqs: List[UniqueFragment],
seqs: list[UniqueFragment],
min_dist: int,
max_neighbours: int,
) -> Dict[UniqueFragment, List[UniqueFragment]]:
) -> dict[UniqueFragment, list[UniqueFragment]]:
"""Identify fragments to collapse by approximate nearest neighbourhood search.
Tries to identify all sequences in the given list of sequences
Expand All @@ -190,9 +219,9 @@ def identify_fragments_to_collapse(
:param max_neighbours: the number of neighbours to use in the Annoy index
:returns: a dictionary with fragments as keys and a list of their adjoining
fragments as values
:rtype: Dict[UniqueFragment, List[UniqueFragment]]
:rtype: dict[UniqueFragment, list[UniqueFragment]]
"""
logger.debug("Computing adjancency sequences from %i elements", len(seqs))
logger.debug("Computing adjacency sequences from %i elements", len(seqs))

# use a nearest neighbours tree to reduce the search space
# TODO this approach requires memory (n_ele * len(seq) * 2)
Expand All @@ -206,7 +235,7 @@ def identify_fragments_to_collapse(

# iterate the neighbours of each sequence to obtain
# the adjacency list (dictionary of lists)
adj_list = {seq: [] for seq in seqs} # type: Dict[str, List[str]]
adj_list: dict[str, list[str]] = {seq: [] for seq in seqs}
for i, seq1 in enumerate(seqs):
if i % 100000 == 0:
logger.debug("Processed 100,000 reads...")
Expand Down Expand Up @@ -244,27 +273,26 @@ def identify_fragments_to_collapse(

def collapse_sequences_unique(
seq_dict: UniqueFragmentToUpiB,
) -> Generator[UniqueFragmentAndCount, None, None]:
) -> Generator[CollapsedFragment, None, None]:
"""Get all fragments.
Let each key in `seq_dict` represent it's own sequence. This is equivalent
to not collapsing the sequences.
:param seq_dict: the fragment to upib dict
:yield UniqueFragmentAndCount: a unique fragment and the number of reads collapsed
:rtype: Generator[UniqueFragmentAndCount, None, None]
:yield a CollapsedFragment object
"""
logger.debug("Picking all unique sequences (i.e. no collapsing is carried out)")

for seq in seq_dict.keys():
yield (seq, len(seq_dict[seq]))
yield CollapsedFragment(seq, 1, len(seq_dict[seq]))


def collapse_sequences_adjacency(
seq_dict: UniqueFragmentToUpiB,
max_neighbours: int,
min_dist: int,
) -> Iterator[UniqueFragmentAndCount]:
) -> Iterator[CollapsedFragment]:
"""Collapse sequences based on their adjacency.
Tries to identify all fragments that represent the same underlying
Expand All @@ -285,7 +313,7 @@ def collapse_sequences_adjacency(
:param min_dist: the hamming distance threshold (i.e. the mismatches
between two sequences)
:returns: An iterator of the of collapsed molecules, and their original counts
:rtype: Iterator[UniqueFragmentAndCount]
:rtype: Iterator[CollapsedFragment]
"""
logger.debug("Collapsing %i sequences", len(seq_dict))

Expand All @@ -296,7 +324,7 @@ def collapse_sequences_adjacency(
seqs=seqs, max_neighbours=max_neighbours, min_dist=min_dist
)
full_components = get_connected_components(adj_list, counts)
components = get_representative_sequence_for_component(full_components, counts)
components = get_collapsed_fragments_for_component(full_components, counts)
return components


Expand Down Expand Up @@ -396,7 +424,7 @@ def filter_by_minimum_upib_count(


def create_edgelist(
clustered_reads: Iterable[UniqueFragmentAndCount],
clustered_reads: Iterable[CollapsedFragment],
unique_reads: UniqueFragmentToUpiB,
umia_start: Optional[int],
umia_end: Optional[int],
Expand Down Expand Up @@ -455,27 +483,34 @@ def create_edgelist(
# iterate each collapsed umi+upia to get the upib
# with the highest count and store in a list
def data():
for cluster_representative_fragment, fragment_count in clustered_reads:
for (
cluster_representative_fragment,
unique_fragment_count,
read_count,
) in clustered_reads:
# get all the upis from the sequence in the cluster
upis = unique_reads[cluster_representative_fragment]
upibs = unique_reads[cluster_representative_fragment]

# take the most abundant umi-upi
umi_upia = cluster_representative_fragment
umi = umi_upia[:umi_size]
upia = umi_upia[umi_size:]
# The cluster_representative_fragment is the concatenation of UMI + UPIA
umi = cluster_representative_fragment[:umi_size]
upia = cluster_representative_fragment[umi_size:]

# take the most common upib from the list
unique_upis = Counter(upis)
upib, _ = unique_upis.most_common(1)[0]

# count (number of molecules) is the number
# of upis in the umi-upia cluster
count = len(upis)
umi_unique_count = fragment_count
upi_unique_count = len(unique_upis)
# this assumes that all other upib's are sequencing errors
# (the upib with the highest count is the correct one)
unique_upibs = Counter(upibs)
upib, _ = unique_upibs.most_common(1)[0]
associated_upibs_count = len(unique_upibs)

# update data array
yield (upia, upib, umi, count, umi_unique_count, upi_unique_count)
yield (
upia,
upib,
umi,
read_count,
unique_fragment_count,
associated_upibs_count,
)

# create an edge list (pd.DataFrame) with the collapsed sequences
df = pd.DataFrame(
Expand Down Expand Up @@ -631,7 +666,7 @@ def collapse_fastq(
max_neighbours=max_neighbours, # type: ignore
min_dist=mismatches, # type: ignore
)
if algorithm == "unique":
elif algorithm == "unique":
clustered_reads = collapse_sequences_unique(
seq_dict=unique_reads,
)
Expand Down
Loading

0 comments on commit 243ba54

Please sign in to comment.