Skip to content

Commit

Permalink
test: move test to metrics crate
Browse files Browse the repository at this point in the history
  • Loading branch information
lennartkloock committed Jan 15, 2025
1 parent ca399ca commit 2050a51
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 193 deletions.
194 changes: 1 addition & 193 deletions crates/bootstrap/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,7 @@ mod tests {
use std::sync::Arc;

use bytes::Bytes;
use opentelemetry::{Key, KeyValue, Value};
use opentelemetry_sdk::metrics::data::{ResourceMetrics, Sum};
use opentelemetry_sdk::metrics::reader::MetricReader;
use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use scuffle_bootstrap::{GlobalWithoutConfig, Service};

use crate::{TelemetryConfig, TelemetrySvc};
Expand Down Expand Up @@ -512,192 +508,4 @@ mod tests {

task_handle.await.unwrap().unwrap();
}

#[tokio::test]
async fn opentelemetry() {
#[derive(Debug, Clone)]
struct TestReader(Arc<ManualReader>);

impl TestReader {
fn new() -> Self {
Self(Arc::new(ManualReaderBuilder::new().build()))
}

fn read(&self) -> ResourceMetrics {
let mut metrics = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: vec![],
};

self.0.collect(&mut metrics).expect("collect");

metrics
}
}

impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader {
fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(
&self,
rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.force_flush()
}

fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.shutdown()
}

fn temporality(
&self,
kind: opentelemetry_sdk::metrics::InstrumentKind,
) -> opentelemetry_sdk::metrics::Temporality {
self.0.temporality(kind)
}
}

struct TestGlobal {
open_telemetry: crate::opentelemetry::OpenTelemetry,
reader: TestReader,
}

impl GlobalWithoutConfig for TestGlobal {
async fn init() -> anyhow::Result<Arc<Self>> {
let reader = TestReader::new();
let provider = SdkMeterProvider::builder()
.with_resource(Resource::new_with_defaults(vec![KeyValue::new(
"service.name",
"test_service",
)]))
.with_reader(reader.clone())
.build();
opentelemetry::global::set_meter_provider(provider);

Ok(Arc::new(TestGlobal {
open_telemetry: crate::opentelemetry::OpenTelemetry::new(),
reader,
}))
}
}

impl TelemetryConfig for TestGlobal {
fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
Some(&self.open_telemetry)
}
}

#[scuffle_metrics::metrics]
mod example {
use scuffle_metrics::{CounterU64, MetricEnum};

#[derive(MetricEnum)]
pub enum Kind {
Http,
Grpc,
}

#[metrics(unit = "requests")]
pub fn request(kind: Kind) -> CounterU64;
}

let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
let task_handle = tokio::spawn(TelemetrySvc.run(Arc::clone(&global), scuffle_context::Context::global()));

let metrics = global.reader.read();

assert!(!metrics.resource.is_empty());
assert_eq!(
metrics.resource.get(Key::from_static_str("service.name")),
Some(Value::from("test_service"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.name")),
Some(Value::from("opentelemetry"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.version")),
Some(Value::from("0.27.1"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.language")),
Some(Value::from("rust"))
);

assert!(metrics.scope_metrics.is_empty());

example::request(example::Kind::Http).incr();

let metrics = global.reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].scope.name(), "scuffle-bootstrap-telemetry");
assert_eq!(metrics.scope_metrics[0].scope.version(), Some("0.0.3"));
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics[0].name, "example_request");
assert_eq!(metrics.scope_metrics[0].metrics[0].description, "");
assert_eq!(metrics.scope_metrics[0].metrics[0].unit, "requests");
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.temporality, opentelemetry_sdk::metrics::Temporality::Cumulative);
assert_eq!(sum.is_monotonic, true);
assert_eq!(sum.data_points.len(), 1);
assert_eq!(sum.data_points[0].value, 1);
assert_eq!(sum.data_points[0].attributes.len(), 1);
assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind"));
assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http"));

example::request(example::Kind::Http).incr();

let metrics = global.reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.data_points.len(), 1);
assert_eq!(sum.data_points[0].value, 2);
assert_eq!(sum.data_points[0].attributes.len(), 1);
assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind"));
assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http"));

example::request(example::Kind::Grpc).incr();

let metrics = global.reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.data_points.len(), 2);
let grpc = sum
.data_points
.iter()
.find(|dp| {
dp.attributes.len() == 1
&& dp.attributes[0].key == Key::from_static_str("kind")
&& dp.attributes[0].value == Value::from("Grpc")
})
.expect("grpc data point not found");
assert_eq!(grpc.value, 1);

