From b3eef9344d1d5b4877614bd293cccd3a3d62cb08 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 20 Dec 2024 16:37:22 +0000 Subject: [PATCH 01/26] Prepare for supporting query benchmarks --- benchmark/pipeline.py | 138 +++++++++++++++++++++++++++++++ benchmark/schema_setup.py | 34 ++++++++ src/matchbox/common/transform.py | 16 ++-- test/pipeline.py | 51 ------------ 4 files changed, 181 insertions(+), 58 deletions(-) create mode 100644 benchmark/pipeline.py create mode 100644 benchmark/schema_setup.py delete mode 100644 test/pipeline.py diff --git a/benchmark/pipeline.py b/benchmark/pipeline.py new file mode 100644 index 00000000..6843b7c5 --- /dev/null +++ b/benchmark/pipeline.py @@ -0,0 +1,138 @@ +import logging +import time +from contextlib import contextmanager +from pathlib import Path + +import pyarrow.parquet as pq +from rich.logging import RichHandler + +from matchbox.common.hash import HASH_FUNC +from matchbox.common.transform import ( + attach_components_to_probabilities, + to_hierarchical_clusters, +) + +logging.basicConfig( + level=logging.INFO, + format="%(message)s", + handlers=[RichHandler(rich_tracebacks=True)], +) +pipeline_logger = logging.getLogger("mb_pipeline") + +ROOT = Path(__file__).parent.parent + + +@contextmanager +def timer(description: str): + start = time.time() + yield + elapsed = time.time() - start + + if elapsed >= 60: + minutes = int(elapsed // 60) + seconds = elapsed % 60 + time_str = f"{minutes} min {seconds:.1f} sec" + else: + time_str = f"{elapsed:.2f} seconds" + + pipeline_logger.info(f"{description} in {time_str}") + + +INPUT_NAME = "hierarchical_cc200k" +OUTPUT_PREFIX = "large" + +if __name__ == "__main__": + with timer("Full pipeline completed"): + with timer("Read table"): + table = pq.read_table(Path.cwd() / f"data/{INPUT_NAME}.parquet") + + pipeline_logger.info(f"Processing {len(table):,} records") + + with timer("Added components"): + cc = attach_components_to_probabilities(table) + + with timer("Built hierarchical clusters"): + hierarchy = to_hierarchical_clusters(cc) + + with timer("Created output tables"): + fake_resolution_hash = HASH_FUNC( + "ceci n'est pas un model".encode("utf-8") + ).digest() + + parents_im, children_im, thresholds = ( + hierarchy.column("parent").to_numpy(), + hierarchy.column("child").to_numpy(), + hierarchy.column("probability").to_numpy(), + ) + import numpy as np + import pyarrow as pa + from pyarrow.parquet import write_table + + im_to_pos = dict() + next_int = max(max(parents_im), 0) + parents = [] + children = [] + for pim in parents_im: + if pim >= 0: + parents.append(pim) + elif pim in im_to_pos: + parents.append(im_to_pos[pim]) + else: + im_to_pos[pim] = next_int + parents.append(next_int) + next_int += 1 + + for cim in children_im: + if cim >= 0: + children.append(cim) + elif cim in im_to_pos: + children.append(im_to_pos[cim]) + else: + im_to_pos[cim] = next_int + children.append(next_int) + next_int += 1 + + unique_clusters = np.unique(parents) + + out_clusters = pa.table( + { + "id": pa.array(unique_clusters, type=pa.uint64()), + "dataset_id": pa.array( + [None] * len(unique_clusters), type=pa.uint64() + ), + "id_in_dataset": pa.array( + [None] * len(unique_clusters), type=pa.string() + ), + } + ) + + out_contains = pa.table( + { + "parent": pa.array(parents, type=pa.uint64()), + "child": pa.array(children, type=pa.uint64()), + } + ) + + out_probabilities = pa.table( + { + "model": pa.array( + [fake_resolution_hash] * len(parents), type=pa.binary() + ), + "cluster": pa.array(parents, type=pa.uint64()), + "probability": pa.array(thresholds, type=pa.uint64()), + } + ) + + write_table( + out_clusters, + Path.cwd() / "data" / f"{OUTPUT_PREFIX}_ingest_clusters.parquet", + ) + + write_table( + out_contains, Path.cwd() / "data" / f"{OUTPUT_PREFIX}_contains.parquet" + ) + + write_table( + out_probabilities, + Path.cwd() / "data" / f"{OUTPUT_PREFIX}_ingest_probabilities.parquet", + ) diff --git a/benchmark/schema_setup.py b/benchmark/schema_setup.py new file mode 100644 index 00000000..4eb33256 --- /dev/null +++ b/benchmark/schema_setup.py @@ -0,0 +1,34 @@ +from textwrap import dedent + +from sqlalchemy.dialects import postgresql +from sqlalchemy.schema import CreateTable + +from matchbox.server.postgresql.orm import ( + Clusters, + Contains, + Probabilities, + ResolutionFrom, + Resolutions, + Sources, +) + +print( + dedent(""" + DROP SCHEMA mb CASCADE; + CREATE SCHEMA mb; +""") +) + +# Order matters +for table_class in ( + Resolutions, + ResolutionFrom, + Sources, + Clusters, + Contains, + Probabilities, +): + print( + str(CreateTable(table_class.__table__).compile(dialect=postgresql.dialect())) + + ";" + ) diff --git a/src/matchbox/common/transform.py b/src/matchbox/common/transform.py index f5c63023..c4004265 100644 --- a/src/matchbox/common/transform.py +++ b/src/matchbox/common/transform.py @@ -199,7 +199,10 @@ def component_to_hierarchy( Returns: Arrow Table with columns ['parent', 'child', 'probability'] """ - probs = np.sort(pc.unique(table["probability"]).to_numpy())[::-1] + ascending_probs = np.sort( + pc.unique(table["probability"]).to_numpy(zero_copy_only=False) + ) + probs = ascending_probs[::-1] djs = DisjointSet[int]() # implements connected components im = IntMap(salt=salt) # generates IDs for new clusters @@ -239,8 +242,8 @@ def component_to_hierarchy( parents, children, probs = zip(*hierarchy, strict=True) return pa.table( { - "parent": pa.array(parents, type=dtype()), - "child": pa.array(children, type=dtype()), + "parent": pa.array(parents, type=pa.int64()), + "child": pa.array(children, type=pa.int64()), "probability": pa.array(probs, type=pa.uint8()), } ) @@ -259,7 +262,6 @@ def to_hierarchical_clusters( probabilities: Arrow table with columns ['component', 'left', 'right', 'probability'] proc_func: Function to process each component - dtype: Arrow data type for parent/child columns timeout: Maximum seconds to wait for each component to process Returns: @@ -315,7 +317,7 @@ def to_hierarchical_clusters( with ProcessPoolExecutor(max_workers=n_cores) as executor: futures = [ - executor.submit(proc_func, component_table, dtype, salt) + executor.submit(proc_func, component_table, salt) for salt, component_table in enumerate(component_tables) ] @@ -340,8 +342,8 @@ def to_hierarchical_clusters( logic_logger.warning("No results to concatenate") return pa.table( { - "parent": pa.array([], type=dtype()), - "child": pa.array([], type=dtype()), + "parent": pa.array([], type=pa.int64()), + "child": pa.array([], type=pa.int64()), "probability": pa.array([], type=pa.uint8()), } ) diff --git a/test/pipeline.py b/test/pipeline.py deleted file mode 100644 index 95d8b367..00000000 --- a/test/pipeline.py +++ /dev/null @@ -1,51 +0,0 @@ -import logging -import time -from contextlib import contextmanager -from pathlib import Path - -import pyarrow.parquet as pq -from rich.logging import RichHandler - -from matchbox.common.transform import ( - attach_components_to_probabilities, - to_hierarchical_clusters, -) - -logging.basicConfig( - level=logging.INFO, - format="%(message)s", - handlers=[RichHandler(rich_tracebacks=True)], -) -pipeline_logger = logging.getLogger("mb_pipeline") - -ROOT = Path(__file__).parent.parent - - -@contextmanager -def timer(description: str): - start = time.time() - yield - elapsed = time.time() - start - - if elapsed >= 60: - minutes = int(elapsed // 60) - seconds = elapsed % 60 - time_str = f"{minutes} min {seconds:.1f} sec" - else: - time_str = f"{elapsed:.2f} seconds" - - pipeline_logger.info(f"{description} in {time_str}") - - -if __name__ == "__main__": - with timer("Full pipeline completed"): - with timer("Read table"): - table = pq.read_table(Path.cwd() / "data/hierarchical_cc20k.parquet") - - pipeline_logger.info(f"Processing {len(table):,} records") - - with timer("Added components"): - cc = attach_components_to_probabilities(table) - - with timer("Built hierarchical clusters"): - out = to_hierarchical_clusters(cc) From 1d7adfdb4111f3812f2b3dd06e29c7cd083c8e88 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 27 Dec 2024 12:11:03 +0000 Subject: [PATCH 02/26] Use factories to generate dedupe probabilities --- test/fixtures/factories.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/fixtures/factories.py b/test/fixtures/factories.py index 612f6e2b..d49995cc 100644 --- a/test/fixtures/factories.py +++ b/test/fixtures/factories.py @@ -26,7 +26,7 @@ def verify_components(table) -> dict: for left, right in zip( table["left"].to_numpy(), table["right"].to_numpy(), - strict=False, + strict=True, ) ] @@ -83,7 +83,7 @@ def _split_values_into_components( def generate_dummy_probabilities( left_values: list[int], - right_values: list[int], + right_values: list[int] | None, prob_range: tuple[float, float], num_components: int, total_rows: int, @@ -102,6 +102,8 @@ def generate_dummy_probabilities( PyArrow Table with 'left', 'right', and 'probability' columns """ # Validate inputs + if right_values is None: + right_values = left_values if len(left_values) < 2 or len(right_values) < 2: raise ValueError("Need at least 2 possible values for both left and right") if num_components > min(len(left_values), len(right_values)): @@ -167,15 +169,17 @@ def generate_dummy_probabilities( ) component_edges = base_edges + list( - zip(random_lefts, random_rights, random_probs, strict=False) + zip(random_lefts, random_rights, random_probs, strict=True) ) else: component_edges = base_edges all_edges.extend(component_edges) + # Drop self-references + all_edges = [(le, ri, pr) for le, ri, pr in all_edges if le != ri] # Convert to arrays - lefts, rights, probs = zip(*all_edges, strict=False) + lefts, rights, probs = zip(*all_edges, strict=True) # Create PyArrow arrays left_array = pa.array(lefts, type=pa.uint64()) From d6e7715215fd3d42f4a2cdf215f07b1b746903a4 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 27 Dec 2024 12:11:31 +0000 Subject: [PATCH 03/26] Correct mistake in ERM --- src/matchbox/server/postgresql/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/matchbox/server/postgresql/README.md b/src/matchbox/server/postgresql/README.md index c4183f18..72542530 100644 --- a/src/matchbox/server/postgresql/README.md +++ b/src/matchbox/server/postgresql/README.md @@ -32,7 +32,7 @@ erDiagram bigint child PK,FK } Probabilities { - bigint model PK,FK + bigint resolution PK,FK bigint cluster PK,FK float probability } From aaebbc7fcaea6fb3b9d49056aedfe41bc315fed7 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 27 Dec 2024 16:57:44 +0000 Subject: [PATCH 04/26] Fix problems for dedupe probability factory --- test/fixtures/factories.py | 138 +++++++++++++++++++++++++++---------- 1 file changed, 103 insertions(+), 35 deletions(-) diff --git a/test/fixtures/factories.py b/test/fixtures/factories.py index d49995cc..940354b6 100644 --- a/test/fixtures/factories.py +++ b/test/fixtures/factories.py @@ -45,21 +45,31 @@ def verify_components(table) -> dict: } -def _calculate_max_possible_edges(n_nodes: int, num_components: int) -> int: +def _calculate_max_possible_edges( + n_nodes: int, num_components: int, deduplicate: bool +) -> int: """ - Calculate the max possible number of edges given n nodes split into k components. + Calculate a conservative max number of edges given n nodes split into k components. Args: - n_nodes: Total number of nodes + n_nodes: Total number of nodes on smallest table (either left or right) num_components: Number of components to split into + deduplicate: Whether we are dealing with probabilities for deduplication Returns: Maximum possible number of edges """ - nodes_per_component = n_nodes // num_components - max_edges_per_component = ( - nodes_per_component * nodes_per_component - ) # Complete bipartite graph + # Size of smallest components we will generate. Because some components might be + # larger, the final estimate might be smaller than necessary + min_nodes_per_component = n_nodes // num_components + if deduplicate: + # Max edges in undirected graph of size n + max_edges_per_component = ( + min_nodes_per_component * (min_nodes_per_component - 1) / 2 + ) + else: + # Complete bipartite graph + max_edges_per_component = min_nodes_per_component * min_nodes_per_component return max_edges_per_component * num_components @@ -81,6 +91,53 @@ def _split_values_into_components( return np.array_split(values, num_components) +def _generate_remaining_edges( + left_values: list[int], + right_values: list[int], + len_component: int, + base_edges: set[tuple[int, int, int]], + deduplicate: bool, +) -> list[tuple[int, int, int]]: + """ + Generate remaining edges in component, recursing if necessary when generated edges + need to be discarded. + + Args: + left_values: list of integers on the left table + right_values: list of integers on the right table + len_component: total number of edges to generate + base_edges: set representing edges generated so far + deduplicate: whether probabilities are for deduplication + + Returns: + Total set of edges generated at current recursion step + """ + remaining_edges = len_component - len(base_edges) + if remaining_edges <= 0: + return base_edges + + lefts = np.random.choice(left_values, size=remaining_edges) + rights = np.random.choice(right_values, size=remaining_edges) + + new_edges = set() + for le, r in zip(lefts, rights, strict=True): + if le == r: + continue + if deduplicate: + le, r = sorted([le, r]) + new_edges.add((le, r)) + + base_edges.update(new_edges) + + return _generate_remaining_edges( + left_values, + right_values, + len_component, + base_edges, + deduplicate, + ) + + def generate_dummy_probabilities( left_values: list[int], right_values: list[int] | None, @@ -93,7 +150,8 @@ def generate_dummy_probabilities( Args: left_values: List of integers to use for left column - right_values: List of integers to use for right column + right_values: List of integers to use for right column. If None, assume we + are generating probabilities for deduplication prob_range: Tuple of (min_prob, max_prob) to constrain probabilities num_components: Number of distinct connected components to generate total_rows: Total number of rows to generate @@ -102,8 +160,10 @@ def generate_dummy_probabilities( PyArrow Table with 'left', 'right', and 'probability' columns """ # Validate inputs + deduplicate = False if right_values is None: right_values = left_values + deduplicate = True if len(left_values) < 2 or len(right_values) < 2: raise ValueError("Need at least 2 possible values for both left and right") if num_components > min(len(left_values), len(right_values)): @@ -112,7 +172,9 @@ def generate_dummy_probabilities( ) min_nodes = min(len(left_values), len(right_values)) - max_possible_edges = _calculate_max_possible_edges(min_nodes, num_components) + max_possible_edges = _calculate_max_possible_edges( + min_nodes, num_components, deduplicate + ) if total_rows > max_possible_edges: raise ValueError( @@ -128,7 +190,11 @@ def generate_dummy_probabilities( # Split values into completely separate groups for each component left_components = _split_values_into_components(left_values, num_components) - right_components = _split_values_into_components(right_values, num_components) + if deduplicate: + # for each left-right component pair, the right equals the left rotated by one + right_components = [np.roll(c, 1) for c in left_components] + else: + right_components = _split_values_into_components(right_values, num_components) # Calculate base number of edges per component base_edges_per_component = total_rows // num_components @@ -141,43 +207,45 @@ def generate_dummy_probabilities( comp_left_values = left_components[comp_idx] comp_right_values = right_components[comp_idx] - # Calculate edges for this component edges_in_component = base_edges_per_component - if comp_idx < remaining_edges: # Distribute remaining edges + # Distribute remaining edges, one per component + if comp_idx < remaining_edges: edges_in_component += 1 # Ensure basic connectivity within the component - base_edges = [] + base_edges = set() # Create a spanning tree-like structure for i in range(len(comp_left_values)): - base_edges.append( - ( - comp_left_values[i], - comp_right_values[i % len(comp_right_values)], - np.random.randint(prob_min, prob_max + 1), - ) - ) + left = comp_left_values[i] + right = comp_right_values[i % len(comp_right_values)] + if deduplicate: + left, right = sorted([left, right]) + + base_edges.add((left, right)) + + # Remove self-references from base edges + base_edges = {(le, ri) for le, ri in base_edges if le != ri} # Generate remaining random edges strictly within this component - remaining_edges = edges_in_component - len(base_edges) - if remaining_edges > 0: - random_lefts = np.random.choice(comp_left_values, size=remaining_edges) - random_rights = np.random.choice(comp_right_values, size=remaining_edges) - random_probs = np.random.randint( - prob_min, prob_max + 1, size=remaining_edges - ) - - component_edges = base_edges + list( - zip(random_lefts, random_rights, random_probs, strict=True) - ) - else: - component_edges = base_edges + component_edges = _generate_remaining_edges( + comp_left_values, + comp_right_values, + base_edges_per_component, + base_edges, + deduplicate, + ) + random_probs = np.random.randint( + prob_min, prob_max + 1, size=len(component_edges) + ) + + component_edges = [ + (le, ri, pr) + for (le, ri), pr in zip(component_edges, random_probs, strict=False) + ] all_edges.extend(component_edges) - # Drop self-references - all_edges = [(le, ri, pr) for le, ri, pr in all_edges if le != ri] # Convert to arrays lefts, rights, probs = zip(*all_edges, strict=True) From 987da86cdada55167d2aa130a950959bcd5da0ec Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 27 Dec 2024 16:59:31 +0000 Subject: [PATCH 05/26] Solve problems with cluster creation --- src/matchbox/common/hash.py | 11 +++++++++++ src/matchbox/common/transform.py | 7 +++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/matchbox/common/hash.py b/src/matchbox/common/hash.py index 5f45d001..4101d37b 100644 --- a/src/matchbox/common/hash.py +++ b/src/matchbox/common/hash.py @@ -155,3 +155,14 @@ def index(self, *values: int) -> int: self.mapping[value_set] = salted_id return salted_id + + def has_mapping(self, *values: int) -> bool: + """ + Args: + values: the integers in the set you want to index + + Returns: + Boolean indicating whether index for values already exists + """ + value_set = frozenset(values) + return value_set in self.mapping diff --git a/src/matchbox/common/transform.py b/src/matchbox/common/transform.py index c4004265..3ca9383b 100644 --- a/src/matchbox/common/transform.py +++ b/src/matchbox/common/transform.py @@ -186,7 +186,7 @@ def get_components(self) -> list[set[T]]: def component_to_hierarchy( - table: pa.Table, dtype: pa.DataType = pa.uint64, salt: int = 1 + table: pa.Table, salt: int, dtype: pa.DataType = pa.uint64 ) -> pa.Table: """ Convert pairwise probabilities into a hierarchical representation. @@ -230,6 +230,9 @@ def component_to_hierarchy( if len(children) <= 2: continue # Skip pairs already handled by pairwise probabilities + if im.has_mapping(*children): + continue # Skip unchanged components from previous thresholds + parent = im.index(*children) prev_roots: set[int] = set() for child in children: @@ -317,7 +320,7 @@ def to_hierarchical_clusters( with ProcessPoolExecutor(max_workers=n_cores) as executor: futures = [ - executor.submit(proc_func, component_table, salt) + executor.submit(proc_func, component_table, salt=salt) for salt, component_table in enumerate(component_tables) ] From 4de9f1ad2a9035985b916da5a2c8c94e727e4031 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 30 Dec 2024 12:00:24 +0000 Subject: [PATCH 06/26] Rename schema_setup --- benchmark/schema_setup.py | 34 ----------------- .../postgresql/benchmark/init_schema.py | 37 +++++++++++++++++++ 2 files changed, 37 insertions(+), 34 deletions(-) delete mode 100644 benchmark/schema_setup.py create mode 100644 src/matchbox/server/postgresql/benchmark/init_schema.py diff --git a/benchmark/schema_setup.py b/benchmark/schema_setup.py deleted file mode 100644 index 4eb33256..00000000 --- a/benchmark/schema_setup.py +++ /dev/null @@ -1,34 +0,0 @@ -from textwrap import dedent - -from sqlalchemy.dialects import postgresql -from sqlalchemy.schema import CreateTable - -from matchbox.server.postgresql.orm import ( - Clusters, - Contains, - Probabilities, - ResolutionFrom, - Resolutions, - Sources, -) - -print( - dedent(""" - DROP SCHEMA mb CASCADE; - CREATE SCHEMA mb; -""") -) - -# Order matters -for table_class in ( - Resolutions, - ResolutionFrom, - Sources, - Clusters, - Contains, - Probabilities, -): - print( - str(CreateTable(table_class.__table__).compile(dialect=postgresql.dialect())) - + ";" - ) diff --git a/src/matchbox/server/postgresql/benchmark/init_schema.py b/src/matchbox/server/postgresql/benchmark/init_schema.py new file mode 100644 index 00000000..fc8a8c33 --- /dev/null +++ b/src/matchbox/server/postgresql/benchmark/init_schema.py @@ -0,0 +1,37 @@ +from textwrap import dedent + +from sqlalchemy.dialects import postgresql +from sqlalchemy.schema import CreateTable + +from matchbox.server.postgresql.orm import ( + Clusters, + Contains, + Probabilities, + ResolutionFrom, + Resolutions, + Sources, +) + +if __name__ == "__main__": + print( + dedent(""" + DROP SCHEMA mb CASCADE; + CREATE SCHEMA mb; + """) + ) + + # Order matters + for table_class in ( + Resolutions, + ResolutionFrom, + Sources, + Clusters, + Contains, + Probabilities, + ): + print( + str( + CreateTable(table_class.__table__).compile(dialect=postgresql.dialect()) + ) + + ";" + ) From 67509d610c5eb492856f2fa5d33510340e2d02f4 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 30 Dec 2024 12:00:59 +0000 Subject: [PATCH 07/26] Move factories to package --- {test/fixtures => src/matchbox/common}/factories.py | 1 + 1 file changed, 1 insertion(+) rename {test/fixtures => src/matchbox/common}/factories.py (99%) diff --git a/test/fixtures/factories.py b/src/matchbox/common/factories.py similarity index 99% rename from test/fixtures/factories.py rename to src/matchbox/common/factories.py index 940354b6..dd45820d 100644 --- a/test/fixtures/factories.py +++ b/src/matchbox/common/factories.py @@ -164,6 +164,7 @@ def generate_dummy_probabilities( if right_values is None: right_values = left_values deduplicate = True + if len(left_values) < 2 or len(right_values) < 2: raise ValueError("Need at least 2 possible values for both left and right") if num_components > min(len(left_values), len(right_values)): From 09b9b47b250ea22f57e75bc93efad92e16a8ef5d Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 30 Dec 2024 12:01:24 +0000 Subject: [PATCH 08/26] Move test_transform to common --- test/{client => common}/test_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename test/{client => common}/test_transform.py (99%) diff --git a/test/client/test_transform.py b/test/common/test_transform.py similarity index 99% rename from test/client/test_transform.py rename to test/common/test_transform.py index 8752fd15..c1ba6f0a 100644 --- a/test/client/test_transform.py +++ b/test/common/test_transform.py @@ -9,12 +9,12 @@ import pyarrow.compute as pc import pytest +from matchbox.common.factories import generate_dummy_probabilities, verify_components from matchbox.common.transform import ( attach_components_to_probabilities, component_to_hierarchy, to_hierarchical_clusters, ) -from test.fixtures.factories import generate_dummy_probabilities, verify_components @lru_cache(maxsize=None) From 4f271e370260ae2c9b9def1f09971d6a43d2de08 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 30 Dec 2024 12:02:01 +0000 Subject: [PATCH 09/26] Move pipeline to postgres benchmark --- src/matchbox/server/postgresql/benchmark/__init__.py | 0 .../matchbox/server/postgresql/benchmark/cluster_pipeline.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/matchbox/server/postgresql/benchmark/__init__.py rename benchmark/pipeline.py => src/matchbox/server/postgresql/benchmark/cluster_pipeline.py (100%) diff --git a/src/matchbox/server/postgresql/benchmark/__init__.py b/src/matchbox/server/postgresql/benchmark/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/benchmark/pipeline.py b/src/matchbox/server/postgresql/benchmark/cluster_pipeline.py similarity index 100% rename from benchmark/pipeline.py rename to src/matchbox/server/postgresql/benchmark/cluster_pipeline.py From 633448683d8e2177008c5dd1a69658724408bf39 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 30 Dec 2024 12:02:17 +0000 Subject: [PATCH 10/26] Create table generation script --- .../postgresql/benchmark/generate_tables.py | 399 ++++++++++++++++++ 1 file changed, 399 insertions(+) create mode 100644 src/matchbox/server/postgresql/benchmark/generate_tables.py diff --git a/src/matchbox/server/postgresql/benchmark/generate_tables.py b/src/matchbox/server/postgresql/benchmark/generate_tables.py new file mode 100644 index 00000000..ae779a56 --- /dev/null +++ b/src/matchbox/server/postgresql/benchmark/generate_tables.py @@ -0,0 +1,399 @@ +import json +from pathlib import Path +from typing import Iterable + +import click +import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.parquet as pq + +from matchbox.common.factories import generate_dummy_probabilities +from matchbox.common.hash import HASH_FUNC +from matchbox.common.transform import ( + attach_components_to_probabilities, + to_hierarchical_clusters, +) + + +class IDCreator: + """ + A generator of incremental integer IDs from positive and negative integers. + + Positive integers will be returned as they are, while a new ID will be generated + for each negative integer. + """ + + def __init__(self, start: int): + self.id_map = dict() + self._next_int = start + + def create(self, temp_ids: list[int]) -> list[int]: + results = [] + for ti in temp_ids: + if ti >= 0: + results.append(ti) + elif ti in self.id_map: + results.append(self.id_map[ti]) + else: + self.id_map[ti] = self._next_int + results.append(self._next_int) + self._next_int += 1 + + return results + + def reset_mapping(self): + self.__init__(self._next_int) + + return self + + +def _hash_list_int(li: list[int]) -> list[bytes]: + return [HASH_FUNC(str(i).encode("utf-8")).digest() for i in li] + + +def _unique_clusters(all_parents: Iterable[int], all_probabilities: Iterable[int]): + ll = set() + clusters = [] + probabilities = [] + for parent, prob in zip(all_parents, all_probabilities, strict=True): + if parent in ll: + continue + else: + ll.add(parent) + clusters.append(parent) + probabilities.append(prob / 100) + return clusters, probabilities + + +def generate_sources() -> pa.Table: + """ + Generate sources table. + + Returns: + PyArrow sources table + """ + sources_resolution_id = [1, 2] + sources_alias = ["alias1", "alias2"] + sources_schema = ["dbt", "dbt"] + sources_table = ["companies_house", "hmrc_exporters"] + sources_id = ["company_number", "id"] + sources_indices = [ + { + "literal": ["col1", "col2", "col3"], + "alias": ["col1", "col2", "col3"], + }, + { + "literal": ["col1", "col2", "col3"], + "alias": ["col1", "col2", "col3"], + }, + ] + sources_indices = [json.dumps(si) for si in sources_indices] + return pa.table( + { + "resolution_id": pa.array(sources_resolution_id, type=pa.uint64()), + "alias": pa.array(sources_alias, type=pa.string()), + "schema": pa.array(sources_schema, type=pa.string()), + "table": pa.array(sources_table, type=pa.string()), + "id": pa.array(sources_id, type=pa.string()), + "indices": pa.array(sources_indices, type=pa.string()), + } + ) + + +def generate_resolutions() -> pa.Table: + """ + Generate resolutions table. + + Returns: + PyArrow resolutions table + """ + resolutions_resolution_id = [1, 2, 3, 4, 5] + resolutions_name = ["source1", "source2", "dedupe1", "dedupe2", "link"] + resolutions_resolution_hash = [ + HASH_FUNC(rid.encode("utf-8")).digest() for rid in resolutions_name + ] + resolutions_type = ["dataset", "dataset", "model", "model", "model"] + resolutions_float = [None, None, 0.8, 0.8, 0.9] + + return pa.table( + { + "resolution_id": pa.array(resolutions_resolution_id, type=pa.uint64()), + "resolution_hash": pa.array(resolutions_resolution_hash, type=pa.binary()), + "type": pa.array(resolutions_type, type=pa.string()), + "name": pa.array(resolutions_name, type=pa.string()), + "description": pa.array(resolutions_name, type=pa.string()), + "truth": pa.array(resolutions_float, type=pa.float64()), + } + ) + + +def generate_resolution_from() -> pa.Table: + """ + Generate resolution_from table. + + Returns: + PyArrow resolution_from table + """ + # 1 and 2 are sources; 3 and 4 are dedupers; 5 is a linker + resolution_parent = [1, 1, 3, 2, 2, 4] + resolution_child = [3, 5, 5, 4, 5, 5] + resolution_level = [1, 2, 1, 2, 1, 1] + resolution_truth_cache = [None, None, 0.7, None, None, 0.7] + + return pa.table( + { + "parent": pa.array(resolution_parent, type=pa.uint64()), + "child": pa.array(resolution_child, type=pa.uint64()), + "level": pa.array(resolution_level, type=pa.uint32()), + "truth_cache": pa.array(resolution_truth_cache, type=pa.float64()), + } + ) + + +def generate_cluster_source(range_left: int, range_right: int) -> pa.Table: + """ + Generate cluster table containing rows for source rows. + + Args: + range_left: first ID to generate + range_right: last ID to generate, plus one + Returns: + PyArrow cluster table + """ + + def create_source_pk(li: list[int]) -> list[list[str]]: + return [[str(i)] for i in li] + + source = list(range(range_left, range_right)) + + return pa.table( + { + "cluster_id": pa.array(source, type=pa.uint64()), + "cluster_hash": pa.array(_hash_list_int(source), type=pa.binary()), + "dataset": pa.array([1] * len(source), type=pa.uint64()), + "source_pk": pa.array(create_source_pk(source), type=pa.list_(pa.string())), + } + ) + + +def generate_result_tables( + left_ids: Iterable[int], + right_ids: Iterable[int] | None, + resolution_id: int, + id_creator: IDCreator, + n_components: int, + n_probs: int, + prob_min: float = 0.6, + prob_max: float = 1, + prob_threshold: float = 0.7, +) -> tuple[pa.Table, pa.Table, pa.Table]: + """ + Generate probabilities, contains and clusters tables. + + Args: + left_ids: list of IDs for rows to dedupe, or for left rows to link + right_ids: list of IDs for right rows to link + resolution_id: ID of resolution for this dedupe or link model + id_creator: an IDCreator instance + n_components: number of implied connected components + n_probs: total number of probability edges to be generated + prob_min: minimum value for probabilities to be generated + prob_max: maximum value for probabilities to be generated + prob_threshold: minimum value for probabilities to be kept + + Returns: + Tuple with 3 PyArrow tables, for probabolities, contains and clusters + """ + probs = generate_dummy_probabilities( + left_ids, right_ids, [prob_min, prob_max], n_components, n_probs + ) + + filtered_probs = probs.filter( + pc.greater(probs["probability"], int(prob_threshold * 100)) + ) + + clusters = to_hierarchical_clusters( + attach_components_to_probabilities(filtered_probs) + ) + + indexed_parents = id_creator.create(clusters["parent"].to_pylist()) + indexed_children = id_creator.create(clusters["child"].to_pylist()) + + final_clusters, final_probs = _unique_clusters( + indexed_parents, clusters["probability"].to_numpy() + ) + + probabilities_table = pa.table( + { + "resolution": pa.array( + [resolution_id] * len(final_clusters), type=pa.uint64() + ), + "cluster": pa.array(final_clusters, type=pa.uint64()), + "probability": pa.array(final_probs, type=pa.float64()), + } + ) + + contains_table = pa.table( + { + "parent": pa.array(indexed_parents, type=pa.uint64()), + "child": pa.array(indexed_children, type=pa.uint64()), + } + ) + + clusters_table = pa.table( + { + "cluster_id": pa.array(final_clusters, type=pa.uint64()), + "cluster_hash": pa.array(_hash_list_int(final_clusters), type=pa.binary()), + "dataset": pa.array([None] * len(final_clusters), type=pa.uint64()), + "source_pk": pa.array( + [None] * len(final_clusters), type=pa.list_(pa.string()) + ), + } + ) + + return (probabilities_table, contains_table, clusters_table) + + +def generate_all_tables( + source_len: int, + dedupe_components: int, + dedupe_len: int, + link_components: int, + link_len: int, +) -> dict[str, pa.Table]: + """ + Make all 6 backend tables. It will create two sources, one deduper for each, + and one linker from each deduper. + + Args: + source_len: length of each data source + dedupe_components: number of connected components implied by each deduper + dedupe_len: probabilities generated by each deduper + link_components: number of connected components implied by each linker + link_len: probabilities generated by each linker + Returns: + A dictionary where keys are table names and values are PyArrow tables + """ + resolutions = generate_resolutions() + resolution_from = generate_resolution_from() + sources = generate_sources() + + clusters_source1 = generate_cluster_source(0, source_len) + clusters_source2 = generate_cluster_source(source_len, source_len * 2) + + id_creator = IDCreator(source_len * 2) + probabilities_dedupe1, contains_dedupe1, clusters_dedupe1 = generate_result_tables( + clusters_source1["cluster_id"].to_pylist(), + None, + 3, + id_creator, + dedupe_components, + dedupe_len, + ) + + probabilities_dedupe2, contains_dedupe2, clusters_dedupe2 = generate_result_tables( + clusters_source2["cluster_id"].to_pylist(), + None, + 4, + id_creator.reset_mapping(), + dedupe_components, + dedupe_len, + ) + + probabilities_link, contains_link, clusters_link = generate_result_tables( + contains_dedupe1["parent"].to_pylist(), + contains_dedupe2["parent"].to_pylist(), + 5, + id_creator.reset_mapping(), + link_components, + link_len, + ) + + probabilities = pa.concat_tables( + [probabilities_dedupe1, probabilities_dedupe2, probabilities_link] + ) + contains = pa.concat_tables([contains_dedupe1, contains_dedupe2, contains_link]) + clusters = pa.concat_tables( + [ + clusters_source1, + clusters_source2, + clusters_dedupe1, + clusters_dedupe2, + clusters_link, + ] + ) + + return { + "resolutions": resolutions, + "resolution_from": resolution_from, + "sources": sources, + "probabilities": probabilities, + "contains": contains, + "clusters": clusters, + } + + +@click.command() +@click.option("-s", "--settings", type=str, required=True) +@click.option("-o", "--output_dir", type=click.Path(exists=True, path_type=Path)) +def main(settings, output_dir): + PRESETS = { + "xs": { + "source_len": 10_000, + "dedupe_components": 2_500, + "dedupe_len": 10_000, + "link_components": 2_000, + "link_len": 10_000, + }, + "s": { + "source_len": 100_000, + "dedupe_components": 25_000, + "dedupe_len": 100_000, + "link_components": 20_000, + "link_len": 100_000, + }, + "m": { + "source_len": 1_000_000, + "dedupe_components": 250_000, + "dedupe_len": 1_000_000, + "link_components": 200_000, + "link_len": 1_000_000, + }, + "l": { + "source_len": 10_000_000, + "dedupe_components": 2_500_000, + "dedupe_len": 10_000_000, + "link_components": 2_000_000, + "link_len": 10_000_000, + }, + "xl": { + "source_len": 100_000_000, + "dedupe_components": 25_000_000, + "dedupe_len": 100_000_000, + "link_components": 20_000_000, + "link_len": 100_000_000, + }, + } + + if not output_dir: + output_dir = Path.cwd() / "data" / "all_tables" + if settings not in PRESETS: + raise ValueError(f"Settings {settings} are invalid") + + config = PRESETS[settings] + source_len = config["source_len"] + dedupe_components = config["dedupe_components"] + dedupe_len = config["dedupe_len"] + link_len = config["link_len"] + link_components = config["link_components"] + + all_tables = generate_all_tables( + source_len, dedupe_components, dedupe_len, link_len, link_components + ) + + for name, table in all_tables.items(): + pq.write_table(table, output_dir / f"{name}.parquet") + + +if __name__ == "__main__": + main() From 20085af96084ca99306ff3f39931c4e74726e079 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Tue, 31 Dec 2024 00:27:50 +0000 Subject: [PATCH 11/26] Make factories more deterministic --- src/matchbox/common/factories.py | 102 +++++++------------------------ src/matchbox/common/transform.py | 14 ++--- 2 files changed, 28 insertions(+), 88 deletions(-) diff --git a/src/matchbox/common/factories.py b/src/matchbox/common/factories.py index dd45820d..e1137fc7 100644 --- a/src/matchbox/common/factories.py +++ b/src/matchbox/common/factories.py @@ -73,71 +73,6 @@ def _calculate_max_possible_edges( return max_edges_per_component * num_components -def _split_values_into_components( - values: list[int], num_components: int -) -> list[np.ndarray]: - """ - Split values into non-overlapping groups for each component. - - Args: - values: List of values to split - num_components: Number of components to create - - Returns: - List of arrays, one for each component - """ - values = np.array(values) - np.random.shuffle(values) - return np.array_split(values, num_components) - - -def _generate_remaining_edges( - left_values: list[int], - right_values: list[int], - len_component: int, - base_edges: set[tuple[int, int, int]], - deduplicate: bool, -) -> list[tuple[int, int, int]]: - """ - Generate remaining edges in component, recursing if necessary when generated edges - need to be discarded. - - Args: - left_values: list of integers on the left table - right_values: list of integers on the right table - len_component: total number of edges to generate - base_edges: set representing edges generated so far - deduplicate: whether probabilities are for deduplication - - Returns: - Total set of edges generated at current recursion step - """ - remaining_edges = len_component - len(base_edges) - if remaining_edges <= 0: - return base_edges - - lefts = np.random.choice(left_values, size=remaining_edges) - rights = np.random.choice(right_values, size=remaining_edges) - - new_edges = set() - for le, r in zip(lefts, rights, strict=True): - if le == r: - continue - if deduplicate: - le, r = sorted([le, r]) - new_edges.add((le, r)) - - base_edges.update(new_edges) - - return _generate_remaining_edges( - left_values, - right_values, - len_component, - base_edges, - deduplicate, - ) - - def generate_dummy_probabilities( left_values: list[int], right_values: list[int] | None, @@ -190,12 +125,10 @@ def generate_dummy_probabilities( prob_max = int(prob_range[1] * 100) # Split values into completely separate groups for each component - left_components = _split_values_into_components(left_values, num_components) - if deduplicate: - # for each left-right component pair, the right equals the left rotated by one - right_components = [np.roll(c, 1) for c in left_components] - else: - right_components = _split_values_into_components(right_values, num_components) + left_components = np.array_split(np.array(left_values), num_components) + right_components = np.array_split(np.array(right_values), num_components) + # For each left-right component pair, the right equals the left rotated by one + right_components = [np.roll(c, 1) for c in right_components] # Calculate base number of edges per component base_edges_per_component = total_rows // num_components @@ -213,10 +146,9 @@ def generate_dummy_probabilities( if comp_idx < remaining_edges: edges_in_component += 1 - # Ensure basic connectivity within the component + # Ensure basic connectivity within the component by creating a spanning-tree + # like structure base_edges = set() - - # Create a spanning tree-like structure for i in range(len(comp_left_values)): left = comp_left_values[i] right = comp_right_values[i % len(comp_right_values)] @@ -229,20 +161,28 @@ def generate_dummy_probabilities( base_edges = {(le, ri) for le, ri in base_edges if le != ri} # Generate remaining random edges strictly within this component - component_edges = _generate_remaining_edges( - comp_left_values, - comp_right_values, - base_edges_per_component, - base_edges, - deduplicate, + # TODO: this can certainly be optimised + all_possible_edges = [ + (x, y) + for x in comp_left_values + for y in comp_right_values + if x != y and (x, y) not in base_edges + ] + edges_required = edges_in_component - len(base_edges) + extra_edges_idx = np.random.choice( + len(all_possible_edges), size=edges_required, replace=False ) + extra_edges = [ + e for i, e in enumerate(all_possible_edges) if i in extra_edges_idx + ] + component_edges = list(base_edges) + extra_edges random_probs = np.random.randint( prob_min, prob_max + 1, size=len(component_edges) ) component_edges = [ (le, ri, pr) - for (le, ri), pr in zip(component_edges, random_probs, strict=False) + for (le, ri), pr in zip(component_edges, random_probs, strict=True) ] all_edges.extend(component_edges) diff --git a/src/matchbox/common/transform.py b/src/matchbox/common/transform.py index 3ca9383b..874ceccd 100644 --- a/src/matchbox/common/transform.py +++ b/src/matchbox/common/transform.py @@ -186,7 +186,7 @@ def get_components(self) -> list[set[T]]: def component_to_hierarchy( - table: pa.Table, salt: int, dtype: pa.DataType = pa.uint64 + table: pa.Table, salt: int, dtype: pa.DataType = pa.int64 ) -> pa.Table: """ Convert pairwise probabilities into a hierarchical representation. @@ -245,8 +245,8 @@ def component_to_hierarchy( parents, children, probs = zip(*hierarchy, strict=True) return pa.table( { - "parent": pa.array(parents, type=pa.int64()), - "child": pa.array(children, type=pa.int64()), + "parent": pa.array(parents, type=dtype()), + "child": pa.array(children, type=dtype()), "probability": pa.array(probs, type=pa.uint8()), } ) @@ -255,7 +255,7 @@ def component_to_hierarchy( def to_hierarchical_clusters( probabilities: pa.Table, proc_func: Callable[[pa.Table, pa.DataType], pa.Table] = component_to_hierarchy, - dtype: pa.DataType = pa.uint64, + dtype: pa.DataType = pa.int64, timeout: int = 300, ) -> pa.Table: """ @@ -320,7 +320,7 @@ def to_hierarchical_clusters( with ProcessPoolExecutor(max_workers=n_cores) as executor: futures = [ - executor.submit(proc_func, component_table, salt=salt) + executor.submit(proc_func, component_table, salt=salt, dtype=dtype) for salt, component_table in enumerate(component_tables) ] @@ -345,8 +345,8 @@ def to_hierarchical_clusters( logic_logger.warning("No results to concatenate") return pa.table( { - "parent": pa.array([], type=pa.int64()), - "child": pa.array([], type=pa.int64()), + "parent": pa.array([], type=dtype()), + "child": pa.array([], type=dtype()), "probability": pa.array([], type=pa.uint8()), } ) From af43d80ecde28fc5bd6b29f3312eccdd4cddbcd4 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Thu, 2 Jan 2025 09:02:16 +0000 Subject: [PATCH 12/26] Make tests pass --- test/common/test_transform.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/test/common/test_transform.py b/test/common/test_transform.py index c1ba6f0a..8ebdce43 100644 --- a/test/common/test_transform.py +++ b/test/common/test_transform.py @@ -10,6 +10,7 @@ import pytest from matchbox.common.factories import generate_dummy_probabilities, verify_components +from matchbox.common.hash import IntMap from matchbox.common.transform import ( attach_components_to_probabilities, component_to_hierarchy, @@ -178,9 +179,7 @@ def test_attach_components_to_probabilities(parameters: dict[str, Any]): def test_component_to_hierarchy( probabilities: dict[str, list[str | float]], hierarchy: set[tuple[str, str, int]] ): - with patch("matchbox.common.transform.IntMap") as MockIntMap: - instance = MockIntMap.return_value - instance.index.side_effect = _combine_strings + with patch.object(IntMap, "index", side_effect=_combine_strings): probabilities_table = ( pa.Table.from_pydict(probabilities) .cast( @@ -220,7 +219,9 @@ def test_component_to_hierarchy( .filter(pc.is_valid(pc.field("parent"))) ) - hierarchy = component_to_hierarchy(probabilities_table, pa.string).sort_by( + hierarchy = component_to_hierarchy( + probabilities_table, salt=1, dtype=pa.string + ).sort_by( [ ("probability", "descending"), ("parent", "ascending"), @@ -323,10 +324,8 @@ def test_hierarchical_clusters(input_data, expected_hierarchy): "matchbox.common.transform.ProcessPoolExecutor", lambda *args, **kwargs: parallel_pool_for_tests(timeout=30), ), - patch("matchbox.common.transform.IntMap") as MockIntMap, + patch.object(IntMap, "index", side_effect=_combine_strings), ): - instance = MockIntMap.return_value - instance.index.side_effect = _combine_strings result = to_hierarchical_clusters( probabilities, dtype=pa.string, proc_func=component_to_hierarchy ) From a4df1fb5e6e1cc69f769667297160161640e03b1 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Thu, 2 Jan 2025 11:19:36 +0000 Subject: [PATCH 13/26] Test benchmark init schema --- .../postgresql/benchmark/init_schema.py | 29 ++++++++++++++----- test/server/test_postgresql.py | 22 ++++++++++++++ 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/matchbox/server/postgresql/benchmark/init_schema.py b/src/matchbox/server/postgresql/benchmark/init_schema.py index fc8a8c33..b6b3a177 100644 --- a/src/matchbox/server/postgresql/benchmark/init_schema.py +++ b/src/matchbox/server/postgresql/benchmark/init_schema.py @@ -3,6 +3,7 @@ from sqlalchemy.dialects import postgresql from sqlalchemy.schema import CreateTable +from matchbox.server.postgresql.db import MBDB from matchbox.server.postgresql.orm import ( Clusters, Contains, @@ -12,14 +13,19 @@ Sources, ) -if __name__ == "__main__": - print( - dedent(""" - DROP SCHEMA mb CASCADE; - CREATE SCHEMA mb; + +def empty_schema() -> str: + schema = MBDB.MatchboxBase.metadata.schema + sql = dedent(f""" + DROP SCHEMA IF EXISTS {schema} CASCADE; + CREATE SCHEMA {schema}; """) - ) + return sql + + +def create_tables() -> str: + sql = "" # Order matters for table_class in ( Resolutions, @@ -29,9 +35,16 @@ Contains, Probabilities, ): - print( + sql += ( str( CreateTable(table_class.__table__).compile(dialect=postgresql.dialect()) ) - + ";" + + "; \n" ) + + return sql + + +if __name__ == "__main__": + print(empty_schema()) + print(create_tables()) diff --git a/test/server/test_postgresql.py b/test/server/test_postgresql.py index 77fdc4b9..2ffc9e8d 100644 --- a/test/server/test_postgresql.py +++ b/test/server/test_postgresql.py @@ -1,6 +1,7 @@ import pytest import rustworkx as rx from pandas import DataFrame +from sqlalchemy import text from matchbox.client.results import ( ClusterResults, @@ -8,6 +9,8 @@ ModelType, ProbabilityResults, ) +from matchbox.server.postgresql.benchmark.init_schema import create_tables, empty_schema +from matchbox.server.postgresql.db import MBDB from matchbox.server.postgresql.utils.insert import _cluster_results_to_hierarchical @@ -204,3 +207,22 @@ def test_cluster_results_to_hierarchical( if actual_relations: # Skip verification for empty case verify_hierarchy(hierarchy.itertuples(index=False)) + + +def test_benchmark_init_schema(): + schema = MBDB.MatchboxBase.metadata.schema + count_tables = text(f""" + select count(*) + from information_schema.tables + where table_schema = '{schema}'; + """) + + with MBDB.get_engine().connect() as con: + con.execute(text(empty_schema())) + n_tables = int(con.execute(count_tables).scalar()) + assert n_tables == 0 + + con.execute(text(create_tables())) + n_tables_expected = len(MBDB.MatchboxBase.metadata.tables) + n_tables = int(con.execute(count_tables).scalar()) + assert n_tables == n_tables_expected From 10e11c1166d3570f3948b9eb8118a9dd0f704f37 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Thu, 2 Jan 2025 14:59:17 +0000 Subject: [PATCH 14/26] Check min components for dummy probs --- src/matchbox/common/factories.py | 135 ++++++++++++++++++++++++------- 1 file changed, 105 insertions(+), 30 deletions(-) diff --git a/src/matchbox/common/factories.py b/src/matchbox/common/factories.py index e1137fc7..e8830640 100644 --- a/src/matchbox/common/factories.py +++ b/src/matchbox/common/factories.py @@ -1,4 +1,5 @@ from collections import Counter +from textwrap import dedent import numpy as np import pyarrow as pa @@ -45,6 +46,38 @@ def verify_components(table) -> dict: } +def _calculate_min_possible_edges( + n_nodes: int, num_components: int, deduplicate: bool +) -> int: + """ + Calculate a conservative minimum number of edges to connect all components. + + Args: + n_nodes: Number of nodes on larger table (either left or right) + num_components: Number of components to split into + deduplicate: Whether we are dealing with probabilities for deduplication + + Returns: + Minimum possible number of edges + """ + # Sizes of components for larger table. Because components in other table + # might be smaller, the final estimate might be larger than necessary + min_nodes_per_component = n_nodes // num_components + max_nodes_per_component = min_nodes_per_component + 1 + + if deduplicate: + return max_nodes_per_component - 1 + + return ( + # connect left[i] to right[i] + (min_nodes_per_component) + # connect right[i] to left[i+1] + + (min_nodes_per_component - 1) + # connect left[0] to all remaining + + (max_nodes_per_component - min_nodes_per_component) + ) + + def _calculate_max_possible_edges( n_nodes: int, num_components: int, deduplicate: bool ) -> int: @@ -59,8 +92,8 @@ def _calculate_max_possible_edges( Returns: Maximum possible number of edges """ - # Size of smallest components we will generate. Because some components might be - # larger, the final estimate might be smaller than necessary + # Size of components for smaller table we will generate. Because components of other + # table might be larger, the final estimate might be smaller than necessary min_nodes_per_component = n_nodes // num_components if deduplicate: # Max edges in undirected graph of size n @@ -107,17 +140,33 @@ def generate_dummy_probabilities( "Cannot have more components than minimum of left/right values" ) - min_nodes = min(len(left_values), len(right_values)) + min_nodes, max_nodes = sorted([len(left_values), len(right_values)]) + min_possible_edges = _calculate_min_possible_edges( + max_nodes, num_components, deduplicate + ) max_possible_edges = _calculate_max_possible_edges( min_nodes, num_components, deduplicate ) - + mode = "dedupe" if deduplicate else "link" + if total_rows < min_possible_edges: + raise ValueError( + dedent(f""" + Cannot generate {total_rows:,} {mode} edges with {num_components:,} + components. + Min edges is {min_possible_edges:,} for {min_nodes:,}+{max_nodes:,} nodes. + Either decrease the number of nodes, increase the number of components, + or increase the total edges requested. + """) + ) if total_rows > max_possible_edges: raise ValueError( - f"Cannot generate {total_rows:,} edges with {num_components:,} components. " - f"Max possible edges is {max_possible_edges:,} given {min_nodes:,} nodes. " - "Either increase the number of nodes, decrease the number of components, " - "or decrease the total edges requested." + dedent(f""" + Cannot generate {total_rows:,} {mode} edges with {num_components:,} + components. + Max edges is {max_possible_edges:,} for {min_nodes:,}+{max_nodes:,} nodes. + Either increase the number of nodes, decrease the number of components, + or decrease the total edges requested. + """) ) # Convert probability range to integers (60-80 for 0.60-0.80) @@ -141,6 +190,10 @@ def generate_dummy_probabilities( comp_left_values = left_components[comp_idx] comp_right_values = right_components[comp_idx] + min_comp_nodes, max_comp_nodes = sorted( + [len(comp_left_values), len(comp_right_values)] + ) + edges_in_component = base_edges_per_component # Distribute remaining edges, one per component if comp_idx < remaining_edges: @@ -149,33 +202,55 @@ def generate_dummy_probabilities( # Ensure basic connectivity within the component by creating a spanning-tree # like structure base_edges = set() - for i in range(len(comp_left_values)): - left = comp_left_values[i] - right = comp_right_values[i % len(comp_right_values)] - if deduplicate: - left, right = sorted([left, right]) - - base_edges.add((left, right)) + # For deduping (A B C) you just need (A - B) (B - C) (C - A) + # which just needs matching pairwise the data and its rotated version + # For linking (A B) and (C D E), we begin by adding (A - C) and (B - D) + for i in range(min_comp_nodes): + small_n, large_n = sorted([comp_left_values[i], comp_right_values[i]]) + base_edges.add((small_n, large_n)) + if not deduplicate: + # For linking, for example above, we now add (C - B) + for i in range(min_comp_nodes - 1): + small_n, large_n = sorted( + [comp_left_values[i + 1], comp_right_values[i]] + ) + base_edges.add((small_n, large_n)) + # For linking, for example above, we now add (A - D) + left_right_diff = max_comp_nodes - min_comp_nodes + for i in range(left_right_diff): + small_comp_half, large_comp_half = comp_left_values, comp_right_values + if len(comp_right_values) < len(comp_left_values): + small_comp_half, large_comp_half = ( + comp_right_values, + comp_left_values, + ) + + base_edges.add( + (small_comp_half[0], large_comp_half[min_comp_nodes + i]) + ) # Remove self-references from base edges base_edges = {(le, ri) for le, ri in base_edges if le != ri} - # Generate remaining random edges strictly within this component - # TODO: this can certainly be optimised - all_possible_edges = [ - (x, y) - for x in comp_left_values - for y in comp_right_values - if x != y and (x, y) not in base_edges - ] edges_required = edges_in_component - len(base_edges) - extra_edges_idx = np.random.choice( - len(all_possible_edges), size=edges_required, replace=False - ) - extra_edges = [ - e for i, e in enumerate(all_possible_edges) if i in extra_edges_idx - ] - component_edges = list(base_edges) + extra_edges + component_edges = list(base_edges) + + if edges_required > 0: + # Generate remaining random edges strictly within this component + # TODO: this can certainly be optimised + all_possible_edges = [ + tuple(sorted([x, y])) + for x in comp_left_values + for y in comp_right_values + if x != y and (x, y) not in base_edges + ] + extra_edges_idx = np.random.choice( + len(all_possible_edges), size=edges_required, replace=False + ) + extra_edges = [ + e for i, e in enumerate(all_possible_edges) if i in extra_edges_idx + ] + component_edges += extra_edges random_probs = np.random.randint( prob_min, prob_max + 1, size=len(component_edges) ) From e474e1a069a72d125d70503a5f4c1f899956257a Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Thu, 2 Jan 2025 15:00:01 +0000 Subject: [PATCH 15/26] Test generation of benchmark tables --- test/server/test_postgresql.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/test/server/test_postgresql.py b/test/server/test_postgresql.py index 2ffc9e8d..9eecbae5 100644 --- a/test/server/test_postgresql.py +++ b/test/server/test_postgresql.py @@ -1,3 +1,5 @@ +from typing import Iterable + import pytest import rustworkx as rx from pandas import DataFrame @@ -9,6 +11,7 @@ ModelType, ProbabilityResults, ) +from matchbox.server.postgresql.benchmark.generate_tables import generate_all_tables from matchbox.server.postgresql.benchmark.init_schema import create_tables, empty_schema from matchbox.server.postgresql.db import MBDB from matchbox.server.postgresql.utils.insert import _cluster_results_to_hierarchical @@ -219,10 +222,40 @@ def test_benchmark_init_schema(): with MBDB.get_engine().connect() as con: con.execute(text(empty_schema())) + con.commit() n_tables = int(con.execute(count_tables).scalar()) assert n_tables == 0 con.execute(text(create_tables())) + con.commit() n_tables_expected = len(MBDB.MatchboxBase.metadata.tables) n_tables = int(con.execute(count_tables).scalar()) assert n_tables == n_tables_expected + + +def test_benchmark_generate_tables(): + schema = MBDB.MatchboxBase.metadata.schema + + def array_encode(array: Iterable[str]): + if not array: + return None + escaped_l = [f'"{s}"' for s in array] + list_rep = ", ".join(escaped_l) + return "{" + list_rep + "}" + + with MBDB.get_engine().connect() as con: + con.execute(text(empty_schema())) + con.commit() + + results = generate_all_tables(20, 5, 15, 5, 30) + + for table_name, table_arrow in results.items(): + df = table_arrow.to_pandas() + # Pandas' `to_sql` dislikes arrays + if "source_pk" in df.columns: + df["source_pk"] = df["source_pk"].apply(array_encode) + # Pandas' `to_sql` dislikes large unsigned ints + for c in df.columns: + if df[c].dtype == "uint64": + df[c] = df[c].astype("int64") + df.to_sql(table_name, con, schema) From 4cb38346b16adeaac671da96ed4c9d0f6a004195 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Thu, 2 Jan 2025 16:00:34 +0000 Subject: [PATCH 16/26] Test hierarchical clusters edge case --- test/common/test_transform.py | 39 ++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/test/common/test_transform.py b/test/common/test_transform.py index 8ebdce43..c600b227 100644 --- a/test/common/test_transform.py +++ b/test/common/test_transform.py @@ -1,6 +1,5 @@ from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager -from functools import lru_cache from itertools import chain from typing import Any, Iterator from unittest.mock import patch @@ -18,10 +17,10 @@ ) -@lru_cache(maxsize=None) -def _combine_strings(*n: str) -> str: +def _combine_strings(self, *n: str) -> str: """ - Combine n strings into a single string. + Combine n strings into a single string, with a cache. + Meant to replace `matchbox.common.hash.IntMap.index` Args: *args: Variable number of strings to combine @@ -29,8 +28,15 @@ def _combine_strings(*n: str) -> str: Returns: A single string """ + value_set = frozenset(n) + if value_set in self.mapping: + return self.mapping[value_set] + letters = set(chain.from_iterable(n)) - return "".join(sorted(letters)) + + new_id = "".join(sorted(letters)) + self.mapping[value_set] = new_id + return new_id @contextmanager @@ -173,13 +179,32 @@ def test_attach_components_to_probabilities(parameters: dict[str, Any]): ("xy", "y", 90), }, ), + # Test case 4: A component larger than two remains unchanged + # at a successive threshold + ( + { + "left": ["x", "y", "a"], + "right": ["y", "z", "b"], + "probability": [90, 90, 85], + }, + { + ("xy", "x", 90), + ("xy", "y", 90), + ("yz", "y", 90), + ("yz", "z", 90), + ("xyz", "xy", 90), + ("xyz", "yz", 90), + ("ab", "a", 85), + ("ab", "b", 85), + }, + ), ], - ids=["equal", "asymmetric", "single"], + ids=["equal", "asymmetric", "single", "unchanged"], ) def test_component_to_hierarchy( probabilities: dict[str, list[str | float]], hierarchy: set[tuple[str, str, int]] ): - with patch.object(IntMap, "index", side_effect=_combine_strings): + with patch.object(IntMap, "index", _combine_strings): probabilities_table = ( pa.Table.from_pydict(probabilities) .cast( From 0e806febc9ef474a0b57c93577220959382b8b9c Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Thu, 2 Jan 2025 16:16:07 +0000 Subject: [PATCH 17/26] Move factories tests to own file --- test/common/test_factories.py | 48 +++++++++++++++++++++++++++++++++++ test/common/test_transform.py | 41 +----------------------------- 2 files changed, 49 insertions(+), 40 deletions(-) create mode 100644 test/common/test_factories.py diff --git a/test/common/test_factories.py b/test/common/test_factories.py new file mode 100644 index 00000000..d442f92a --- /dev/null +++ b/test/common/test_factories.py @@ -0,0 +1,48 @@ +from typing import Any + +import pyarrow.compute as pc +import pytest + +from matchbox.common.factories import generate_dummy_probabilities, verify_components + + +@pytest.mark.parametrize( + ("parameters"), + [ + { + "left_range": (0, 1_000), + "right_range": (1_000, 2_000), + "prob_range": (0.6, 0.8), + "num_components": 10, + "total_rows": 100_000, + }, + ], + ids=["simple"], +) +def test_probabilities_factory(parameters: dict[str, Any]): + left_values = range(*parameters["left_range"]) + right_values = range(*parameters["right_range"]) + + probabilities = generate_dummy_probabilities( + left_values=left_values, + right_values=right_values, + prob_range=parameters["prob_range"], + num_components=parameters["num_components"], + total_rows=parameters["total_rows"], + ) + report = verify_components(table=probabilities) + + assert report["num_components"] == parameters["num_components"] + assert set(pc.unique(probabilities["left"]).to_pylist()) == set(left_values) + assert set(pc.unique(probabilities["right"]).to_pylist()) == set(right_values) + assert ( + pc.max(probabilities["probability"]).as_py() / 100 + <= parameters["prob_range"][1] + ) + assert ( + pc.min(probabilities["probability"]).as_py() / 100 + >= parameters["prob_range"][0] + ) + + +def test_dummy_probs_min_edges_dedupe(): ... diff --git a/test/common/test_transform.py b/test/common/test_transform.py index c600b227..ee54133e 100644 --- a/test/common/test_transform.py +++ b/test/common/test_transform.py @@ -8,7 +8,7 @@ import pyarrow.compute as pc import pytest -from matchbox.common.factories import generate_dummy_probabilities, verify_components +from matchbox.common.factories import generate_dummy_probabilities from matchbox.common.hash import IntMap from matchbox.common.transform import ( attach_components_to_probabilities, @@ -56,45 +56,6 @@ def parallel_pool_for_tests( executor.shutdown(wait=False, cancel_futures=True) -@pytest.mark.parametrize( - ("parameters"), - [ - { - "left_range": (0, 1_000), - "right_range": (1_000, 2_000), - "prob_range": (0.6, 0.8), - "num_components": 10, - "total_rows": 100_000, - }, - ], - ids=["simple"], -) -def test_probabilities_factory(parameters: dict[str, Any]): - left_values = range(*parameters["left_range"]) - right_values = range(*parameters["right_range"]) - - probabilities = generate_dummy_probabilities( - left_values=left_values, - right_values=right_values, - prob_range=parameters["prob_range"], - num_components=parameters["num_components"], - total_rows=parameters["total_rows"], - ) - report = verify_components(table=probabilities) - - assert report["num_components"] == parameters["num_components"] - assert set(pc.unique(probabilities["left"]).to_pylist()) == set(left_values) - assert set(pc.unique(probabilities["right"]).to_pylist()) == set(right_values) - assert ( - pc.max(probabilities["probability"]).as_py() / 100 - <= parameters["prob_range"][1] - ) - assert ( - pc.min(probabilities["probability"]).as_py() / 100 - >= parameters["prob_range"][0] - ) - - @pytest.mark.parametrize( ("parameters"), [ From 676ce6f30aa8a337440d57e46bf4d76130fcca88 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 3 Jan 2025 14:01:21 +0000 Subject: [PATCH 18/26] Test and fix dummy probability generation --- src/matchbox/common/factories.py | 213 ++++++++++++++++++------------- test/common/test_factories.py | 148 +++++++++++++++++++-- 2 files changed, 255 insertions(+), 106 deletions(-) diff --git a/src/matchbox/common/factories.py b/src/matchbox/common/factories.py index e8830640..242b1353 100644 --- a/src/matchbox/common/factories.py +++ b/src/matchbox/common/factories.py @@ -46,64 +46,91 @@ def verify_components(table) -> dict: } -def _calculate_min_possible_edges( - n_nodes: int, num_components: int, deduplicate: bool -) -> int: +def _min_edges_component(left: int, right: int, deduplicate: bool) -> int: """ - Calculate a conservative minimum number of edges to connect all components. + Calculate min edges for component to be connected. + Does so by assuming a spanning tree. Args: - n_nodes: Number of nodes on larger table (either left or right) - num_components: Number of components to split into - deduplicate: Whether we are dealing with probabilities for deduplication + left: number of nodes of component on the left + right: number of nodes of component on the right (for linking) + deduplicate: whether edges are for deduplication Returns: - Minimum possible number of edges + Minimum number of edges """ - # Sizes of components for larger table. Because components in other table - # might be smaller, the final estimate might be larger than necessary - min_nodes_per_component = n_nodes // num_components - max_nodes_per_component = min_nodes_per_component + 1 - - if deduplicate: - return max_nodes_per_component - 1 - - return ( - # connect left[i] to right[i] - (min_nodes_per_component) - # connect right[i] to left[i+1] - + (min_nodes_per_component - 1) - # connect left[0] to all remaining - + (max_nodes_per_component - min_nodes_per_component) - ) + if not deduplicate: + return left + right - 1 + + return left - 1 -def _calculate_max_possible_edges( - n_nodes: int, num_components: int, deduplicate: bool -) -> int: +def _max_edges_component(left: int, right: int, deduplicate: bool) -> int: """ - Calculate a conservative max number of edges given n nodes split into k components. + Calculate max edges for component to be avoid duplication. + Considers complete graph for deduping, and complete bipartite graph for linking. Args: - n_nodes: Total number of nodes on smallest table (either left or right) - num_components: Number of components to split into - deduplicate: Whether we are dealing with probabilities for deduplication + left: number of nodes of component on the left + right: number of nodes of component on the right (for linking) + deduplicate: whether edges are for deduplication Returns: - Maximum possible number of edges + Maximum number of edges """ - # Size of components for smaller table we will generate. Because components of other - # table might be larger, the final estimate might be smaller than necessary - min_nodes_per_component = n_nodes // num_components - if deduplicate: - # Max edges in undirected graph of size n - max_edges_per_component = ( - min_nodes_per_component * (min_nodes_per_component - 1) / 2 - ) - else: - # Complete bipartite graph - max_edges_per_component = min_nodes_per_component * min_nodes_per_component - return max_edges_per_component * num_components + if not deduplicate: + return left * right + # n*(n-1) is always divisible by 2 + return left * (left - 1) // 2 + + +def calculate_min_max_edges( + left_nodes: int, right_nodes: int, num_components: int, deduplicate: bool +) -> tuple[int, int]: + """ + Calculate min and max edges for a graph. + + Args: + left_nodes: number of nodes in left source + right_nodes: number of nodes in right source + num_components: number of requested components + deduplicate: whether edges are for deduplication + + Returns: + Two-tuple representing min and max edges + """ + left_mod, right_mod = left_nodes % num_components, right_nodes % num_components + left_div, right_div = left_nodes // num_components, right_nodes // num_components + + min_mod, max_mod = sorted([left_mod, right_mod]) + + min_edges, max_edges = 0, 0 + # components where both sides have maximum nodes + min_edges += ( + _min_edges_component(left_div + 1, right_div + 1, deduplicate) * min_mod + ) + max_edges += ( + _max_edges_component(left_div + 1, right_div + 1, deduplicate) * min_mod + ) + # components where one side has maximum nodes + left_after_min_mod, right_after_min_mod = left_div + 1, right_div + if left_mod == min_mod: + left_after_min_mod, right_after_min_mod = left_div, right_div + 1 + min_edges += _min_edges_component( + left_after_min_mod, right_after_min_mod, deduplicate + ) * (max_mod - min_mod) + max_edges += _max_edges_component( + left_after_min_mod, right_after_min_mod, deduplicate + ) * (max_mod - min_mod) + # components where both side have maximum nodes + min_edges += _min_edges_component(left_div, right_div, deduplicate) * ( + num_components - max_mod + ) + max_edges += _max_edges_component(left_div, right_div, deduplicate) * ( + num_components - max_mod + ) + + return min_edges, max_edges def generate_dummy_probabilities( @@ -140,20 +167,18 @@ def generate_dummy_probabilities( "Cannot have more components than minimum of left/right values" ) - min_nodes, max_nodes = sorted([len(left_values), len(right_values)]) - min_possible_edges = _calculate_min_possible_edges( - max_nodes, num_components, deduplicate - ) - max_possible_edges = _calculate_max_possible_edges( - min_nodes, num_components, deduplicate + left_nodes, right_nodes = len(left_values), len(right_values) + min_possible_edges, max_possible_edges = calculate_min_max_edges( + left_nodes, right_nodes, num_components, deduplicate ) + mode = "dedupe" if deduplicate else "link" if total_rows < min_possible_edges: raise ValueError( dedent(f""" Cannot generate {total_rows:,} {mode} edges with {num_components:,} components. - Min edges is {min_possible_edges:,} for {min_nodes:,}+{max_nodes:,} nodes. + Min edges is {min_possible_edges:,} for nodes given. Either decrease the number of nodes, increase the number of components, or increase the total edges requested. """) @@ -163,12 +188,14 @@ def generate_dummy_probabilities( dedent(f""" Cannot generate {total_rows:,} {mode} edges with {num_components:,} components. - Max edges is {max_possible_edges:,} for {min_nodes:,}+{max_nodes:,} nodes. + Max edges is {max_possible_edges:,} for nodes given. Either increase the number of nodes, decrease the number of components, or decrease the total edges requested. """) ) + n_extra_edges = total_rows - min_possible_edges + # Convert probability range to integers (60-80 for 0.60-0.80) prob_min = int(prob_range[0] * 100) prob_max = int(prob_range[1] * 100) @@ -179,10 +206,6 @@ def generate_dummy_probabilities( # For each left-right component pair, the right equals the left rotated by one right_components = [np.roll(c, 1) for c in right_components] - # Calculate base number of edges per component - base_edges_per_component = total_rows // num_components - remaining_edges = total_rows % num_components - all_edges = [] # Generate edges for each component @@ -194,56 +217,62 @@ def generate_dummy_probabilities( [len(comp_left_values), len(comp_right_values)] ) - edges_in_component = base_edges_per_component - # Distribute remaining edges, one per component - if comp_idx < remaining_edges: - edges_in_component += 1 - # Ensure basic connectivity within the component by creating a spanning-tree - # like structure base_edges = set() # For deduping (A B C) you just need (A - B) (B - C) (C - A) - # which just needs matching pairwise the data and its rotated version - # For linking (A B) and (C D E), we begin by adding (A - C) and (B - D) - for i in range(min_comp_nodes): - small_n, large_n = sorted([comp_left_values[i], comp_right_values[i]]) - base_edges.add((small_n, large_n)) - if not deduplicate: - # For linking, for example above, we now add (C - B) + # which just needs matching pairwise the data and its rotated version. + # For deduping, `min_comp_nodes` == `max_comp_nodes` + if deduplicate: for i in range(min_comp_nodes - 1): - small_n, large_n = sorted( - [comp_left_values[i + 1], comp_right_values[i]] - ) + small_n, large_n = sorted([comp_left_values[i], comp_right_values[i]]) base_edges.add((small_n, large_n)) - # For linking, for example above, we now add (A - D) + else: + # For linking (A B) and (C D E), we begin by adding (A - C) and (B - D) + for i in range(min_comp_nodes): + base_edges.add((comp_left_values[i], comp_right_values[i])) + # we now add (C - B) + for i in range(min_comp_nodes - 1): + base_edges.add((comp_left_values[i + 1], comp_right_values[i])) + # we now add (A - D) left_right_diff = max_comp_nodes - min_comp_nodes for i in range(left_right_diff): - small_comp_half, large_comp_half = comp_left_values, comp_right_values + left_i, right_i = 0, min_comp_nodes + i if len(comp_right_values) < len(comp_left_values): - small_comp_half, large_comp_half = ( - comp_right_values, - comp_left_values, - ) - - base_edges.add( - (small_comp_half[0], large_comp_half[min_comp_nodes + i]) - ) + left_i, right_i = min_comp_nodes + i, 0 - # Remove self-references from base edges - base_edges = {(le, ri) for le, ri in base_edges if le != ri} + base_edges.add((comp_left_values[left_i], comp_right_values[right_i])) - edges_required = edges_in_component - len(base_edges) component_edges = list(base_edges) - if edges_required > 0: + if n_extra_edges > 0: # Generate remaining random edges strictly within this component # TODO: this can certainly be optimised - all_possible_edges = [ - tuple(sorted([x, y])) - for x in comp_left_values - for y in comp_right_values - if x != y and (x, y) not in base_edges - ] + if deduplicate: + all_possible_edges = list( + { + tuple(sorted([x, y])) + for x in comp_left_values + for y in comp_right_values + if x != y and tuple(sorted([x, y])) not in base_edges + } + ) + else: + all_possible_edges = list( + { + (x, y) + for x in comp_left_values + for y in comp_right_values + if x != y and (x, y) not in base_edges + } + ) + max_new_edges = len(all_possible_edges) + if max_new_edges >= n_extra_edges: + edges_required = n_extra_edges + n_extra_edges = 0 + else: + edges_required = max_new_edges + n_extra_edges -= max_new_edges + extra_edges_idx = np.random.choice( len(all_possible_edges), size=edges_required, replace=False ) diff --git a/test/common/test_factories.py b/test/common/test_factories.py index d442f92a..967f3851 100644 --- a/test/common/test_factories.py +++ b/test/common/test_factories.py @@ -1,40 +1,122 @@ from typing import Any +import numpy as np import pyarrow.compute as pc import pytest -from matchbox.common.factories import generate_dummy_probabilities, verify_components +from matchbox.common.factories import ( + calculate_min_max_edges, + generate_dummy_probabilities, + verify_components, +) + + +@pytest.mark.parametrize( + ("left_n", "right_n", "n_components", "true_min", "true_max"), + [ + (10, None, 2, 8, 20), + (11, None, 2, 9, 25), + (9, 9, 3, 15, 27), + (8, 4, 3, 9, 11), + (4, 8, 3, 9, 11), + (8, 8, 3, 13, 22), + ], + ids=[ + "dedupe_no_mod", + "dedup_mod", + "link_no_mod", + "link_left_mod", + "link_right_mod", + "link_same_mod", + ], +) +def test_calculate_min_max_edges( + left_n: int, right_n: int | None, n_components: int, true_min: int, true_max: int +): + deduplicate = False + if not right_n: + deduplicate = True + right_n = left_n + min_edges, max_edges = calculate_min_max_edges( + left_n, right_n, n_components, deduplicate + ) + + assert true_min == min_edges + assert true_max == max_edges @pytest.mark.parametrize( ("parameters"), [ { - "left_range": (0, 1_000), - "right_range": (1_000, 2_000), + "left_count": 1000, + "right_count": None, + "prob_range": (0.6, 0.8), + "num_components": 10, + "total_rows": calculate_min_max_edges(1000, 1000, 10, True)[0], + }, + { + "left_count": 1_000, + "right_count": None, + "prob_range": (0.6, 0.8), + "num_components": 10, + "total_rows": calculate_min_max_edges(1000, 1000, 10, True)[1], + }, + { + "left_count": 1_000, + "right_count": 1_000, + "prob_range": (0.6, 0.8), + "num_components": 10, + "total_rows": calculate_min_max_edges(1000, 1000, 10, False)[0], + }, + { + "left_count": 1_000, + "right_count": 1_000, "prob_range": (0.6, 0.8), "num_components": 10, - "total_rows": 100_000, + "total_rows": calculate_min_max_edges(1000, 1000, 10, False)[1], }, ], - ids=["simple"], + ids=["dedupe_min", "dedupe_max", "link_min", "link_max"], ) -def test_probabilities_factory(parameters: dict[str, Any]): - left_values = range(*parameters["left_range"]) - right_values = range(*parameters["right_range"]) +def test_generate_dummy_probabilities(parameters: dict[str, Any]): + len_left = parameters["left_count"] + len_right = parameters["right_count"] + if len_right: + total_len = len_left + len_right + len_right = parameters["right_count"] + rand_vals = np.random.choice(a=total_len, replace=False, size=total_len) + left_values = list(rand_vals[:len_left]) + right_values = list(rand_vals[len_left:]) + else: + rand_vals = np.random.choice(a=len_left, replace=False, size=len_left) + left_values = list(rand_vals[:len_left]) + right_values = None + + n_components = parameters["num_components"] + total_rows = parameters["total_rows"] probabilities = generate_dummy_probabilities( left_values=left_values, right_values=right_values, prob_range=parameters["prob_range"], - num_components=parameters["num_components"], - total_rows=parameters["total_rows"], + num_components=n_components, + total_rows=total_rows, ) report = verify_components(table=probabilities) + p_left = probabilities["left"].to_pylist() + p_right = probabilities["right"].to_pylist() + + assert report["num_components"] == n_components + + # Link + if right_values: + assert set(p_left) == set(left_values) + assert set(p_right) == set(right_values) + # Dedupe + else: + assert set(p_left) | set(p_right) == set(left_values) - assert report["num_components"] == parameters["num_components"] - assert set(pc.unique(probabilities["left"]).to_pylist()) == set(left_values) - assert set(pc.unique(probabilities["right"]).to_pylist()) == set(right_values) assert ( pc.max(probabilities["probability"]).as_py() / 100 <= parameters["prob_range"][1] @@ -44,5 +126,43 @@ def test_probabilities_factory(parameters: dict[str, Any]): >= parameters["prob_range"][0] ) + assert len(probabilities) == total_rows + + edges = zip(p_left, p_right, strict=True) + edges_set = {tuple(sorted(e)) for e in edges} + assert len(edges_set) == total_rows + + self_references = [e for e in edges if e[0] == e[1]] + assert len(self_references) == 0 + + +@pytest.mark.parametrize( + ("parameters"), + [ + { + "left_range": (0, 10_000), + "right_range": (10_000, 20_000), + "num_components": 2, + "total_rows": 1, + }, + { + "left_range": (0, 10), + "right_range": (10, 20), + "num_components": 2, + "total_rows": 8_000, + }, + ], + ids=["lower_than_min", "higher_than_max"], +) +def test_generate_dummy_probabilities_errors(parameters: dict[str, Any]): + left_values = range(*parameters["left_range"]) + right_values = range(*parameters["right_range"]) -def test_dummy_probs_min_edges_dedupe(): ... + with pytest.raises(ValueError): + generate_dummy_probabilities( + left_values=left_values, + right_values=right_values, + prob_range=(0.6, 0.8), + num_components=parameters["num_components"], + total_rows=parameters["total_rows"], + ) From 1a1fce5a73a2b4498f611a11f973086df8ac5e19 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Fri, 3 Jan 2025 14:07:45 +0000 Subject: [PATCH 19/26] Make all tests pass --- test/common/test_transform.py | 2 +- test/server/test_postgresql.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/common/test_transform.py b/test/common/test_transform.py index ee54133e..8e7ef3d9 100644 --- a/test/common/test_transform.py +++ b/test/common/test_transform.py @@ -310,7 +310,7 @@ def test_hierarchical_clusters(input_data, expected_hierarchy): "matchbox.common.transform.ProcessPoolExecutor", lambda *args, **kwargs: parallel_pool_for_tests(timeout=30), ), - patch.object(IntMap, "index", side_effect=_combine_strings), + patch.object(IntMap, "index", _combine_strings), ): result = to_hierarchical_clusters( probabilities, dtype=pa.string, proc_func=component_to_hierarchy diff --git a/test/server/test_postgresql.py b/test/server/test_postgresql.py index 9eecbae5..8e872be8 100644 --- a/test/server/test_postgresql.py +++ b/test/server/test_postgresql.py @@ -247,7 +247,7 @@ def array_encode(array: Iterable[str]): con.execute(text(empty_schema())) con.commit() - results = generate_all_tables(20, 5, 15, 5, 30) + results = generate_all_tables(20, 5, 15, 5, 100) for table_name, table_arrow in results.items(): df = table_arrow.to_pandas() From f4eaf79b79819b3e5df30cd8a66594c0d3ef23e2 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 6 Jan 2025 08:45:11 +0000 Subject: [PATCH 20/26] Slightly extend test for generating benchmark data --- src/matchbox/common/transform.py | 2 +- test/server/test_postgresql.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/matchbox/common/transform.py b/src/matchbox/common/transform.py index 874ceccd..1ebcc36b 100644 --- a/src/matchbox/common/transform.py +++ b/src/matchbox/common/transform.py @@ -186,7 +186,7 @@ def get_components(self) -> list[set[T]]: def component_to_hierarchy( - table: pa.Table, salt: int, dtype: pa.DataType = pa.int64 + table: pa.Table, salt: int, dtype: pa.DataType = pa.uint64 ) -> pa.Table: """ Convert pairwise probabilities into a hierarchical representation. diff --git a/test/server/test_postgresql.py b/test/server/test_postgresql.py index 8e872be8..ba436c7b 100644 --- a/test/server/test_postgresql.py +++ b/test/server/test_postgresql.py @@ -249,6 +249,8 @@ def array_encode(array: Iterable[str]): results = generate_all_tables(20, 5, 15, 5, 100) + assert len(results) == len(MBDB.MatchboxBase.metadata.tables) + for table_name, table_arrow in results.items(): df = table_arrow.to_pandas() # Pandas' `to_sql` dislikes arrays From 09a4b637c6e9c66b5a626ac4b23258212da10d06 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone <92318564+leo-mazzone@users.noreply.github.com> Date: Mon, 6 Jan 2025 10:50:38 +0000 Subject: [PATCH 21/26] Add missing type hints Co-authored-by: Will Langdale <113046391+wpfl-dbt@users.noreply.github.com> --- src/matchbox/common/factories.py | 2 +- src/matchbox/server/postgresql/benchmark/generate_tables.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/matchbox/common/factories.py b/src/matchbox/common/factories.py index 242b1353..6e295d32 100644 --- a/src/matchbox/common/factories.py +++ b/src/matchbox/common/factories.py @@ -6,7 +6,7 @@ import rustworkx as rx -def verify_components(table) -> dict: +def verify_components(table: pa.Table) -> dict: """ Fast verification of connected components using rustworkx. diff --git a/src/matchbox/server/postgresql/benchmark/generate_tables.py b/src/matchbox/server/postgresql/benchmark/generate_tables.py index ae779a56..7b3c2c4c 100644 --- a/src/matchbox/server/postgresql/benchmark/generate_tables.py +++ b/src/matchbox/server/postgresql/benchmark/generate_tables.py @@ -51,7 +51,7 @@ def _hash_list_int(li: list[int]) -> list[bytes]: return [HASH_FUNC(str(i).encode("utf-8")).digest() for i in li] -def _unique_clusters(all_parents: Iterable[int], all_probabilities: Iterable[int]): +def _unique_clusters(all_parents: Iterable[int], all_probabilities: Iterable[int]) -> tuple[list[int], list[float]]: ll = set() clusters = [] probabilities = [] From 02a3a0148bc5d5af4ad4bb69c5a69ece894e2dc7 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 6 Jan 2025 10:57:32 +0000 Subject: [PATCH 22/26] Fix typo --- .../server/postgresql/benchmark/generate_tables.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/matchbox/server/postgresql/benchmark/generate_tables.py b/src/matchbox/server/postgresql/benchmark/generate_tables.py index ae779a56..3b39645e 100644 --- a/src/matchbox/server/postgresql/benchmark/generate_tables.py +++ b/src/matchbox/server/postgresql/benchmark/generate_tables.py @@ -137,7 +137,7 @@ def generate_resolution_from() -> pa.Table: # 1 and 2 are sources; 3 and 4 are dedupers; 5 is a linker resolution_parent = [1, 1, 3, 2, 2, 4] resolution_child = [3, 5, 5, 4, 5, 5] - resolution_level = [1, 2, 1, 2, 1, 1] + resolution_level = [1, 2, 1, 1, 2, 1] resolution_truth_cache = [None, None, 0.7, None, None, 0.7] return pa.table( @@ -339,9 +339,9 @@ def generate_all_tables( def main(settings, output_dir): PRESETS = { "xs": { - "source_len": 10_000, - "dedupe_components": 2_500, - "dedupe_len": 10_000, + "source_len": 1_000, + "dedupe_components": 500, + "dedupe_len": 500, "link_components": 2_000, "link_len": 10_000, }, From b6a9b887cc06a877c9bad1313cd6357b449fe8da Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 6 Jan 2025 15:51:11 +0000 Subject: [PATCH 23/26] Add ipywidgets --- pyproject.toml | 1 + uv.lock | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 4ed88b0d..5de023a6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ server = [ [dependency-groups] dev = [ "ipykernel>=6.29.5", + "ipywidgets>=8.1.5", "pre-commit>=3.8.0", "pytest>=8.3.3", "pytest-cov>=5.0.0", diff --git a/uv.lock b/uv.lock index 07a7cb3d..6f2472e6 100644 --- a/uv.lock +++ b/uv.lock @@ -734,6 +734,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a8/a2/6c725958e6f135d8e5de081e69841bb2c1d84b3fc259d02eb092b8fc203a/ipython-8.27.0-py3-none-any.whl", hash = "sha256:f68b3cb8bde357a5d7adc9598d57e22a45dfbea19eb6b98286fa3b288c9cd55c", size = 818986 }, ] +[[package]] +name = "ipywidgets" +version = "8.1.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "comm" }, + { name = "ipython" }, + { name = "jupyterlab-widgets" }, + { name = "traitlets" }, + { name = "widgetsnbextension" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c7/4c/dab2a281b07596a5fc220d49827fe6c794c66f1493d7a74f1df0640f2cc5/ipywidgets-8.1.5.tar.gz", hash = "sha256:870e43b1a35656a80c18c9503bbf2d16802db1cb487eec6fab27d683381dde17", size = 116723 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/22/2d/9c0b76f2f9cc0ebede1b9371b6f317243028ed60b90705863d493bae622e/ipywidgets-8.1.5-py3-none-any.whl", hash = "sha256:3290f526f87ae6e77655555baba4f36681c555b8bdbbff430b70e52c34c86245", size = 139767 }, +] + [[package]] name = "jedi" version = "0.19.1" @@ -815,6 +831,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c9/fb/108ecd1fe961941959ad0ee4e12ee7b8b1477247f30b1fdfd83ceaf017f0/jupyter_core-5.7.2-py3-none-any.whl", hash = "sha256:4f7315d2f6b4bcf2e3e7cb6e46772eba760ae459cd1f59d29eb57b0a01bd7409", size = 28965 }, ] +[[package]] +name = "jupyterlab-widgets" +version = "3.0.13" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/59/73/fa26bbb747a9ea4fca6b01453aa22990d52ab62dd61384f1ac0dc9d4e7ba/jupyterlab_widgets-3.0.13.tar.gz", hash = "sha256:a2966d385328c1942b683a8cd96b89b8dd82c8b8f81dda902bb2bc06d46f5bed", size = 203556 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a9/93/858e87edc634d628e5d752ba944c2833133a28fa87bb093e6832ced36a3e/jupyterlab_widgets-3.0.13-py3-none-any.whl", hash = "sha256:e3cda2c233ce144192f1e29914ad522b2f4c40e77214b0cc97377ca3d323db54", size = 214392 }, +] + [[package]] name = "kiwisolver" version = "1.4.7" @@ -920,6 +945,7 @@ dependencies = [ { name = "connectorx" }, { name = "duckdb" }, { name = "httpx" }, + { name = "ipywidgets" }, { name = "matplotlib" }, { name = "pandas" }, { name = "psycopg2" }, @@ -963,6 +989,7 @@ requires-dist = [ { name = "duckdb", specifier = ">=1.1.1" }, { name = "fastapi", extras = ["standard"], marker = "extra == 'server'", specifier = ">=0.115.0,<0.116.0" }, { name = "httpx", specifier = ">=0.28.0" }, + { name = "ipywidgets", specifier = ">=8.1.5" }, { name = "matplotlib", specifier = ">=3.9.2" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pg-bulk-ingest", marker = "extra == 'server'", specifier = ">=0.0.54" }, @@ -1461,6 +1488,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/33/39/5a9a229bb5414abeb86e33b8fc8143ab0aecce5a7f698a53e31367d30caa/psycopg2-2.9.10-cp311-cp311-win_amd64.whl", hash = "sha256:0435034157049f6846e95103bd8f5a668788dd913a7c30162ca9503fdf542cb4", size = 1163736 }, { url = "https://files.pythonhosted.org/packages/3d/16/4623fad6076448df21c1a870c93a9774ad8a7b4dd1660223b59082dd8fec/psycopg2-2.9.10-cp312-cp312-win32.whl", hash = "sha256:65a63d7ab0e067e2cdb3cf266de39663203d38d6a8ed97f5ca0cb315c73fe067", size = 1025113 }, { url = "https://files.pythonhosted.org/packages/66/de/baed128ae0fc07460d9399d82e631ea31a1f171c0c4ae18f9808ac6759e3/psycopg2-2.9.10-cp312-cp312-win_amd64.whl", hash = "sha256:4a579d6243da40a7b3182e0430493dbd55950c493d8c68f4eec0b302f6bbf20e", size = 1163951 }, + { url = "https://files.pythonhosted.org/packages/ae/49/a6cfc94a9c483b1fa401fbcb23aca7892f60c7269c5ffa2ac408364f80dc/psycopg2-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:91fd603a2155da8d0cfcdbf8ab24a2d54bca72795b90d2a3ed2b6da8d979dee2", size = 2569060 }, ] [[package]] @@ -2346,6 +2374,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b0/0b/c7e5d11020242984d9d37990310520ed663b942333b83a033c2f20191113/websockets-14.1-py3-none-any.whl", hash = "sha256:4d4fc827a20abe6d544a119896f6b78ee13fe81cbfef416f3f2ddf09a03f0e2e", size = 156277 }, ] +[[package]] +name = "widgetsnbextension" +version = "4.0.13" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/56/fc/238c424fd7f4ebb25f8b1da9a934a3ad7c848286732ae04263661eb0fc03/widgetsnbextension-4.0.13.tar.gz", hash = "sha256:ffcb67bc9febd10234a362795f643927f4e0c05d9342c727b65d2384f8feacb6", size = 1164730 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/21/02/88b65cc394961a60c43c70517066b6b679738caf78506a5da7b88ffcb643/widgetsnbextension-4.0.13-py3-none-any.whl", hash = "sha256:74b2692e8500525cc38c2b877236ba51d34541e6385eeed5aec15a70f88a6c71", size = 2335872 }, +] + [[package]] name = "wrapt" version = "1.17.0" From c060c11bdb5798d240dc6047751ac47930dcf5f7 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Mon, 6 Jan 2025 15:53:36 +0000 Subject: [PATCH 24/26] Fix various problems with benchmark table creation --- .../postgresql/benchmark/generate_tables.py | 104 ++++++++++-------- 1 file changed, 56 insertions(+), 48 deletions(-) diff --git a/src/matchbox/server/postgresql/benchmark/generate_tables.py b/src/matchbox/server/postgresql/benchmark/generate_tables.py index 56bf36c9..f24b594e 100644 --- a/src/matchbox/server/postgresql/benchmark/generate_tables.py +++ b/src/matchbox/server/postgresql/benchmark/generate_tables.py @@ -4,7 +4,6 @@ import click import pyarrow as pa -import pyarrow.compute as pc import pyarrow.parquet as pq from matchbox.common.factories import generate_dummy_probabilities @@ -51,7 +50,9 @@ def _hash_list_int(li: list[int]) -> list[bytes]: return [HASH_FUNC(str(i).encode("utf-8")).digest() for i in li] -def _unique_clusters(all_parents: Iterable[int], all_probabilities: Iterable[int]) -> tuple[list[int], list[float]]: +def _unique_clusters( + all_parents: Iterable[int], all_probabilities: Iterable[int] +) -> tuple[list[int], list[float]]: ll = set() clusters = [] probabilities = [] @@ -185,8 +186,7 @@ def generate_result_tables( n_probs: int, prob_min: float = 0.6, prob_max: float = 1, - prob_threshold: float = 0.7, -) -> tuple[pa.Table, pa.Table, pa.Table]: +) -> tuple[list[int], pa.Table, pa.Table, pa.Table]: """ Generate probabilities, contains and clusters tables. @@ -199,22 +199,16 @@ def generate_result_tables( n_probs: total number of probability edges to be generated prob_min: minimum value for probabilities to be generated prob_max: maximum value for probabilities to be generated - prob_threshold: minimum value for probabilities to be kept Returns: - Tuple with 3 PyArrow tables, for probabolities, contains and clusters + Tuple with 1 list of top-level clusters and 3 PyArrow tables, for probabilities, + contains and clusters """ probs = generate_dummy_probabilities( left_ids, right_ids, [prob_min, prob_max], n_components, n_probs ) - filtered_probs = probs.filter( - pc.greater(probs["probability"], int(prob_threshold * 100)) - ) - - clusters = to_hierarchical_clusters( - attach_components_to_probabilities(filtered_probs) - ) + clusters = to_hierarchical_clusters(attach_components_to_probabilities(probs)) indexed_parents = id_creator.create(clusters["parent"].to_pylist()) indexed_children = id_creator.create(clusters["child"].to_pylist()) @@ -223,6 +217,10 @@ def generate_result_tables( indexed_parents, clusters["probability"].to_numpy() ) + set_children = set(indexed_children) + source_entries = left_ids if right_ids is None else left_ids + right_ids + top_clusters = [c for c in final_clusters + source_entries if c not in set_children] + probabilities_table = pa.table( { "resolution": pa.array( @@ -251,7 +249,7 @@ def generate_result_tables( } ) - return (probabilities_table, contains_table, clusters_table) + return (top_clusters, probabilities_table, contains_table, clusters_table) def generate_all_tables( @@ -282,27 +280,31 @@ def generate_all_tables( clusters_source2 = generate_cluster_source(source_len, source_len * 2) id_creator = IDCreator(source_len * 2) - probabilities_dedupe1, contains_dedupe1, clusters_dedupe1 = generate_result_tables( - clusters_source1["cluster_id"].to_pylist(), - None, - 3, - id_creator, - dedupe_components, - dedupe_len, + top_clusters1, probabilities_dedupe1, contains_dedupe1, clusters_dedupe1 = ( + generate_result_tables( + clusters_source1["cluster_id"].to_pylist(), + None, + 3, + id_creator, + dedupe_components, + dedupe_len, + ) ) - probabilities_dedupe2, contains_dedupe2, clusters_dedupe2 = generate_result_tables( - clusters_source2["cluster_id"].to_pylist(), - None, - 4, - id_creator.reset_mapping(), - dedupe_components, - dedupe_len, + top_clusters2, probabilities_dedupe2, contains_dedupe2, clusters_dedupe2 = ( + generate_result_tables( + clusters_source2["cluster_id"].to_pylist(), + None, + 4, + id_creator.reset_mapping(), + dedupe_components, + dedupe_len, + ) ) - probabilities_link, contains_link, clusters_link = generate_result_tables( - contains_dedupe1["parent"].to_pylist(), - contains_dedupe2["parent"].to_pylist(), + _, probabilities_link, contains_link, clusters_link = generate_result_tables( + top_clusters1, + top_clusters2, 5, id_creator.reset_mapping(), link_components, @@ -339,38 +341,38 @@ def generate_all_tables( def main(settings, output_dir): PRESETS = { "xs": { - "source_len": 1_000, - "dedupe_components": 500, - "dedupe_len": 500, - "link_components": 2_000, + "source_len": 10_000, + "dedupe_components": 8000, + "dedupe_len": 2000, + "link_components": 6000, "link_len": 10_000, }, "s": { "source_len": 100_000, - "dedupe_components": 25_000, - "dedupe_len": 100_000, - "link_components": 20_000, + "dedupe_components": 80_000, + "dedupe_len": 20_000, + "link_components": 60_000, "link_len": 100_000, }, "m": { "source_len": 1_000_000, - "dedupe_components": 250_000, - "dedupe_len": 1_000_000, - "link_components": 200_000, + "dedupe_components": 800_000, + "dedupe_len": 200_000, + "link_components": 600_000, "link_len": 1_000_000, }, "l": { "source_len": 10_000_000, - "dedupe_components": 2_500_000, - "dedupe_len": 10_000_000, - "link_components": 2_000_000, + "dedupe_components": 8_000_000, + "dedupe_len": 2_000_000, + "link_components": 6_000_000, "link_len": 10_000_000, }, "xl": { "source_len": 100_000_000, - "dedupe_components": 25_000_000, - "dedupe_len": 100_000_000, - "link_components": 20_000_000, + "dedupe_components": 80_000_000, + "dedupe_len": 20_000_000, + "link_components": 60_000_000, "link_len": 100_000_000, }, } @@ -388,9 +390,15 @@ def main(settings, output_dir): link_components = config["link_components"] all_tables = generate_all_tables( - source_len, dedupe_components, dedupe_len, link_len, link_components + source_len=source_len, + dedupe_components=dedupe_components, + dedupe_len=dedupe_len, + link_components=link_components, + link_len=link_len, ) + output_dir /= settings + output_dir.mkdir(parents=True, exist_ok=True) for name, table in all_tables.items(): pq.write_table(table, output_dir / f"{name}.parquet") From 9173884f5e492eab434726fe3b8dbd5214baf89a Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Tue, 7 Jan 2025 08:22:06 +0000 Subject: [PATCH 25/26] Use valid params for test generate benchmark tables --- test/server/test_postgresql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/server/test_postgresql.py b/test/server/test_postgresql.py index ba436c7b..b8c6992b 100644 --- a/test/server/test_postgresql.py +++ b/test/server/test_postgresql.py @@ -247,7 +247,7 @@ def array_encode(array: Iterable[str]): con.execute(text(empty_schema())) con.commit() - results = generate_all_tables(20, 5, 15, 5, 100) + results = generate_all_tables(20, 5, 25, 5, 25) assert len(results) == len(MBDB.MatchboxBase.metadata.tables) From 77ea3d078543662945dd133e862d4636d6966cb9 Mon Sep 17 00:00:00 2001 From: Leonardo Mazzone Date: Tue, 7 Jan 2025 08:23:58 +0000 Subject: [PATCH 26/26] Solve bugs with benchmark probability generation --- src/matchbox/common/factories.py | 35 +++++--------- src/matchbox/common/transform.py | 48 ++++++++++++++----- .../postgresql/benchmark/generate_tables.py | 2 +- test/common/test_factories.py | 25 +++++++--- uv.lock | 4 +- 5 files changed, 72 insertions(+), 42 deletions(-) diff --git a/src/matchbox/common/factories.py b/src/matchbox/common/factories.py index 6e295d32..71be3427 100644 --- a/src/matchbox/common/factories.py +++ b/src/matchbox/common/factories.py @@ -1,45 +1,33 @@ from collections import Counter from textwrap import dedent +from typing import Any import numpy as np import pyarrow as pa import rustworkx as rx +from matchbox.common.transform import graph_results -def verify_components(table: pa.Table) -> dict: + +def verify_components(all_nodes: list[Any], table: pa.Table) -> dict: """ Fast verification of connected components using rustworkx. Args: + all_nodes: list of identities of inputs being matched table: PyArrow table with 'left', 'right' columns Returns: dictionary containing basic component statistics """ - graph = rx.PyGraph() - - unique_nodes = set(table["left"].to_numpy()) | set(table["right"].to_numpy()) - graph.add_nodes_from(range(len(unique_nodes))) - - node_to_idx = {node: idx for idx, node in enumerate(unique_nodes)} - edges = [ - (node_to_idx[left], node_to_idx[right]) - for left, right in zip( - table["left"].to_numpy(), - table["right"].to_numpy(), - strict=True, - ) - ] - - graph.add_edges_from_no_data(edges) - + graph, _, _ = graph_results(table, all_nodes) components = rx.connected_components(graph) component_sizes = Counter(len(component) for component in components) return { "num_components": len(components), - "total_nodes": len(unique_nodes), - "total_edges": len(edges), + "total_nodes": graph.num_nodes(), + "total_edges": graph.num_edges(), "component_sizes": component_sizes, "min_component_size": min(component_sizes.keys()), "max_component_size": max(component_sizes.keys()), @@ -122,7 +110,7 @@ def calculate_min_max_edges( max_edges += _max_edges_component( left_after_min_mod, right_after_min_mod, deduplicate ) * (max_mod - min_mod) - # components where both side have maximum nodes + # components where both side have minimum nodes min_edges += _min_edges_component(left_div, right_div, deduplicate) * ( num_components - max_mod ) @@ -173,6 +161,9 @@ def generate_dummy_probabilities( ) mode = "dedupe" if deduplicate else "link" + + if total_rows == 0: + raise ValueError("At least one edge must be generated") if total_rows < min_possible_edges: raise ValueError( dedent(f""" @@ -204,7 +195,7 @@ def generate_dummy_probabilities( left_components = np.array_split(np.array(left_values), num_components) right_components = np.array_split(np.array(right_values), num_components) # For each left-right component pair, the right equals the left rotated by one - right_components = [np.roll(c, 1) for c in right_components] + right_components = [np.roll(c, -1) for c in right_components] all_edges = [] diff --git a/src/matchbox/common/transform.py b/src/matchbox/common/transform.py index 1ebcc36b..72a10e6f 100644 --- a/src/matchbox/common/transform.py +++ b/src/matchbox/common/transform.py @@ -2,7 +2,7 @@ import multiprocessing from collections import defaultdict from concurrent.futures import ProcessPoolExecutor -from typing import Callable, Generic, Hashable, TypeVar +from typing import Callable, Generic, Hashable, Iterable, TypeVar import numpy as np import pyarrow as pa @@ -92,14 +92,23 @@ def to_clusters(results: ProbabilityResults) -> ClusterResults: ) -def attach_components_to_probabilities(probabilities: pa.Table) -> pa.Table: +def graph_results( + probabilities: pa.Table, all_nodes: Iterable[int] | None = None +) -> tuple[rx.PyDiGraph, np.ndarray, np.ndarray]: """ - Takes an Arrow table of probabilities and adds a component column. + Convert probability table to graph representation. - Expects an Arrow table of column, left, right, probability. - - Returns a table with an additional column, component. + Args: + probabilities: PyArrow table with 'left', 'right' columns + all_nodes: superset of node identities figuring in probabilities table. + Used to optionally add isolated nodes to the graph. + Returns: + A tuple containing: + - Rustwork directed graph + - A list mapping the 'left' probabilities column to node indices in the graph + - A list mapping the 'right' probabilities column to node indices in the graph """ + # Create index to use in graph unique = pc.unique( pa.concat_arrays( @@ -109,8 +118,9 @@ def attach_components_to_probabilities(probabilities: pa.Table) -> pa.Table: ] ) ) - left_indices = pc.index_in(probabilities["left"], unique) - right_indices = pc.index_in(probabilities["right"], unique) + + left_indices = pc.index_in(probabilities["left"], unique).to_numpy() + right_indices = pc.index_in(probabilities["right"], unique).to_numpy() # Create and process graph n_nodes = len(unique) @@ -119,9 +129,25 @@ def attach_components_to_probabilities(probabilities: pa.Table) -> pa.Table: graph = rx.PyGraph(node_count_hint=n_nodes, edge_count_hint=n_edges) graph.add_nodes_from(range(n_nodes)) - edges = tuple(zip(left_indices.to_numpy(), right_indices.to_numpy(), strict=True)) + if all_nodes is not None: + isolated_nodes = len(set(all_nodes) - set(unique.to_pylist())) + graph.add_nodes_from(range(isolated_nodes)) + + edges = tuple(zip(left_indices, right_indices, strict=True)) graph.add_edges_from_no_data(edges) + return graph, left_indices, right_indices + + +def attach_components_to_probabilities(probabilities: pa.Table) -> pa.Table: + """ + Takes an Arrow table of probabilities and adds a component column. + + Expects an Arrow table of column, left, right, probability. + + Returns a table with an additional column, component. + """ + graph, left_indices, _ = graph_results(probabilities) components = rx.connected_components(graph) # Convert components to arrays, map back to input to join, and reattach @@ -130,10 +156,10 @@ def attach_components_to_probabilities(probabilities: pa.Table) -> pa.Table: np.arange(len(components)), [len(c) for c in components] ) - node_to_component = np.zeros(len(unique), dtype=np.int64) + node_to_component = np.zeros(graph.num_nodes(), dtype=np.int64) node_to_component[component_indices] = component_labels - edge_components = pa.array(node_to_component[left_indices.to_numpy()]) + edge_components = pa.array(node_to_component[left_indices]) return probabilities.append_column("component", edge_components).sort_by( [("component", "ascending"), ("probability", "descending")] diff --git a/src/matchbox/server/postgresql/benchmark/generate_tables.py b/src/matchbox/server/postgresql/benchmark/generate_tables.py index f24b594e..c18dadb1 100644 --- a/src/matchbox/server/postgresql/benchmark/generate_tables.py +++ b/src/matchbox/server/postgresql/benchmark/generate_tables.py @@ -217,8 +217,8 @@ def generate_result_tables( indexed_parents, clusters["probability"].to_numpy() ) - set_children = set(indexed_children) source_entries = left_ids if right_ids is None else left_ids + right_ids + set_children = set(indexed_children) top_clusters = [c for c in final_clusters + source_entries if c not in set_children] probabilities_table = pa.table( diff --git a/test/common/test_factories.py b/test/common/test_factories.py index 967f3851..e48aa3e3 100644 --- a/test/common/test_factories.py +++ b/test/common/test_factories.py @@ -48,6 +48,13 @@ def test_calculate_min_max_edges( @pytest.mark.parametrize( ("parameters"), [ + { + "left_count": 5, + "right_count": None, + "prob_range": (0.6, 0.8), + "num_components": 3, + "total_rows": 2, + }, { "left_count": 1000, "right_count": None, @@ -77,7 +84,13 @@ def test_calculate_min_max_edges( "total_rows": calculate_min_max_edges(1000, 1000, 10, False)[1], }, ], - ids=["dedupe_min", "dedupe_max", "link_min", "link_max"], + ids=[ + "dedupe_no_edges", + "dedupe_min", + "dedupe_max", + "link_min", + "link_max", + ], ) def test_generate_dummy_probabilities(parameters: dict[str, Any]): len_left = parameters["left_count"] @@ -103,19 +116,19 @@ def test_generate_dummy_probabilities(parameters: dict[str, Any]): num_components=n_components, total_rows=total_rows, ) - report = verify_components(table=probabilities) + report = verify_components(table=probabilities, all_nodes=rand_vals) p_left = probabilities["left"].to_pylist() p_right = probabilities["right"].to_pylist() assert report["num_components"] == n_components - # Link + # Link job if right_values: - assert set(p_left) == set(left_values) - assert set(p_right) == set(right_values) + assert set(p_left) <= set(left_values) + assert set(p_right) <= set(right_values) # Dedupe else: - assert set(p_left) | set(p_right) == set(left_values) + assert set(p_left) | set(p_right) <= set(left_values) assert ( pc.max(probabilities["probability"]).as_py() / 100 diff --git a/uv.lock b/uv.lock index 6f2472e6..a3c3f534 100644 --- a/uv.lock +++ b/uv.lock @@ -945,7 +945,6 @@ dependencies = [ { name = "connectorx" }, { name = "duckdb" }, { name = "httpx" }, - { name = "ipywidgets" }, { name = "matplotlib" }, { name = "pandas" }, { name = "psycopg2" }, @@ -970,6 +969,7 @@ server = [ dev = [ { name = "docker" }, { name = "ipykernel" }, + { name = "ipywidgets" }, { name = "pre-commit" }, { name = "pytest" }, { name = "pytest-cov" }, @@ -989,7 +989,6 @@ requires-dist = [ { name = "duckdb", specifier = ">=1.1.1" }, { name = "fastapi", extras = ["standard"], marker = "extra == 'server'", specifier = ">=0.115.0,<0.116.0" }, { name = "httpx", specifier = ">=0.28.0" }, - { name = "ipywidgets", specifier = ">=8.1.5" }, { name = "matplotlib", specifier = ">=3.9.2" }, { name = "pandas", specifier = ">=2.2.3" }, { name = "pg-bulk-ingest", marker = "extra == 'server'", specifier = ">=0.0.54" }, @@ -1009,6 +1008,7 @@ requires-dist = [ dev = [ { name = "docker", specifier = ">=7.1.0" }, { name = "ipykernel", specifier = ">=6.29.5" }, + { name = "ipywidgets", specifier = ">=8.1.5" }, { name = "pre-commit", specifier = ">=3.8.0" }, { name = "pytest", specifier = ">=8.3.3" }, { name = "pytest-cov", specifier = ">=5.0.0" },