From 000934ba7b583724f689dfc78a8085c4054d345f Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 6 Jan 2025 17:15:09 +0800 Subject: [PATCH 1/2] delete telemetry_sink_build --- src/common/src/telemetry/mod.rs | 4 ++-- src/common/telemetry_event/src/lib.rs | 8 +++++++- src/stream/src/from_proto/sink.rs | 28 --------------------------- 3 files changed, 9 insertions(+), 31 deletions(-) diff --git a/src/common/src/telemetry/mod.rs b/src/common/src/telemetry/mod.rs index 52573e298b860..145bee316e004 100644 --- a/src/common/src/telemetry/mod.rs +++ b/src/common/src/telemetry/mod.rs @@ -20,8 +20,8 @@ use std::env; use risingwave_pb::telemetry::PbTelemetryClusterType; pub use risingwave_telemetry_event::{ - current_timestamp, post_telemetry_report_pb, report_event_common, request_to_telemetry_event, - TelemetryError, TelemetryResult, + current_timestamp, post_telemetry_report_pb, report_event_common, TelemetryError, + TelemetryResult, }; use serde::{Deserialize, Serialize}; use sysinfo::System; diff --git a/src/common/telemetry_event/src/lib.rs b/src/common/telemetry_event/src/lib.rs index 0be5f40e0de1c..b2a49e83c6e87 100644 --- a/src/common/telemetry_event/src/lib.rs +++ b/src/common/telemetry_event/src/lib.rs @@ -51,6 +51,12 @@ pub fn report_event_common( attributes: Option, // any json string node: String, ) { + // REMOVE ME: To find out all report_event calls + println!( + "report_event_common backtrace:\n{}", + std::backtrace::Backtrace::force_capture() + ); + let event_tracking_id: String; if let Some(tracking_id) = TELEMETRY_TRACKING_ID.get() { event_tracking_id = tracking_id.to_string(); @@ -72,7 +78,7 @@ pub fn report_event_common( ); } -pub fn request_to_telemetry_event( +fn request_to_telemetry_event( tracking_id: String, event_stage: PbTelemetryEventStage, event_name: &str, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 6393948812d9d..4441620f62adb 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -41,32 +41,6 @@ use crate::telemetry::report_event; pub struct SinkExecutorBuilder; -fn telemetry_sink_build( - sink_id: &SinkId, - connector_name: &str, - sink_format_desc: &Option, -) { - let attr = sink_format_desc.as_ref().map(|f| { - let mut builder = jsonbb::Builder::>::new(); - builder.begin_object(); - builder.add_string("format"); - builder.add_value(jsonbb::ValueRef::String(f.format.to_string().as_str())); - builder.add_string("encode"); - builder.add_value(jsonbb::ValueRef::String(f.encode.to_string().as_str())); - builder.end_object(); - builder.finish() - }); - - report_event( - PbTelemetryEventStage::CreateStreamJob, - "sink", - sink_id.sink_id() as i64, - Some(connector_name.to_owned()), - Some(PbTelemetryDatabaseObject::Sink), - attr, - ) -} - fn resolve_pk_info( input_schema: &Schema, log_store_table: &Table, @@ -259,8 +233,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { connector, sink_id.sink_id, params.executor_id ); - telemetry_sink_build(&sink_id, connector, &sink_param.format_desc); - let exec = match node.log_store_type() { // Default value is the normal in memory log store to be backward compatible with the // previously unset value From da143dd6826dc39aeefc97d33dd6c28cf414d8f7 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 6 Jan 2025 18:22:24 +0800 Subject: [PATCH 2/2] delete telemetry_source_build --- src/stream/src/from_proto/source/mod.rs | 29 ------------------- .../src/from_proto/source/trad_source.rs | 2 -- 2 files changed, 31 deletions(-) diff --git a/src/stream/src/from_proto/source/mod.rs b/src/stream/src/from_proto/source/mod.rs index b61f4ce959eb0..c89ac9cf25c16 100644 --- a/src/stream/src/from_proto/source/mod.rs +++ b/src/stream/src/from_proto/source/mod.rs @@ -22,10 +22,8 @@ pub use fs_fetch::FsFetchExecutorBuilder; use risingwave_common::catalog::TableId; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_pb::catalog::PbStreamSourceInfo; -use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage}; use super::*; -use crate::telemetry::report_event; fn get_connector_name(with_props: &BTreeMap) -> String { with_props @@ -33,30 +31,3 @@ fn get_connector_name(with_props: &BTreeMap) -> String { .map(|s| s.to_lowercase()) .unwrap_or_default() } - -fn telemetry_source_build( - source_type: &str, // "source" or "source backfill" - source_id: &TableId, - source_info: &PbStreamSourceInfo, - with_props: &BTreeMap, -) { - let mut builder = jsonbb::Builder::>::new(); - builder.begin_object(); - builder.add_string("format"); - builder.add_value(jsonbb::ValueRef::String(source_info.format().as_str_name())); - builder.add_string("encode"); - builder.add_value(jsonbb::ValueRef::String( - source_info.row_encode().as_str_name(), - )); - builder.end_object(); - let value = builder.finish(); - - report_event( - PbTelemetryEventStage::CreateStreamJob, - source_type, - source_id.table_id as i64, - Some(get_connector_name(with_props)), - Some(PbTelemetryDatabaseObject::Source), - Some(value), - ) -} diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 42cf0e1c1da23..15bb9481ecb9e 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -110,8 +110,6 @@ pub fn create_source_desc_builder( }); } - telemetry_source_build(source_type, source_id, &source_info, &with_properties); - SourceDescBuilder::new( source_columns.clone(), params.env.source_metrics(),