scuffle_context::Handler::global().shutdown().await;

task_handle.await.unwrap().unwrap();
}
}
170 changes: 170 additions & 0 deletions crates/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ pub use scuffle_metrics_derive::{metrics, MetricEnum};
#[cfg(test)]
#[cfg_attr(all(test, coverage_nightly), coverage(off))]
mod tests {
use std::sync::Arc;

use opentelemetry::{Key, KeyValue, Value};
use opentelemetry_sdk::metrics::data::{ResourceMetrics, Sum};
use opentelemetry_sdk::metrics::reader::MetricReader;
use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider};
use opentelemetry_sdk::Resource;

#[test]
fn derive_enum() {
insta::assert_snapshot!(postcompile::compile! {
Expand Down Expand Up @@ -93,4 +101,166 @@ mod tests {
// }
// });
// }

#[test]
fn opentelemetry() {
#[derive(Debug, Clone)]
struct TestReader(Arc<ManualReader>);

impl TestReader {
fn new() -> Self {
Self(Arc::new(ManualReaderBuilder::new().build()))
}

fn read(&self) -> ResourceMetrics {
let mut metrics = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: vec![],
};

self.0.collect(&mut metrics).expect("collect");

metrics
}
}

impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader {
fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(
&self,
rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.force_flush()
}

fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.shutdown()
}

fn temporality(
&self,
kind: opentelemetry_sdk::metrics::InstrumentKind,
) -> opentelemetry_sdk::metrics::Temporality {
self.0.temporality(kind)
}
}

#[crate::metrics(crate_path = "crate")]
mod example {
use crate::{CounterU64, MetricEnum};

#[derive(MetricEnum)]
#[metrics(crate_path = "crate")]
pub enum Kind {
Http,
Grpc,
}

#[metrics(unit = "requests")]
pub fn request(kind: Kind) -> CounterU64;
}

let reader = TestReader::new();
let provider = SdkMeterProvider::builder()
.with_resource(Resource::new_with_defaults(vec![KeyValue::new(
"service.name",
"test_service",
)]))
.with_reader(reader.clone())
.build();
opentelemetry::global::set_meter_provider(provider);

let metrics = reader.read();

assert!(!metrics.resource.is_empty());
assert_eq!(
metrics.resource.get(Key::from_static_str("service.name")),
Some(Value::from("test_service"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.name")),
Some(Value::from("opentelemetry"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.version")),
Some(Value::from("0.27.1"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.language")),
Some(Value::from("rust"))
);

assert!(metrics.scope_metrics.is_empty());

example::request(example::Kind::Http).incr();

let metrics = reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].scope.name(), "scuffle-metrics");
assert!(metrics.scope_metrics[0].scope.version().is_some());
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics[0].name, "example_request");
assert_eq!(metrics.scope_metrics[0].metrics[0].description, "");
assert_eq!(metrics.scope_metrics[0].metrics[0].unit, "requests");
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.temporality, opentelemetry_sdk::metrics::Temporality::Cumulative);
assert_eq!(sum.is_monotonic, true);
assert_eq!(sum.data_points.len(), 1);
assert_eq!(sum.data_points[0].value, 1);
assert_eq!(sum.data_points[0].attributes.len(), 1);
assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind"));
assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http"));

example::request(example::Kind::Http).incr();

let metrics = reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.data_points.len(), 1);
assert_eq!(sum.data_points[0].value, 2);
assert_eq!(sum.data_points[0].attributes.len(), 1);
assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind"));
assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http"));

example::request(example::Kind::Grpc).incr();

let metrics = reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.data_points.len(), 2);
let grpc = sum
.data_points
.iter()
.find(|dp| {
dp.attributes.len() == 1
&& dp.attributes[0].key == Key::from_static_str("kind")
&& dp.attributes[0].value == Value::from("Grpc")
})
.expect("grpc data point not found");
assert_eq!(grpc.value, 1);
}
}

0 comments on commit 2050a51

Please sign in to comment.