From 653a52e7d8cd818ae2a597b2e1b970f0eca89de8 Mon Sep 17 00:00:00 2001 From: VG Date: Thu, 21 Mar 2024 00:39:02 +0800 Subject: [PATCH] chore: fix minor metrics gathering (#2462) --- dozer-core/src/executor/sink_node.rs | 39 +++++++++++++++++++--------- dozer-tracing/src/constants.rs | 1 - 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/dozer-core/src/executor/sink_node.rs b/dozer-core/src/executor/sink_node.rs index 76b355cf51..f35ac3b1eb 100644 --- a/dozer-core/src/executor/sink_node.rs +++ b/dozer-core/src/executor/sink_node.rs @@ -2,8 +2,8 @@ use crossbeam::channel::{Receiver, Sender, TryRecvError}; use daggy::NodeIndex; use dozer_tracing::{ constants::{ - ConnectorEntityType, DOZER_METER_NAME, ENDPOINT_LABEL, OPERATION_TYPE_LABEL, - PIPELINE_LATENCY_GAUGE_NAME, SINK_OPERATION_COUNTER_NAME, TABLE_LABEL, + ConnectorEntityType, DOZER_METER_NAME, OPERATION_TYPE_LABEL, PIPELINE_LATENCY_GAUGE_NAME, + SINK_OPERATION_COUNTER_NAME, TABLE_LABEL, }, emit_event, opentelemetry_metrics::{Counter, Gauge}, @@ -12,6 +12,7 @@ use dozer_tracing::{ use dozer_types::{ log::debug, node::{NodeHandle, OpIdentifier}, + tracing::error, types::{Operation, TableOperation}, }; use std::{ @@ -404,16 +405,20 @@ impl ReceiverLoop for SinkNode { } self.last_op_if_commit = Some(epoch.clone()); - if let Ok(duration) = epoch.decision_instant.elapsed() { - let mut labels = self.labels.attrs().clone(); - labels.push(dozer_tracing::KeyValue::new( - ENDPOINT_LABEL, - self.node_handle.id.clone(), - )); - - self.metrics - .latency_gauge - .record(duration.as_secs_f64(), &labels); + match epoch.decision_instant.elapsed() { + Ok(duration) => { + let mut labels = self.labels.attrs().clone(); + labels.push(dozer_tracing::KeyValue::new( + TABLE_LABEL, + self.node_handle.id.clone(), + )); + self.metrics + .latency_gauge + .record(duration.as_secs_f64(), &labels); + } + Err(e) => { + error!("error recording pipeline_latench {:?}", e); + } } if self @@ -448,6 +453,16 @@ impl ReceiverLoop for SinkNode { if let Err(e) = self.sink.on_source_snapshotting_done(connection_name, id) { self.error_manager.report(e); } + + // record 0 as pipeline latency when snapshotting is done + // this is to initialize the value for metrics + let mut labels = self.labels.attrs().clone(); + labels.push(dozer_tracing::KeyValue::new( + TABLE_LABEL, + self.node_handle.id.clone(), + )); + self.metrics.latency_gauge.record(0_f64, &labels); + Ok(()) } } diff --git a/dozer-tracing/src/constants.rs b/dozer-tracing/src/constants.rs index 39b019cf8f..7040892f18 100644 --- a/dozer-tracing/src/constants.rs +++ b/dozer-tracing/src/constants.rs @@ -9,7 +9,6 @@ pub const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation"; // Labels pub const OPERATION_TYPE_LABEL: &str = "operation_type"; -pub const ENDPOINT_LABEL: &str = "endpoint"; pub const TABLE_LABEL: &str = "table"; pub const CONNECTION_LABEL: &str = "connection";