Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aerospike sink: Add transactionally consistent denormalization #2437

Merged
merged 8 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions dozer-core/src/builder_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,26 @@ impl BuilderDag {
.serialize_state()
.await
.map_err(ExecutionError::Source)?;
let mut checkpoint = None;
for sink in source_id_to_sinks.remove(&node.handle).unwrap_or_default() {
let sink = &mut graph[sink];
let sink_handle = &sink.handle;
let NodeKind::Sink(sink) = &mut sink.kind else {
unreachable!()
};
sink.set_source_state(&state)
.map_err(ExecutionError::Sink)?;
if let Some(sink_checkpoint) = source_op_ids.remove(sink_handle) {
checkpoint =
Some(checkpoint.unwrap_or(sink_checkpoint).min(sink_checkpoint));
}
}

let last_checkpoint = source_op_ids.remove(&node.handle);
NodeType {
handle: node.handle,
kind: NodeKind::Source {
source,
last_checkpoint,
last_checkpoint: checkpoint,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl DagExecutor {
let Some(node) = execution_dag.graph()[node_index].kind.as_ref() else {
continue;
};
match &node {
match node {
NodeKind::Source { .. } => unreachable!("We already started the source node"),
NodeKind::Processor(_) => {
let processor_node = ProcessorNode::new(&mut execution_dag, node_index).await;
Expand Down
2 changes: 1 addition & 1 deletion dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub trait SinkFactory: Send + Sync + Debug {
fn type_name(&self) -> String;
}

pub trait Sink: Send + Sync + Debug {
pub trait Sink: Send + Debug {
fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError>;
fn process(&mut self, op: TableOperation) -> Result<(), BoxedError>;

Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,17 +375,17 @@ impl Connector {
}
};

for (table_index, op) in transaction.operations {
for (seq, (table_index, op)) in transaction.operations.into_iter().enumerate() {
if ingestor
.blocking_handle_message(IngestionMessage::OperationEvent {
table_index,
op,
id: None,
id: Some(OpIdentifier::new(transaction.commit_scn, seq as u64)),
})
.is_err()
{
return;
}
};
}

if ingestor
Expand Down
3 changes: 2 additions & 1 deletion dozer-sink-aerospike/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ edition = "2021"
dozer-core = { path = "../dozer-core" }
dozer-types = { path = "../dozer-types" }
aerospike-client-sys = { path = "./aerospike-client-sys" }
crossbeam-channel = "0.5.11"
itertools = "0.12"
smallvec = "1.13.1"
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
#include <aerospike/as_arraylist.h>
#include <aerospike/as_map.h>
#include <aerospike/as_orderedmap.h>
#include <aerospike/as_exp.h>
168 changes: 168 additions & 0 deletions dozer-sink-aerospike/aerospike-client-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,171 @@
#![allow(non_snake_case)]

include!(concat!(env!("OUT_DIR"), "/generated.rs"));

#[macro_export]
macro_rules! as_exp_build {
($func:ident $args:tt ) => {{
let mut v = Vec::new();
$crate::as_exp_build_inner!(v, $func $args);
$crate::as_exp_compile(v.as_mut_ptr(), v.len() as u32)
}}
}

#[macro_export]
macro_rules! as_exp_build_inner {
($v:expr, as_exp_bin_int($bin_name:expr $(,)?)) => {{
let bin_name: *const i8 = $bin_name;
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_BIN,
count: 3,
sz: 0,
prev_va_args: 0,
v: std::mem::zeroed(),
});
$crate::as_exp_build_inner!($v, as_exp_int($crate::as_exp_type_AS_EXP_TYPE_INT as i64));
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_VAL_RAWSTR,
v: $crate::as_exp_entry__bindgen_ty_1 { str_val: bin_name },
count: 0,
sz: 0,
prev_va_args: 0,
});
}};
($v:expr, as_exp_int($val:expr)) => {
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_VAL_INT,
v: $crate::as_exp_entry__bindgen_ty_1 { int_val: $val },
count: 0,
sz: 0,
prev_va_args: 0,
})
};
($v:expr, as_exp_uint($val:expr)) => {
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_VAL_UINT,
v: $crate::as_exp_entry__bindgen_ty_1 { uint_val: $val },
count: 0,
sz: 0,
prev_va_args: 0,
})
};
($v:expr, as_exp_cmp_eq($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_EQ,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_cmp_gt($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_GT,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_cmp_ge($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_GE,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left);
$crate::as_exp_build_inner!($v, $right);
}};
($v:expr, as_exp_cmp_lt($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_LT,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_cmp_le($left_name:ident $left_args:tt, $right_name:ident $right_args:tt $(,)?)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_CMP_LE,
count: 3,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$crate::as_exp_build_inner!($v, $left_name $left_args);
$crate::as_exp_build_inner!($v, $right_name $right_args);
}};
($v:expr, as_exp_and($($arg_name:ident $arg_args:tt),*)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_AND,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$($crate::as_exp_build_inner!($v, $arg_name $arg_args));*;
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_END_OF_VA_ARGS,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
}};
($v:expr, as_exp_or($($arg_name:ident $arg_args:tt),*)) => {{
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_OR,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
$($crate::as_exp_build_inner!($v, $arg_name $arg_args));*;
$v.push($crate::as_exp_entry {
op: $crate::as_exp_ops__AS_EXP_CODE_END_OF_VA_ARGS,
count: 0,
v: std::mem::zeroed(),
sz: 0,
prev_va_args: 0,
});
}};
}

#[cfg(test)]
mod tests {
use std::ffi::CString;

use super::*;

#[test]
fn test_as_exp_build() {
// Tested that this results in the same compiled expression as when
// using the macros from the C library
let bin_name = CString::new("bin_name").unwrap();
unsafe {
let exp = as_exp_build! {
as_exp_and(
as_exp_cmp_gt(
as_exp_bin_int(bin_name.as_ptr()),
as_exp_int(3)
),
as_exp_cmp_lt(
as_exp_bin_int(bin_name.as_ptr()),
as_exp_int(8)
)
)
};
assert!(!exp.is_null());
as_exp_destroy(exp);
}
}
}
Loading
Loading