Skip to content

Commit

Permalink
Fetch all many-node baselines in first read batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker committed Mar 14, 2024
1 parent ca8a9b2 commit 8ebf9b0
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion dozer-sink-aerospike/src/denorm_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ struct BatchLookup {
batch_idx: usize,
version: usize,
batch_read_index: Option<usize>,
follow: bool,
}

impl DenormalizationState {
Expand Down Expand Up @@ -968,6 +969,7 @@ impl DenormalizationState {
client: &Client,
) -> Result<Vec<DenormalizedTable>, AerospikeSinkError> {
let mut lookups = Vec::new();
let mut batch = RecordBatch::new(0, None);
for (nid, _, _) in &self.base_tables {
let node = self.node(*nid);
let node_keys = node.batch.iter_dirty().map(
Expand All @@ -980,23 +982,52 @@ impl DenormalizationState {
node: *nid,
batch_idx: idx,
batch_read_index: None,
follow: true,
},
);
lookups.extend(node_keys);
}

let mut batch = RecordBatch::new(0, None);
let n_lookups = 0;
while !lookups.is_empty() {
unsafe {
client.batch_get(batch.as_mut_ptr())?;
}
let mut new_lookups = Vec::with_capacity(lookups.len());
let mut new_batch = RecordBatch::new(lookups.len().try_into().unwrap(), None);

// For persisting, we need all many-node baselines, so put them in the
// first batch
if n_lookups == 0 {
for (i, node) in self.dag.node_references() {
if let CachedBatch::Many(node_batch) = node.batch {
for (batch_idx, key) in
node_batch.0.iter().enumerate().filter_map(|(i, (key, entry))| {
entry.base.is_none().then_some((i, key))
})
{
let batch_read_index = new_batch.add_read_all(
&node.as_schema.namespace,
&node.as_schema.set,
key,
)?;
new_lookups.push(BatchLookup {
node: i,
batch_idx,
version: 0,
batch_read_index: Some(batch_read_index),
follow: true,
});
}
}
}
}
for BatchLookup {
node: nid,
batch_idx,
version,
batch_read_index,
follow,
} in lookups
{
// Update the node's local batch
Expand Down Expand Up @@ -1039,6 +1070,9 @@ impl DenormalizationState {
}
}
}
if !follow {
continue;
}
let Some(values) = self.node(nid).batch.get_index(batch_idx, version) else {
continue;
};
Expand Down Expand Up @@ -1078,6 +1112,7 @@ impl DenormalizationState {
batch_idx,
version,
batch_read_index,
follow: true,
})
}
}
Expand Down Expand Up @@ -1210,6 +1245,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_denorm() {
let customer_schema = schema! {
"id": String PRIMARY_KEY,
Expand Down

0 comments on commit 8ebf9b0

Please sign in to comment.