Skip to content

Commit

Permalink
chore: fix minor metrics gathering (#2462)
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 authored Mar 20, 2024
1 parent 58a2305 commit 653a52e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
39 changes: 27 additions & 12 deletions dozer-core/src/executor/sink_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crossbeam::channel::{Receiver, Sender, TryRecvError};
use daggy::NodeIndex;
use dozer_tracing::{
constants::{
ConnectorEntityType, DOZER_METER_NAME, ENDPOINT_LABEL, OPERATION_TYPE_LABEL,
PIPELINE_LATENCY_GAUGE_NAME, SINK_OPERATION_COUNTER_NAME, TABLE_LABEL,
ConnectorEntityType, DOZER_METER_NAME, OPERATION_TYPE_LABEL, PIPELINE_LATENCY_GAUGE_NAME,
SINK_OPERATION_COUNTER_NAME, TABLE_LABEL,
},
emit_event,
opentelemetry_metrics::{Counter, Gauge},
Expand All @@ -12,6 +12,7 @@ use dozer_tracing::{
use dozer_types::{
log::debug,
node::{NodeHandle, OpIdentifier},
tracing::error,
types::{Operation, TableOperation},
};
use std::{
Expand Down Expand Up @@ -404,16 +405,20 @@ impl ReceiverLoop for SinkNode {
}
self.last_op_if_commit = Some(epoch.clone());

if let Ok(duration) = epoch.decision_instant.elapsed() {
let mut labels = self.labels.attrs().clone();
labels.push(dozer_tracing::KeyValue::new(
ENDPOINT_LABEL,
self.node_handle.id.clone(),
));

self.metrics
.latency_gauge
.record(duration.as_secs_f64(), &labels);
match epoch.decision_instant.elapsed() {
Ok(duration) => {
let mut labels = self.labels.attrs().clone();
labels.push(dozer_tracing::KeyValue::new(
TABLE_LABEL,
self.node_handle.id.clone(),
));
self.metrics
.latency_gauge
.record(duration.as_secs_f64(), &labels);
}
Err(e) => {
error!("error recording pipeline_latench {:?}", e);
}
}

if self
Expand Down Expand Up @@ -448,6 +453,16 @@ impl ReceiverLoop for SinkNode {
if let Err(e) = self.sink.on_source_snapshotting_done(connection_name, id) {
self.error_manager.report(e);
}

// record 0 as pipeline latency when snapshotting is done
// this is to initialize the value for metrics
let mut labels = self.labels.attrs().clone();
labels.push(dozer_tracing::KeyValue::new(
TABLE_LABEL,
self.node_handle.id.clone(),
));
self.metrics.latency_gauge.record(0_f64, &labels);

Ok(())
}
}
1 change: 0 additions & 1 deletion dozer-tracing/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation";

// Labels
pub const OPERATION_TYPE_LABEL: &str = "operation_type";
pub const ENDPOINT_LABEL: &str = "endpoint";
pub const TABLE_LABEL: &str = "table";
pub const CONNECTION_LABEL: &str = "connection";

Expand Down

0 comments on commit 653a52e

Please sign in to comment.