Skip to content

Commit

Permalink
test(bootstrap-telemetry): add test
Browse files Browse the repository at this point in the history
  • Loading branch information
lennartkloock committed Jan 15, 2025
1 parent d73fdce commit ca399ca
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 145 deletions.
342 changes: 199 additions & 143 deletions crates/bootstrap/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ use scuffle_http::body::IncomingBody;
#[cfg(feature = "opentelemetry-traces")]
pub use tracing_opentelemetry;

#[cfg(feature = "opentelemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
pub mod opentelemetry;

/// The telemetry service.
///
/// This is supposed to be used with the `scuffle-bootstrap` crate.
Expand Down Expand Up @@ -343,154 +347,18 @@ async fn opentelemetry_flush<G: TelemetryConfig>(
}
}

#[cfg(feature = "opentelemetry")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
pub mod opentelemetry {
pub use ::opentelemetry::*;

#[derive(Debug, thiserror::Error)]
pub enum OpenTelemetryError {
#[error("metrics: {0}")]
Metrics(#[from] opentelemetry_sdk::metrics::MetricError),
#[error("traces: {0}")]
Traces(#[from] opentelemetry::trace::TraceError),
#[error("logs: {0}")]
Logs(#[from] opentelemetry_sdk::logs::LogError),
}

#[derive(Debug, Default, Clone)]
pub struct OpenTelemetry {
#[cfg(feature = "opentelemetry-metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-metrics")))]
metrics: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
#[cfg(feature = "opentelemetry-traces")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-traces")))]
traces: Option<opentelemetry_sdk::trace::TracerProvider>,
#[cfg(feature = "opentelemetry-logs")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-logs")))]
logs: Option<opentelemetry_sdk::logs::LoggerProvider>,
}

impl OpenTelemetry {
pub fn new() -> Self {
Self::default()
}

pub fn is_enabled(&self) -> bool {
#[cfg_attr(
not(any(
feature = "opentelemetry-metrics",
feature = "opentelemetry-traces",
feature = "opentelemetry-logs"
)),
allow(unused_mut)
)]
let mut enabled = false;
#[cfg(feature = "opentelemetry-metrics")]
{
enabled |= self.metrics.is_some();
}
#[cfg(feature = "opentelemetry-traces")]
{
enabled |= self.traces.is_some();
}
#[cfg(feature = "opentelemetry-logs")]
{
enabled |= self.logs.is_some();
}
enabled
}

#[cfg(feature = "opentelemetry-metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-metrics")))]
pub fn with_metrics(self, metrics: impl Into<Option<opentelemetry_sdk::metrics::SdkMeterProvider>>) -> Self {
Self {
metrics: metrics.into(),
#[cfg(feature = "opentelemetry-traces")]
traces: self.traces,
#[cfg(feature = "opentelemetry-logs")]
logs: self.logs,
}
}

#[cfg(feature = "opentelemetry-traces")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-traces")))]
pub fn with_traces(self, traces: impl Into<Option<opentelemetry_sdk::trace::TracerProvider>>) -> Self {
Self {
traces: traces.into(),
#[cfg(feature = "opentelemetry-metrics")]
metrics: self.metrics,
#[cfg(feature = "opentelemetry-logs")]
logs: self.logs,
}
}

#[cfg(feature = "opentelemetry-logs")]
#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-logs")))]
pub fn with_logs(self, logs: impl Into<Option<opentelemetry_sdk::logs::LoggerProvider>>) -> Self {
Self {
logs: logs.into(),
#[cfg(feature = "opentelemetry-traces")]
traces: self.traces,
#[cfg(feature = "opentelemetry-metrics")]
metrics: self.metrics,
}
}

/// Flushes all metrics, traces, and logs, warning; this blocks the
/// current thread.
pub fn flush(&self) -> Result<(), OpenTelemetryError> {
#[cfg(feature = "opentelemetry-metrics")]
if let Some(metrics) = &self.metrics {
metrics.force_flush()?;
}

#[cfg(feature = "opentelemetry-traces")]
if let Some(traces) = &self.traces {
for r in traces.force_flush() {
r?;
}
}

#[cfg(feature = "opentelemetry-logs")]
if let Some(logs) = &self.logs {
for r in logs.force_flush() {
r?;
}
}

Ok(())
}

/// Shuts down all metrics, traces, and logs.
pub fn shutdown(&self) -> Result<(), OpenTelemetryError> {
#[cfg(feature = "opentelemetry-metrics")]
if let Some(metrics) = &self.metrics {
metrics.shutdown()?;
}

#[cfg(feature = "opentelemetry-traces")]
if let Some(traces) = &self.traces {
traces.shutdown()?;
}

#[cfg(feature = "opentelemetry-logs")]
if let Some(logs) = &self.logs {
logs.shutdown()?;
}

Ok(())
}
}
}

#[cfg(test)]
#[cfg_attr(all(test, coverage_nightly), coverage(off))]
mod tests {
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use std::sync::Arc;

use bytes::Bytes;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry::{Key, KeyValue, Value};
use opentelemetry_sdk::metrics::data::{ResourceMetrics, Sum};
use opentelemetry_sdk::metrics::reader::MetricReader;
use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider};
use opentelemetry_sdk::Resource;
use scuffle_bootstrap::{GlobalWithoutConfig, Service};

use crate::{TelemetryConfig, TelemetrySvc};
Expand Down Expand Up @@ -644,4 +512,192 @@ mod tests {

task_handle.await.unwrap().unwrap();
}

#[tokio::test]
async fn opentelemetry() {
#[derive(Debug, Clone)]
struct TestReader(Arc<ManualReader>);

impl TestReader {
fn new() -> Self {
Self(Arc::new(ManualReaderBuilder::new().build()))
}

fn read(&self) -> ResourceMetrics {
let mut metrics = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: vec![],
};

self.0.collect(&mut metrics).expect("collect");

metrics
}
}

impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader {
fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(
&self,
rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.force_flush()
}

fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
self.0.shutdown()
}

fn temporality(
&self,
kind: opentelemetry_sdk::metrics::InstrumentKind,
) -> opentelemetry_sdk::metrics::Temporality {
self.0.temporality(kind)
}
}

