diff --git a/dozer-cli/src/cli/helper.rs b/dozer-cli/src/cli/helper.rs index 510d1e75de..a01ec7ce48 100644 --- a/dozer-cli/src/cli/helper.rs +++ b/dozer-cli/src/cli/helper.rs @@ -103,7 +103,7 @@ async fn load_config( ignore_pipe: bool, ) -> Result<(Config, Vec), CliError> { let read_stdin = atty::isnt(Stream::Stdin) && !ignore_pipe; - let first_config_path = config_url_or_paths.get(0); + let first_config_path = config_url_or_paths.first(); match first_config_path { None => Err(ConfigurationFilePathNotProvided), Some(path) => { diff --git a/dozer-cli/src/pipeline/connector_source.rs b/dozer-cli/src/pipeline/connector_source.rs index 7b2c9d0656..5fd3a3194d 100644 --- a/dozer-cli/src/pipeline/connector_source.rs +++ b/dozer-cli/src/pipeline/connector_source.rs @@ -262,12 +262,12 @@ impl Source for ConnectorSource { .iter() .zip(&self.ports) .map(|(table, port)| { - let checkpoint = last_checkpoint.get(port).copied().flatten(); + let state = last_checkpoint.get(port).cloned().flatten(); TableToIngest { schema: table.schema.clone(), name: table.name.clone(), column_names: table.column_names.clone(), - checkpoint, + state, } }) .collect::>(); diff --git a/dozer-core/src/builder_dag.rs b/dozer-core/src/builder_dag.rs index 0ef79afa33..a4d1d82b19 100644 --- a/dozer-core/src/builder_dag.rs +++ b/dozer-core/src/builder_dag.rs @@ -81,7 +81,7 @@ impl BuilderDag { last_checkpoint_by_name .as_mut() .and_then(|last_checkpoint| { - last_checkpoint.remove(&port_name).flatten() + last_checkpoint.remove(&port_name).flatten().cloned() }), ); } diff --git a/dozer-core/src/checkpoint/mod.rs b/dozer-core/src/checkpoint/mod.rs index 49033b9ce9..40e570da30 100644 --- a/dozer-core/src/checkpoint/mod.rs +++ b/dozer-core/src/checkpoint/mod.rs @@ -12,7 +12,7 @@ use dozer_types::{ bincode, log::{error, info}, models::app_config::{DataStorage, RecordStore}, - node::{NodeHandle, OpIdentifier, SourceStates, TableState}, + node::{NodeHandle, RestartableState, SourceStates, TableState}, parking_lot::Mutex, tonic::codegen::tokio_stream::StreamExt, types::Field, @@ -114,7 +114,7 @@ impl OptionCheckpoint { pub fn get_source_state( &self, node_handle: &NodeHandle, - ) -> Result>>, ExecutionError> { + ) -> Result>>, ExecutionError> { let Some(checkpoint) = self.checkpoint.as_ref() else { return Ok(None); }; @@ -124,7 +124,7 @@ impl OptionCheckpoint { let mut result = HashMap::new(); for (table_name, state) in source_state { - let id = match state { + let state = match state { TableState::NotStarted => None, TableState::NonRestartable => { return Err(ExecutionError::SourceCannotRestart { @@ -132,9 +132,9 @@ impl OptionCheckpoint { table_name: table_name.clone(), }); } - TableState::Restartable(id) => Some(*id), + TableState::Restartable(state) => Some(state), }; - result.insert(table_name.clone(), id); + result.insert(table_name.clone(), state); } Ok(Some(result)) } diff --git a/dozer-core/src/forwarder.rs b/dozer-core/src/forwarder.rs index 200787b9c2..26c8b5c6e3 100644 --- a/dozer-core/src/forwarder.rs +++ b/dozer-core/src/forwarder.rs @@ -222,12 +222,12 @@ impl SourceChannelManager { request_termination: bool, ) -> Result { match message { - IngestionMessage::OperationEvent { op, id, .. } => { + IngestionMessage::OperationEvent { op, state, .. } => { let port_name = self.port_names[&port].clone(); self.current_op_ids.insert( port_name, - if let Some(id) = id { - TableState::Restartable(id) + if let Some(state) = state { + TableState::Restartable(state) } else { TableState::NonRestartable }, diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index 0a5ad4af05..90b494d288 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -5,7 +5,7 @@ use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_log::storage::{Object, Queue}; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::OpIdentifier; +use dozer_types::node::RestartableState; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::tonic::async_trait; use dozer_types::types::Schema; @@ -54,7 +54,7 @@ pub trait SourceFactory: Send + Sync + Debug { ) -> Result, BoxedError>; } -pub type SourceState = HashMap>; +pub type SourceState = HashMap>; pub trait Source: Send + Sync + Debug { fn start( diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 1498214071..d57db2bcfa 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -16,7 +16,7 @@ use dozer_log::tokio; use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer}; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::{NodeHandle, OpIdentifier}; +use dozer_types::node::NodeHandle; use dozer_types::tonic::async_trait; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -378,7 +378,7 @@ impl Source for ErrGeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, GENERATOR_SOURCE_OUTPUT_PORT, )?; diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 70ab205649..449ef66507 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -3,7 +3,6 @@ use crate::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFacto use crate::DEFAULT_PORT_HANDLE; use dozer_types::errors::internal::BoxedError; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, }; @@ -96,15 +95,14 @@ impl Source for GeneratorSource { fw: &mut dyn SourceChannelForwarder, last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - let start = last_checkpoint - .values() - .copied() - .next() - .flatten() - .unwrap_or(OpIdentifier::new(0, 0)) - .txid; + let state = last_checkpoint.values().next().and_then(|state| { + state + .as_ref() + .map(|state| u64::from_be_bytes(state.0.as_slice().try_into().unwrap())) + }); + let start = state.map(|state| state + 1).unwrap_or(0); - for n in start + 1..(start + self.count + 1) { + for n in start..(start + self.count) { fw.send( IngestionMessage::OperationEvent { table_index: 0, @@ -114,7 +112,7 @@ impl Source for GeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, GENERATOR_SOURCE_OUTPUT_PORT, )?; @@ -237,7 +235,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, )?; @@ -250,7 +248,7 @@ impl Source for DualPortGeneratorSource { Field::String(format!("value_{n}")), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2, )?; diff --git a/dozer-ingestion/connector/src/ingestor.rs b/dozer-ingestion/connector/src/ingestor.rs index a01088d16f..140fd240cb 100644 --- a/dozer-ingestion/connector/src/ingestor.rs +++ b/dozer-ingestion/connector/src/ingestor.rs @@ -96,7 +96,7 @@ mod tests { .handle_message(IngestionMessage::OperationEvent { table_index: 0, op: operation.clone(), - id: None, + state: None, }) .await .unwrap(); @@ -104,7 +104,7 @@ mod tests { .handle_message(IngestionMessage::OperationEvent { table_index: 0, op: operation2.clone(), - id: None, + state: None, }) .await .unwrap(); @@ -121,7 +121,7 @@ mod tests { IngestionMessage::OperationEvent { table_index: 0, op, - id: None + state: None }, msg ); diff --git a/dozer-ingestion/connector/src/lib.rs b/dozer-ingestion/connector/src/lib.rs index a1015e847b..71256f6e9b 100644 --- a/dozer-ingestion/connector/src/lib.rs +++ b/dozer-ingestion/connector/src/lib.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use dozer_types::errors::internal::BoxedError; -use dozer_types::node::OpIdentifier; +use dozer_types::node::RestartableState; use dozer_types::serde; use dozer_types::serde::{Deserialize, Serialize}; pub use dozer_types::tonic::async_trait; @@ -146,8 +146,8 @@ pub struct TableToIngest { pub name: String, /// The column names to be mapped. pub column_names: Vec, - /// The checkpoint to start after. - pub checkpoint: Option, + /// The state to restart after. + pub state: Option, } impl TableToIngest { @@ -156,7 +156,7 @@ impl TableToIngest { schema: table_info.schema, name: table_info.name, column_names: table_info.column_names, - checkpoint: None, + state: None, } } } diff --git a/dozer-ingestion/deltalake/src/reader.rs b/dozer-ingestion/deltalake/src/reader.rs index 7a9f8d6dc8..3e25e91b1e 100644 --- a/dozer-ingestion/deltalake/src/reader.rs +++ b/dozer-ingestion/deltalake/src/reader.rs @@ -40,7 +40,7 @@ impl DeltaLakeReader { table: &TableToIngest, ingestor: &Ingestor, ) -> Result<(), BoxedError> { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); let table_path = table_path(&self.config, &table.name)?; let ctx = SessionContext::new(); @@ -75,7 +75,7 @@ impl DeltaLakeReader { lifetime: None, }, }, - id: None, + state: None, }) .await .unwrap(); diff --git a/dozer-ingestion/dozer/src/connector.rs b/dozer-ingestion/dozer/src/connector.rs index 7a4678c68a..299bde40e5 100644 --- a/dozer-ingestion/dozer/src/connector.rs +++ b/dozer-ingestion/dozer/src/connector.rs @@ -12,7 +12,7 @@ use dozer_ingestion_connector::{ default_buffer_size, default_log_batch_size, default_timeout, IngestionMessage, NestedDozerConfig, NestedDozerLogOptions, }, - node::OpIdentifier, + node::RestartableState, serde_json, tonic::{async_trait, transport::Channel}, types::{FieldType, Operation, Record, Schema}, @@ -208,10 +208,11 @@ async fn read_table( reader_builder: LogReaderBuilder, sender: Sender, ) -> Result<(), NestedDozerConnectorError> { - let starting_point = table_info - .checkpoint - .map(|checkpoint| checkpoint.seq_in_tx + 1) - .unwrap_or(0); + let state = table_info + .state + .map(|state| decode_state(&state)) + .transpose()?; + let starting_point = state.map(|pos| pos + 1).unwrap_or(0); let mut reader = reader_builder.build(starting_point); let schema = reader.schema.schema.clone(); let map = SchemaMapper::new(schema, &table_info.column_names)?; @@ -243,12 +244,26 @@ async fn read_table( .send(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(0, op_and_pos.pos)), + state: Some(encode_state(op_and_pos.pos)), }) .await; } } +fn encode_state(pos: u64) -> RestartableState { + pos.to_be_bytes().to_vec().into() +} + +fn decode_state(state: &RestartableState) -> Result { + Ok(u64::from_be_bytes( + state + .0 + .as_slice() + .try_into() + .map_err(|_| NestedDozerConnectorError::CorruptedState)?, + )) +} + struct SchemaMapper { source_schema: Schema, fields: Vec, diff --git a/dozer-ingestion/dozer/src/lib.rs b/dozer-ingestion/dozer/src/lib.rs index 5620508c91..14a6fe6834 100644 --- a/dozer-ingestion/dozer/src/lib.rs +++ b/dozer-ingestion/dozer/src/lib.rs @@ -8,6 +8,9 @@ use dozer_log::errors::{ReaderBuilderError, ReaderError}; #[derive(Error, Debug)] enum NestedDozerConnectorError { + #[error("Failed to parse checkpoint state")] + CorruptedState, + #[error("Failed to connect to upstream dozer at {0}: {1:?}")] ConnectionError(String, #[source] dozer_types::tonic::transport::Error), diff --git a/dozer-ingestion/ethereum/src/log/connector.rs b/dozer-ingestion/ethereum/src/log/connector.rs index 46b04c00ca..b23445ab02 100644 --- a/dozer-ingestion/ethereum/src/log/connector.rs +++ b/dozer-ingestion/ethereum/src/log/connector.rs @@ -71,7 +71,7 @@ impl EthLogConnector { .map(|t| vec![H256::from_str(t).unwrap()]) .collect(); builder.topics( - topics.get(0).cloned(), + topics.first().cloned(), topics.get(1).cloned(), topics.get(2).cloned(), topics.get(3).cloned(), diff --git a/dozer-ingestion/ethereum/src/log/helper.rs b/dozer-ingestion/ethereum/src/log/helper.rs index c4ae561cb2..7501c70f8f 100644 --- a/dozer-ingestion/ethereum/src/log/helper.rs +++ b/dozer-ingestion/ethereum/src/log/helper.rs @@ -75,7 +75,7 @@ pub fn decode_event( // Topics 0, 1, 2 should be name, buyer, seller in most cases let name = log .topics - .get(0) + .first() .expect("name is expected") .to_owned() .to_string(); diff --git a/dozer-ingestion/ethereum/src/log/sender.rs b/dozer-ingestion/ethereum/src/log/sender.rs index 2e28270d69..b2de12777b 100644 --- a/dozer-ingestion/ethereum/src/log/sender.rs +++ b/dozer-ingestion/ethereum/src/log/sender.rs @@ -222,7 +222,7 @@ async fn process_log(details: Arc>, msg: Log) { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() @@ -245,7 +245,7 @@ async fn process_log(details: Arc>, msg: Log) { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await; } else { diff --git a/dozer-ingestion/ethereum/src/trace/connector.rs b/dozer-ingestion/ethereum/src/trace/connector.rs index 63fd7c7b4e..9ca8525788 100644 --- a/dozer-ingestion/ethereum/src/trace/connector.rs +++ b/dozer-ingestion/ethereum/src/trace/connector.rs @@ -155,7 +155,7 @@ pub async fn run( .handle_message(IngestionMessage::OperationEvent { table_index: 0, // We have only one table op, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/arrow.rs b/dozer-ingestion/grpc/src/adapter/arrow.rs index 72ff83f150..a42f6b7301 100644 --- a/dozer-ingestion/grpc/src/adapter/arrow.rs +++ b/dozer-ingestion/grpc/src/adapter/arrow.rs @@ -115,7 +115,7 @@ pub async fn handle_message( .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/default.rs b/dozer-ingestion/grpc/src/adapter/default.rs index 1614ffb79b..8bbc1092b5 100644 --- a/dozer-ingestion/grpc/src/adapter/default.rs +++ b/dozer-ingestion/grpc/src/adapter/default.rs @@ -88,7 +88,7 @@ pub async fn handle_message( .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await; Ok(()) diff --git a/dozer-ingestion/javascript/src/js_extension/mod.rs b/dozer-ingestion/javascript/src/js_extension/mod.rs index 18ee1a9ee6..e9bd5f5535 100644 --- a/dozer-ingestion/javascript/src/js_extension/mod.rs +++ b/dozer-ingestion/javascript/src/js_extension/mod.rs @@ -120,7 +120,7 @@ async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), Error> { IngestionMessage::OperationEvent { table_index: 0, op, - id: None, + state: None, } } }; diff --git a/dozer-ingestion/kafka/src/debezium/mapper.rs b/dozer-ingestion/kafka/src/debezium/mapper.rs index 6826d9460b..1825bcc868 100644 --- a/dozer-ingestion/kafka/src/debezium/mapper.rs +++ b/dozer-ingestion/kafka/src/debezium/mapper.rs @@ -383,7 +383,7 @@ mod tests { fields_map.insert("weight".to_string(), weight_struct); let fields = convert_value_to_schema(value, &schema, &fields_map).unwrap(); - assert_eq!(*fields.get(0).unwrap(), Field::from(1)); + assert_eq!(*fields.first().unwrap(), Field::from(1)); assert_eq!(*fields.get(1).unwrap(), Field::from("Product".to_string())); assert_eq!( *fields.get(2).unwrap(), @@ -440,7 +440,7 @@ mod tests { fields_map.insert("name".to_string(), name_struct); let fields = convert_value_to_schema(value, &schema, &fields_map).unwrap(); - assert_eq!(*fields.get(0).unwrap(), Field::from(1)); + assert_eq!(*fields.first().unwrap(), Field::from(1)); assert_eq!(*fields.get(1).unwrap(), Field::Null); } } diff --git a/dozer-ingestion/kafka/src/debezium/schema_registry.rs b/dozer-ingestion/kafka/src/debezium/schema_registry.rs index ab5bfbe362..9f4f154869 100644 --- a/dozer-ingestion/kafka/src/debezium/schema_registry.rs +++ b/dozer-ingestion/kafka/src/debezium/schema_registry.rs @@ -97,7 +97,7 @@ impl SchemaRegistry { let sr_settings = SrSettings::new(schema_registry_url); match table_names { None => Ok(vec![]), - Some(tables) => match tables.get(0) { + Some(tables) => match tables.first() { None => Ok(vec![]), Some(table) => { let key_result = diff --git a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs index ba907de845..1429436910 100644 --- a/dozer-ingestion/kafka/src/debezium/stream_consumer.rs +++ b/dozer-ingestion/kafka/src/debezium/stream_consumer.rs @@ -94,7 +94,7 @@ impl StreamConsumer for DebeziumStreamConsumer { let topics: Vec<&str> = tables .iter() .map(|t| { - assert!(t.checkpoint.is_none()); + assert!(t.state.is_none()); t.name.as_str() }) .collect(); @@ -154,7 +154,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() @@ -176,7 +176,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() @@ -198,7 +198,7 @@ impl StreamConsumer for DebeziumStreamConsumer { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/kafka/src/stream_consumer_basic.rs b/dozer-ingestion/kafka/src/stream_consumer_basic.rs index 34c28a5ddf..f72f8db69d 100644 --- a/dozer-ingestion/kafka/src/stream_consumer_basic.rs +++ b/dozer-ingestion/kafka/src/stream_consumer_basic.rs @@ -88,7 +88,7 @@ impl StreamConsumer for StreamConsumerBasic { let mut schemas = HashMap::new(); for (table_index, table) in tables.into_iter().enumerate() { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); let schema = if let Some(url) = schema_registry_url { SchemaRegistryBasic::get_single_schema(&table.name, url).await? @@ -158,7 +158,7 @@ impl StreamConsumer for StreamConsumerBasic { lifetime: None, }, }, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/mongodb/src/lib.rs b/dozer-ingestion/mongodb/src/lib.rs index f734f0eb85..f2441f5632 100644 --- a/dozer-ingestion/mongodb/src/lib.rs +++ b/dozer-ingestion/mongodb/src/lib.rs @@ -626,7 +626,7 @@ impl Connector for MongodbConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() @@ -673,7 +673,7 @@ impl Connector for MongodbConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/mysql/src/binlog.rs b/dozer-ingestion/mysql/src/binlog.rs index b8831fe404..1141b7241f 100644 --- a/dozer-ingestion/mysql/src/binlog.rs +++ b/dozer-ingestion/mysql/src/binlog.rs @@ -459,7 +459,7 @@ impl BinlogIngestor<'_, '_, '_> { .handle_message(IngestionMessage::OperationEvent { table_index: table.def.table_index, op: op?, - id: None, + state: None, }) .await .is_err() @@ -1016,6 +1016,39 @@ impl<'a> BinlogRowsEvent<'a> { } } +trait ByteSliceExt { + fn trim_start(&self) -> &[u8]; + fn starts_with_case_insensitive(&self, prefix: &[u8]) -> bool; +} + +impl ByteSliceExt for [u8] { + fn trim_start(&self) -> &[u8] { + for i in 0..self.len() { + if !self[i].is_ascii_whitespace() { + return &self[i..]; + } + } + &[] + } + + fn starts_with_case_insensitive(&self, prefix: &[u8]) -> bool { + if self.len() < prefix.len() { + false + } else { + self[..prefix.len()].eq_ignore_ascii_case(prefix) + } + } +} + +fn object_name_to_string(object_name: &sqlparser::ast::ObjectName) -> String { + object_name + .0 + .iter() + .map(|ident| ident.value.as_str()) + .collect::>() + .join(".") +} + #[cfg(test)] mod tests { @@ -1186,36 +1219,3 @@ mod tests { ); } } - -trait ByteSliceExt { - fn trim_start(&self) -> &[u8]; - fn starts_with_case_insensitive(&self, prefix: &[u8]) -> bool; -} - -impl ByteSliceExt for [u8] { - fn trim_start(&self) -> &[u8] { - for i in 0..self.len() { - if !self[i].is_ascii_whitespace() { - return &self[i..]; - } - } - &[] - } - - fn starts_with_case_insensitive(&self, prefix: &[u8]) -> bool { - if self.len() < prefix.len() { - false - } else { - self[..prefix.len()].eq_ignore_ascii_case(prefix) - } - } -} - -fn object_name_to_string(object_name: &sqlparser::ast::ObjectName) -> String { - object_name - .0 - .iter() - .map(|ident| ident.value.as_str()) - .collect::>() - .join(".") -} diff --git a/dozer-ingestion/mysql/src/connector.rs b/dozer-ingestion/mysql/src/connector.rs index 8324788a39..d33a92e12c 100644 --- a/dozer-ingestion/mysql/src/connector.rs +++ b/dozer-ingestion/mysql/src/connector.rs @@ -196,7 +196,7 @@ impl Connector for MySQLConnector { let table_infos = tables .into_iter() .map(|table| { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); TableInfo { schema: table.schema, name: table.name, @@ -318,7 +318,7 @@ impl MySQLConnector { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: None, + state: None, }) .await .is_err() @@ -531,7 +531,7 @@ mod tests { Field::Float(1.0.into()), ]), }, - id: None, + state: None, }, IngestionMessage::OperationEvent { table_index: 0, @@ -542,7 +542,7 @@ mod tests { Field::Float(2.0.into()), ]), }, - id: None, + state: None, }, IngestionMessage::OperationEvent { table_index: 0, @@ -553,7 +553,7 @@ mod tests { Field::Float(3.0.into()), ]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; @@ -619,7 +619,7 @@ mod tests { op: Insert { new: Record::new(vec![Field::Int(4), Field::Float(4.0.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, IngestionMessage::SnapshottingStarted, @@ -628,7 +628,7 @@ mod tests { op: Insert { new: Record::new(vec![Field::Int(1), Field::Json(true.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; @@ -648,7 +648,7 @@ mod tests { old: Record::new(vec![Field::Int(4), Field::Float(4.0.into())]), new: Record::new(vec![Field::Int(4), Field::Float(5.0.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; @@ -667,7 +667,7 @@ mod tests { op: Delete { old: Record::new(vec![Field::Int(4), Field::Float(5.0.into())]), }, - id: None, + state: None, }, IngestionMessage::SnapshottingDone, ]; diff --git a/dozer-ingestion/object-store/src/connector.rs b/dozer-ingestion/object-store/src/connector.rs index cdd1ce1092..f51512df0e 100644 --- a/dozer-ingestion/object-store/src/connector.rs +++ b/dozer-ingestion/object-store/src/connector.rs @@ -132,7 +132,7 @@ impl Connector for ObjectStoreConnector { let mut handles = vec![]; for (table_index, table_info) in tables.iter().enumerate() { - assert!(table_info.checkpoint.is_none()); + assert!(table_info.state.is_none()); let table_info = TableInfo { schema: table_info.schema.clone(), name: table_info.name.clone(), diff --git a/dozer-ingestion/object-store/src/table_reader.rs b/dozer-ingestion/object-store/src/table_reader.rs index 846a9c9b5f..d5fa1a94a7 100644 --- a/dozer-ingestion/object-store/src/table_reader.rs +++ b/dozer-ingestion/object-store/src/table_reader.rs @@ -114,7 +114,7 @@ pub async fn read( .send(Ok(Some(IngestionMessage::OperationEvent { table_index, op: evt, - id: None, + state: None, }))) .await .is_err() diff --git a/dozer-ingestion/object-store/src/tests/local_storage_tests.rs b/dozer-ingestion/object-store/src/tests/local_storage_tests.rs index ca749f9f64..d5e5088511 100644 --- a/dozer-ingestion/object-store/src/tests/local_storage_tests.rs +++ b/dozer-ingestion/object-store/src/tests/local_storage_tests.rs @@ -23,10 +23,10 @@ async fn test_get_schema_of_parquet() { let connector = ObjectStoreConnector::new(local_storage); let (_, schemas) = connector.list_all_schemas().await.unwrap(); - let schema = schemas.get(0).unwrap(); + let schema = schemas.first().unwrap(); let fields = schema.schema.fields.clone(); - assert_eq!(fields.get(0).unwrap().typ, FieldType::Int); + assert_eq!(fields.first().unwrap().typ, FieldType::Int); assert_eq!(fields.get(1).unwrap().typ, FieldType::Boolean); assert_eq!(fields.get(2).unwrap().typ, FieldType::Int); assert_eq!(fields.get(3).unwrap().typ, FieldType::Int); @@ -45,10 +45,10 @@ async fn test_get_schema_of_csv() { let connector = ObjectStoreConnector::new(local_storage); let (_, schemas) = connector.list_all_schemas().await.unwrap(); - let schema = schemas.get(0).unwrap(); + let schema = schemas.first().unwrap(); let fields = schema.schema.fields.clone(); - assert_eq!(fields.get(0).unwrap().typ, FieldType::Int); + assert_eq!(fields.first().unwrap().typ, FieldType::Int); assert_eq!(fields.get(1).unwrap().typ, FieldType::String); assert_eq!(fields.get(2).unwrap().typ, FieldType::String); assert_eq!(fields.get(3).unwrap().typ, FieldType::Int); @@ -213,7 +213,7 @@ fn test_csv_read() { test_type_conversion!(values, 7, Field::String(_)); test_type_conversion!(values, 8, Field::String(_)); - if let Field::Int(id) = values.get(0).unwrap() { + if let Field::Int(id) = values.first().unwrap() { if *id == 2 || *id == 12 { test_type_conversion!(values, 9, Field::Float(_)); } else { @@ -269,7 +269,7 @@ fn test_csv_read_marker() { test_type_conversion!(values, 7, Field::String(_)); test_type_conversion!(values, 8, Field::String(_)); - if let Field::Int(id) = values.get(0).unwrap() { + if let Field::Int(id) = values.first().unwrap() { if *id == 2 || *id == 12 { test_type_conversion!(values, 9, Field::Float(_)); } else { @@ -325,7 +325,7 @@ fn test_csv_read_only_one_marker() { test_type_conversion!(values, 7, Field::String(_)); test_type_conversion!(values, 8, Field::String(_)); - if let Field::Int(id) = values.get(0).unwrap() { + if let Field::Int(id) = values.first().unwrap() { if *id == 2 || *id == 12 { test_type_conversion!(values, 9, Field::Float(_)); } else { diff --git a/dozer-ingestion/postgres/src/connector.rs b/dozer-ingestion/postgres/src/connector.rs index ccac425f81..8270a7fc2c 100644 --- a/dozer-ingestion/postgres/src/connector.rs +++ b/dozer-ingestion/postgres/src/connector.rs @@ -186,7 +186,7 @@ impl Connector for PostgresConnector { let tables = tables .into_iter() .map(|table| { - assert!(table.checkpoint.is_none()); + assert!(table.state.is_none()); ListOrFilterColumns { schema: table.schema, name: table.name, diff --git a/dozer-ingestion/postgres/src/replication_slot_helper.rs b/dozer-ingestion/postgres/src/replication_slot_helper.rs index 0b57d59a30..4c04d4f736 100644 --- a/dozer-ingestion/postgres/src/replication_slot_helper.rs +++ b/dozer-ingestion/postgres/src/replication_slot_helper.rs @@ -57,7 +57,7 @@ impl ReplicationSlotHelper { .map_err(PostgresConnectorError::FetchReplicationSlotError)?; Ok(matches!( - slot_query_row.get(0), + slot_query_row.first(), Some(SimpleQueryMessage::Row(_)) )) } @@ -75,7 +75,7 @@ impl ReplicationSlotHelper { .await .map_err(PostgresConnectorError::FetchReplicationSlotError)?; - let column_index = if let Some(SimpleQueryMessage::Row(row)) = slots.get(0) { + let column_index = if let Some(SimpleQueryMessage::Row(row)) = slots.first() { row.columns().iter().position(|c| c.name() == "slot_name") } else { None diff --git a/dozer-ingestion/postgres/src/replicator.rs b/dozer-ingestion/postgres/src/replicator.rs index 27b5bbbf61..b0a183ec3d 100644 --- a/dozer-ingestion/postgres/src/replicator.rs +++ b/dozer-ingestion/postgres/src/replicator.rs @@ -2,11 +2,12 @@ use dozer_ingestion_connector::dozer_types::bytes; use dozer_ingestion_connector::dozer_types::chrono::{TimeZone, Utc}; use dozer_ingestion_connector::dozer_types::log::{error, info}; use dozer_ingestion_connector::dozer_types::models::ingestion_types::IngestionMessage; -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::RestartableState; use dozer_ingestion_connector::futures::StreamExt; use dozer_ingestion_connector::Ingestor; use postgres_protocol::message::backend::ReplicationMessage::*; use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage}; +use postgres_protocol::Lsn; use postgres_types::PgLsn; use tokio_postgres::Error; @@ -30,9 +31,9 @@ pub struct CDCHandler<'a> { pub slot_name: String, pub start_lsn: PgLsn, - pub begin_lsn: u64, - pub offset_lsn: u64, - pub last_commit_lsn: u64, + pub begin_lsn: Lsn, + pub offset_lsn: Lsn, + pub last_commit_lsn: Lsn, pub offset: u64, pub seq_no: u64, @@ -59,8 +60,8 @@ impl<'a> CDCHandler<'a> { publication_name = self.publication_name ); - self.offset_lsn = u64::from(lsn); - self.last_commit_lsn = u64::from(lsn); + self.offset_lsn = Lsn::from(lsn); + self.last_commit_lsn = Lsn::from(lsn); let mut stream = LogicalReplicationStream::new(client, self.slot_name.clone(), lsn, options) @@ -116,8 +117,8 @@ impl<'a> CDCHandler<'a> { let message = mapper.handle_message(body)?; match message { - Some(MappedReplicationMessage::Commit(commit)) => { - self.last_commit_lsn = commit.txid; + Some(MappedReplicationMessage::Commit(lsn)) => { + self.last_commit_lsn = lsn; } Some(MappedReplicationMessage::Begin) => { self.begin_lsn = lsn; @@ -131,7 +132,7 @@ impl<'a> CDCHandler<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(self.begin_lsn, self.seq_no)), + state: Some(encode_state(self.begin_lsn, self.seq_no)), }) .await .is_err() @@ -155,6 +156,13 @@ impl<'a> CDCHandler<'a> { } } +fn encode_state(lsn: Lsn, seq_no: u64) -> RestartableState { + let mut state = vec![]; + state.extend_from_slice(&lsn.to_be_bytes()); + state.extend_from_slice(&seq_no.to_be_bytes()); + state.into() +} + pub struct LogicalReplicationStream { client: Client, slot_name: String, diff --git a/dozer-ingestion/postgres/src/schema/sorter.rs b/dozer-ingestion/postgres/src/schema/sorter.rs index 8f8aec97e8..a910463a2b 100644 --- a/dozer-ingestion/postgres/src/schema/sorter.rs +++ b/dozer-ingestion/postgres/src/schema/sorter.rs @@ -127,11 +127,11 @@ mod tests { ]; let result = sort_fields(&postgres_table, &expected_order).unwrap(); - assert_eq!(result.get(0).unwrap().0.name, "first field"); + assert_eq!(result.first().unwrap().0.name, "first field"); assert_eq!(result.get(1).unwrap().0.name, "second field"); assert_eq!(result.get(2).unwrap().0.name, "third field"); - assert!(result.get(0).unwrap().1); + assert!(result.first().unwrap().1); assert!(!result.get(1).unwrap().1); assert!(!result.get(2).unwrap().1); } @@ -153,28 +153,28 @@ mod tests { let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); assert_eq!( - result.get(0).unwrap().1.fields().get(0).unwrap().name, + result.first().unwrap().1.fields().first().unwrap().name, postgres_table.get_field(0).unwrap().name ); assert_eq!( - result.get(0).unwrap().1.fields().get(1).unwrap().name, + result.first().unwrap().1.fields().get(1).unwrap().name, postgres_table.get_field(1).unwrap().name ); assert_eq!( - result.get(0).unwrap().1.fields().get(2).unwrap().name, + result.first().unwrap().1.fields().get(2).unwrap().name, postgres_table.get_field(2).unwrap().name ); assert_eq!( - result.get(0).unwrap().1.is_index_field(0), + result.first().unwrap().1.is_index_field(0), postgres_table.is_index_field(0) ); assert_eq!( - result.get(0).unwrap().1.is_index_field(1), + result.first().unwrap().1.is_index_field(1), postgres_table.is_index_field(1) ); assert_eq!( - result.get(0).unwrap().1.is_index_field(2), + result.first().unwrap().1.is_index_field(2), postgres_table.is_index_field(2) ); } @@ -197,10 +197,10 @@ mod tests { let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); assert_eq!( - &result.get(0).unwrap().1.fields().get(0).unwrap().name, - columns_order.get(0).unwrap() + &result.first().unwrap().1.fields().first().unwrap().name, + columns_order.first().unwrap() ); - assert_eq!(result.get(0).unwrap().1.fields().len(), 1); + assert_eq!(result.first().unwrap().1.fields().len(), 1); } #[test] @@ -225,18 +225,18 @@ mod tests { let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); assert_eq!( - &result.get(0).unwrap().1.fields().get(0).unwrap().name, - columns_order.get(0).unwrap() + &result.first().unwrap().1.fields().first().unwrap().name, + columns_order.first().unwrap() ); assert_eq!( - &result.get(0).unwrap().1.fields().get(1).unwrap().name, + &result.first().unwrap().1.fields().get(1).unwrap().name, columns_order.get(1).unwrap() ); assert_eq!( - &result.get(0).unwrap().1.fields().get(2).unwrap().name, + &result.first().unwrap().1.fields().get(2).unwrap().name, columns_order.get(2).unwrap() ); - assert_eq!(result.get(0).unwrap().1.fields().len(), 3); + assert_eq!(result.first().unwrap().1.fields().len(), 3); } #[test] @@ -277,20 +277,20 @@ mod tests { ]; let result = sort_schemas(expected_table_order, &mapped_tables).unwrap(); - let first_table_after_sort = result.get(0).unwrap(); + let first_table_after_sort = result.first().unwrap(); let second_table_after_sort = result.get(1).unwrap(); assert_eq!( first_table_after_sort.0 .1, - expected_table_order.get(0).unwrap().name + expected_table_order.first().unwrap().name ); assert_eq!( second_table_after_sort.0 .1, expected_table_order.get(1).unwrap().name ); assert_eq!( - &first_table_after_sort.1.fields().get(0).unwrap().name, - columns_order_1.get(0).unwrap() + &first_table_after_sort.1.fields().first().unwrap().name, + columns_order_1.first().unwrap() ); assert_eq!( &first_table_after_sort.1.fields().get(1).unwrap().name, @@ -301,8 +301,8 @@ mod tests { columns_order_1.get(2).unwrap() ); assert_eq!( - &second_table_after_sort.1.fields().get(0).unwrap().name, - columns_order_2.get(0).unwrap() + &second_table_after_sort.1.fields().first().unwrap().name, + columns_order_2.first().unwrap() ); assert_eq!( &second_table_after_sort.1.fields().get(1).unwrap().name, diff --git a/dozer-ingestion/postgres/src/schema/tests.rs b/dozer-ingestion/postgres/src/schema/tests.rs index a17bf03b1f..5819748449 100644 --- a/dozer-ingestion/postgres/src/schema/tests.rs +++ b/dozer-ingestion/postgres/src/schema/tests.rs @@ -38,7 +38,7 @@ async fn test_connector_get_tables() { let schema_helper = SchemaHelper::new(client.postgres_config.clone(), None); let result = schema_helper.get_tables(None).await.unwrap(); - let table = result.get(0).unwrap(); + let table = result.first().unwrap(); assert_eq!(table_name, table.name); assert!(assert_vec_eq( &[ @@ -76,7 +76,7 @@ async fn test_connector_get_schema_with_selected_columns() { }; let result = schema_helper.get_tables(Some(&[table_info])).await.unwrap(); - let table = result.get(0).unwrap(); + let table = result.first().unwrap(); assert_eq!(table_name, table.name); assert!(assert_vec_eq( &["name".to_string(), "id".to_string()], @@ -109,7 +109,7 @@ async fn test_connector_get_schema_without_selected_columns() { }; let result = schema_helper.get_tables(Some(&[table_info])).await.unwrap(); - let table = result.get(0).unwrap(); + let table = result.first().unwrap(); assert_eq!(table_name, table.name.clone()); assert!(assert_vec_eq( &[ diff --git a/dozer-ingestion/postgres/src/snapshotter.rs b/dozer-ingestion/postgres/src/snapshotter.rs index 76ff91135d..d06775aa2b 100644 --- a/dozer-ingestion/postgres/src/snapshotter.rs +++ b/dozer-ingestion/postgres/src/snapshotter.rs @@ -136,7 +136,7 @@ impl<'a> PostgresSnapshotter<'a> { .handle_message(IngestionMessage::OperationEvent { table_index, op: evt, - id: None, + state: None, }) .await .is_err() diff --git a/dozer-ingestion/postgres/src/xlog_mapper.rs b/dozer-ingestion/postgres/src/xlog_mapper.rs index 08b0303e6d..5996705270 100644 --- a/dozer-ingestion/postgres/src/xlog_mapper.rs +++ b/dozer-ingestion/postgres/src/xlog_mapper.rs @@ -1,13 +1,11 @@ -use dozer_ingestion_connector::dozer_types::{ - node::OpIdentifier, - types::{Field, Operation, Record}, -}; +use dozer_ingestion_connector::dozer_types::types::{Field, Operation, Record}; use postgres_protocol::message::backend::LogicalReplicationMessage::{ Begin, Commit, Delete, Insert, Relation, Update, }; use postgres_protocol::message::backend::{ LogicalReplicationMessage, RelationBody, ReplicaIdentity, TupleData, UpdateBody, XLogDataBody, }; +use postgres_protocol::Lsn; use postgres_types::Type; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -34,7 +32,7 @@ pub struct TableColumn { #[derive(Debug, Clone)] pub enum MappedReplicationMessage { Begin, - Commit(OpIdentifier), + Commit(Lsn), Operation { table_index: usize, op: Operation }, } @@ -63,10 +61,7 @@ impl XlogMapper { self.ingest_schema(relation)?; } Commit(commit) => { - return Ok(Some(MappedReplicationMessage::Commit(OpIdentifier::new( - commit.end_lsn(), - 0, - )))); + return Ok(Some(MappedReplicationMessage::Commit(commit.end_lsn()))); } Begin(_begin) => { return Ok(Some(MappedReplicationMessage::Begin)); diff --git a/dozer-ingestion/snowflake/src/connector/snowflake.rs b/dozer-ingestion/snowflake/src/connector/snowflake.rs index dde8c76d28..4cda88d0b1 100644 --- a/dozer-ingestion/snowflake/src/connector/snowflake.rs +++ b/dozer-ingestion/snowflake/src/connector/snowflake.rs @@ -4,7 +4,6 @@ use dozer_ingestion_connector::{ errors::internal::BoxedError, log::{info, warn}, models::ingestion_types::{default_snowflake_poll_interval, SnowflakeConfig}, - node::OpIdentifier, types::FieldType, }, tokio, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, @@ -13,7 +12,9 @@ use dozer_ingestion_connector::{ use odbc::create_environment_v3; use crate::{ - connection::client::Client, schema_helper::SchemaHelper, stream_consumer::StreamConsumer, + connection::client::Client, + schema_helper::SchemaHelper, + stream_consumer::{decode_state, StreamConsumer}, SnowflakeError, SnowflakeStreamError, }; @@ -151,16 +152,17 @@ fn run( for (idx, table) in tables.iter().enumerate() { // We only check stream status on first iteration if iteration == 0 { - match table.checkpoint { - None | Some(OpIdentifier { txid: 0, .. }) => { + let state = table.state.as_ref().map(decode_state).transpose()?; + match state { + None | Some((0, _)) => { info!("[{}][{}] Creating new stream", name, table.name); StreamConsumer::drop_stream(&stream_client, &table.name)?; StreamConsumer::create_stream(&stream_client, &table.name)?; } - Some(OpIdentifier { txid, seq_in_tx }) => { + Some((iteration, index)) => { info!( "[{}][{}] Continuing ingestion from {}/{}", - name, table.name, txid, seq_in_tx + name, table.name, iteration, index ); if let Ok(false) = StreamConsumer::is_stream_created(&stream_client, &table.name) diff --git a/dozer-ingestion/snowflake/src/lib.rs b/dozer-ingestion/snowflake/src/lib.rs index 38f38124c8..4a449e05c0 100644 --- a/dozer-ingestion/snowflake/src/lib.rs +++ b/dozer-ingestion/snowflake/src/lib.rs @@ -17,6 +17,9 @@ mod tests; #[derive(Error, Debug)] pub enum SnowflakeError { + #[error("Failed to parse checkpoint state")] + CorruptedState, + #[error("Snowflake query error")] QueryError(#[source] Box), diff --git a/dozer-ingestion/snowflake/src/stream_consumer.rs b/dozer-ingestion/snowflake/src/stream_consumer.rs index b1b4ed4dfd..3272c1ee26 100644 --- a/dozer-ingestion/snowflake/src/stream_consumer.rs +++ b/dozer-ingestion/snowflake/src/stream_consumer.rs @@ -1,7 +1,7 @@ use dozer_ingestion_connector::{ dozer_types::{ models::ingestion_types::IngestionMessage, - node::OpIdentifier, + node::RestartableState, types::{Field, Operation, Record}, }, Ingestor, @@ -126,7 +126,7 @@ impl StreamConsumer { .blocking_handle_message(IngestionMessage::OperationEvent { table_index, op, - id: Some(OpIdentifier::new(iteration, idx as u64)), + state: Some(encode_state(iteration, idx as u64)), }) .is_err() { @@ -141,3 +141,20 @@ impl StreamConsumer { client.exec(&query) } } + +fn encode_state(iteration: u64, index: u64) -> RestartableState { + let mut state = vec![]; + state.extend_from_slice(&iteration.to_be_bytes()); + state.extend_from_slice(&index.to_be_bytes()); + state.into() +} + +pub fn decode_state(state: &RestartableState) -> Result<(u64, u64), SnowflakeError> { + if state.0.len() != 16 { + return Err(SnowflakeError::CorruptedState); + } + let state = state.0.as_slice(); + let iteration = u64::from_be_bytes(state[0..8].try_into().unwrap()); + let index = u64::from_be_bytes(state[8..16].try_into().unwrap()); + Ok((iteration, index)) +} diff --git a/dozer-ingestion/tests/test_suite/basic.rs b/dozer-ingestion/tests/test_suite/basic.rs index 2fd1430e19..e7bfc3e4ac 100644 --- a/dozer-ingestion/tests/test_suite/basic.rs +++ b/dozer-ingestion/tests/test_suite/basic.rs @@ -40,21 +40,13 @@ pub async fn run_test_suite_basic_data_ready(runtime: let (mut iterator, abort_handle) = spawn_connector(runtime, connector, tables); // Loop over messages until timeout. - let mut last_identifier = None; let mut num_operations = 0; while let Some(message) = iterator.next_timeout(Duration::from_secs(1)).await { // Check message identifier. if let IngestionMessage::OperationEvent { - table_index, - op, - id, + table_index, op, .. } = &message { - if let Some((last_id, id)) = last_identifier.zip(*id) { - assert!(id > last_id); - } - last_identifier = *id; - num_operations += 1; // Check record schema consistency. match op { @@ -147,22 +139,12 @@ pub async fn run_test_suite_basic_insert_only(runtim let mut record_iter = records.iter(); - let mut last_identifier = None; while let Some(message) = iterator.next_timeout(Duration::from_secs(1)).await { // Filter out non-operation events. - let IngestionMessage::OperationEvent { - op: operation, id, .. - } = message - else { + let IngestionMessage::OperationEvent { op: operation, .. } = message else { continue; }; - // Identifier must be increasing. - if let Some((last_id, id)) = last_identifier.zip(id) { - assert!(id > last_id); - } - last_identifier = id; - // Operation must be insert. let Operation::Insert { new: actual_record } = operation else { panic!("Expected an insert event, but got {:?}", operation); @@ -230,23 +212,13 @@ pub async fn run_test_suite_basic_cud(runtime: Arc let (mut iterator, abort_handle) = spawn_connector(runtime, connector, tables); // Check data schema consistency. - let mut last_identifier = None; let mut records = Records::new(actual_primary_index.clone()); while let Some(message) = iterator.next_timeout(Duration::from_secs(1)).await { // Filter out non-operation events. - let IngestionMessage::OperationEvent { - op: operation, id, .. - } = message - else { + let IngestionMessage::OperationEvent { op: operation, .. } = message else { continue; }; - // Identifier must be increasing. - if let Some((last_id, id)) = last_identifier.zip(id) { - assert!(id > last_id); - } - last_identifier = id; - // Record must match schema. match operation { Operation::Insert { new } => { diff --git a/dozer-sql/expression/src/execution.rs b/dozer-sql/expression/src/execution.rs index 1c91ecd1f7..db8d79ac26 100644 --- a/dozer-sql/expression/src/execution.rs +++ b/dozer-sql/expression/src/execution.rs @@ -451,7 +451,7 @@ impl Expression { results, else_result: _, } => { - let typ = results.get(0).unwrap().get_type(schema)?; + let typ = results.first().unwrap().get_type(schema)?; Ok(ExpressionType::new( typ.return_type, true, diff --git a/dozer-sql/jsonpath/src/path/json.rs b/dozer-sql/jsonpath/src/path/json.rs index 5ba5acdd84..f805f7525c 100644 --- a/dozer-sql/jsonpath/src/path/json.rs +++ b/dozer-sql/jsonpath/src/path/json.rs @@ -5,7 +5,7 @@ use regex::Regex; /// The method expects to get a number on the right side and array or string or object on the left /// where the number of characters, elements or fields will be compared respectively. pub fn size(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { - let Some(n) = right.get(0).and_then(|v| v.to_usize()) else { + let Some(n) = right.first().and_then(|v| v.to_usize()) else { return false; }; left.iter().all(|el| match el.destructure_ref() { @@ -26,7 +26,7 @@ pub fn sub_set_of(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { } if let Some(elems) = left.first().and_then(|e| (*e).as_array()) { - if let Some(right_elems) = right.get(0).and_then(|v| v.as_array()) { + if let Some(right_elems) = right.first().and_then(|v| v.as_array()) { if right_elems.is_empty() { return false; } @@ -58,7 +58,7 @@ pub fn any_of(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { return false; } - let Some(elems) = right.get(0).and_then(|v| v.as_array()) else { + let Some(elems) = right.first().and_then(|v| v.as_array()) else { return false; }; if elems.is_empty() { @@ -91,7 +91,7 @@ pub fn regex(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { return false; } - let Some(str) = right.get(0).and_then(|v| v.as_string()) else { + let Some(str) = right.first().and_then(|v| v.as_string()) else { return false; }; if let Ok(regex) = Regex::new(str) { @@ -112,7 +112,7 @@ pub fn inside(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { return false; } - let Some(first) = right.get(0) else { + let Some(first) = right.first() else { return false; }; if let Some(elems) = first.as_array() { @@ -136,8 +136,8 @@ pub fn inside(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { /// ensure the number on the left side is less the number on the right side pub fn less(left: Vec<&JsonValue>, right: Vec<&JsonValue>) -> bool { if left.len() == 1 && right.len() == 1 { - let left_no = left.get(0).and_then(|v| v.as_number()); - let right_no = right.get(0).and_then(|v| v.as_number()); + let left_no = left.first().and_then(|v| v.as_number()); + let right_no = right.first().and_then(|v| v.as_number()); match (left_no, right_no) { (Some(l), Some(r)) => l < r, _ => false, diff --git a/dozer-sql/jsonpath/src/path/top.rs b/dozer-sql/jsonpath/src/path/top.rs index 1d0c7b471b..4cd5611fcd 100644 --- a/dozer-sql/jsonpath/src/path/top.rs +++ b/dozer-sql/jsonpath/src/path/top.rs @@ -102,7 +102,7 @@ impl<'a> Path<'a> for FnPath { } }; - match input.get(0) { + match input.first() { Some(v) => match v { NewValue(d) => take_len(d), Slice(s) => take_len(s), diff --git a/dozer-sql/src/aggregation/aggregator.rs b/dozer-sql/src/aggregation/aggregator.rs index 2bb613dec5..bf5c4521c4 100644 --- a/dozer-sql/src/aggregation/aggregator.rs +++ b/dozer-sql/src/aggregation/aggregator.rs @@ -278,7 +278,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments(AggregateFunctionType::Sum.to_string()) })? @@ -290,7 +290,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments(AggregateFunctionType::Min.to_string()) })? @@ -302,7 +302,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments( AggregateFunctionType::MinAppendOnly.to_string(), @@ -316,7 +316,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments(AggregateFunctionType::Max.to_string()) })? @@ -328,7 +328,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments( AggregateFunctionType::MaxAppendOnly.to_string(), @@ -342,7 +342,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![ - args.get(0) + args.first() .ok_or_else(|| { PipelineError::NotEnoughArguments( AggregateFunctionType::MaxValue.to_string(), @@ -364,7 +364,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![ - args.get(0) + args.first() .ok_or_else(|| { PipelineError::NotEnoughArguments( AggregateFunctionType::MinValue.to_string(), @@ -386,7 +386,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments(AggregateFunctionType::Avg.to_string()) })? @@ -398,7 +398,7 @@ pub fn get_aggregator_type_from_aggregation_expression( args, } => Ok(( vec![args - .get(0) + .first() .ok_or_else(|| { PipelineError::NotEnoughArguments(AggregateFunctionType::Count.to_string()) })? @@ -416,7 +416,7 @@ pub fn update_val_map( field_map: &mut BTreeMap, return_map: &mut BTreeMap>, ) -> Result<(), PipelineError> { - let field = match fields.get(0) { + let field = match fields.first() { Some(v) => v, None => { return Err(InvalidFunctionArgument( diff --git a/dozer-sql/src/aggregation/max_value.rs b/dozer-sql/src/aggregation/max_value.rs index 63ff0d5965..2e39568af5 100644 --- a/dozer-sql/src/aggregation/max_value.rs +++ b/dozer-sql/src/aggregation/max_value.rs @@ -68,7 +68,7 @@ fn get_max_value( let val = calculate_err!(field_map.keys().max(), MaxValue).clone(); match return_map.get(&val) { - Some(v) => match v.get(0) { + Some(v) => match v.first() { Some(v) => { let value = v.clone(); Ok(value) diff --git a/dozer-sql/src/aggregation/min_value.rs b/dozer-sql/src/aggregation/min_value.rs index fbb415cc12..845635649e 100644 --- a/dozer-sql/src/aggregation/min_value.rs +++ b/dozer-sql/src/aggregation/min_value.rs @@ -68,7 +68,7 @@ fn get_min_value( let val = calculate_err!(field_map.keys().min(), MinValue).clone(); match return_map.get(&val) { - Some(v) => match v.get(0) { + Some(v) => match v.first() { Some(v) => { let value = v.clone(); Ok(value) diff --git a/dozer-sql/src/pipeline_builder/from_builder.rs b/dozer-sql/src/pipeline_builder/from_builder.rs index d2826b42a3..e6cfba5931 100644 --- a/dozer-sql/src/pipeline_builder/from_builder.rs +++ b/dozer-sql/src/pipeline_builder/from_builder.rs @@ -159,7 +159,7 @@ pub fn insert_table_operator_processor_to_pipeline( query_context.runtime.clone(), ); - if let Some(table) = operator.args.get(0) { + if let Some(table) = operator.args.first() { let source_name = match table { TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, TableOperatorArg::Descriptor(descriptor) => { @@ -218,7 +218,7 @@ pub fn insert_table_operator_processor_to_pipeline( } let processor = WindowProcessorFactory::new(processor_name.clone(), operator.clone()); - if let Some(table) = operator.args.get(0) { + if let Some(table) = operator.args.first() { let source_name = match table { TableOperatorArg::Argument(argument) => get_source_name(&operator.name, argument)?, TableOperatorArg::Descriptor(descriptor) => { diff --git a/dozer-sql/src/table_operator/tests/operator_test.rs b/dozer-sql/src/table_operator/tests/operator_test.rs index 62247aea7a..0d0c47c955 100644 --- a/dozer-sql/src/table_operator/tests/operator_test.rs +++ b/dozer-sql/src/table_operator/tests/operator_test.rs @@ -66,7 +66,7 @@ fn test_lifetime() { .execute(&record_store, &record, &schema) .unwrap(); assert_eq!(result.len(), 1); - let lifetime_record = result.get(0).unwrap(); + let lifetime_record = result.first().unwrap(); let mut expected_record = record.clone(); diff --git a/dozer-sql/src/tests/builder_test.rs b/dozer-sql/src/tests/builder_test.rs index 8b084c6c39..e8cbdce76e 100644 --- a/dozer-sql/src/tests/builder_test.rs +++ b/dozer-sql/src/tests/builder_test.rs @@ -16,7 +16,6 @@ use dozer_types::chrono::DateTime; use dozer_types::errors::internal::BoxedError; use dozer_types::log::debug; use dozer_types::models::ingestion_types::IngestionMessage; -use dozer_types::node::OpIdentifier; use dozer_types::ordered_float::OrderedFloat; use dozer_types::types::{ Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, @@ -112,7 +111,7 @@ impl Source for TestSource { fw: &mut dyn SourceChannelForwarder, _last_checkpoint: SourceState, ) -> Result<(), BoxedError> { - for n in 0..10 { + for n in 0..10u64 { fw.send( IngestionMessage::OperationEvent { table_index: 0, @@ -126,7 +125,7 @@ impl Source for TestSource { ), ]), }, - id: Some(OpIdentifier::new(n, 0)), + state: Some(n.to_be_bytes().to_vec().into()), }, DEFAULT_PORT_HANDLE, ) diff --git a/dozer-sql/src/tests/utils.rs b/dozer-sql/src/tests/utils.rs index f5cade9909..8e327c0889 100644 --- a/dozer-sql/src/tests/utils.rs +++ b/dozer-sql/src/tests/utils.rs @@ -13,7 +13,7 @@ pub fn get_select(sql: &str) -> Result, PipelineError> { let ast = Parser::parse_sql(&dialect, sql).unwrap(); - let statement = ast.get(0).expect("First statement is missing").to_owned(); + let statement = ast.first().expect("First statement is missing").to_owned(); if let Statement::Query(query) = statement { Ok(get_query_select(&query)) } else { diff --git a/dozer-sql/src/window/tests/operator_test.rs b/dozer-sql/src/window/tests/operator_test.rs index 426cf808f1..25b694097c 100644 --- a/dozer-sql/src/window/tests/operator_test.rs +++ b/dozer-sql/src/window/tests/operator_test.rs @@ -30,7 +30,7 @@ fn test_hop() { ) .unwrap(); assert_eq!(result.len(), 5); - let window_record = result.get(0).unwrap(); + let window_record = result.first().unwrap(); let expected_record = ProcessorRecord::appended( &record, @@ -82,7 +82,7 @@ fn test_tumble() { ) .unwrap(); assert_eq!(result.len(), 1); - let window_record = result.get(0).unwrap(); + let window_record = result.first().unwrap(); let expected_record = ProcessorRecord::appended( &record, diff --git a/dozer-storage/src/lmdb_database/dump/mod.rs b/dozer-storage/src/lmdb_database/dump/mod.rs index df1b7aa56e..e9d60ba3d9 100644 --- a/dozer-storage/src/lmdb_database/dump/mod.rs +++ b/dozer-storage/src/lmdb_database/dump/mod.rs @@ -167,6 +167,28 @@ async fn restore_slice(reader: &mut (impl AsyncRead + Unpin)) -> Result, Ok(buf) } +pub fn assert_database_equal( + txn1: &T1, + db1: Database, + txn2: &T2, + db2: Database, +) { + assert_eq!( + txn1.stat(db1).unwrap().entries(), + txn2.stat(db2).unwrap().entries() + ); + let cursor1 = txn1.open_ro_cursor(db1).unwrap(); + let cursor2 = txn2.open_ro_cursor(db2).unwrap(); + let iter1 = RawIterator::new(cursor1, Bound::Unbounded, true).unwrap(); + let iter2 = RawIterator::new(cursor2, Bound::Unbounded, true).unwrap(); + for (result1, result2) in iter1.zip(iter2) { + let (key1, value1) = result1.unwrap(); + let (key2, value2) = result2.unwrap(); + assert_eq!(key1, key2); + assert_eq!(value1, value2); + } +} + #[cfg(test)] pub mod tests { use std::pin::pin; @@ -292,25 +314,3 @@ pub mod tests { assert_database_equal(&txn, db, &txn, restore_db); } } - -pub fn assert_database_equal( - txn1: &T1, - db1: Database, - txn2: &T2, - db2: Database, -) { - assert_eq!( - txn1.stat(db1).unwrap().entries(), - txn2.stat(db2).unwrap().entries() - ); - let cursor1 = txn1.open_ro_cursor(db1).unwrap(); - let cursor2 = txn2.open_ro_cursor(db2).unwrap(); - let iter1 = RawIterator::new(cursor1, Bound::Unbounded, true).unwrap(); - let iter2 = RawIterator::new(cursor2, Bound::Unbounded, true).unwrap(); - for (result1, result2) in iter1.zip(iter2) { - let (key1, value1) = result1.unwrap(); - let (key2, value2) = result2.unwrap(); - assert_eq!(key1, key2); - assert_eq!(value1, value2); - } -} diff --git a/dozer-tests/src/sql_tests/helper/pipeline.rs b/dozer-tests/src/sql_tests/helper/pipeline.rs index 5311a5c202..2998fdf3cc 100644 --- a/dozer-tests/src/sql_tests/helper/pipeline.rs +++ b/dozer-tests/src/sql_tests/helper/pipeline.rs @@ -133,7 +133,7 @@ impl Source for TestSource { IngestionMessage::OperationEvent { table_index: 0, op, - id: None, + state: None, }, *port, ) diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index c0216dd7d0..0ed9eb674b 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use crate::{ helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64}, models::connection::SchemaExample, - node::OpIdentifier, + node::RestartableState, types::Operation, }; @@ -24,8 +24,8 @@ pub enum IngestionMessage { table_index: usize, /// The CDC event. op: Operation, - /// If this connector supports restarting from a specific CDC event, it should provide an identifier. - id: Option, + /// If this connector supports restarting from a specific CDC event, it should provide a `RestartableState`. + state: Option, }, /// A connector uses this message kind to notify Dozer that a initial snapshot of the source tables is started SnapshottingStarted, diff --git a/dozer-types/src/node.rs b/dozer-types/src/node.rs index d7cfdcf40c..5682247148 100644 --- a/dozer-types/src/node.rs +++ b/dozer-types/src/node.rs @@ -63,60 +63,19 @@ impl Display for NodeHandle { } #[derive( - Clone, - Debug, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - Hash, - Default, - Serialize, - Deserialize, - bincode::Encode, - bincode::Decode, + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, )] -/// A identifier made of two `u64`s. -pub struct OpIdentifier { - /// High 64 bits of the identifier. - pub txid: u64, - /// Low 64 bits of the identifier. - pub seq_in_tx: u64, -} - -impl OpIdentifier { - pub fn new(txid: u64, seq_in_tx: u64) -> Self { - Self { txid, seq_in_tx } - } - - pub fn to_bytes(&self) -> [u8; 16] { - let mut result = [0_u8; 16]; - result[0..8].copy_from_slice(&self.txid.to_be_bytes()); - result[8..16].copy_from_slice(&self.seq_in_tx.to_be_bytes()); - result - } +/// A table's restartable state, any binary data. +pub struct RestartableState(pub Vec); - pub fn from_bytes(bytes: [u8; 16]) -> Self { - let txid = u64::from_be_bytes(bytes[0..8].try_into().unwrap()); - let seq_in_tx = u64::from_be_bytes(bytes[8..16].try_into().unwrap()); - Self::new(txid, seq_in_tx) +impl From> for RestartableState { + fn from(value: Vec) -> Self { + Self(value) } } #[derive( - Debug, - Clone, - Copy, - Serialize, - Deserialize, - PartialEq, - Eq, - PartialOrd, - Ord, - Hash, - bincode::Encode, - bincode::Decode, + Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, bincode::Encode, bincode::Decode, )] /// A table's ingestion state. pub enum TableState { @@ -124,8 +83,8 @@ pub enum TableState { NotStarted, /// This table has some data ingested, and it can't be restarted. NonRestartable, - /// This table has some data ingested, and it can be restarted using the given identifier. - Restartable(OpIdentifier), + /// This table has some data ingested, and it can be restarted if it's given the state. + Restartable(RestartableState), } /// Map from a `Source` node's handle to its tables' states.