From 2decf3d9c5a5863fbd60b8700fadaee53c3fb7db Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Mon, 22 Jan 2024 17:37:49 +0100 Subject: [PATCH 1/5] feat: fix deflated edgelist counts The old implementation only added the number of exact matches with the representative fragment as the read count to the edgelist. This makes sure we use the sum of the read counts for all fragments collapsed onto the same representative fragment. --- src/pixelator/collapse/process.py | 116 ++++++++++++++++++++---------- tests/collapse/test_process.py | 50 ++++++++----- 2 files changed, 108 insertions(+), 58 deletions(-) diff --git a/src/pixelator/collapse/process.py b/src/pixelator/collapse/process.py index 3ca281ec..cba1866a 100644 --- a/src/pixelator/collapse/process.py +++ b/src/pixelator/collapse/process.py @@ -5,6 +5,7 @@ Copyright (c) 2023 Pixelgen Technologies AB. """ +import dataclasses import logging import tempfile from collections import Counter, defaultdict @@ -47,6 +48,27 @@ UniqueFragmentAndCount = Tuple[str, int] +@dataclasses.dataclass(frozen=True, slots=True, repr=True, order=True, eq=True) +class CollapsedFragment: + """A collapsed fragment. + + :attr sequence: the consensus sequence for a list of similar fragments + :attr unique_fragments_count: the number of unique fragments that are + represented by this collapsed fragment + :attr reads_count: the number of reads that are represented + by this collapsed fragment + """ + + sequence: str + unique_fragments_count: int + reads_count: int + + def __iter__(self) -> Iterable: + """Iterate over the dataclass fields.""" + # Used to support tuple unpacking + return iter(dataclasses.asdict(self).values()) + + def build_annoytree(data: npt.NDArray[np.uint8], n_trees: int = 10) -> AnnoyIndex: """Build an Annoy tree index. @@ -100,15 +122,16 @@ def build_binary_data(seqs: List[str]) -> npt.NDArray[np.uint8]: return data -def get_representative_sequence_for_component( +def get_collapsed_fragments_for_component( components: List[Set[UniqueFragment]], counts: Dict[UniqueFragment, int] -) -> Generator[Tuple[UniqueFragment, int], None, None]: +) -> Generator[CollapsedFragment, None, None]: """Take the representative sequence from a component based on its counts. Given a list of components (i.e. sequences that presumably belong to the same original molecule), and the counts of occurrences for each of these, - return an iterator of tuples with the representative sequence for that - component and the number of reads collapsed for that umi. + return an iterator of objects with the representative sequence for that + component the total number of reads associated with the cluster and + the number of collapsed fragments. The representative molecule is selected as the one with the most associated upib's, and if there is a tie the lexicographically smallest sequence is picked @@ -116,18 +139,27 @@ def get_representative_sequence_for_component( :param components: a list of components as produced from `get_connected_components` :param counts: a dictionary of the counts of each unique fragment - :yields: the representative sequence and the size of the component as a tuple - :rtype: Generator[Tuple[UniqueFragment, int], None, None] + :rtype: Generator[CollapsedFragment, None, None] + :yields CollapsedFragment: a collapsed fragment """ for component in components: - counts_and_sequences = defaultdict(list) - for seq in component: - counts_and_sequences[counts[seq]].append(seq) - max_count = max(counts_and_sequences.keys()) - yield ( - sorted(counts_and_sequences[max_count])[0], - len(component), - ) + # get the most common sequence in the component and use + # this as the 'consensus sequence' + representative_frag, representative_frag_count = ["", 0] + total_reads = 0 + for c in component: + total_reads += counts[c] + if ( + representative_frag is None + or counts[c] > representative_frag_count + or counts[c] == representative_frag_count + and c < representative_frag + ): + representative_frag = c + representative_frag_count = counts[c] + + collapsed_fragments = len(component) + yield CollapsedFragment(representative_frag, collapsed_fragments, total_reads) # This code snipped has been obtained from: @@ -192,7 +224,7 @@ def identify_fragments_to_collapse( fragments as values :rtype: Dict[UniqueFragment, List[UniqueFragment]] """ - logger.debug("Computing adjancency sequences from %i elements", len(seqs)) + logger.debug("Computing adjacency sequences from %i elements", len(seqs)) # use a nearest neighbours tree to reduce the search space # TODO this approach requires memory (n_ele * len(seq) * 2) @@ -244,27 +276,26 @@ def identify_fragments_to_collapse( def collapse_sequences_unique( seq_dict: UniqueFragmentToUpiB, -) -> Generator[UniqueFragmentAndCount, None, None]: +) -> Generator[CollapsedFragment, None, None]: """Get all fragments. Let each key in `seq_dict` represent it's own sequence. This is equivalent to not collapsing the sequences. :param seq_dict: the fragment to upib dict - :yield UniqueFragmentAndCount: a unique fragment and the number of reads collapsed - :rtype: Generator[UniqueFragmentAndCount, None, None] + :yield a CollapsedFragment object """ logger.debug("Picking all unique sequences (i.e. no collapsing is carried out)") for seq in seq_dict.keys(): - yield (seq, len(seq_dict[seq])) + yield CollapsedFragment(seq, 1, len(seq_dict[seq])) def collapse_sequences_adjacency( seq_dict: UniqueFragmentToUpiB, max_neighbours: int, min_dist: int, -) -> Iterator[UniqueFragmentAndCount]: +) -> Iterator[CollapsedFragment]: """Collapse sequences based on their adjacency. Tries to identify all fragments that represent the same underlying @@ -285,7 +316,7 @@ def collapse_sequences_adjacency( :param min_dist: the hamming distance threshold (i.e. the mismatches between two sequences) :returns: An iterator of the of collapsed molecules, and their original counts - :rtype: Iterator[UniqueFragmentAndCount] + :rtype: Iterator[CollapsedFragment] """ logger.debug("Collapsing %i sequences", len(seq_dict)) @@ -296,7 +327,7 @@ def collapse_sequences_adjacency( seqs=seqs, max_neighbours=max_neighbours, min_dist=min_dist ) full_components = get_connected_components(adj_list, counts) - components = get_representative_sequence_for_component(full_components, counts) + components = get_collapsed_fragments_for_component(full_components, counts) return components @@ -396,7 +427,7 @@ def filter_by_minimum_upib_count( def create_edgelist( - clustered_reads: Iterable[UniqueFragmentAndCount], + clustered_reads: Iterable[CollapsedFragment], unique_reads: UniqueFragmentToUpiB, umia_start: Optional[int], umia_end: Optional[int], @@ -455,27 +486,34 @@ def create_edgelist( # iterate each collapsed umi+upia to get the upib # with the highest count and store in a list def data(): - for cluster_representative_fragment, fragment_count in clustered_reads: + for ( + cluster_representative_fragment, + unique_fragment_count, + read_count, + ) in clustered_reads: # get all the upis from the sequence in the cluster - upis = unique_reads[cluster_representative_fragment] + upibs = unique_reads[cluster_representative_fragment] - # take the most abundant umi-upi - umi_upia = cluster_representative_fragment - umi = umi_upia[:umi_size] - upia = umi_upia[umi_size:] + # The cluster_representative_fragment is the concatenation of UMI + UPIA + umi = cluster_representative_fragment[:umi_size] + upia = cluster_representative_fragment[umi_size:] # take the most common upib from the list - unique_upis = Counter(upis) - upib, _ = unique_upis.most_common(1)[0] - - # count (number of molecules) is the number - # of upis in the umi-upia cluster - count = len(upis) - umi_unique_count = fragment_count - upi_unique_count = len(unique_upis) + # this assumes that all other upib's are sequencing errors + # (the upib with the highest count is the correct one) + unique_upibs = Counter(upibs) + upib, _ = unique_upibs.most_common(1)[0] + associated_upibs_count = len(unique_upibs) # update data array - yield (upia, upib, umi, count, umi_unique_count, upi_unique_count) + yield ( + upia, + upib, + umi, + read_count, + unique_fragment_count, + associated_upibs_count, + ) # create an edge list (pd.DataFrame) with the collapsed sequences df = pd.DataFrame( @@ -631,7 +669,7 @@ def collapse_fastq( max_neighbours=max_neighbours, # type: ignore min_dist=mismatches, # type: ignore ) - if algorithm == "unique": + elif algorithm == "unique": clustered_reads = collapse_sequences_unique( seq_dict=unique_reads, ) diff --git a/tests/collapse/test_process.py b/tests/collapse/test_process.py index 1267e1ac..f55e0a40 100644 --- a/tests/collapse/test_process.py +++ b/tests/collapse/test_process.py @@ -15,6 +15,7 @@ from numpy.testing import assert_array_equal from pandas.testing import assert_frame_equal from pixelator.collapse.process import ( + CollapsedFragment, build_annoytree, build_binary_data, collapse_fastq, @@ -24,7 +25,7 @@ create_fragment_to_upib_dict, filter_by_minimum_upib_count, get_connected_components, - get_representative_sequence_for_component, + get_collapsed_fragments_for_component, identify_fragments_to_collapse, write_tmp_feather_file, ) @@ -77,7 +78,11 @@ def test_create_edgelist(): "HIJNOXXX": ["EF", "LL"], "HIJNOZZZ": ["EF"], } - clustered_sequences = [("HIJNOABC", 2), ("HIJNOXXX", 1), ("HIJNOZZZ", 1)] + clustered_sequences = [ + CollapsedFragment("HIJNOABC", 2, 6), + CollapsedFragment("HIJNOXXX", 1, 2), + CollapsedFragment("HIJNOZZZ", 1, 1), + ] result = create_edgelist( clustered_reads=clustered_sequences, @@ -100,7 +105,7 @@ def test_create_edgelist(): "umi": "HIJNO", "marker": "CD4", "sequence": "AAAAA", - "count": 3, + "count": 6, "umi_unique_count": 2, "upi_unique_count": 3, }, @@ -145,14 +150,13 @@ def test_get_connected_components(): assert result == [{"A", "B", "C"}, {"D", "E"}] -def test_get_representative_sequence_for_component(): +def test_get_collapsed_fragments_for_component(): """Test get representative sequence for a component.""" counts = {"A": 10, "B": 1, "C": 1, "D": 2, "E": 6} components = [{"A", "B", "C"}, {"D", "E"}] - result = get_representative_sequence_for_component( - components=components, counts=counts - ) - assert list(result) == [("A", 3), ("E", 2)] + result = get_collapsed_fragments_for_component(components=components, counts=counts) + result_list = list(result) + assert result_list == [CollapsedFragment("A", 3, 12), CollapsedFragment("E", 2, 8)] def test_identify_fragments_to_collapse(): @@ -188,7 +192,12 @@ def test_collapse_sequences_unique(): } result = collapse_sequences_unique(fragments_to_upib) - assert list(result) == [("TATATA", 3), ("GCGCGC", 3), ("ATATAT", 2), ("GGGGGG", 1)] + assert list(result) == [ + CollapsedFragment("TATATA", 1, 3), + CollapsedFragment("GCGCGC", 1, 3), + CollapsedFragment("ATATAT", 1, 2), + CollapsedFragment("GGGGGG", 1, 1), + ] def test_collapse_sequences_adjacency_no_errors(): @@ -203,7 +212,10 @@ def test_collapse_sequences_adjacency_no_errors(): result = collapse_sequences_adjacency( fragments_to_upib, max_neighbours=10, min_dist=2 ) - assert list(result) == [(x, 1) for x in list(fragments_to_upib.keys())] + assert list(result) == [ + CollapsedFragment(key, 1, len(values)) + for key, values in fragments_to_upib.items() + ] def test_collapse_sequences_adjacency_with_errors(): @@ -229,10 +241,10 @@ def test_collapse_sequences_adjacency_with_errors(): result = sorted(list(result)) assert result == [ - ("ATATAT", 1), - ("GCGCGC", 1), - ("GGGGGG", 1), - ("TATATA", 2), + CollapsedFragment("ATATAT", 1, 2), + CollapsedFragment("GCGCGC", 1, 3), + CollapsedFragment("GGGGGG", 1, 1), + CollapsedFragment("TATATA", 2, 6), ] @@ -252,10 +264,10 @@ def test_collapse_sequences_adjacency_with_errors_picks_most_abundant(): fragments_to_upib, max_neighbours=10, min_dist=2 ) assert sorted(list(result)) == [ - ("ATATAT", 1), - ("GCGCGC", 1), - ("GGGGGG", 1), - ("TATATA", 2), # Note that since "TATATA" has more upib's + CollapsedFragment("ATATAT", 1, 2), + CollapsedFragment("GCGCGC", 1, 3), + CollapsedFragment("GGGGGG", 1, 1), + CollapsedFragment("TATATA", 2, 5), # Note that since "TATATA" has more upib's # associated it will be picked ] @@ -445,7 +457,7 @@ def mock_reads(*args, **kwargs): assert data["upia"].nunique() == 1001 assert data["upib"].nunique() == 1003 assert data["umi"].nunique() == 9580 - assert data["count"].describe()["mean"] == 9.857187370170337 + assert data["count"].describe()["mean"] == 9.897174906522642 assert data["umi_unique_count"].describe()["mean"] == 1.0378063980058163 assert data["upi_unique_count"].describe()["mean"] == 1.0250311591192356 From 651e9865adb2e5eb944102c449a10d95a0188a9d Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Wed, 24 Jan 2024 10:39:24 +0100 Subject: [PATCH 2/5] use a NamedTuple instead of a dataclass --- src/pixelator/collapse/process.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/pixelator/collapse/process.py b/src/pixelator/collapse/process.py index cba1866a..b381064b 100644 --- a/src/pixelator/collapse/process.py +++ b/src/pixelator/collapse/process.py @@ -5,9 +5,9 @@ Copyright (c) 2023 Pixelgen Technologies AB. """ -import dataclasses import logging import tempfile +import typing from collections import Counter, defaultdict from typing import ( Dict, @@ -48,8 +48,7 @@ UniqueFragmentAndCount = Tuple[str, int] -@dataclasses.dataclass(frozen=True, slots=True, repr=True, order=True, eq=True) -class CollapsedFragment: +class CollapsedFragment(typing.NamedTuple): """A collapsed fragment. :attr sequence: the consensus sequence for a list of similar fragments @@ -63,11 +62,6 @@ class CollapsedFragment: unique_fragments_count: int reads_count: int - def __iter__(self) -> Iterable: - """Iterate over the dataclass fields.""" - # Used to support tuple unpacking - return iter(dataclasses.asdict(self).values()) - def build_annoytree(data: npt.NDArray[np.uint8], n_trees: int = 10) -> AnnoyIndex: """Build an Annoy tree index. From aaa9017aced6cbb25c8d5a634aad0398651a6543 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Wed, 24 Jan 2024 10:53:09 +0100 Subject: [PATCH 3/5] update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a20282..51f6cab7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * Performance improvements and reduced bundle size in QC report. +* Fix deflated counts in the edgelist after collapse. ## [0.16.1] - 2024-01-12 From dc9293238a16e99731e6f19f6619460e2d65481b Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Wed, 24 Jan 2024 14:25:01 +0100 Subject: [PATCH 4/5] refactor get_collapsed_fragments_for_component a bit --- src/pixelator/collapse/process.py | 57 ++++++++++++++++--------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/src/pixelator/collapse/process.py b/src/pixelator/collapse/process.py index b381064b..8b683cb4 100644 --- a/src/pixelator/collapse/process.py +++ b/src/pixelator/collapse/process.py @@ -8,19 +8,15 @@ import logging import tempfile import typing +import warnings from collections import Counter, defaultdict from typing import ( - Dict, Generator, Iterable, Iterator, - List, Literal, Optional, - Set, - Tuple, ) -import warnings import numpy as np import numpy.typing as npt @@ -44,8 +40,7 @@ UniqueFragment = str UpiB = str -UniqueFragmentToUpiB = Dict[UniqueFragment, List[UpiB]] -UniqueFragmentAndCount = Tuple[str, int] +UniqueFragmentToUpiB = dict[UniqueFragment, list[UpiB]] class CollapsedFragment(typing.NamedTuple): @@ -92,7 +87,7 @@ def build_annoytree(data: npt.NDArray[np.uint8], n_trees: int = 10) -> AnnoyInde return tree -def build_binary_data(seqs: List[str]) -> npt.NDArray[np.uint8]: +def build_binary_data(seqs: list[str]) -> npt.NDArray[np.uint8]: """Build a binary matrix from a list of sequences using two-bit encoding. Convert a list of DNA sequences (str) to binary sequences @@ -117,7 +112,7 @@ def build_binary_data(seqs: List[str]) -> npt.NDArray[np.uint8]: def get_collapsed_fragments_for_component( - components: List[Set[UniqueFragment]], counts: Dict[UniqueFragment, int] + components: list[set[UniqueFragment]], counts: dict[UniqueFragment, int] ) -> Generator[CollapsedFragment, None, None]: """Take the representative sequence from a component based on its counts. @@ -139,18 +134,26 @@ def get_collapsed_fragments_for_component( for component in components: # get the most common sequence in the component and use # this as the 'consensus sequence' - representative_frag, representative_frag_count = ["", 0] - total_reads = 0 - for c in component: - total_reads += counts[c] - if ( - representative_frag is None - or counts[c] > representative_frag_count - or counts[c] == representative_frag_count - and c < representative_frag + + # get an iterator over the fragments + fragment_it = iter(component) + + # Initialize variables for finding the representative with the first fragment + # of a component + representative_frag = next(fragment_it) + representative_frag_count = counts[representative_frag] + total_reads = representative_frag_count + + # Loop over the other fragments to find the representative + for fragment in fragment_it: + fragment_read_counts = counts[fragment] + total_reads += fragment_read_counts + if fragment_read_counts > representative_frag_count or ( + fragment_read_counts == representative_frag_count + and fragment < representative_frag ): - representative_frag = c - representative_frag_count = counts[c] + representative_frag = fragment + representative_frag_count = fragment_read_counts collapsed_fragments = len(component) yield CollapsedFragment(representative_frag, collapsed_fragments, total_reads) @@ -159,8 +162,8 @@ def get_collapsed_fragments_for_component( # This code snipped has been obtained from: # https://github.com/CGATOxford/umi-tools def get_connected_components( - graph: Dict[UniqueFragment, List[UniqueFragment]], counts: Dict[UniqueFragment, int] -) -> List[Set[UniqueFragment]]: + graph: dict[UniqueFragment, list[UniqueFragment]], counts: dict[UniqueFragment, int] +) -> list[set[UniqueFragment]]: """Get connected components from a graph of sequences. Find the connected components from the sequence graph (represented by an adjacency @@ -176,7 +179,7 @@ def get_connected_components( :param counts: a dictionary of counts (copies) for each sequence in `graph` :returns: a list of sets of sequences where each set represents a connected component - :rtype: List[Set[UniqueFragment]] + :rtype: list[set[UniqueFragment]] """ found = set() components = [] @@ -189,10 +192,10 @@ def get_connected_components( def identify_fragments_to_collapse( - seqs: List[UniqueFragment], + seqs: list[UniqueFragment], min_dist: int, max_neighbours: int, -) -> Dict[UniqueFragment, List[UniqueFragment]]: +) -> dict[UniqueFragment, list[UniqueFragment]]: """Identify fragments to collapse by approximate nearest neighbourhood search. Tries to identify all sequences in the given list of sequences @@ -216,7 +219,7 @@ def identify_fragments_to_collapse( :param max_neighbours: the number of neighbours to use in the Annoy index :returns: a dictionary with fragments as keys and a list of their adjoining fragments as values - :rtype: Dict[UniqueFragment, List[UniqueFragment]] + :rtype: dict[UniqueFragment, list[UniqueFragment]] """ logger.debug("Computing adjacency sequences from %i elements", len(seqs)) @@ -232,7 +235,7 @@ def identify_fragments_to_collapse( # iterate the neighbours of each sequence to obtain # the adjacency list (dictionary of lists) - adj_list = {seq: [] for seq in seqs} # type: Dict[str, List[str]] + adj_list: dict[str, list[str]] = {seq: [] for seq in seqs} for i, seq1 in enumerate(seqs): if i % 100000 == 0: logger.debug("Processed 100,000 reads...") From e637f1719772afff54d6e81bea65d0da1907c071 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Wed, 24 Jan 2024 18:46:40 +0100 Subject: [PATCH 5/5] initialize --polarization-n-permutations to 0 --- src/pixelator/cli/analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pixelator/cli/analysis.py b/src/pixelator/cli/analysis.py index 72640425..e677086a 100644 --- a/src/pixelator/cli/analysis.py +++ b/src/pixelator/cli/analysis.py @@ -70,7 +70,7 @@ ) @click.option( "--polarization-n-permutations", - default=None, + default=0, required=False, type=click.IntRange(min=0), show_default=True,