From 0f2977e061acb9f4107d4bbfdbb81ea8ad57b4c0 Mon Sep 17 00:00:00 2001 From: Bei Chu <914745487@qq.com> Date: Fri, 1 Mar 2024 15:53:41 +0800 Subject: [PATCH] feat: Only confirm ESP when sink is flushed (#2436) --- dozer-core/src/executor/sink_node.rs | 25 +- dozer-ingestion/aerospike/src/connector.rs | 263 ++++++++++++++------- dozer-ingestion/src/lib.rs | 2 + dozer-types/src/event.rs | 2 +- 4 files changed, 195 insertions(+), 97 deletions(-) diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index 84d46c7d0a..042e769c2a 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -98,7 +98,7 @@ pub struct SinkNode { /// The metrics labels. labels: LabelsAndProgress, - last_op_was_commit: bool, + last_op_if_commit: Option, flush_on_next_commit: bool, flush_scheduler_sender: Sender, should_flush_receiver: Receiver<()>, @@ -150,7 +150,7 @@ impl SinkNode { sink, error_manager: dag.error_manager().clone(), labels: dag.labels().clone(), - last_op_was_commit: false, + last_op_if_commit: None, flush_on_next_commit: false, flush_scheduler_sender: schedule_sender, should_flush_receiver, @@ -162,13 +162,17 @@ impl SinkNode { &self.node_handle } - fn flush(&mut self) -> Result<(), ExecutionError> { + fn flush(&mut self, epoch: Epoch) -> Result<(), ExecutionError> { if let Err(e) = self.sink.flush_batch() { self.error_manager.report(e); } self.flush_scheduler_sender .send(MAX_FLUSH_INTERVAL) .unwrap(); + let _ = self.event_sender.send(Event::SinkFlushed { + node: self.node_handle.clone(), + epoch, + }); Ok(()) } } @@ -212,8 +216,8 @@ impl ReceiverLoop for SinkNode { let mut sel = init_select(&receivers); loop { if self.should_flush_receiver.try_recv().is_ok() { - if self.last_op_was_commit { - self.flush()?; + if let Some(epoch) = self.last_op_if_commit.take() { + self.flush(epoch)?; } else { self.flush_on_next_commit = true; } @@ -267,7 +271,7 @@ impl ReceiverLoop for SinkNode { } fn on_op(&mut self, _index: usize, op: TableOperation) -> Result<(), ExecutionError> { - self.last_op_was_commit = false; + self.last_op_if_commit = None; let mut labels = self.labels.labels().clone(); labels.push("table", self.node_handle.id.clone()); const OPERATION_TYPE_LABEL: &str = "operation_type"; @@ -304,7 +308,7 @@ impl ReceiverLoop for SinkNode { if let Err(e) = self.sink.commit(&epoch) { self.error_manager.report(e); } - self.last_op_was_commit = true; + self.last_op_if_commit = Some(epoch.clone()); if let Ok(duration) = epoch.decision_instant.elapsed() { let mut labels = self.labels.labels().clone(); @@ -313,15 +317,10 @@ impl ReceiverLoop for SinkNode { } if self.flush_on_next_commit { - self.flush()?; + self.flush(epoch)?; self.flush_on_next_commit = false; } - let _ = self.event_sender.send(Event::SinkCommitted { - node: self.node_handle.clone(), - epoch, - }); - Ok(()) } diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index 08fb0eabc5..b2ca550a3c 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -5,11 +5,12 @@ use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnect use dozer_ingestion_connector::dozer_types::models::ingestion_types::{ IngestionMessage, TransactionInfo, }; -use dozer_ingestion_connector::dozer_types::node::OpIdentifier; +use dozer_ingestion_connector::dozer_types::node::{NodeHandle, OpIdentifier, SourceState}; use dozer_ingestion_connector::dozer_types::types::Operation::Insert; use dozer_ingestion_connector::dozer_types::types::{Field, FieldDefinition, FieldType, Schema}; use dozer_ingestion_connector::tokio::sync::broadcast::error::RecvError; use dozer_ingestion_connector::tokio::sync::broadcast::Receiver; +use dozer_ingestion_connector::tokio::sync::{mpsc, oneshot}; use dozer_ingestion_connector::{ async_trait, dozer_types, tokio, Connector, Ingestor, SourceSchema, SourceSchemaResult, TableIdentifier, TableInfo, @@ -45,21 +46,15 @@ pub enum AerospikeConnectorError { #[error("Cannot start server: {0}")] CannotStartServer(#[from] std::io::Error), - #[error("No set name find in key: {0:?}")] - NoSetNameFindInKey(Vec>), + #[error("Set name is none. Key: {0:?}, {1:?}, {2:?}")] + SetNameIsNone(Option, Option, Option), - #[error("Set name is none. Key: {0:?}")] - SetNameIsNone(Vec>), - - #[error("No PK in key: {0:?}")] - NoPkInKey(Vec>), + #[error("PK is none: {0:?}, {1:?}, {2:?}")] + PkIsNone(Option, String, Option), #[error("Invalid key value: {0:?}. Key is supposed to have 4 elements.")] InvalidKeyValue(Vec>), - #[error("PK is none: {0:?}")] - PkIsNone(Vec>), - #[error("Unsupported type. Bin type {bin_type:?}, field type: {field_type:?}")] UnsupportedTypeForFieldType { bin_type: String, @@ -131,35 +126,24 @@ pub struct Bin { #[derive(Debug)] pub struct AerospikeConnector { pub config: AerospikeConnection, + node_handle: NodeHandle, event_receiver: Receiver, } impl AerospikeConnector { - pub fn new(config: AerospikeConnection, event_receiver: Receiver) -> Self { + pub fn new( + config: AerospikeConnection, + node_handle: NodeHandle, + event_receiver: Receiver, + ) -> Self { Self { config, + node_handle, event_receiver, } } fn start_server(&self, server_state: ServerState) -> Result { - let mut receiver = self.event_receiver.resubscribe(); - tokio::spawn(async move { - loop { - match receiver.recv().await { - Ok(event) => { - dbg!(event); - } - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { - // do nothing - } - } - } - }); - let address = format!( "{}:{}", self.config.replication.server_address, self.config.replication.server_port @@ -178,6 +162,102 @@ impl AerospikeConnector { } } +#[derive(Debug)] +struct PendingMessage { + message: IngestionMessage, + sender: oneshot::Sender<()>, +} + +#[derive(Debug)] +struct PendingOperationId { + operation_id: u64, + sender: oneshot::Sender<()>, +} + +/// This loop assigns an operation id to each request and sends it to the ingestor. +async fn ingestor_loop( + mut message_receiver: mpsc::UnboundedReceiver, + ingestor: Ingestor, + operation_id_sender: mpsc::UnboundedSender, +) { + let mut operation_id = 0; + while let Some(message) = message_receiver.recv().await { + let pending_operation_id = PendingOperationId { + operation_id, + sender: message.sender, + }; + + // Propagate panic in the pipeline event processor loop. + operation_id_sender.send(pending_operation_id).unwrap(); + + // Ignore the error, because the server can be down. + let _ = ingestor.handle_message(message.message).await; + let _ = ingestor + .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { + id: Some(OpIdentifier::new(0, operation_id)), + })) + .await; + + operation_id += 1; + } +} + +/// This loop triggers the pending operation id that's before the event's payload. +async fn pipeline_event_processor( + node_handle: NodeHandle, + mut operation_id_receiver: mpsc::UnboundedReceiver, + mut event_receiver: Receiver, +) { + let mut operation_id_from_pipeline = None; + let mut pending_operation_id: Option = None; + loop { + if operation_id_from_pipeline + < pending_operation_id + .as_ref() + .map(|operation_id| operation_id.operation_id) + { + // We have pending operation id, wait for pipeline event. + let event = match event_receiver.recv().await { + Ok(event) => event, + Err(RecvError::Closed) => { + // Pipeline is down. + return; + } + Err(RecvError::Lagged(_)) => { + // Ignore lagged events. + continue; + } + }; + if let Some(operation_id) = get_operation_id_from_event(&event, &node_handle) { + operation_id_from_pipeline = Some(operation_id); + } + } else if let Some(pending) = pending_operation_id.take() { + // This operation id is already confirmed by the pipeline. + let _ = pending.sender.send(()); + } else { + // Wait for the next operation id. + let Some(pending) = operation_id_receiver.recv().await else { + // Ingestor is down. + return; + }; + pending_operation_id = Some(pending); + } + } +} + +fn get_operation_id_from_event(event: &Event, node_handle: &NodeHandle) -> Option { + match event { + Event::SinkFlushed { epoch, .. } => epoch + .common_info + .source_states + .get(node_handle) + .and_then(|state| match state { + SourceState::Restartable(id) => Some(id.seq_in_tx), + _ => None, + }), + } +} + fn map_error(error: AerospikeConnectorError) -> HttpResponse { error!("Aerospike ingestion error: {:?}", error); HttpResponse::InternalServerError().finish() @@ -201,19 +281,22 @@ async fn event_request_handler( return HttpResponse::Ok().finish(); } - let operation_events = map_events(event, state.tables_index_map.clone()).await; + let operation_events = map_events(event, &state.tables_index_map).await; match operation_events { Ok(None) => HttpResponse::Ok().finish(), - Ok(Some(events)) => { - for event in events { - if let Err(e) = state.ingestor.handle_message(event).await { - error!("Aerospike ingestion message send error: {:?}", e); - return HttpResponse::InternalServerError().finish(); - } + Ok(Some(message)) => { + let (sender, receiver) = oneshot::channel::<()>(); + if let Err(e) = state.sender.send(PendingMessage { message, sender }) { + error!("Ingestor is down: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } + if let Err(e) = receiver.await { + error!("Pipeline event processor is down: {:?}", e); + HttpResponse::InternalServerError().finish() + } else { + HttpResponse::Ok().finish() } - - HttpResponse::Ok().finish() } Err(e) => map_error(e), } @@ -228,7 +311,7 @@ struct TableIndexMap { #[derive(Clone)] struct ServerState { tables_index_map: HashMap, - ingestor: Ingestor, + sender: mpsc::UnboundedSender, } #[async_trait] @@ -426,9 +509,20 @@ impl Connector for AerospikeConnector { }) .collect(); + let (message_sender, message_receiver) = mpsc::unbounded_channel(); + let (operation_id_sender, operation_id_receiver) = mpsc::unbounded_channel(); + let ingestor = ingestor.clone(); + tokio::spawn(async move { + ingestor_loop(message_receiver, ingestor, operation_id_sender).await + }); + let node_handle = self.node_handle.clone(); + let event_receiver = self.event_receiver.resubscribe(); + tokio::spawn(async move { + pipeline_event_processor(node_handle, operation_id_receiver, event_receiver).await + }); let server_state = ServerState { tables_index_map: tables_index_map.clone(), - ingestor: ingestor.clone(), + sender: message_sender, }; let _server = self.start_server(server_state)?.await; @@ -439,61 +533,64 @@ impl Connector for AerospikeConnector { async fn map_events( event: AerospikeEvent, - tables_map: HashMap, -) -> Result>, AerospikeConnectorError> { - let key = event.key; - let [_, Some(ref set_name), _, ref pk_in_key] = key.clone()[..] else { - return Err(AerospikeConnectorError::InvalidKeyValue(key.clone())); + tables_map: &HashMap, +) -> Result, AerospikeConnectorError> { + let key: [Option; 4] = match event.key.try_into() { + Ok(key) => key, + Err(key) => return Err(AerospikeConnectorError::InvalidKeyValue(key)), + }; + let [key0, set_name, key2, pk_in_key] = key; + let Some(set_name) = set_name else { + return Err(AerospikeConnectorError::SetNameIsNone( + key0, key2, pk_in_key, + )); }; - if let Some(TableIndexMap { + let Some(TableIndexMap { columns_map, table_index, }) = tables_map.get(set_name.as_str()) - { - let mut fields = vec![Field::Null; columns_map.len()]; - if let Some((pk, _)) = columns_map.get("PK") { - if let Some(pk_in_key) = pk_in_key { - fields[*pk] = Field::String(pk_in_key.clone()); - } else { - return Err(AerospikeConnectorError::PkIsNone(key.clone())); - } + else { + return Ok(None); + }; + + let mut fields = vec![Field::Null; columns_map.len()]; + if let Some((pk, _)) = columns_map.get("PK") { + if let Some(pk_in_key) = pk_in_key { + fields[*pk] = Field::String(pk_in_key); + } else { + return Err(AerospikeConnectorError::PkIsNone(key0, set_name, key2)); } + } - if let Some((index, _)) = columns_map.get("inserted_at") { - // Create a NaiveDateTime from the timestamp - let naive = NaiveDateTime::from_timestamp_millis(event.lut as i64) - .ok_or(AerospikeConnectorError::InvalidTimestamp(event.lut as i64))?; + if let Some((index, _)) = columns_map.get("inserted_at") { + // Create a NaiveDateTime from the timestamp + let naive = NaiveDateTime::from_timestamp_millis(event.lut as i64) + .ok_or(AerospikeConnectorError::InvalidTimestamp(event.lut as i64))?; - // Create a normal DateTime from the NaiveDateTime - let datetime: DateTime = - DateTime::::from_naive_utc_and_offset(naive, Utc).fixed_offset(); + // Create a normal DateTime from the NaiveDateTime + let datetime: DateTime = + DateTime::::from_naive_utc_and_offset(naive, Utc).fixed_offset(); - fields[*index] = Field::Timestamp(datetime); - } + fields[*index] = Field::Timestamp(datetime); + } - for bin in event.bins { - if let Some((i, typ)) = columns_map.get(bin.name.as_str()) { - fields[*i] = match bin.value { - Some(value) => map_value_to_field(bin.r#type.as_str(), value, *typ)?, - None => Field::Null, - }; - } + for bin in event.bins { + if let Some((i, typ)) = columns_map.get(bin.name.as_str()) { + fields[*i] = match bin.value { + Some(value) => map_value_to_field(bin.r#type.as_str(), value, *typ)?, + None => Field::Null, + }; } - - Ok(Some(vec![ - IngestionMessage::OperationEvent { - table_index: *table_index, - op: Insert { - new: dozer_types::types::Record::new(fields), - }, - id: None, - }, - IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None }), - ])) - } else { - Ok(None) } + + Ok(Some(IngestionMessage::OperationEvent { + table_index: *table_index, + op: Insert { + new: dozer_types::types::Record::new(fields), + }, + id: None, + })) } pub(crate) fn map_value_to_field( diff --git a/dozer-ingestion/src/lib.rs b/dozer-ingestion/src/lib.rs index 364f6c9bc2..9e4cda2d0c 100644 --- a/dozer-ingestion/src/lib.rs +++ b/dozer-ingestion/src/lib.rs @@ -10,6 +10,7 @@ use dozer_ingestion_connector::dozer_types::{ connection::{Connection, ConnectionConfig}, ingestion_types::default_grpc_adapter, }, + node::NodeHandle, prettytable::Table, }; use dozer_ingestion_deltalake::DeltaLakeConnector; @@ -142,6 +143,7 @@ pub fn get_connector( ))), ConnectionConfig::Aerospike(config) => Ok(Box::new(AerospikeConnector::new( config, + NodeHandle::new(None, connection.name), event_hub.receiver, ))), ConnectionConfig::Oracle(oracle_config) => Ok(Box::new(OracleConnector::new( diff --git a/dozer-types/src/event.rs b/dozer-types/src/event.rs index 594230f4d3..e6e513ba57 100644 --- a/dozer-types/src/event.rs +++ b/dozer-types/src/event.rs @@ -4,7 +4,7 @@ use crate::{epoch::Epoch, node::NodeHandle}; #[derive(Debug, Clone)] pub enum Event { - SinkCommitted { node: NodeHandle, epoch: Epoch }, + SinkFlushed { node: NodeHandle, epoch: Epoch }, } #[derive(Debug)]