diff --git a/crates/bootstrap/telemetry/src/lib.rs b/crates/bootstrap/telemetry/src/lib.rs index 69b8e7cc5..a7993b2c2 100644 --- a/crates/bootstrap/telemetry/src/lib.rs +++ b/crates/bootstrap/telemetry/src/lib.rs @@ -354,11 +354,7 @@ mod tests { use std::sync::Arc; use bytes::Bytes; - use opentelemetry::{Key, KeyValue, Value}; - use opentelemetry_sdk::metrics::data::{ResourceMetrics, Sum}; - use opentelemetry_sdk::metrics::reader::MetricReader; - use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider}; - use opentelemetry_sdk::Resource; + use opentelemetry_sdk::metrics::SdkMeterProvider; use scuffle_bootstrap::{GlobalWithoutConfig, Service}; use crate::{TelemetryConfig, TelemetrySvc}; @@ -512,192 +508,4 @@ mod tests { task_handle.await.unwrap().unwrap(); } - - #[tokio::test] - async fn opentelemetry() { - #[derive(Debug, Clone)] - struct TestReader(Arc); - - impl TestReader { - fn new() -> Self { - Self(Arc::new(ManualReaderBuilder::new().build())) - } - - fn read(&self) -> ResourceMetrics { - let mut metrics = ResourceMetrics { - resource: Resource::empty(), - scope_metrics: vec![], - }; - - self.0.collect(&mut metrics).expect("collect"); - - metrics - } - } - - impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader { - fn register_pipeline(&self, pipeline: std::sync::Weak) { - self.0.register_pipeline(pipeline) - } - - fn collect( - &self, - rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics, - ) -> opentelemetry_sdk::metrics::MetricResult<()> { - self.0.collect(rm) - } - - fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> { - self.0.force_flush() - } - - fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> { - self.0.shutdown() - } - - fn temporality( - &self, - kind: opentelemetry_sdk::metrics::InstrumentKind, - ) -> opentelemetry_sdk::metrics::Temporality { - self.0.temporality(kind) - } - } - - struct TestGlobal { - open_telemetry: crate::opentelemetry::OpenTelemetry, - reader: TestReader, - } - - impl GlobalWithoutConfig for TestGlobal { - async fn init() -> anyhow::Result> { - let reader = TestReader::new(); - let provider = SdkMeterProvider::builder() - .with_resource(Resource::new_with_defaults(vec![KeyValue::new( - "service.name", - "test_service", - )])) - .with_reader(reader.clone()) - .build(); - opentelemetry::global::set_meter_provider(provider); - - Ok(Arc::new(TestGlobal { - open_telemetry: crate::opentelemetry::OpenTelemetry::new(), - reader, - })) - } - } - - impl TelemetryConfig for TestGlobal { - fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> { - Some(&self.open_telemetry) - } - } - - #[scuffle_metrics::metrics] - mod example { - use scuffle_metrics::{CounterU64, MetricEnum}; - - #[derive(MetricEnum)] - pub enum Kind { - Http, - Grpc, - } - - #[metrics(unit = "requests")] - pub fn request(kind: Kind) -> CounterU64; - } - - let global = ::init().await.unwrap(); - let task_handle = tokio::spawn(TelemetrySvc.run(Arc::clone(&global), scuffle_context::Context::global())); - - let metrics = global.reader.read(); - - assert!(!metrics.resource.is_empty()); - assert_eq!( - metrics.resource.get(Key::from_static_str("service.name")), - Some(Value::from("test_service")) - ); - assert_eq!( - metrics.resource.get(Key::from_static_str("telemetry.sdk.name")), - Some(Value::from("opentelemetry")) - ); - assert_eq!( - metrics.resource.get(Key::from_static_str("telemetry.sdk.version")), - Some(Value::from("0.27.1")) - ); - assert_eq!( - metrics.resource.get(Key::from_static_str("telemetry.sdk.language")), - Some(Value::from("rust")) - ); - - assert!(metrics.scope_metrics.is_empty()); - - example::request(example::Kind::Http).incr(); - - let metrics = global.reader.read(); - - assert_eq!(metrics.scope_metrics.len(), 1); - assert_eq!(metrics.scope_metrics[0].scope.name(), "scuffle-bootstrap-telemetry"); - assert_eq!(metrics.scope_metrics[0].scope.version(), Some("0.0.3")); - assert_eq!(metrics.scope_metrics[0].metrics.len(), 1); - assert_eq!(metrics.scope_metrics[0].metrics[0].name, "example_request"); - assert_eq!(metrics.scope_metrics[0].metrics[0].description, ""); - assert_eq!(metrics.scope_metrics[0].metrics[0].unit, "requests"); - let sum: &Sum = metrics.scope_metrics[0].metrics[0] - .data - .as_any() - .downcast_ref() - .expect("wrong data type"); - assert_eq!(sum.temporality, opentelemetry_sdk::metrics::Temporality::Cumulative); - assert_eq!(sum.is_monotonic, true); - assert_eq!(sum.data_points.len(), 1); - assert_eq!(sum.data_points[0].value, 1); - assert_eq!(sum.data_points[0].attributes.len(), 1); - assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind")); - assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http")); - - example::request(example::Kind::Http).incr(); - - let metrics = global.reader.read(); - - assert_eq!(metrics.scope_metrics.len(), 1); - assert_eq!(metrics.scope_metrics[0].metrics.len(), 1); - let sum: &Sum = metrics.scope_metrics[0].metrics[0] - .data - .as_any() - .downcast_ref() - .expect("wrong data type"); - assert_eq!(sum.data_points.len(), 1); - assert_eq!(sum.data_points[0].value, 2); - assert_eq!(sum.data_points[0].attributes.len(), 1); - assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind")); - assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http")); - - example::request(example::Kind::Grpc).incr(); - - let metrics = global.reader.read(); - - assert_eq!(metrics.scope_metrics.len(), 1); - assert_eq!(metrics.scope_metrics[0].metrics.len(), 1); - let sum: &Sum = metrics.scope_metrics[0].metrics[0] - .data - .as_any() - .downcast_ref() - .expect("wrong data type"); - assert_eq!(sum.data_points.len(), 2); - let grpc = sum - .data_points - .iter() - .find(|dp| { - dp.attributes.len() == 1 - && dp.attributes[0].key == Key::from_static_str("kind") - && dp.attributes[0].value == Value::from("Grpc") - }) - .expect("grpc data point not found"); - assert_eq!(grpc.value, 1); - - scuffle_context::Handler::global().shutdown().await; - - task_handle.await.unwrap().unwrap(); - } } diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index d6b7ba906..b2613ed18 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -62,6 +62,14 @@ pub use scuffle_metrics_derive::{metrics, MetricEnum}; #[cfg(test)] #[cfg_attr(all(test, coverage_nightly), coverage(off))] mod tests { + use std::sync::Arc; + + use opentelemetry::{Key, KeyValue, Value}; + use opentelemetry_sdk::metrics::data::{ResourceMetrics, Sum}; + use opentelemetry_sdk::metrics::reader::MetricReader; + use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider}; + use opentelemetry_sdk::Resource; + #[test] fn derive_enum() { insta::assert_snapshot!(postcompile::compile! { @@ -93,4 +101,166 @@ mod tests { // } // }); // } + + #[test] + fn opentelemetry() { + #[derive(Debug, Clone)] + struct TestReader(Arc); + + impl TestReader { + fn new() -> Self { + Self(Arc::new(ManualReaderBuilder::new().build())) + } + + fn read(&self) -> ResourceMetrics { + let mut metrics = ResourceMetrics { + resource: Resource::empty(), + scope_metrics: vec![], + }; + + self.0.collect(&mut metrics).expect("collect"); + + metrics + } + } + + impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader { + fn register_pipeline(&self, pipeline: std::sync::Weak) { + self.0.register_pipeline(pipeline) + } + + fn collect( + &self, + rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics, + ) -> opentelemetry_sdk::metrics::MetricResult<()> { + self.0.collect(rm) + } + + fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> { + self.0.force_flush() + } + + fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> { + self.0.shutdown() + } + + fn temporality( + &self, + kind: opentelemetry_sdk::metrics::InstrumentKind, + ) -> opentelemetry_sdk::metrics::Temporality { + self.0.temporality(kind) + } + } + + #[crate::metrics(crate_path = "crate")] + mod example { + use crate::{CounterU64, MetricEnum}; + + #[derive(MetricEnum)] + #[metrics(crate_path = "crate")] + pub enum Kind { + Http, + Grpc, + } + + #[metrics(unit = "requests")] + pub fn request(kind: Kind) -> CounterU64; + } + + let reader = TestReader::new(); + let provider = SdkMeterProvider::builder() + .with_resource(Resource::new_with_defaults(vec![KeyValue::new( + "service.name", + "test_service", + )])) + .with_reader(reader.clone()) + .build(); + opentelemetry::global::set_meter_provider(provider); + + let metrics = reader.read(); + + assert!(!metrics.resource.is_empty()); + assert_eq!( + metrics.resource.get(Key::from_static_str("service.name")), + Some(Value::from("test_service")) + ); + assert_eq!( + metrics.resource.get(Key::from_static_str("telemetry.sdk.name")), + Some(Value::from("opentelemetry")) + ); + assert_eq!( + metrics.resource.get(Key::from_static_str("telemetry.sdk.version")), + Some(Value::from("0.27.1")) + ); + assert_eq!( + metrics.resource.get(Key::from_static_str("telemetry.sdk.language")), + Some(Value::from("rust")) + ); + + assert!(metrics.scope_metrics.is_empty()); + + example::request(example::Kind::Http).incr(); + + let metrics = reader.read(); + + assert_eq!(metrics.scope_metrics.len(), 1); + assert_eq!(metrics.scope_metrics[0].scope.name(), "scuffle-metrics"); + assert!(metrics.scope_metrics[0].scope.version().is_some()); + assert_eq!(metrics.scope_metrics[0].metrics.len(), 1); + assert_eq!(metrics.scope_metrics[0].metrics[0].name, "example_request"); + assert_eq!(metrics.scope_metrics[0].metrics[0].description, ""); + assert_eq!(metrics.scope_metrics[0].metrics[0].unit, "requests"); + let sum: &Sum = metrics.scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref() + .expect("wrong data type"); + assert_eq!(sum.temporality, opentelemetry_sdk::metrics::Temporality::Cumulative); + assert_eq!(sum.is_monotonic, true); + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 1); + assert_eq!(sum.data_points[0].attributes.len(), 1); + assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind")); + assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http")); + + example::request(example::Kind::Http).incr(); + + let metrics = reader.read(); + + assert_eq!(metrics.scope_metrics.len(), 1); + assert_eq!(metrics.scope_metrics[0].metrics.len(), 1); + let sum: &Sum = metrics.scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref() + .expect("wrong data type"); + assert_eq!(sum.data_points.len(), 1); + assert_eq!(sum.data_points[0].value, 2); + assert_eq!(sum.data_points[0].attributes.len(), 1); + assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind")); + assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http")); + + example::request(example::Kind::Grpc).incr(); + + let metrics = reader.read(); + + assert_eq!(metrics.scope_metrics.len(), 1); + assert_eq!(metrics.scope_metrics[0].metrics.len(), 1); + let sum: &Sum = metrics.scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref() + .expect("wrong data type"); + assert_eq!(sum.data_points.len(), 2); + let grpc = sum + .data_points + .iter() + .find(|dp| { + dp.attributes.len() == 1 + && dp.attributes[0].key == Key::from_static_str("kind") + && dp.attributes[0].value == Value::from("Grpc") + }) + .expect("grpc data point not found"); + assert_eq!(grpc.value, 1); + } }