Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support benchmarks #35

Merged
merged 27 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b3eef93
Prepare for supporting query benchmarks
leo-mazzone Dec 20, 2024
1d7adfd
Use factories to generate dedupe probabilities
leo-mazzone Dec 27, 2024
d6e7715
Correct mistake in ERM
leo-mazzone Dec 27, 2024
aaebbc7
Fix problems for dedupe probability factory
leo-mazzone Dec 27, 2024
987da86
Solve problems with cluster creation
leo-mazzone Dec 27, 2024
4de9f1a
Rename schema_setup
leo-mazzone Dec 30, 2024
67509d6
Move factories to package
leo-mazzone Dec 30, 2024
09b9b47
Move test_transform to common
leo-mazzone Dec 30, 2024
4f271e3
Move pipeline to postgres benchmark
leo-mazzone Dec 30, 2024
6334486
Create table generation script
leo-mazzone Dec 30, 2024
20085af
Make factories more deterministic
leo-mazzone Dec 31, 2024
af43d80
Make tests pass
leo-mazzone Jan 2, 2025
a4df1fb
Test benchmark init schema
leo-mazzone Jan 2, 2025
10e11c1
Check min components for dummy probs
leo-mazzone Jan 2, 2025
e474e1a
Test generation of benchmark tables
leo-mazzone Jan 2, 2025
4cb3834
Test hierarchical clusters edge case
leo-mazzone Jan 2, 2025
0e806fe
Move factories tests to own file
leo-mazzone Jan 2, 2025
676ce6f
Test and fix dummy probability generation
leo-mazzone Jan 3, 2025
1a1fce5
Make all tests pass
leo-mazzone Jan 3, 2025
f4eaf79
Slightly extend test for generating benchmark data
leo-mazzone Jan 6, 2025
09a4b63
Add missing type hints
leo-mazzone Jan 6, 2025
02a3a01
Fix typo
leo-mazzone Jan 6, 2025
0790951
Merge branch 'support_benchmarks' of github.com:uktrade/matchbox into…
leo-mazzone Jan 6, 2025
b6a9b88
Add ipywidgets
leo-mazzone Jan 6, 2025
c060c11
Fix various problems with benchmark table creation
leo-mazzone Jan 6, 2025
9173884
Use valid params for test generate benchmark tables
leo-mazzone Jan 7, 2025
77ea3d0
Solve bugs with benchmark probability generation
leo-mazzone Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 304 additions & 0 deletions src/matchbox/common/factories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
from collections import Counter
from textwrap import dedent

import numpy as np
import pyarrow as pa
import rustworkx as rx


def verify_components(table) -> dict:
"""
Fast verification of connected components using rustworkx.

Args:
table: PyArrow table with 'left', 'right' columns

Returns:
dictionary containing basic component statistics
"""
graph = rx.PyGraph()

unique_nodes = set(table["left"].to_numpy()) | set(table["right"].to_numpy())
graph.add_nodes_from(range(len(unique_nodes)))

node_to_idx = {node: idx for idx, node in enumerate(unique_nodes)}
edges = [
(node_to_idx[left], node_to_idx[right])
for left, right in zip(
table["left"].to_numpy(),
table["right"].to_numpy(),
strict=True,
)
]

graph.add_edges_from_no_data(edges)

components = rx.connected_components(graph)
component_sizes = Counter(len(component) for component in components)

return {
"num_components": len(components),
"total_nodes": len(unique_nodes),
"total_edges": len(edges),
"component_sizes": component_sizes,
"min_component_size": min(component_sizes.keys()),
"max_component_size": max(component_sizes.keys()),
}


def _min_edges_component(left: int, right: int, deduplicate: bool) -> int:
"""
Calculate min edges for component to be connected.
Does so by assuming a spanning tree.

Args:
left: number of nodes of component on the left
right: number of nodes of component on the right (for linking)
deduplicate: whether edges are for deduplication

Returns:
Minimum number of edges
"""
if not deduplicate:
return left + right - 1

return left - 1


def _max_edges_component(left: int, right: int, deduplicate: bool) -> int:
"""
Calculate max edges for component to be avoid duplication.
Considers complete graph for deduping, and complete bipartite graph for linking.

Args:
left: number of nodes of component on the left
right: number of nodes of component on the right (for linking)
deduplicate: whether edges are for deduplication

Returns:
Maximum number of edges
"""
if not deduplicate:
return left * right
# n*(n-1) is always divisible by 2
return left * (left - 1) // 2


def calculate_min_max_edges(
left_nodes: int, right_nodes: int, num_components: int, deduplicate: bool
) -> tuple[int, int]:
"""
Calculate min and max edges for a graph.

Args:
left_nodes: number of nodes in left source
right_nodes: number of nodes in right source
num_components: number of requested components
deduplicate: whether edges are for deduplication

Returns:
Two-tuple representing min and max edges
"""
left_mod, right_mod = left_nodes % num_components, right_nodes % num_components
left_div, right_div = left_nodes // num_components, right_nodes // num_components