struct TestGlobal {
open_telemetry: crate::opentelemetry::OpenTelemetry,
reader: TestReader,
}

impl GlobalWithoutConfig for TestGlobal {
async fn init() -> anyhow::Result<Arc<Self>> {
let reader = TestReader::new();
let provider = SdkMeterProvider::builder()
.with_resource(Resource::new_with_defaults(vec![KeyValue::new(
"service.name",
"test_service",
)]))
.with_reader(reader.clone())
.build();
opentelemetry::global::set_meter_provider(provider);

Ok(Arc::new(TestGlobal {
open_telemetry: crate::opentelemetry::OpenTelemetry::new(),
reader,
}))
}
}

impl TelemetryConfig for TestGlobal {
fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
Some(&self.open_telemetry)
}
}

#[scuffle_metrics::metrics]
mod example {
use scuffle_metrics::{CounterU64, MetricEnum};

#[derive(MetricEnum)]
pub enum Kind {
Http,
Grpc,
}

#[metrics(unit = "requests")]
pub fn request(kind: Kind) -> CounterU64;
}

let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
let task_handle = tokio::spawn(TelemetrySvc.run(Arc::clone(&global), scuffle_context::Context::global()));

let metrics = global.reader.read();

assert!(!metrics.resource.is_empty());
assert_eq!(
metrics.resource.get(Key::from_static_str("service.name")),
Some(Value::from("test_service"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.name")),
Some(Value::from("opentelemetry"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.version")),
Some(Value::from("0.27.1"))
);
assert_eq!(
metrics.resource.get(Key::from_static_str("telemetry.sdk.language")),
Some(Value::from("rust"))
);

assert!(metrics.scope_metrics.is_empty());

example::request(example::Kind::Http).incr();

let metrics = global.reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].scope.name(), "scuffle-bootstrap-telemetry");
assert_eq!(metrics.scope_metrics[0].scope.version(), Some("0.0.3"));
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics[0].name, "example_request");
assert_eq!(metrics.scope_metrics[0].metrics[0].description, "");
assert_eq!(metrics.scope_metrics[0].metrics[0].unit, "requests");
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.temporality, opentelemetry_sdk::metrics::Temporality::Cumulative);
assert_eq!(sum.is_monotonic, true);
assert_eq!(sum.data_points.len(), 1);
assert_eq!(sum.data_points[0].value, 1);
assert_eq!(sum.data_points[0].attributes.len(), 1);
assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind"));
assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http"));

example::request(example::Kind::Http).incr();

let metrics = global.reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.data_points.len(), 1);
assert_eq!(sum.data_points[0].value, 2);
assert_eq!(sum.data_points[0].attributes.len(), 1);
assert_eq!(sum.data_points[0].attributes[0].key, Key::from_static_str("kind"));
assert_eq!(sum.data_points[0].attributes[0].value, Value::from("Http"));

example::request(example::Kind::Grpc).incr();

let metrics = global.reader.read();

assert_eq!(metrics.scope_metrics.len(), 1);
assert_eq!(metrics.scope_metrics[0].metrics.len(), 1);
let sum: &Sum<u64> = metrics.scope_metrics[0].metrics[0]
.data
.as_any()
.downcast_ref()
.expect("wrong data type");
assert_eq!(sum.data_points.len(), 2);
let grpc = sum
.data_points
.iter()
.find(|dp| {
dp.attributes.len() == 1
&& dp.attributes[0].key == Key::from_static_str("kind")
&& dp.attributes[0].value == Value::from("Grpc")
})
.expect("grpc data point not found");
assert_eq!(grpc.value, 1);

scuffle_context::Handler::global().shutdown().await;

task_handle.await.unwrap().unwrap();
}
}
Loading

0 comments on commit ca399ca

Please sign in to comment.