From 07a9b28182a6e2c9362fc145fb2e9a8c6437131e Mon Sep 17 00:00:00 2001 From: Jesse Date: Tue, 2 Apr 2024 15:53:06 +0200 Subject: [PATCH] Add total latency metric (#2470) --- dozer-core/src/executor/sink_node.rs | 44 +++++++++++++- dozer-core/src/executor/source_node/mod.rs | 7 ++- dozer-core/src/node.rs | 6 ++ dozer-core/src/tests/dag_base_errors.rs | 1 + dozer-core/src/tests/sources.rs | 2 + dozer-ingestion/aerospike/src/connector.rs | 53 +++++++++++------ dozer-ingestion/grpc/src/adapter/arrow.rs | 1 + dozer-ingestion/grpc/src/adapter/default.rs | 1 + .../javascript/src/js_extension/mod.rs | 1 + dozer-ingestion/mysql/src/binlog.rs | 1 + dozer-ingestion/mysql/src/connector.rs | 15 ++++- dozer-ingestion/object-store/src/connector.rs | 5 +- dozer-ingestion/oracle/src/connector/mod.rs | 5 ++ dozer-ingestion/postgres/src/replicator.rs | 1 + .../snowflake/src/stream_consumer.rs | 1 + dozer-sink-aerospike/src/lib.rs | 4 ++ dozer-sink-oracle/src/lib.rs | 4 ++ dozer-tracing/src/constants.rs | 1 + dozer-types/src/epoch.rs | 57 +++++++++++++++++-- dozer-types/src/models/ingestion_types.rs | 2 + 20 files changed, 180 insertions(+), 32 deletions(-) diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index f35ac3b1eb..79fa4fb38c 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -3,12 +3,13 @@ use daggy::NodeIndex; use dozer_tracing::{ constants::{ ConnectorEntityType, DOZER_METER_NAME, OPERATION_TYPE_LABEL, PIPELINE_LATENCY_GAUGE_NAME, - SINK_OPERATION_COUNTER_NAME, TABLE_LABEL, + SINK_OPERATION_COUNTER_NAME, TABLE_LABEL, TOTAL_LATENCY_HISTOGRAM_NAME, }, emit_event, - opentelemetry_metrics::{Counter, Gauge}, + opentelemetry_metrics::{Counter, Gauge, Histogram}, DozerMonitorContext, }; +use dozer_types::{epoch::SourceTime, log::warn}; use dozer_types::{ log::debug, node::{NodeHandle, OpIdentifier}, @@ -115,6 +116,7 @@ pub struct SinkNode { event_sender: broadcast::Sender, metrics: SinkMetrics, + source_times: Option>, } #[derive(Debug)] @@ -122,6 +124,7 @@ pub struct SinkNode { pub struct SinkMetrics { sink_counter: Counter, latency_gauge: Gauge, + total_latency_hist: Histogram, } impl SinkNode { @@ -147,6 +150,11 @@ impl SinkNode { .with_description("Mesasures latency between commits") .init(); + let total_latency_hist = meter + .u64_histogram(TOTAL_LATENCY_HISTOGRAM_NAME) + .with_description("Measures total latency between commit on source and commit on sink") + .init(); + let max_flush_interval = sink .max_batch_duration_ms() .map_or(DEFAULT_FLUSH_INTERVAL, Duration::from_millis); @@ -161,6 +169,7 @@ impl SinkNode { }; std::thread::spawn(move || scheduler.run()); + let source_times = sink.supports_batching().then(Vec::new); Self { node_handle, @@ -177,9 +186,11 @@ impl SinkNode { event_sender: dag.event_hub().sender.clone(), max_flush_interval, ops_since_flush: 0, + source_times, metrics: SinkMetrics { sink_counter, latency_gauge, + total_latency_hist, }, } } @@ -200,6 +211,18 @@ impl SinkNode { node: self.node_handle.clone(), epoch, }); + if let Some(source_times) = self.source_times.as_mut() { + let mut labels = self.labels.attrs().clone(); + labels.push(dozer_tracing::KeyValue::new( + TABLE_LABEL, + self.node_handle.id.clone(), + )); + for time in source_times.drain(..) { + if let Some(elapsed) = time.elapsed_millis() { + self.metrics.total_latency_hist.record(elapsed, &labels); + } + } + } Ok(()) } } @@ -421,6 +444,23 @@ impl ReceiverLoop for SinkNode { } } + if let Some(source_time) = epoch.source_time { + if let Some(source_times) = self.source_times.as_mut() { + source_times.push(source_time); + } else { + let mut labels = self.labels.attrs().clone(); + labels.push(dozer_tracing::KeyValue::new( + TABLE_LABEL, + self.node_handle.id.clone(), + )); + if let Some(elapsed) = source_time.elapsed_millis() { + self.metrics.total_latency_hist.record(elapsed, &labels); + } else { + warn!("Recorded total latency < 0. Source clock and system clock are out of sync."); + } + } + } + if self .sink .preferred_batch_size() diff --git a/dozer-core/src/executor/source_node/mod.rs b/dozer-core/src/executor/source_node/mod.rs index 3087a10314..0fa913c004 100644 --- a/dozer-core/src/executor/source_node/mod.rs +++ b/dozer-core/src/executor/source_node/mod.rs @@ -98,7 +98,7 @@ impl Node for SourceNode { .send_op(TableOperation { op, id, port })?; } IngestionMessage::TransactionInfo(info) => match info { - TransactionInfo::Commit { id } => { + TransactionInfo::Commit { id, source_time } => { if let Some(id) = id { source.state = SourceState::Restartable(id); } else { @@ -116,8 +116,11 @@ impl Node for SourceNode { }) .collect(), ); - let epoch = + let mut epoch = Epoch::new(self.epoch_id, source_states, SystemTime::now()); + if let Some(st) = source_time { + epoch = epoch.with_source_time(st); + } send_to_all_nodes( &self.sources, ExecutorOperation::Commit { epoch }, diff --git a/dozer-core/src/node.rs b/dozer-core/src/node.rs index c011e9ef2b..d9eb136b10 100644 --- a/dozer-core/src/node.rs +++ b/dozer-core/src/node.rs @@ -138,4 +138,10 @@ pub trait Sink: Send + Debug { fn flush_batch(&mut self) -> Result<(), BoxedError> { Ok(()) } + + /// If this returns `true`, [Sink::commit] is assumed to not commit the committed + /// transaction to the sink's remote + fn supports_batching(&self) -> bool { + false + } } diff --git a/dozer-core/src/tests/dag_base_errors.rs b/dozer-core/src/tests/dag_base_errors.rs index 3907fa3a39..e8a2bed55a 100644 --- a/dozer-core/src/tests/dag_base_errors.rs +++ b/dozer-core/src/tests/dag_base_errors.rs @@ -360,6 +360,7 @@ impl Source for ErrGeneratorSource { GENERATOR_SOURCE_OUTPUT_PORT, IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, n)), + source_time: None, }), )) .await?; diff --git a/dozer-core/src/tests/sources.rs b/dozer-core/src/tests/sources.rs index 43ce615adb..896b1cf782 100644 --- a/dozer-core/src/tests/sources.rs +++ b/dozer-core/src/tests/sources.rs @@ -128,6 +128,7 @@ impl Source for GeneratorSource { GENERATOR_SOURCE_OUTPUT_PORT, IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, n)), + source_time: None, }), )) .await?; @@ -283,6 +284,7 @@ impl Source for DualPortGeneratorSource { DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, n)), + source_time: None, }), )) .await?; diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index a6246638da..ad6b728e43 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -1,3 +1,4 @@ +use dozer_ingestion_connector::dozer_types::epoch::SourceTime; use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError; use dozer_ingestion_connector::dozer_types::errors::types::DeserializationError; use dozer_ingestion_connector::dozer_types::event::Event; @@ -227,6 +228,7 @@ impl AerospikeConnector { #[derive(Debug)] struct PendingMessage { + source_time: SourceTime, messages: Vec, sender: oneshot::Sender<()>, } @@ -260,6 +262,7 @@ async fn ingestor_loop( let _ = ingestor .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: Some(OpIdentifier::new(0, operation_id)), + source_time: Some(message.source_time), })) .await; @@ -352,7 +355,8 @@ async fn event_request_handler( return HttpResponse::Ok().finish(); } - let message = map_record(event, &state.tables_index_map).await; + let source_time = SourceTime::new(event.lut, 1); + let message = map_record(event, &state.tables_index_map); trace!(target: "aerospike_http_server", "Mapped message {:?}", message); match message { @@ -360,6 +364,7 @@ async fn event_request_handler( Ok(Some(message)) => { let (sender, receiver) = oneshot::channel::<()>(); if let Err(e) = state.sender.send(PendingMessage { + source_time, messages: vec![message], sender, }) { @@ -385,30 +390,42 @@ async fn batch_event_request_handler( let events = json.into_inner(); let state = data.into_inner(); - let mut messages = vec![]; debug!(target: "aerospike_http_server", "Aerospike events count {:?}", events.len()); trace!(target: "aerospike_http_server", "Aerospike events {:?}", events); - for event in events { - match map_record(event, &state.tables_index_map).await { - Ok(None) => {} - Ok(Some(message)) => messages.push(message), - Err(e) => return map_error(e), - } - } + let mut min_lut = u64::MAX; + let messages = match events + .into_iter() + .filter_map(|e| { + let lut = e.lut; + let msg = map_record(e, &state.tables_index_map).transpose()?; + min_lut = min_lut.min(lut); + Some(msg) + }) + .collect::, AerospikeConnectorError>>() + { + Ok(msgs) => msgs, + Err(e) => return map_error(e), + }; debug!(target: "aerospike_http_server", "Mapped {:?} messages", messages.len()); trace!(target: "aerospike_http_server", "Mapped messages {:?}", messages); - let (sender, receiver) = oneshot::channel::<()>(); - if let Err(e) = state.sender.send(PendingMessage { messages, sender }) { - error!("Ingestor is down: {:?}", e); - return HttpResponse::InternalServerError().finish(); - } + if !messages.is_empty() { + let (sender, receiver) = oneshot::channel::<()>(); + if let Err(e) = state.sender.send(PendingMessage { + messages, + sender, + source_time: SourceTime::new(min_lut, 1), + }) { + error!("Ingestor is down: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } - if let Err(e) = receiver.await { - error!("Pipeline event processor is down: {:?}", e); - return HttpResponse::InternalServerError().finish(); + if let Err(e) = receiver.await { + error!("Pipeline event processor is down: {:?}", e); + return HttpResponse::InternalServerError().finish(); + } } HttpResponse::Ok().finish() @@ -673,7 +690,7 @@ impl Connector for AerospikeConnector { } } -async fn map_record( +fn map_record( event: AerospikeEvent, tables_map: &HashMap, ) -> Result, AerospikeConnectorError> { diff --git a/dozer-ingestion/grpc/src/adapter/arrow.rs b/dozer-ingestion/grpc/src/adapter/arrow.rs index b3184dfedd..3e8b258eb6 100644 --- a/dozer-ingestion/grpc/src/adapter/arrow.rs +++ b/dozer-ingestion/grpc/src/adapter/arrow.rs @@ -127,6 +127,7 @@ pub async fn handle_message( if ingestor .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None, + source_time: None, })) .await .is_err() diff --git a/dozer-ingestion/grpc/src/adapter/default.rs b/dozer-ingestion/grpc/src/adapter/default.rs index c6069ad2ef..09de2ec905 100644 --- a/dozer-ingestion/grpc/src/adapter/default.rs +++ b/dozer-ingestion/grpc/src/adapter/default.rs @@ -94,6 +94,7 @@ pub async fn handle_message( let _ = ingestor .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None, + source_time: None, })) .await; Ok(()) diff --git a/dozer-ingestion/javascript/src/js_extension/mod.rs b/dozer-ingestion/javascript/src/js_extension/mod.rs index 658d8aca86..57c52fd959 100644 --- a/dozer-ingestion/javascript/src/js_extension/mod.rs +++ b/dozer-ingestion/javascript/src/js_extension/mod.rs @@ -121,6 +121,7 @@ async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), anyhow::Error> { let _ = ingestor .handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None, + source_time: None, })) .await; Ok(()) diff --git a/dozer-ingestion/mysql/src/binlog.rs b/dozer-ingestion/mysql/src/binlog.rs index ccca293913..6ff486dcca 100644 --- a/dozer-ingestion/mysql/src/binlog.rs +++ b/dozer-ingestion/mysql/src/binlog.rs @@ -440,6 +440,7 @@ impl BinlogIngestor<'_, '_, '_> { .handle_message(IngestionMessage::TransactionInfo( TransactionInfo::Commit { id: Some(encode_state(&transaction_pos)), + source_time: None, }, )) .await diff --git a/dozer-ingestion/mysql/src/connector.rs b/dozer-ingestion/mysql/src/connector.rs index 89c1d5737d..44c99decc4 100644 --- a/dozer-ingestion/mysql/src/connector.rs +++ b/dozer-ingestion/mysql/src/connector.rs @@ -539,7 +539,10 @@ mod tests { IngestionMessage::OperationEvent { id, .. } => { *id = None; } - IngestionMessage::TransactionInfo(TransactionInfo::Commit { id }) => { + IngestionMessage::TransactionInfo(TransactionInfo::Commit { + id, + source_time: None, + }) => { *id = None; } _ => {} @@ -736,7 +739,10 @@ mod tests { }, id: None, }, - IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None }), + IngestionMessage::TransactionInfo(TransactionInfo::Commit { + id: None, + source_time: None, + }), ]; check_ingestion_messages(&mut iterator, expected_ingestion_messages).await; @@ -754,7 +760,10 @@ mod tests { }, id: None, }, - IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None }), + IngestionMessage::TransactionInfo(TransactionInfo::Commit { + id: None, + source_time: None, + }), ]; check_ingestion_messages(&mut iterator, expected_ingestion_messages).await; diff --git a/dozer-ingestion/object-store/src/connector.rs b/dozer-ingestion/object-store/src/connector.rs index 9a40083a83..20d7e44601 100644 --- a/dozer-ingestion/object-store/src/connector.rs +++ b/dozer-ingestion/object-store/src/connector.rs @@ -188,7 +188,10 @@ impl Connector for ObjectStoreConnector { .unwrap(); sender .send(Ok(Some(IngestionMessage::TransactionInfo( - TransactionInfo::Commit { id: None }, + TransactionInfo::Commit { + id: None, + source_time: None, + }, )))) .await .unwrap(); diff --git a/dozer-ingestion/oracle/src/connector/mod.rs b/dozer-ingestion/oracle/src/connector/mod.rs index 4b8f874a81..70ea0b617f 100644 --- a/dozer-ingestion/oracle/src/connector/mod.rs +++ b/dozer-ingestion/oracle/src/connector/mod.rs @@ -8,6 +8,7 @@ use std::{ use dozer_ingestion_connector::{ dozer_types::{ chrono, + epoch::SourceTime, log::{debug, error}, models::ingestion_types::{IngestionMessage, OracleReplicator, TransactionInfo}, node::OpIdentifier, @@ -393,6 +394,10 @@ impl Connector { .blocking_handle_message(IngestionMessage::TransactionInfo( TransactionInfo::Commit { id: Some(OpIdentifier::new(transaction.commit_scn, 0)), + source_time: Some(SourceTime::from_chrono( + &transaction.commit_timestamp, + 1000, + )), }, )) .is_err() diff --git a/dozer-ingestion/postgres/src/replicator.rs b/dozer-ingestion/postgres/src/replicator.rs index 7e0c5d26ee..6a472f5039 100644 --- a/dozer-ingestion/postgres/src/replicator.rs +++ b/dozer-ingestion/postgres/src/replicator.rs @@ -123,6 +123,7 @@ impl<'a> CDCHandler<'a> { .handle_message(IngestionMessage::TransactionInfo( TransactionInfo::Commit { id: Some(OpIdentifier::new(self.begin_lsn, 0)), + source_time: None, }, )) .await diff --git a/dozer-ingestion/snowflake/src/stream_consumer.rs b/dozer-ingestion/snowflake/src/stream_consumer.rs index 3ae6812ffa..811ca6d96d 100644 --- a/dozer-ingestion/snowflake/src/stream_consumer.rs +++ b/dozer-ingestion/snowflake/src/stream_consumer.rs @@ -137,6 +137,7 @@ impl StreamConsumer { .blocking_handle_message(IngestionMessage::TransactionInfo( TransactionInfo::Commit { id: Some(OpIdentifier::new(iteration, idx as u64)), + source_time: None, }, )) .is_err() diff --git a/dozer-sink-aerospike/src/lib.rs b/dozer-sink-aerospike/src/lib.rs index 6f367010e1..b48a7ef3e1 100644 --- a/dozer-sink-aerospike/src/lib.rs +++ b/dozer-sink-aerospike/src/lib.rs @@ -468,6 +468,10 @@ impl AerospikeSinkWorker { } impl Sink for AerospikeSink { + fn supports_batching(&self) -> bool { + true + } + fn flush_batch(&mut self) -> Result<(), BoxedError> { self.replication_worker.flush_batch()?; Ok(()) diff --git a/dozer-sink-oracle/src/lib.rs b/dozer-sink-oracle/src/lib.rs index 088b9b7c92..cebed94038 100644 --- a/dozer-sink-oracle/src/lib.rs +++ b/dozer-sink-oracle/src/lib.rs @@ -713,6 +713,10 @@ impl Sink for OracleSink { Ok(()) } + fn supports_batching(&self) -> bool { + true + } + fn flush_batch(&mut self) -> Result<(), BoxedError> { self.exec_batch()?; if let Some(txid) = self.latest_txid { diff --git a/dozer-tracing/src/constants.rs b/dozer-tracing/src/constants.rs index 7040892f18..0b572a372f 100644 --- a/dozer-tracing/src/constants.rs +++ b/dozer-tracing/src/constants.rs @@ -4,6 +4,7 @@ pub const DOZER_METER_NAME: &str = "dozer.meter"; // Metrics pub const SINK_OPERATION_COUNTER_NAME: &str = "sink_operation"; pub const PIPELINE_LATENCY_GAUGE_NAME: &str = "pipeline_latency"; +pub const TOTAL_LATENCY_HISTOGRAM_NAME: &str = "total_latency"; pub const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation"; diff --git a/dozer-types/src/epoch.rs b/dozer-types/src/epoch.rs index 33d215fbd9..f2fc2da674 100644 --- a/dozer-types/src/epoch.rs +++ b/dozer-types/src/epoch.rs @@ -1,4 +1,9 @@ -use std::{sync::Arc, time::SystemTime}; +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use chrono::{DateTime, NaiveDateTime}; use crate::node::SourceStates; @@ -8,10 +13,51 @@ pub struct EpochCommonInfo { pub source_states: Arc, } +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct SourceTime { + millis_since_epoch: u64, + accuracy: u64, +} + +impl SourceTime { + pub fn elapsed_millis(&self) -> Option { + let now_duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time is before 1970-01-01 00:00"); + let rounded_now_millis: u64 = now_duration + .as_millis() + .next_multiple_of(self.accuracy.into()) + .try_into() + // Won't overflow a u64 in the coming 500 million years + .unwrap(); + rounded_now_millis.checked_sub(self.millis_since_epoch) + } + + pub fn from_chrono(date: &DateTime, accuracy: u64) -> Self { + Self { + millis_since_epoch: date + .naive_utc() + .signed_duration_since(NaiveDateTime::UNIX_EPOCH) + .num_milliseconds() + .try_into() + .expect("Only source times after 1970-01-01 00:00 are supported"), + accuracy, + } + } + + pub fn new(millis_since_epoch: u64, accuracy: u64) -> Self { + Self { + millis_since_epoch, + accuracy, + } + } +} + #[derive(Clone, Debug)] pub struct Epoch { pub common_info: EpochCommonInfo, pub decision_instant: SystemTime, + pub source_time: Option, } impl Epoch { @@ -19,13 +65,12 @@ impl Epoch { Self { common_info: EpochCommonInfo { id, source_states }, decision_instant, + source_time: None, } } - pub fn from(common_info: EpochCommonInfo, decision_instant: SystemTime) -> Self { - Self { - common_info, - decision_instant, - } + pub fn with_source_time(mut self, source_time: SourceTime) -> Self { + self.source_time = Some(source_time); + self } } diff --git a/dozer-types/src/models/ingestion_types.rs b/dozer-types/src/models/ingestion_types.rs index 2dc1d6008a..24832547e3 100644 --- a/dozer-types/src/models/ingestion_types.rs +++ b/dozer-types/src/models/ingestion_types.rs @@ -5,6 +5,7 @@ use std::{fmt::Debug, time::Duration}; use serde::{Deserialize, Serialize}; use crate::{ + epoch::SourceTime, helper::{deserialize_duration_secs_f64, f64_schema, serialize_duration_secs_f64}, models::connection::SchemaExample, node::OpIdentifier, @@ -35,6 +36,7 @@ pub enum TransactionInfo { Commit { /// If this connector supports restarting from after this commit, it should provide a `OpIdentifier`. id: Option, + source_time: Option, }, /// A connector uses this message kind to notify Dozer that a initial snapshot of the source tables is started SnapshottingStarted,