-
Notifications
You must be signed in to change notification settings - Fork 550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider holding data in sqlite table #964
Comments
Perhaps the path is not to try to adapt the core dedupe classes to such a big change but to create a new set of classes like DatabaseDedupe, that take a connection string and some other data about an existing database connection. |
thoughts @fjsj, @NickCrews ? |
If I understood correctly what's being planned, here's my two cents:
I've tweaked a lot with scoring stage optimizations. The best performance I got was using a huge EC2 machine: a single VM with several CPUs. What I did was: all records and blocks on DB. A orchestrator process would read the blocks from DB and yield the pairs (only the IDs) to workers (1 per CPU). Each worker would fetch the records from DB, score, and save the partial result in files. Each worker got batches of pairs, not single pairs. That's not very different from the current or old code, but it was the best balance between IPC overhead and DB usage. DB was Postgres in a separate machine in this case. The orchestrator is needed because there's a lot of skewness on block sizes, so you cannot simply "map" blocks to workers. There are papers that suggest "balancing" algorithms to deal with that, but I think the "speed" they reported was lower than what I was getting (on the order of 100k scored pairs/sec, of course that depends a lot on what's being scored), so I never had the time to try those. I'm not 100% sure about the effectiveness of that in performance, but there are tricks that can speed up pair generation and IPC costs of passing pairs between processes. Using numpy to represent the block as a array of record IDs and generate pairs using numpy.triu can be faster than itertools.combinations. Or moving that to Cython maybe. Also, passing numpy objects between processes seems faster than Python objects. But all of that won't work for index predicates (which are a huge bottleneck). |
interesting! @fjsj, this would confirm that just reducing IPC would significantly improve things (passing ids instead of full record pairs) the design i was thinking about was something like this. class ScoreDupes(object):
def __init__(self,
data_model,
classifier,
connection_string,
data_table,
block_table,
score_file_path: str,
dtype: numpy.dtype,
offset,
sql_offset,
n_processes):
self.data_model = data_model
self.classifier = classifier
self.score_file_path = score_file_path
self.dtype = dtype
self.offset = offset
self.connection_string
self.data_table
self.block_table
self.sql_offset,
self.n_processes
def __call__(self) -> None:
connection = db.connect(self.connection_string)
cursor = connection.serverside_cursor()
cursor.execute("""
select a.donor_id,
row_to_json((select d from (select a.city,
a.name,
a.zip,
a.state,
a.address) d)),
b.donor_id,
row_to_json((select d from (select b.city,
b.name,
b.zip,
b.state,
b.address) d))
from (select DISTINCT l.donor_id as east, r.donor_id as west
from blocking_map as l
INNER JOIN blocking_map as r
using (block_key)
where l.donor_id < r.donor_id) ids
INNER JOIN processed_donors a on ids.east=a.donor_id
INNER JOIN processed_donors b on ids.west=b.donor_id
OFFSET = ?
WHERE row_number() % ?""", (self.sql_offset, self.number_of_processes)
for record_pair in cursor:
self.score(record_pair)
def score(self, record_pairs: RecordPairs) -> None:
# compare distances and write results to memmap array that way each scoring process is responsible for generating their own record pairs to work on. i think this would likely be much faster than your design (and would also eliminate dupe comparisons that your approach would have) the percent is a modulo in case that wasn't clear. the idea is that if there are four processes, the first process would do the 1st, 5th, and 9th records, the second process would do the 2nd, 6th, and 10th, etc. |
The offset/modulo idea is nice. If you have a block in memory, you can pass the same object to multiple worker processes, but each worker scores only a subset of pairs from that block based on the offset they got. It's similar to what you're doing here, I guess. If we had parallel threads in Python, this would greatly reduce memory usage and IPC costs. However, the bottleneck I see in your approach here is the Therefore, I think the scalable solutions are:
|
as long as the temp storage is on disk, it still might be favorable. but, if that offset/modulo strategy does not work, i can think at least one other per-process SQL idea that might work.* in any regard, we probably need a shoot out at this point to see what's best. * just to record the idea for the future: there are N records and K = N * (N - 1) / 2 record pairs. if the record_ids are randomly assigned, then, in expectation, there should be same number of blocked pairs is in records pairs from 0 to K/2 - 1, as from K/2 to K. assume our record_ids are integers from 0 to N and are randomly assigned. then we could partition like so. cursor.execute("""
select a.donor_id,
row_to_json((select d from (select a.city,
a.name,
a.zip,
a.state,
a.address) d)),
b.donor_id,
row_to_json((select d from (select b.city,
b.name,
b.zip,
b.state,
b.address) d))
from (select DISTINCT l.donor_id as east, r.donor_id as west
from blocking_map as l
INNER JOIN blocking_map as r
using (block_key)
where l.donor_id < r.donor_id and l.donor_id > ? and l.donor_id < ?) ids
INNER JOIN processed_donors a on ids.east=a.donor_id
INNER JOIN processed_donors b on ids.west=b.donor_id
""", (lower, upper) such that the in expectation, the number of blocked pairs should be about equal in each segment defined by lower_a and upper_a |
In general, I agree that trying to get things out of memory and moving to a disk-based model seems like the next logical step. Multiple ways to do this of course. Everything you both have said make sense. I must admit I know hardly anything about SQL and databases, so the details that you talked about were mostly lost on me. Couple thoughts:
def comparator(price_1: float, price_2: np.array) -> np.array:
result = numpy.empty_like(price_2.shape)
result[:] = np.nan
if price_1 <= 0:
return result
legit = price_2 > 0
result[legit] = numpy.abs(numpy.log10(price_1) - numpy.log10(price_2[legit]))
return result In my experience, using these vectorized functions has allowed many algorithms that would otherwise be too slow. I think we could also allow a back door for people to write their own non-vectorized versions if they want to, and then they could have the old behavior. Sometimes this could result in a more complex API, such as when doing cosine similarity, but it should be possible. Also having this coexist with the current scalar-based API could be tricky.
Realize that a lot of these suggestions are coming from someone who doesn't understand databases well so I'm trying to apply the tools that I'm more familiar with 😉 Hopefully this makes sense, I must admit I still don't totally understand the internals of dedupe so I might be missing something. |
I missed vectorized comparators as one of scalable solution proposals. Apache Arrow value proposal looks awesome, I've checked it. But at least ~1 year ago I found it wasn't trivial to use Arrow API to deal with zero-copy regular Python dicts/arrays (instead of numpy or pandas objects). |
i don't see any strong benefits from vectorization. the complexity still remains O(n^2) because, as in your price example, if the vectorization allows for much more efficient calculation on chip (which is often the case for pure numeric operations), that would be great. typically, it also reduces loop overhead as well. if vectorization helped a lot, that would be fantastic, but it would only change the overall complexity by a scalar factor, though potentially a very attractive factor. that said, i don't know of ways to turn edit distances into operations that would really benefit from vectorization. would love to see examples of this! |
if there is a project that offers a zero-copy dict, that would be amazing. |
Yes, oops, you're totally right it doesn't change the complexity. I was treating each "call into c code" as my atomic operation that I was counting, which sometimes for wall time seems to be an OK approximation :) Just tried an experiment with the price comparator: import numpy as np
from dedupe.variables.price import PriceType
N = 10000
N_ZEROS = N // 10
# float64 in [0, N)
prices = np.random.rand(N) * N
# Set 10% of prices to 0
to_zero = np.random.randint(0, N, size=(N_ZEROS,))
prices[to_zero] = 0
prices_list = list(prices)
def comparator(price_1: float, price_2: np.array) -> np.array:
result = np.empty(shape=price_2.shape, dtype=np.float64)
result[:] = np.nan
if price_1 <= 0:
return result
legit = price_2 > 0
result[legit] = np.abs(np.log10(price_1) - np.log10(price_2[legit]))
return result
def comp_iter():
result = []
for p1 in prices_list:
for p2 in prices_list:
result.append(PriceType.comparator(p1, p2))
return result
def comp_vec():
result = []
for p1 in prices_list:
result.extend(comparator(p1, prices))
return result
np.testing.assert_almost_equal(comp_iter(), comp_vec())
%timeit comp_iter()
%timeit comp_vec() yields
so that's a 32x improvement The other big room for improvement when using vectorized operations is that you can throw out duplicate operations. In each block, You're right, string edit distance would be tricky to parallelize. Although some algorithms like SymSpell might actually be possible to modify. Probably though, the low hanging fruit would be 1. only operate on unique values as above and 2. you could move one of the loops into Cython, probably would help some. Similarly, you could apply these two optimizations to all the comparators, since they wouldn't require different algorithms. Affine Gap would probably be the next obvious thing to benchmark. I should have added that caveat, the nested types in arrow really do look like a PITA to use. But I think some operations, such as cosine similarity, should be possible using the pyarrow primitive operations? But you're totally right, code using python dicts gets us right back to where we are. But this is going to be a problem if we ever are trying to serialize dicts, lists, etc. Some other test code to compare pyarrow serialization vs pickle import pickle
import numpy as np
import pandas as pd
import pyarrow as pa
udt = pa.map_(pa.string(), pa.int32())
names = pd.read_csv("https://raw.githubusercontent.com/dominictarr/random-name/master/first-names.txt").squeeze().tolist()
nums = np.random.randint(0, 100, size=len(names)).tolist()
items = list(zip(names, nums))
# Pyarrow takes dicts as lists of tuples, not as dicts
dicts = []
chunk_size = 5
for i in range(0, len(items), chunk_size):
dicts.append(items[i:i + chunk_size])
print(dicts[:3])
print(len(dicts))
assert pickle.loads(pickle.dumps(dicts)) == dicts
assert pa.array(dicts, type=udt).to_pylist() == dicts
%timeit pickle.loads(pickle.dumps(dicts))
%timeit pa.array(dicts, type=udt).to_pylist() results in
So pyarrow looks way worse than just pickle. Maybe I'm not doing that correctly though. Of course I think one of the biggest drawbacks to vectorizing things is that it would require overhauling a LOT of things to fit the new data model, whereas the database approach just changes how we pipe the data (same shape as before) around. |
i don't follow how you can throw out duplicate operations? |
leaning more into databases would also open up to pushing the distance functions into the database. for example using something like sqlite's create_function facilities, we could imagine something like. select a.donor_id,
b.donor_id,
levenshtein(a.name, b.name),
levenshtein(a.zip. b.zip)
from (select DISTINCT l.donor_id as east, r.donor_id as west
from blocking_map as l
INNER JOIN blocking_map as r
using (block_key)
where l.donor_id < r.donor_id) ids
INNER JOIN processed_donors a on ids.east=a.donor_id
INNER JOIN processed_donors b on ids.west=b.donor_id
OFFSET = ?
WHERE row_number() % ? and from there it would be a small step to doing the scoring too. |
for example with the price comparator: m = {inp: f(inp) for inp in unique(inp)}
result = inp.replace(m) I actually have a function decorator that does this for pandas dataframes and series that I use extensively for doing expensive calculations on data that has a lot of duplicates.
That's super cool! Does this require SQL to go out to python and then drop into the function, or can you pass in a cython function and have SQL go straight to the C implementation? |
I would like to mention Awkward Array as an alternative for holding records in memory or DB and perhaps faster serialization/deserialization/IPC: https://github.com/scikit-hep/awkward-1.0 The basic example shows a 10x gain in RAM usage. It seems to be compatible with Apache Arrow and to support zero-copy conversions to it. |
Another option that seems very intriguing: vaex
|
Pointing out here for inspiration that splink does pretty much everything in SQL. It even is backend-agnostic, so it can run locally with DuckDB, or distributed on spark |
The major barrier to efficient parallelization of blocking is the inter process communication of the records. Similarly, much of the potential benefit of parallelization of scoring is lost because of IPC too.
Within python's multiprocessing paradigms (and our need to support the spawn method of parallelization), the only way I know of to reduce IPC is to store the data on disk and and give each process access to that data store.
We are already using sqlite quite a bit in the library, so I'm contemplating loading the data dict passed to dedupe into sqlite.
Pros
Cons
Typing
In order to not radically change the API, we would need to be able to load the records from a data dictionary into sqlite tables. The problem is that right now the fields of a record can be of any type, and sqlite does not have very rich set of native types.
We have comparators for strings, integers, and floats fields. Sqlite has native support for those.
We also have comparators for tuples and sets and allow people to create arbitrary comparators that could take arbitrary types.
You can register arbitrary types for serialization and deserialization with pythons sqlite library, but this is likely to slow things down a lot.
API changes.
The higher level methods could remain unchanged, but a lot of the lower level methods would not really make sense if we had the data in a database. For example, you would not want to generate pairs to feed to a score method.
Those lower level methods are basic to how we now leverage databases in our database examples.
The text was updated successfully, but these errors were encountered: