Skip to content

Commit

Permalink
Add total latency metric (#2470)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jesse-Bakker authored Apr 2, 2024
1 parent df196a6 commit 07a9b28
Show file tree
Hide file tree
Showing 20 changed files with 180 additions and 32 deletions.
44 changes: 42 additions & 2 deletions dozer-core/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use daggy::NodeIndex;
use dozer_tracing::{
constants::{
ConnectorEntityType, DOZER_METER_NAME, OPERATION_TYPE_LABEL, PIPELINE_LATENCY_GAUGE_NAME,
SINK_OPERATION_COUNTER_NAME, TABLE_LABEL,
SINK_OPERATION_COUNTER_NAME, TABLE_LABEL, TOTAL_LATENCY_HISTOGRAM_NAME,
},
emit_event,
opentelemetry_metrics::{Counter, Gauge},
opentelemetry_metrics::{Counter, Gauge, Histogram},
DozerMonitorContext,
};
use dozer_types::{epoch::SourceTime, log::warn};
use dozer_types::{
log::debug,
node::{NodeHandle, OpIdentifier},
Expand Down Expand Up @@ -115,13 +116,15 @@ pub struct SinkNode {

event_sender: broadcast::Sender<Event>,
metrics: SinkMetrics,
source_times: Option<Vec<SourceTime>>,
}

#[derive(Debug)]

pub struct SinkMetrics {
sink_counter: Counter<u64>,
latency_gauge: Gauge<f64>,
total_latency_hist: Histogram<u64>,
}

impl SinkNode {
Expand All @@ -147,6 +150,11 @@ impl SinkNode {
.with_description("Mesasures latency between commits")
.init();

let total_latency_hist = meter
.u64_histogram(TOTAL_LATENCY_HISTOGRAM_NAME)
.with_description("Measures total latency between commit on source and commit on sink")
.init();

let max_flush_interval = sink
.max_batch_duration_ms()
.map_or(DEFAULT_FLUSH_INTERVAL, Duration::from_millis);
Expand All @@ -161,6 +169,7 @@ impl SinkNode {
};

std::thread::spawn(move || scheduler.run());
let source_times = sink.supports_batching().then(Vec::new);

Self {
node_handle,
Expand All @@ -177,9 +186,11 @@ impl SinkNode {
event_sender: dag.event_hub().sender.clone(),
max_flush_interval,
ops_since_flush: 0,
source_times,
metrics: SinkMetrics {
sink_counter,
latency_gauge,
total_latency_hist,
},
}
}
Expand All @@ -200,6 +211,18 @@ impl SinkNode {
node: self.node_handle.clone(),
epoch,
});
if let Some(source_times) = self.source_times.as_mut() {
let mut labels = self.labels.attrs().clone();
labels.push(dozer_tracing::KeyValue::new(
TABLE_LABEL,
self.node_handle.id.clone(),
));
for time in source_times.drain(..) {
if let Some(elapsed) = time.elapsed_millis() {
self.metrics.total_latency_hist.record(elapsed, &labels);
}
}
}
Ok(())
}
}
Expand Down Expand Up @@ -421,6 +444,23 @@ impl ReceiverLoop for SinkNode {
}
}

if let Some(source_time) = epoch.source_time {
if let Some(source_times) = self.source_times.as_mut() {
source_times.push(source_time);
} else {
let mut labels = self.labels.attrs().clone();
labels.push(dozer_tracing::KeyValue::new(
TABLE_LABEL,
self.node_handle.id.clone(),
));
if let Some(elapsed) = source_time.elapsed_millis() {
self.metrics.total_latency_hist.record(elapsed, &labels);
} else {
warn!("Recorded total latency < 0. Source clock and system clock are out of sync.");
}
}
}

if self
.sink
.preferred_batch_size()
Expand Down
7 changes: 5 additions & 2 deletions dozer-core/src/executor/source_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
.send_op(TableOperation { op, id, port })?;
}
IngestionMessage::TransactionInfo(info) => match info {
TransactionInfo::Commit { id } => {
TransactionInfo::Commit { id, source_time } => {
if let Some(id) = id {
source.state = SourceState::Restartable(id);
} else {
Expand All @@ -116,8 +116,11 @@ impl<F: Future + Unpin> Node for SourceNode<F> {
})
.collect(),
);
let epoch =
let mut epoch =
Epoch::new(self.epoch_id, source_states, SystemTime::now());
if let Some(st) = source_time {
epoch = epoch.with_source_time(st);
}
send_to_all_nodes(
&self.sources,
ExecutorOperation::Commit { epoch },
Expand Down
6 changes: 6 additions & 0 deletions dozer-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,10 @@ pub trait Sink: Send + Debug {
fn flush_batch(&mut self) -> Result<(), BoxedError> {
Ok(())
}

/// If this returns `true`, [Sink::commit] is assumed to not commit the committed
/// transaction to the sink's remote
fn supports_batching(&self) -> bool {
false
}
}
1 change: 1 addition & 0 deletions dozer-core/src/tests/dag_base_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl Source for ErrGeneratorSource {
GENERATOR_SOURCE_OUTPUT_PORT,
IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, n)),
source_time: None,
}),
))
.await?;
Expand Down
2 changes: 2 additions & 0 deletions dozer-core/src/tests/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Source for GeneratorSource {
GENERATOR_SOURCE_OUTPUT_PORT,
IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, n)),
source_time: None,
}),
))
.await?;
Expand Down Expand Up @@ -283,6 +284,7 @@ impl Source for DualPortGeneratorSource {
DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, n)),
source_time: None,
}),
))
.await?;
Expand Down
53 changes: 35 additions & 18 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_ingestion_connector::dozer_types::epoch::SourceTime;
use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::errors::types::DeserializationError;
use dozer_ingestion_connector::dozer_types::event::Event;
Expand Down Expand Up @@ -227,6 +228,7 @@ impl AerospikeConnector {

#[derive(Debug)]
struct PendingMessage {
source_time: SourceTime,
messages: Vec<IngestionMessage>,
sender: oneshot::Sender<()>,
}
Expand Down Expand Up @@ -260,6 +262,7 @@ async fn ingestor_loop(
let _ = ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: Some(OpIdentifier::new(0, operation_id)),
source_time: Some(message.source_time),
}))
.await;

Expand Down Expand Up @@ -352,14 +355,16 @@ async fn event_request_handler(
return HttpResponse::Ok().finish();
}

let message = map_record(event, &state.tables_index_map).await;
let source_time = SourceTime::new(event.lut, 1);
let message = map_record(event, &state.tables_index_map);

trace!(target: "aerospike_http_server", "Mapped message {:?}", message);
match message {
Ok(None) => HttpResponse::Ok().finish(),
Ok(Some(message)) => {
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage {
source_time,
messages: vec![message],
sender,
}) {
Expand All @@ -385,30 +390,42 @@ async fn batch_event_request_handler(
let events = json.into_inner();
let state = data.into_inner();

let mut messages = vec![];
debug!(target: "aerospike_http_server", "Aerospike events count {:?}", events.len());
trace!(target: "aerospike_http_server", "Aerospike events {:?}", events);

for event in events {
match map_record(event, &state.tables_index_map).await {
Ok(None) => {}
Ok(Some(message)) => messages.push(message),
Err(e) => return map_error(e),
}
}
let mut min_lut = u64::MAX;
let messages = match events
.into_iter()
.filter_map(|e| {
let lut = e.lut;
let msg = map_record(e, &state.tables_index_map).transpose()?;
min_lut = min_lut.min(lut);
Some(msg)
})
.collect::<Result<Vec<_>, AerospikeConnectorError>>()
{
Ok(msgs) => msgs,
Err(e) => return map_error(e),
};

debug!(target: "aerospike_http_server", "Mapped {:?} messages", messages.len());
trace!(target: "aerospike_http_server", "Mapped messages {:?}", messages);

let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage { messages, sender }) {
error!("Ingestor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
if !messages.is_empty() {
let (sender, receiver) = oneshot::channel::<()>();
if let Err(e) = state.sender.send(PendingMessage {
messages,
sender,
source_time: SourceTime::new(min_lut, 1),
}) {
error!("Ingestor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}

if let Err(e) = receiver.await {
error!("Pipeline event processor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
if let Err(e) = receiver.await {
error!("Pipeline event processor is down: {:?}", e);
return HttpResponse::InternalServerError().finish();
}
}

HttpResponse::Ok().finish()
Expand Down Expand Up @@ -673,7 +690,7 @@ impl Connector for AerospikeConnector {
}
}

async fn map_record(
fn map_record(
event: AerospikeEvent,
tables_map: &HashMap<String, TableIndexMap>,
) -> Result<Option<IngestionMessage>, AerospikeConnectorError> {
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/grpc/src/adapter/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pub async fn handle_message(
if ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
source_time: None,
}))
.await
.is_err()
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/grpc/src/adapter/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub async fn handle_message(
let _ = ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
source_time: None,
}))
.await;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/javascript/src/js_extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async fn send(ingestor: Ingestor, val: JsMessage) -> Result<(), anyhow::Error> {
let _ = ingestor
.handle_message(IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
source_time: None,
}))
.await;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/mysql/src/binlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ impl BinlogIngestor<'_, '_, '_> {
.handle_message(IngestionMessage::TransactionInfo(
TransactionInfo::Commit {
id: Some(encode_state(&transaction_pos)),
source_time: None,
},
))
.await
Expand Down
15 changes: 12 additions & 3 deletions dozer-ingestion/mysql/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,10 @@ mod tests {
IngestionMessage::OperationEvent { id, .. } => {
*id = None;
}
IngestionMessage::TransactionInfo(TransactionInfo::Commit { id }) => {
IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id,
source_time: None,
}) => {
*id = None;
}
_ => {}
Expand Down Expand Up @@ -736,7 +739,10 @@ mod tests {
},
id: None,
},
IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None }),
IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
source_time: None,
}),
];

check_ingestion_messages(&mut iterator, expected_ingestion_messages).await;
Expand All @@ -754,7 +760,10 @@ mod tests {
},
id: None,
},
IngestionMessage::TransactionInfo(TransactionInfo::Commit { id: None }),
IngestionMessage::TransactionInfo(TransactionInfo::Commit {
id: None,
source_time: None,
}),
];

check_ingestion_messages(&mut iterator, expected_ingestion_messages).await;
Expand Down
5 changes: 4 additions & 1 deletion dozer-ingestion/object-store/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {
.unwrap();
sender
.send(Ok(Some(IngestionMessage::TransactionInfo(
TransactionInfo::Commit { id: None },
TransactionInfo::Commit {
id: None,
source_time: None,
},
))))
.await
.unwrap();
Expand Down
5 changes: 5 additions & 0 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
use dozer_ingestion_connector::{
dozer_types::{
chrono,
epoch::SourceTime,
log::{debug, error},
models::ingestion_types::{IngestionMessage, OracleReplicator, TransactionInfo},
node::OpIdentifier,
Expand Down Expand Up @@ -393,6 +394,10 @@ impl Connector {
.blocking_handle_message(IngestionMessage::TransactionInfo(
TransactionInfo::Commit {
id: Some(OpIdentifier::new(transaction.commit_scn, 0)),
source_time: Some(SourceTime::from_chrono(
&transaction.commit_timestamp,
1000,
)),
},
))
.is_err()
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/postgres/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl<'a> CDCHandler<'a> {
.handle_message(IngestionMessage::TransactionInfo(
TransactionInfo::Commit {
id: Some(OpIdentifier::new(self.begin_lsn, 0)),
source_time: None,
},
))
.await
Expand Down
1 change: 1 addition & 0 deletions dozer-ingestion/snowflake/src/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl StreamConsumer {
.blocking_handle_message(IngestionMessage::TransactionInfo(
TransactionInfo::Commit {
id: Some(OpIdentifier::new(iteration, idx as u64)),
source_time: None,
},
))
.is_err()
Expand Down
4 changes: 4 additions & 0 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ impl AerospikeSinkWorker {
}

impl Sink for AerospikeSink {
fn supports_batching(&self) -> bool {
true
}

fn flush_batch(&mut self) -> Result<(), BoxedError> {
self.replication_worker.flush_batch()?;
Ok(())
Expand Down
Loading

0 comments on commit 07a9b28

Please sign in to comment.