min_mod, max_mod = sorted([left_mod, right_mod])

min_edges, max_edges = 0, 0
# components where both sides have maximum nodes
min_edges += (
_min_edges_component(left_div + 1, right_div + 1, deduplicate) * min_mod
)
max_edges += (
_max_edges_component(left_div + 1, right_div + 1, deduplicate) * min_mod
)
# components where one side has maximum nodes
left_after_min_mod, right_after_min_mod = left_div + 1, right_div
if left_mod == min_mod:
left_after_min_mod, right_after_min_mod = left_div, right_div + 1
min_edges += _min_edges_component(
left_after_min_mod, right_after_min_mod, deduplicate
) * (max_mod - min_mod)
max_edges += _max_edges_component(
left_after_min_mod, right_after_min_mod, deduplicate
) * (max_mod - min_mod)
# components where both side have maximum nodes
min_edges += _min_edges_component(left_div, right_div, deduplicate) * (
num_components - max_mod
)
max_edges += _max_edges_component(left_div, right_div, deduplicate) * (
num_components - max_mod
)

return min_edges, max_edges


def generate_dummy_probabilities(
left_values: list[int],
right_values: list[int] | None,
prob_range: tuple[float, float],
num_components: int,
total_rows: int,
) -> pa.Table:
"""
Generate dummy Arrow probabilities data with guaranteed isolated components.

Args:
left_values: List of integers to use for left column
right_values: List of integers to use for right column. If None, assume we
are generating probabilities for deduplication
prob_range: Tuple of (min_prob, max_prob) to constrain probabilities
num_components: Number of distinct connected components to generate
total_rows: Total number of rows to generate

Returns:
PyArrow Table with 'left', 'right', and 'probability' columns
"""
# Validate inputs
deduplicate = False
if right_values is None:
right_values = left_values
deduplicate = True

if len(left_values) < 2 or len(right_values) < 2:
raise ValueError("Need at least 2 possible values for both left and right")
if num_components > min(len(left_values), len(right_values)):
raise ValueError(
"Cannot have more components than minimum of left/right values"
)

left_nodes, right_nodes = len(left_values), len(right_values)
min_possible_edges, max_possible_edges = calculate_min_max_edges(
left_nodes, right_nodes, num_components, deduplicate
)

mode = "dedupe" if deduplicate else "link"
if total_rows < min_possible_edges:
raise ValueError(
dedent(f"""
Cannot generate {total_rows:,} {mode} edges with {num_components:,}
components.
Min edges is {min_possible_edges:,} for nodes given.
Either decrease the number of nodes, increase the number of components,
or increase the total edges requested.
""")
)
if total_rows > max_possible_edges:
raise ValueError(
dedent(f"""
Cannot generate {total_rows:,} {mode} edges with {num_components:,}
components.
Max edges is {max_possible_edges:,} for nodes given.
Either increase the number of nodes, decrease the number of components,
or decrease the total edges requested.
""")
)

n_extra_edges = total_rows - min_possible_edges

# Convert probability range to integers (60-80 for 0.60-0.80)
prob_min = int(prob_range[0] * 100)
prob_max = int(prob_range[1] * 100)

# Split values into completely separate groups for each component
left_components = np.array_split(np.array(left_values), num_components)
right_components = np.array_split(np.array(right_values), num_components)
# For each left-right component pair, the right equals the left rotated by one
right_components = [np.roll(c, 1) for c in right_components]

all_edges = []

# Generate edges for each component
for comp_idx in range(num_components):
comp_left_values = left_components[comp_idx]
comp_right_values = right_components[comp_idx]

min_comp_nodes, max_comp_nodes = sorted(
[len(comp_left_values), len(comp_right_values)]
)

# Ensure basic connectivity within the component by creating a spanning-tree
base_edges = set()
# For deduping (A B C) you just need (A - B) (B - C) (C - A)
# which just needs matching pairwise the data and its rotated version.
# For deduping, `min_comp_nodes` == `max_comp_nodes`
if deduplicate:
for i in range(min_comp_nodes - 1):
small_n, large_n = sorted([comp_left_values[i], comp_right_values[i]])
base_edges.add((small_n, large_n))
else:
# For linking (A B) and (C D E), we begin by adding (A - C) and (B - D)
for i in range(min_comp_nodes):
base_edges.add((comp_left_values[i], comp_right_values[i]))
# we now add (C - B)
for i in range(min_comp_nodes - 1):
base_edges.add((comp_left_values[i + 1], comp_right_values[i]))
# we now add (A - D)
left_right_diff = max_comp_nodes - min_comp_nodes
for i in range(left_right_diff):
left_i, right_i = 0, min_comp_nodes + i
if len(comp_right_values) < len(comp_left_values):
left_i, right_i = min_comp_nodes + i, 0

