diff --git a/dozer-sink-aerospike/src/denorm_dag.rs b/dozer-sink-aerospike/src/denorm_dag.rs index fde5d293fa..2f75574b47 100644 --- a/dozer-sink-aerospike/src/denorm_dag.rs +++ b/dozer-sink-aerospike/src/denorm_dag.rs @@ -1051,15 +1051,9 @@ impl DenormalizationState { mod tests { use std::ffi::CString; - use dozer_types::{ - models::sink::{ - AerospikeDenormalizations, AerospikeSet, AerospikeSinkTable, DenormColumn, DenormKey, - }, - rust_decimal::Decimal, - types::{ - Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, - TableOperation, - }, + use dozer_types::types::{ + Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, + TableOperation, }; use crate::{aerospike::Client, denorm_dag::DenormalizedTable}; @@ -1089,89 +1083,232 @@ mod tests { }}; } - #[test] - #[ignore] - fn test_denorm() { - let customer_schema = schema! { - "id": String PRIMARY_KEY, - "phone_number": String - }; + trait Table { + fn schema() -> Schema; + fn to_row(&self) -> Vec; + fn to_record(&self) -> Record { + Record::new(self.to_row()) + } + } + struct Customer { + id: &'static str, + phone_number: &'static str, + } - // account to many customer mapping - let account_owners_schema = schema! { - "account_id": UInt PRIMARY_KEY, - "customer_id": String, - "transaction_limit": UInt - }; + impl Table for Customer { + fn schema() -> Schema { + schema! { + "id": String PRIMARY_KEY, + "phone_number": String + } + } + fn to_row(&self) -> Vec { + vec![ + Field::String(self.id.to_owned()), + Field::String(self.phone_number.to_owned()), + ] + } + } - let transaction_schema = schema! { - "id": UInt PRIMARY_KEY, - "account_id": UInt, - "amount": Decimal - }; + struct AccountOwner { + account_id: u64, + customer_id: &'static str, + transaction_limit: Option, + } + + impl Table for AccountOwner { + fn schema() -> Schema { + schema! { + "account_id": UInt PRIMARY_KEY, + "customer_id": String, + "transaction_limit": UInt + } + } + + fn to_row(&self) -> Vec { + vec![ + Field::UInt(self.account_id), + Field::String(self.customer_id.to_owned()), + self.transaction_limit.map_or(Field::Null, Field::UInt), + ] + } + } + + struct Transaction { + id: u64, + account_id: u64, + amount: &'static str, + } + + impl Table for Transaction { + fn schema() -> Schema { + schema! { + "id": UInt PRIMARY_KEY, + "account_id": UInt, + "amount": Decimal + } + } + + fn to_row(&self) -> Vec { + vec![ + Field::UInt(self.id), + Field::UInt(self.account_id), + Field::Decimal(self.amount.try_into().unwrap()), + ] + } + } + + #[derive(Debug)] + struct DenormResult { + id: u64, + account_id: u64, + amount: &'static str, + customer_id: &'static str, + transaction_limit: Option, + phone_number: &'static str, + } + + impl Table for DenormResult { + fn schema() -> Schema { + schema! { + "id": UInt PRIMARY_KEY, + "account_id": UInt, + "amount": Decimal, + "customer_id": String PRIMARY_KEY, + "transaction_limit": UInt, + "phone_number": String + } + } + + fn to_row(&self) -> Vec { + vec![ + Field::UInt(self.id), + Field::UInt(self.account_id), + Field::Decimal(self.amount.try_into().unwrap()), + Field::String(self.customer_id.to_owned()), + self.transaction_limit.map_or(Field::Null, Field::UInt), + Field::String(self.phone_number.to_owned()), + ] + } + } + impl PartialEq> for DenormalizedTable { + fn eq(&self, other: &Vec) -> bool { + other.eq(self) + } + } + + impl PartialEq for Vec { + fn eq(&self, other: &DenormalizedTable) -> bool { + let DenormalizedTable { + bin_names, + namespace: _, + set: _, + records, + pk, + } = other; + bin_names + .iter() + .map(|name| name.to_str().unwrap()) + .eq(DenormResult::schema() + .fields + .iter() + .map(|field| field.name.as_str())) + && records + .iter() + .cloned() + .eq(self.iter().map(|rec| rec.to_row())) + && pk == &DenormResult::schema().primary_index + } + } + + fn client() -> Client { + let client = Client::new(&CString::new("localhost:3000").unwrap()).unwrap(); + let mut response = std::ptr::null_mut(); + let request = "truncate-namespace:namespace=test"; + let request = CString::new(request).unwrap(); + unsafe { + client.info(&request, &mut response).unwrap(); + } + client + } + + #[test] + #[ignore] + fn test_denorm() { let tables = vec![ ( - AerospikeSinkTable { - source_table_name: "".into(), - namespace: "test".into(), - set_name: "customers".into(), - denormalize: vec![], - write_denormalized_to: None, - primary_key: vec!["id".into()], - aggregate_by_pk: true, - }, - customer_schema, + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: customers + primary_key: + - id + aggregate_by_pk: true + "#, + ) + .unwrap(), + Customer::schema(), ), ( - AerospikeSinkTable { - source_table_name: "".into(), - namespace: "test".into(), - set_name: "accounts".into(), - denormalize: vec![AerospikeDenormalizations { - from_namespace: "test".into(), - from_set: "customers".into(), - key: DenormKey::Simple("customer_id".into()), - columns: vec![DenormColumn::Direct("phone_number".into())], - }], - write_denormalized_to: None, - primary_key: vec!["account_id".into()], - aggregate_by_pk: false, - }, - account_owners_schema, + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: accounts + primary_key: + - account_id + denormalize: + - from_namespace: test + from_set: customers + key: customer_id + columns: + - phone_number + "#, + ) + .unwrap(), + AccountOwner::schema(), ), ( - AerospikeSinkTable { - source_table_name: "".into(), - namespace: "test".into(), - set_name: "transactions".into(), - denormalize: vec![AerospikeDenormalizations { - from_namespace: "test".into(), - from_set: "accounts".into(), - key: DenormKey::Simple("account_id".into()), - columns: vec![DenormColumn::Direct("transaction_limit".into())], - }], - write_denormalized_to: Some(AerospikeSet { - namespace: "test".into(), - set: "transactions_denorm".into(), - primary_key: vec!["id".into(), "phone_number".into()], - }), - primary_key: vec!["id".into()], - aggregate_by_pk: false, - }, - transaction_schema, + dozer_types::serde_yaml::from_str( + r#" + source_table_name: + namespace: test + set_name: transactions + primary_key: + - id + denormalize: + - from_namespace: test + from_set: accounts + key: account_id + columns: + - customer_id + - transaction_limit + write_denormalized_to: + namespace: test + set: transactions_denorm + primary_key: + - id + - customer_id + "#, + ) + .unwrap(), + Transaction::schema(), ), ]; + let client = client(); let mut state = DenormalizationState::new(&tables).unwrap(); // Customers state .process(TableOperation { id: None, op: Operation::Insert { - new: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+1234567".into()), - ]), + new: Customer { + id: "1001", + phone_number: "+1234567", + } + .to_record(), }, port: 0, }) @@ -1181,52 +1318,44 @@ mod tests { .process(TableOperation { id: None, op: Operation::Insert { - new: dozer_types::types::Record::new(vec![ - Field::UInt(101), - Field::String("1001".into()), - Field::Null, - ]), + new: AccountOwner { + account_id: 101, + customer_id: "1001", + transaction_limit: None, + } + .to_record(), }, port: 1, }) .unwrap(); + state.persist(&client).unwrap(); + assert_eq!(state.perform_denorm(&client).unwrap(), vec![vec![]]); // Transactions state .process(TableOperation { id: None, op: Operation::Insert { - new: Record::new(vec![ - Field::UInt(1), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - ]), + new: Transaction { + id: 1, + account_id: 101, + amount: "1.23", + } + .to_record(), }, port: 2, }) .unwrap(); - let client = Client::new(&CString::new("localhost:3000").unwrap()).unwrap(); let res = state.perform_denorm(&client).unwrap(); assert_eq!( res, - vec![DenormalizedTable { - bin_names: vec![ - CString::new("id").unwrap(), - CString::new("account_id").unwrap(), - CString::new("amount").unwrap(), - CString::new("transaction_limit").unwrap(), - CString::new("phone_number").unwrap(), - ], - records: vec![vec![ - Field::UInt(1), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - Field::Null, - Field::String("+1234567".into()) - ]], - namespace: CString::new("test").unwrap(), - set: CString::new("transactions_denorm").unwrap(), - pk: vec![0, 4], - }] + vec![vec![DenormResult { + id: 1, + customer_id: "1001", + account_id: 101, + amount: "1.23", + transaction_limit: None, + phone_number: "+1234567", + }]] ); state.commit(); state.persist(&client).unwrap(); @@ -1234,11 +1363,12 @@ mod tests { .process(TableOperation { id: None, op: Operation::Insert { - new: Record::new(vec![ - Field::UInt(2), - Field::UInt(101), - Field::Decimal(Decimal::new(321, 2)), - ]), + new: Transaction { + id: 2, + account_id: 101, + amount: "3.21", + } + .to_record(), }, port: 2, }) @@ -1248,14 +1378,16 @@ mod tests { .process(TableOperation { id: None, op: Operation::Update { - old: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+1234567".into()), - ]), - new: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+7654321".into()), - ]), + old: Customer { + id: "1001", + phone_number: "+1234567", + } + .to_record(), + new: Customer { + id: "1001", + phone_number: "+7654321", + } + .to_record(), }, port: 0, }) @@ -1264,11 +1396,12 @@ mod tests { .process(TableOperation { id: None, op: Operation::Insert { - new: Record::new(vec![ - Field::UInt(3), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - ]), + new: Transaction { + id: 3, + account_id: 101, + amount: "1.23", + } + .to_record(), }, port: 2, }) @@ -1277,10 +1410,11 @@ mod tests { .process(TableOperation { id: None, op: Operation::Insert { - new: dozer_types::types::Record::new(vec![ - Field::String("1001".into()), - Field::String("+2 123".into()), - ]), + new: Customer { + id: "1001", + phone_number: "+2 123", + } + .to_record(), }, port: 0, }) @@ -1289,41 +1423,33 @@ mod tests { let res = state.perform_denorm(&client).unwrap(); assert_eq!( res, - vec![DenormalizedTable { - bin_names: vec![ - CString::new("id").unwrap(), - CString::new("account_id").unwrap(), - CString::new("amount").unwrap(), - CString::new("transaction_limit").unwrap(), - CString::new("phone_number").unwrap(), - ], - records: vec![ - vec![ - Field::UInt(2), - Field::UInt(101), - Field::Decimal(Decimal::new(321, 2)), - Field::Null, - Field::String("+1234567".into()) - ], - vec![ - Field::UInt(3), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - Field::Null, - Field::String("+7654321".into()) - ], - vec![ - Field::UInt(3), - Field::UInt(101), - Field::Decimal(Decimal::new(123, 2)), - Field::Null, - Field::String("+2 123".into()) - ] - ], - namespace: CString::new("test").unwrap(), - set: CString::new("transactions_denorm").unwrap(), - pk: vec![0, 4], - }] + vec![vec![ + DenormResult { + id: 2, + account_id: 101, + amount: "3.21", + customer_id: "1001", + transaction_limit: None, + phone_number: "+1234567" + }, + DenormResult { + id: 3, + account_id: 101, + amount: "1.23", + customer_id: "1001", + transaction_limit: None, + phone_number: "+7654321", + }, + DenormResult { + id: 3, + account_id: 101, + amount: "1.23", + customer_id: "1001", + transaction_limit: None, + phone_number: "+2 123", + }, + ],] ); + state.persist(&client).unwrap(); } }