base_edges.add((comp_left_values[left_i], comp_right_values[right_i]))

component_edges = list(base_edges)

if n_extra_edges > 0:
# Generate remaining random edges strictly within this component
# TODO: this can certainly be optimised
if deduplicate:
all_possible_edges = list(
{
tuple(sorted([x, y]))
for x in comp_left_values
for y in comp_right_values
if x != y and tuple(sorted([x, y])) not in base_edges
}
)
else:
all_possible_edges = list(
{
(x, y)
for x in comp_left_values
for y in comp_right_values
if x != y and (x, y) not in base_edges
}
)
max_new_edges = len(all_possible_edges)
if max_new_edges >= n_extra_edges:
edges_required = n_extra_edges
n_extra_edges = 0
else:
edges_required = max_new_edges
n_extra_edges -= max_new_edges

extra_edges_idx = np.random.choice(
len(all_possible_edges), size=edges_required, replace=False
)
extra_edges = [
e for i, e in enumerate(all_possible_edges) if i in extra_edges_idx
]
component_edges += extra_edges
random_probs = np.random.randint(
prob_min, prob_max + 1, size=len(component_edges)
)

component_edges = [
(le, ri, pr)
for (le, ri), pr in zip(component_edges, random_probs, strict=True)
]

all_edges.extend(component_edges)

# Convert to arrays
lefts, rights, probs = zip(*all_edges, strict=True)

# Create PyArrow arrays
left_array = pa.array(lefts, type=pa.uint64())
right_array = pa.array(rights, type=pa.uint64())
prob_array = pa.array(probs, type=pa.uint8())

return pa.table(
[left_array, right_array, prob_array], names=["left", "right", "probability"]
)
11 changes: 11 additions & 0 deletions src/matchbox/common/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,14 @@ def index(self, *values: int) -> int:
self.mapping[value_set] = salted_id

return salted_id

def has_mapping(self, *values: int) -> bool:
"""
Args:
values: the integers in the set you want to index

Returns:
Boolean indicating whether index for values already exists
"""
value_set = frozenset(values)
return value_set in self.mapping
15 changes: 10 additions & 5 deletions src/matchbox/common/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def get_components(self) -> list[set[T]]:


def component_to_hierarchy(
table: pa.Table, dtype: pa.DataType = pa.uint64, salt: int = 1
table: pa.Table, salt: int, dtype: pa.DataType = pa.uint64
) -> pa.Table:
"""
Convert pairwise probabilities into a hierarchical representation.
Expand All @@ -199,7 +199,10 @@ def component_to_hierarchy(
Returns:
Arrow Table with columns ['parent', 'child', 'probability']
"""
probs = np.sort(pc.unique(table["probability"]).to_numpy())[::-1]
ascending_probs = np.sort(
pc.unique(table["probability"]).to_numpy(zero_copy_only=False)
)
probs = ascending_probs[::-1]

djs = DisjointSet[int]() # implements connected components
im = IntMap(salt=salt) # generates IDs for new clusters
Expand Down Expand Up @@ -227,6 +230,9 @@ def component_to_hierarchy(
if len(children) <= 2:
continue # Skip pairs already handled by pairwise probabilities

if im.has_mapping(*children):
continue # Skip unchanged components from previous thresholds

parent = im.index(*children)
prev_roots: set[int] = set()
for child in children:
Expand All @@ -249,7 +255,7 @@ def component_to_hierarchy(
def to_hierarchical_clusters(
probabilities: pa.Table,
proc_func: Callable[[pa.Table, pa.DataType], pa.Table] = component_to_hierarchy,
dtype: pa.DataType = pa.uint64,
dtype: pa.DataType = pa.int64,
timeout: int = 300,
) -> pa.Table:
"""
Expand All @@ -259,7 +265,6 @@ def to_hierarchical_clusters(
probabilities: Arrow table with columns ['component', 'left', 'right',
'probability']
proc_func: Function to process each component
dtype: Arrow data type for parent/child columns
timeout: Maximum seconds to wait for each component to process

Returns:
Expand Down Expand Up @@ -315,7 +320,7 @@ def to_hierarchical_clusters(

with ProcessPoolExecutor(max_workers=n_cores) as executor:
futures = [
executor.submit(proc_func, component_table, dtype, salt)
executor.submit(proc_func, component_table, salt=salt, dtype=dtype)
for salt, component_table in enumerate(component_tables)
]

Expand Down
2 changes: 1 addition & 1 deletion src/matchbox/server/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ erDiagram
bigint child PK,FK
}
Probabilities {
bigint model PK,FK
bigint resolution PK,FK
bigint cluster PK,FK
float probability
}
Expand Down
Empty file.
Loading
Loading