From 2c249887c85aad160e86e8ea2b78b81cab38d71c Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 5 Dec 2024 13:09:57 +0100 Subject: [PATCH 01/12] chore: modify LogExporter and SpanExporter interfaces to support returning failure Address review comments --- opentelemetry-otlp/CHANGELOG.md | 4 ++-- opentelemetry-otlp/src/exporter/http/logs.rs | 5 +++-- opentelemetry-otlp/src/exporter/http/trace.rs | 7 ++++--- opentelemetry-otlp/src/exporter/tonic/logs.rs | 3 ++- opentelemetry-otlp/src/exporter/tonic/trace.rs | 3 ++- opentelemetry-sdk/CHANGELOG.md | 7 +++++++ opentelemetry-sdk/src/export/logs/mod.rs | 5 ++++- opentelemetry-sdk/src/export/trace.rs | 15 +++++++++++---- opentelemetry-sdk/src/logs/log_processor.rs | 12 ++++++++---- .../src/testing/logs/in_memory_exporter.rs | 3 ++- .../src/testing/trace/in_memory_exporter.rs | 5 +++-- .../src/testing/trace/span_exporters.rs | 11 +++++++---- opentelemetry-sdk/src/trace/span_processor.rs | 16 +++++++--------- opentelemetry-stdout/CHANGELOG.md | 4 ++++ opentelemetry-stdout/src/logs/exporter.rs | 3 ++- opentelemetry-stdout/src/trace/exporter.rs | 5 +++-- 16 files changed, 71 insertions(+), 37 deletions(-) diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 2d0676d5bb..8b95bd59fd 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -1,9 +1,9 @@ # Changelog ## vNext - - Bump msrv to 1.75.0. - +- `OtlpHttpClient.shutdown` `TonicLogsClient.shutdown`, and `TonicTracesClient.shutdown` now explicitly return a result. The + semantics of the method have not changed, but you will have a new lint encouraging you to consume these results. ## 0.27.0 diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 905fb638d0..e3dd7c8c14 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -49,8 +49,9 @@ impl LogExporter for OtlpHttpClient { Ok(()) } - fn shutdown(&mut self) { - let _ = self.client.lock().map(|mut c| c.take()); + fn shutdown(&mut self) -> LogResult<()> { + let _ = self.client.lock()?.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index d188dc8911..056042ebe9 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use futures_core::future::BoxFuture; use http::{header::CONTENT_TYPE, Method}; -use opentelemetry::{otel_debug, trace::TraceError}; +use opentelemetry::trace::{TraceError, TraceResult}; use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use super::OtlpHttpClient; @@ -64,8 +64,9 @@ impl SpanExporter for OtlpHttpClient { }) } - fn shutdown(&mut self) { - let _ = self.client.lock().map(|mut c| c.take()); + fn shutdown(&mut self) -> TraceResult<()> { + let _ = self.client.lock()?.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 2737c2bd99..fa120d7677 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -89,8 +89,9 @@ impl LogExporter for TonicLogsClient { Ok(()) } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> LogResult<()> { let _ = self.inner.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 998acafad5..01ba8c8b39 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -92,8 +92,9 @@ impl SpanExporter for TonicTracesClient { }) } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> TraceResult<()> { let _ = self.inner.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 61991a9b75..b9fa048e2e 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,11 @@ ## vNext +- If you are an exporter author, the trait functions `LogExporter.shutdown` and `TraceExporter.shutdown` must now return a result. Note that implementing shutdown is optional as the trait provides a default implementation that returns Ok(()). + +- The trait functions `LogExporter.shutdown` and `TraceExporter.shutdown` now explicitly return a result. The + semantics of the method have not changed, but you will have a new lint encouraging you to consume these results. + - *Breaking(Affects custom metric exporter authors only)* `start_time` and `time` is moved from DataPoints to aggregations (Sum, Gauge, Histogram, ExpoHistogram) see [#2377](https://github.com/open-telemetry/opentelemetry-rust/pull/2377) and [#2411](https://github.com/open-telemetry/opentelemetry-rust/pull/2411), to reduce memory. - *Breaking* `start_time` is no longer optional for `Sum` aggregation, see [#2367](https://github.com/open-telemetry/opentelemetry-rust/pull/2367), but is still optional for `Gauge` aggregation see [#2389](https://github.com/open-telemetry/opentelemetry-rust/pull/2389). @@ -14,6 +19,7 @@ [#2338](https://github.com/open-telemetry/opentelemetry-rust/pull/2338) - `ResourceDetector.detect()` no longer supports timeout option. - `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`) + - *Feature*: Add `ResourceBuilder` for an easy way to create new `Resource`s - *Breaking*: Remove `Resource::{new,empty,from_detectors,new_with_defaults,from_schema_url,merge,default}` from public api. To create Resources you should only use `Resource::builder()` or `Resource::builder_empty()`. See [#2322](https://github.com/open-telemetry/opentelemetry-rust/pull/2322) for a migration guide. Example Usage: @@ -156,6 +162,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 5e2168a7ce..dff2893ba9 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -83,7 +83,10 @@ pub trait LogExporter: Send + Sync + Debug { /// async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>; /// Shuts down the exporter. - fn shutdown(&mut self) {} + fn shutdown(&mut self) -> LogResult<()> { + Ok(()) + } + #[cfg(feature = "spec_unstable_logs_enabled")] /// Chek if logs are enabled. fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index c606d85b1a..775d53ccae 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -7,8 +7,13 @@ use std::borrow::Cow; use std::fmt::Debug; use std::time::SystemTime; -/// Describes the result of an export. -pub type ExportResult = Result<(), TraceError>; +/// Describes the results of other operations on the trace API. +pub type TraceResult = Result; + +/// Describes the results of an export +/// Note: This is an alias we will remove in the future, favouring +/// using TraceResult directly. +pub type ExportResult = TraceResult<()>; /// `SpanExporter` defines the interface that protocol-specific exporters must /// implement so that they can be plugged into OpenTelemetry SDK and support @@ -30,7 +35,7 @@ pub trait SpanExporter: Send + Sync + Debug { /// /// Any retry logic that is required by the exporter is the responsibility /// of the exporter. - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult>; + fn export(&mut self, batch: Vec) -> BoxFuture<'static, TraceResult<()>>; /// Shuts down the exporter. Called when SDK is shut down. This is an /// opportunity for exporter to do any cleanup required. @@ -43,7 +48,9 @@ pub trait SpanExporter: Send + Sync + Debug { /// flush the data and the destination is unavailable). SDK authors /// can decide if they want to make the shutdown timeout /// configurable. - fn shutdown(&mut self) {} + fn shutdown(&mut self) -> TraceResult<()> { + Ok(()) + } /// This is a hint to ensure that the export of any Spans the exporter /// has received prior to the call to this function SHOULD be completed diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index cff0d44be2..f26a90418b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -144,7 +144,7 @@ impl LogProcessor for SimpleLogProcessor { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); + exporter.shutdown()?; Ok(()) } else { Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) @@ -650,7 +650,13 @@ impl BatchLogProcessorWithAsyncRuntime { ) .await; - exporter.shutdown(); + if let Err(e) = exporter.shutdown() { + otel_warn!( + name: "BatchLogProcessor.Shutdown.Failed", + message = "failed shutting down exporter cleanly", + error = format!("{:?}", e) + ); + }; if let Err(send_error) = ch.send(result) { otel_debug!( @@ -934,8 +940,6 @@ mod tests { Ok(()) } - fn shutdown(&mut self) {} - fn set_resource(&mut self, resource: &Resource) { self.resource .lock() diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 4ed62e90a1..76c4f8897a 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -195,10 +195,11 @@ impl LogExporter for InMemoryLogExporter { Ok(()) } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> LogResult<()> { if self.should_reset_on_shutdown { self.reset(); } + Ok(()) } fn set_resource(&mut self, resource: &Resource) { diff --git a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs index 0ae261916a..631e2935c8 100644 --- a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs @@ -142,8 +142,9 @@ impl SpanExporter for InMemorySpanExporter { Box::pin(std::future::ready(Ok(()))) } - fn shutdown(&mut self) { - self.reset() + fn shutdown(&mut self) -> TraceResult<()> { + self.reset(); + Ok(()) } fn set_resource(&mut self, resource: &Resource) { diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index e9996e3fc8..f75189b98e 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -1,11 +1,11 @@ use crate::{ - export::trace::{ExportResult, SpanData, SpanExporter}, + export::trace::{ExportResult, SpanData, SpanExporter, TraceResult}, trace::{SpanEvents, SpanLinks}, }; use futures_util::future::BoxFuture; pub use opentelemetry::testing::trace::TestSpan; use opentelemetry::{ - trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}, + trace::{SpanContext, SpanId, SpanKind, Status, TraceError, TraceFlags, TraceId, TraceState}, InstrumentationScope, }; use std::fmt::{Display, Formatter}; @@ -53,8 +53,11 @@ impl SpanExporter for TokioSpanExporter { Box::pin(std::future::ready(Ok(()))) } - fn shutdown(&mut self) { - self.tx_shutdown.send(()).unwrap(); + fn shutdown(&mut self) -> TraceResult<()> { + self.tx_shutdown + .send(()) + .map_err::(|err| TraceError::Other(Box::new(err)))?; + Ok(()) } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 5023ca2bc5..1a5ea75220 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -149,14 +149,7 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&self) -> TraceResult<()> { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); - Ok(()) - } else { - Err(TraceError::Other( - "SimpleSpanProcessor mutex poison at shutdown".into(), - )) - } + self.exporter.lock()?.shutdown() } fn set_resource(&mut self, resource: &Resource) { @@ -432,7 +425,12 @@ impl BatchSpanProcessorInternal { // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { self.flush(Some(ch)).await; - self.exporter.shutdown(); + if let Err(e) = self.exporter.shutdown() { + otel_warn!( + name: "SpanProcessor.Shutdown.Failed", + message = "failed shutting down exporter cleanly", + error = format!("{:?}", e)); + } return false; } // propagate the resource diff --git a/opentelemetry-stdout/CHANGELOG.md b/opentelemetry-stdout/CHANGELOG.md index 046262458d..42cc896e65 100644 --- a/opentelemetry-stdout/CHANGELOG.md +++ b/opentelemetry-stdout/CHANGELOG.md @@ -5,6 +5,10 @@ - Bump msrv to 1.75.0. - *Breaking* time fields, `StartTime` and `EndTime` is printed on aggregation (Sum, Gauge, Histogram, ExpoHistogram) with 2 tabs, previously it was on aggregation data point, with 3 tabs, see [#2377](https://github.com/open-telemetry/opentelemetry-rust/pull/2377) and [#2411](https://github.com/open-telemetry/opentelemetry-rust/pull/2411). +- `LogExporter.shutdown` and `SpanExporter.shutdown` now explicitly return a result. The + semantics of the method have not changed, but you will have a new lint encouraging you to consume these results. + + ## 0.27.0 Released 2024-Nov-11 diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 2633f5a072..cc0314e0df 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -60,8 +60,9 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> LogResult<()> { self.is_shutdown.store(true, atomic::Ordering::SeqCst); + Ok(()) } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index e2e0fbace9..e16bf06b29 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use core::fmt; use futures_util::future::BoxFuture; -use opentelemetry::trace::TraceError; +use opentelemetry::trace::{TraceError, TraceResult}; use opentelemetry_sdk::export::{self, trace::ExportResult}; use std::sync::atomic; @@ -59,8 +59,9 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { } } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> TraceResult<()> { self.is_shutdown.store(true, atomic::Ordering::SeqCst); + Ok(()) } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { From 5b243a6ab99be932a2fc88d2e6382c6d688b125d Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 19 Dec 2024 15:02:35 +0100 Subject: [PATCH 02/12] chore: address lints and comments --- opentelemetry-otlp/CHANGELOG.md | 3 +-- opentelemetry-otlp/src/exporter/http/trace.rs | 1 + opentelemetry-otlp/src/exporter/tonic/trace.rs | 2 +- opentelemetry-stdout/CHANGELOG.md | 3 +-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 8b95bd59fd..778b1a31d2 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -2,8 +2,7 @@ ## vNext - Bump msrv to 1.75.0. -- `OtlpHttpClient.shutdown` `TonicLogsClient.shutdown`, and `TonicTracesClient.shutdown` now explicitly return a result. The - semantics of the method have not changed, but you will have a new lint encouraging you to consume these results. +- `OtlpHttpClient.shutdown` `TonicLogsClient.shutdown`, and `TonicTracesClient.shutdown` now explicitly return a result. ## 0.27.0 diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 056042ebe9..f1fac61467 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use futures_core::future::BoxFuture; use http::{header::CONTENT_TYPE, Method}; +use opentelemetry::otel_debug; use opentelemetry::trace::{TraceError, TraceResult}; use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 01ba8c8b39..3fd03e7e87 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -7,7 +7,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ }; use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; - +use opentelemetry::trace::TraceResult; use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; use super::BoxInterceptor; diff --git a/opentelemetry-stdout/CHANGELOG.md b/opentelemetry-stdout/CHANGELOG.md index 42cc896e65..2f5fd9e220 100644 --- a/opentelemetry-stdout/CHANGELOG.md +++ b/opentelemetry-stdout/CHANGELOG.md @@ -5,8 +5,7 @@ - Bump msrv to 1.75.0. - *Breaking* time fields, `StartTime` and `EndTime` is printed on aggregation (Sum, Gauge, Histogram, ExpoHistogram) with 2 tabs, previously it was on aggregation data point, with 3 tabs, see [#2377](https://github.com/open-telemetry/opentelemetry-rust/pull/2377) and [#2411](https://github.com/open-telemetry/opentelemetry-rust/pull/2411). -- `LogExporter.shutdown` and `SpanExporter.shutdown` now explicitly return a result. The - semantics of the method have not changed, but you will have a new lint encouraging you to consume these results. +- `LogExporter.shutdown` and `SpanExporter.shutdown` now explicitly return a result ## 0.27.0 From 77d9a01472ce8ddf295619baa69b222a391e2b5a Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 19 Dec 2024 15:13:13 +0100 Subject: [PATCH 03/12] chore: address lint --- opentelemetry-otlp/src/exporter/tonic/trace.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 3fd03e7e87..a0fc02a38e 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -1,14 +1,14 @@ use core::fmt; use futures_core::future::BoxFuture; +use opentelemetry::trace::TraceResult; use opentelemetry::{otel_debug, trace::TraceError}; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; +use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; -use opentelemetry::trace::TraceResult; -use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; use super::BoxInterceptor; From dc5d9b132dbe77c45e7f98286f41d04af0d305c7 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 19 Dec 2024 15:26:30 +0100 Subject: [PATCH 04/12] chore: Switch SpanExporter to use ShutdownResult --- opentelemetry-otlp/src/exporter/http/trace.rs | 4 +- .../src/exporter/tonic/trace.rs | 4 +- opentelemetry-sdk/src/export/trace.rs | 46 +++++++++++++++---- .../src/testing/trace/in_memory_exporter.rs | 4 +- .../src/testing/trace/span_exporters.rs | 7 +-- opentelemetry-sdk/src/trace/span_processor.rs | 5 +- opentelemetry-stdout/src/trace/exporter.rs | 6 +-- 7 files changed, 54 insertions(+), 22 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index f1fac61467..e6aa92fac2 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -4,7 +4,7 @@ use futures_core::future::BoxFuture; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry::trace::{TraceError, TraceResult}; -use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; +use opentelemetry_sdk::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; use super::OtlpHttpClient; @@ -65,7 +65,7 @@ impl SpanExporter for OtlpHttpClient { }) } - fn shutdown(&mut self) -> TraceResult<()> { + fn shutdown(&mut self) -> ShutdownResult { let _ = self.client.lock()?.take(); Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index a0fc02a38e..a7812ff9a7 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -7,7 +7,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; -use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; +use opentelemetry_sdk::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -92,7 +92,7 @@ impl SpanExporter for TonicTracesClient { }) } - fn shutdown(&mut self) -> TraceResult<()> { + fn shutdown(&mut self) -> ShutdownResult { let _ = self.inner.take(); Ok(()) } diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index 775d53ccae..206b1c4e3c 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -5,15 +5,15 @@ use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError}; use opentelemetry::{InstrumentationScope, KeyValue}; use std::borrow::Cow; use std::fmt::Debug; -use std::time::SystemTime; +use std::sync::PoisonError; +use std::time::{Duration, SystemTime}; +use thiserror::Error; -/// Describes the results of other operations on the trace API. -pub type TraceResult = Result; +/// Results of an export operation +pub type ExportResult = Result<(), TraceError>; -/// Describes the results of an export -/// Note: This is an alias we will remove in the future, favouring -/// using TraceResult directly. -pub type ExportResult = TraceResult<()>; +/// Result of a shutdown operation +pub type ShutdownResult = Result<(), ShutdownError>; /// `SpanExporter` defines the interface that protocol-specific exporters must /// implement so that they can be plugged into OpenTelemetry SDK and support @@ -35,7 +35,7 @@ pub trait SpanExporter: Send + Sync + Debug { /// /// Any retry logic that is required by the exporter is the responsibility /// of the exporter. - fn export(&mut self, batch: Vec) -> BoxFuture<'static, TraceResult<()>>; + fn export(&mut self, batch: Vec) -> BoxFuture<'static, Result<(), TraceError>>; /// Shuts down the exporter. Called when SDK is shut down. This is an /// opportunity for exporter to do any cleanup required. @@ -48,7 +48,7 @@ pub trait SpanExporter: Send + Sync + Debug { /// flush the data and the destination is unavailable). SDK authors /// can decide if they want to make the shutdown timeout /// configurable. - fn shutdown(&mut self) -> TraceResult<()> { + fn shutdown(&mut self) -> ShutdownResult { Ok(()) } @@ -105,3 +105,31 @@ pub struct SpanData { /// Instrumentation scope that produced this span pub instrumentation_scope: InstrumentationScope, } + +/// Errors returned by shutdown operations in the Export API. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ShutdownError { + /// The exporter has already been shut down. + #[error("Shutdown already performed")] + AlreadyShutdown, + + /// Shutdown timed out before completing. + #[error("Shutdown timed out after {0:?}")] + Timeout(Duration), + + /// An unexpected error occurred during shutdown. + #[error(transparent)] + Other(#[from] Box), +} + +/// Custom error wrapper for string messages. +#[derive(Error, Debug)] +#[error("{0}")] +struct CustomError(String); + +impl From> for ShutdownError { + fn from(err: PoisonError) -> Self { + ShutdownError::Other(Box::new(CustomError(err.to_string()))) + } +} diff --git a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs index 631e2935c8..e1954babc5 100644 --- a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; use crate::resource::Resource; use futures_util::future::BoxFuture; use opentelemetry::trace::{TraceError, TraceResult}; @@ -142,7 +142,7 @@ impl SpanExporter for InMemorySpanExporter { Box::pin(std::future::ready(Ok(()))) } - fn shutdown(&mut self) -> TraceResult<()> { + fn shutdown(&mut self) -> ShutdownResult { self.reset(); Ok(()) } diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index f75189b98e..eee0744ebf 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -1,5 +1,6 @@ +use crate::export::trace::{ShutdownError, ShutdownResult}; use crate::{ - export::trace::{ExportResult, SpanData, SpanExporter, TraceResult}, + export::trace::{ExportResult, SpanData, SpanExporter}, trace::{SpanEvents, SpanLinks}, }; use futures_util::future::BoxFuture; @@ -53,10 +54,10 @@ impl SpanExporter for TokioSpanExporter { Box::pin(std::future::ready(Ok(()))) } - fn shutdown(&mut self) -> TraceResult<()> { + fn shutdown(&mut self) -> ShutdownResult { self.tx_shutdown .send(()) - .map_err::(|err| TraceError::Other(Box::new(err)))?; + .map_err::(|err| ShutdownError::Other(Box::new(err)))?; Ok(()) } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 1a5ea75220..c252636351 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -149,7 +149,10 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&self) -> TraceResult<()> { - self.exporter.lock()?.shutdown() + self.exporter + .lock()? + .shutdown() + .map_err(|e| TraceError::Other(Box::new(e))) } fn set_resource(&mut self, resource: &Resource) { diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index e16bf06b29..a2e35faaa8 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -2,10 +2,10 @@ use chrono::{DateTime, Utc}; use core::fmt; use futures_util::future::BoxFuture; use opentelemetry::trace::{TraceError, TraceResult}; +use opentelemetry_sdk::export::trace::ShutdownResult; use opentelemetry_sdk::export::{self, trace::ExportResult}; -use std::sync::atomic; - use opentelemetry_sdk::resource::Resource; +use std::sync::atomic; /// An OpenTelemetry exporter that writes Spans to stdout on export. pub struct SpanExporter { @@ -59,7 +59,7 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { } } - fn shutdown(&mut self) -> TraceResult<()> { + fn shutdown(&mut self) -> ShutdownResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) } From 1b49188086329fe58fc579bd436abb4be56a98c5 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 19 Dec 2024 15:55:59 +0100 Subject: [PATCH 05/12] chore: change logs exporter to use ShutdownResult --- opentelemetry-otlp/src/exporter/http/logs.rs | 4 ++-- opentelemetry-otlp/src/exporter/http/trace.rs | 2 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 4 ++-- .../src/exporter/tonic/trace.rs | 1 - opentelemetry-sdk/src/export/logs/mod.rs | 7 +++++-- opentelemetry-sdk/src/logs/error.rs | 19 +++++++++++++++++++ opentelemetry-sdk/src/logs/log_processor.rs | 4 +++- opentelemetry-sdk/src/logs/mod.rs | 2 +- .../src/testing/logs/in_memory_exporter.rs | 4 ++-- .../src/testing/trace/span_exporters.rs | 2 +- opentelemetry-stdout/src/logs/exporter.rs | 4 ++-- 11 files changed, 38 insertions(+), 15 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index e3dd7c8c14..7f1ba1e2b3 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter, ShutdownResult}; use opentelemetry_sdk::logs::{LogError, LogResult}; use super::OtlpHttpClient; @@ -49,7 +49,7 @@ impl LogExporter for OtlpHttpClient { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&mut self) -> ShutdownResult { let _ = self.client.lock()?.take(); Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index e6aa92fac2..83fc6c2a6c 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use futures_core::future::BoxFuture; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; -use opentelemetry::trace::{TraceError, TraceResult}; +use opentelemetry::trace::TraceError; use opentelemetry_sdk::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; use super::OtlpHttpClient; diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index fa120d7677..93aaded954 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -4,7 +4,7 @@ use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter, ShutdownResult}; use opentelemetry_sdk::logs::{LogError, LogResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; @@ -89,7 +89,7 @@ impl LogExporter for TonicLogsClient { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&mut self) -> ShutdownResult { let _ = self.inner.take(); Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index a7812ff9a7..0660c9a8e2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -1,7 +1,6 @@ use core::fmt; use futures_core::future::BoxFuture; -use opentelemetry::trace::TraceResult; use opentelemetry::{otel_debug, trace::TraceError}; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index dff2893ba9..a26938211d 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -1,6 +1,6 @@ //! Log exporters use crate::logs::LogRecord; -use crate::logs::{LogError, LogResult}; +use crate::logs::{LogError, LogResult, ShutdownError}; use crate::Resource; use async_trait::async_trait; #[cfg(feature = "spec_unstable_logs_enabled")] @@ -83,7 +83,7 @@ pub trait LogExporter: Send + Sync + Debug { /// async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>; /// Shuts down the exporter. - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&mut self) -> ShutdownResult { Ok(()) } @@ -99,3 +99,6 @@ pub trait LogExporter: Send + Sync + Debug { /// Describes the result of an export. pub type ExportResult = Result<(), LogError>; + +/// Describes the result of a shutdown in the log SDK. +pub type ShutdownResult = Result<(), ShutdownError>; diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs index 4f33ba6dbf..3c8e72cd2d 100644 --- a/opentelemetry-sdk/src/logs/error.rs +++ b/opentelemetry-sdk/src/logs/error.rs @@ -61,3 +61,22 @@ impl From> for LogError { #[derive(Error, Debug)] #[error("{0}")] struct Custom(String); + +/// Errors returned during shutdown +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ShutdownError { + /// Mutex lock poisoning + #[error("mutex lock poisioning for {0}")] + MutexPoisoned(String), + + /// Other errors propagated from log SDK that weren't covered above. + #[error(transparent)] + Other(#[from] Box), +} + +impl From> for ShutdownError { + fn from(err: PoisonError) -> Self { + ShutdownError::Other(err.to_string().into()) + } +} diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index f26a90418b..75d3b1b73e 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -144,7 +144,9 @@ impl LogProcessor for SimpleLogProcessor { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown()?; + exporter + .shutdown() + .map_err(|e| LogError::Other(Box::new(e)))?; Ok(()) } else { Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index b1341b9e96..04bb4ce4c7 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -4,7 +4,7 @@ mod log_emitter; mod log_processor; pub(crate) mod record; -pub use error::{LogError, LogResult}; +pub use error::{LogError, LogResult, ShutdownError}; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 76c4f8897a..9abaefdc8f 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,6 +1,6 @@ use crate::export::logs::{LogBatch, LogExporter}; -use crate::logs::LogRecord; use crate::logs::{LogError, LogResult}; +use crate::logs::{LogRecord, ShutdownError}; use crate::Resource; use async_trait::async_trait; use opentelemetry::InstrumentationScope; @@ -195,7 +195,7 @@ impl LogExporter for InMemoryLogExporter { Ok(()) } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&mut self) -> Result<(), ShutdownError> { if self.should_reset_on_shutdown { self.reset(); } diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index eee0744ebf..d8c8a055c6 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -6,7 +6,7 @@ use crate::{ use futures_util::future::BoxFuture; pub use opentelemetry::testing::trace::TestSpan; use opentelemetry::{ - trace::{SpanContext, SpanId, SpanKind, Status, TraceError, TraceFlags, TraceId, TraceState}, + trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}, InstrumentationScope, }; use std::fmt::{Display, Formatter}; diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index cc0314e0df..9edeff333b 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::fmt; -use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::export::logs::{LogBatch, ShutdownResult}; use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; @@ -60,7 +60,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } - fn shutdown(&mut self) -> LogResult<()> { + fn shutdown(&mut self) -> ShutdownResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) } From a670a4ea32b55eab0c9a5efae6af78226a76b1cd Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Thu, 19 Dec 2024 16:04:19 +0100 Subject: [PATCH 06/12] lints --- examples/tracing-grpc/src/client.rs | 2 +- examples/tracing-grpc/src/server.rs | 2 +- opentelemetry-appender-log/src/lib.rs | 2 +- opentelemetry-stdout/src/trace/exporter.rs | 2 +- stress/src/throughput.rs | 6 +++--- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/tracing-grpc/src/client.rs b/examples/tracing-grpc/src/client.rs index c871e9ca8d..29a9353621 100644 --- a/examples/tracing-grpc/src/client.rs +++ b/examples/tracing-grpc/src/client.rs @@ -22,7 +22,7 @@ fn init_tracer() -> sdktrace::TracerProvider { struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); -impl<'a> Injector for MetadataMap<'a> { +impl Injector for MetadataMap<'_> { /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs fn set(&mut self, key: &str, value: String) { if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index aadb77b6e6..24a9e09481 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -29,7 +29,7 @@ pub mod hello_world { struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap); -impl<'a> Extractor for MetadataMap<'a> { +impl Extractor for MetadataMap<'_> { /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None fn get(&self, key: &str) -> Option<&str> { self.0.get(key).and_then(|metadata| metadata.to_str().ok()) diff --git a/opentelemetry-appender-log/src/lib.rs b/opentelemetry-appender-log/src/lib.rs index 2cc8b1e0fe..81ec10d129 100644 --- a/opentelemetry-appender-log/src/lib.rs +++ b/opentelemetry-appender-log/src/lib.rs @@ -239,7 +239,7 @@ mod any_value { pub(crate) fn serialize(value: log::kv::Value) -> Option { struct ValueVisitor(Option); - impl<'kvs> log::kv::VisitValue<'kvs> for ValueVisitor { + impl log::kv::VisitValue<'_> for ValueVisitor { fn visit_any(&mut self, value: log::kv::Value) -> Result<(), log::kv::Error> { self.0 = Some(AnyValue::String(StringValue::from(value.to_string()))); diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index a2e35faaa8..613c5dc7e3 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use core::fmt; use futures_util::future::BoxFuture; -use opentelemetry::trace::{TraceError, TraceResult}; +use opentelemetry::trace::{TraceError}; use opentelemetry_sdk::export::trace::ShutdownResult; use opentelemetry_sdk::export::{self, trace::ExportResult}; use opentelemetry_sdk::resource::Resource; diff --git a/stress/src/throughput.rs b/stress/src/throughput.rs index 8116f904ee..131762fab7 100644 --- a/stress/src/throughput.rs +++ b/stress/src/throughput.rs @@ -140,8 +140,8 @@ struct UnsafeSlice<'a> { slice: &'a [UnsafeCell], } -unsafe impl<'a> Send for UnsafeSlice<'a> {} -unsafe impl<'a> Sync for UnsafeSlice<'a> {} +unsafe impl Send for UnsafeSlice<'_> {} +unsafe impl Sync for UnsafeSlice<'_> {} impl<'a> UnsafeSlice<'a> { fn new(slice: &'a mut [WorkerStats]) -> Self { @@ -155,7 +155,7 @@ impl<'a> UnsafeSlice<'a> { #[inline(always)] unsafe fn increment(&self, i: usize) { let value = self.slice[i].get(); - (*value).count = (*value).count + 1; + (*value).count += 1; } #[inline(always)] From 3c68efa4a5d461129514c489d49931b3f8b824e7 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Fri, 20 Dec 2024 10:55:05 +0100 Subject: [PATCH 07/12] more error changes --- .../tests/integration_test/tests/logs.rs | 4 ++-- opentelemetry-sdk/src/export/logs/mod.rs | 8 ++++--- opentelemetry-sdk/src/export/trace.rs | 13 ++++++----- opentelemetry-sdk/src/logs/error.rs | 23 +++++++++++-------- opentelemetry-sdk/src/logs/log_emitter.rs | 4 ++-- opentelemetry-sdk/src/logs/log_processor.rs | 7 +++--- .../src/testing/logs/in_memory_exporter.rs | 23 +++++++++++-------- opentelemetry-stdout/src/trace/exporter.rs | 2 +- 8 files changed, 48 insertions(+), 36 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 39eb88a1e4..fe2399fdae 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -94,7 +94,7 @@ mod logtests { tokio::time::sleep(Duration::from_secs(10)).await; - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + let _ = assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); Ok(()) } @@ -122,7 +122,7 @@ mod logtests { } let _ = logger_provider.shutdown(); // tokio::time::sleep(Duration::from_secs(10)).await; - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + let _ = assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); Ok(()) } diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index a26938211d..4f9e078245 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -1,6 +1,6 @@ //! Log exporters use crate::logs::LogRecord; -use crate::logs::{LogError, LogResult, ShutdownError}; +use crate::logs::{LogError, ShutdownError}; use crate::Resource; use async_trait::async_trait; #[cfg(feature = "spec_unstable_logs_enabled")] @@ -81,8 +81,10 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>; - /// Shuts down the exporter. + async fn export(&self, batch: LogBatch<'_>) -> ExportResult; + + /// Shuts down the exporter. This function is idempotent; calling it + /// more than once has no additional effect. fn shutdown(&mut self) -> ShutdownResult { Ok(()) } diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index 206b1c4e3c..7b7fbb0a95 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -118,18 +118,19 @@ pub enum ShutdownError { #[error("Shutdown timed out after {0:?}")] Timeout(Duration), + /// The export client failed while holding the client lock. It is not + /// possible to complete the shutdown and a retry will not help. + /// This is something that should not happen and should likely emit some diagnostic. + #[error("export client failed while holding lock; cannot retry.")] + ClientFailed(String), + /// An unexpected error occurred during shutdown. #[error(transparent)] Other(#[from] Box), } -/// Custom error wrapper for string messages. -#[derive(Error, Debug)] -#[error("{0}")] -struct CustomError(String); - impl From> for ShutdownError { fn from(err: PoisonError) -> Self { - ShutdownError::Other(Box::new(CustomError(err.to_string()))) + ShutdownError::ClientFailed(format!("Mutex poisoned during shutdown: {}", err)) } } diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs index 3c8e72cd2d..bb9b29555f 100644 --- a/opentelemetry-sdk/src/logs/error.rs +++ b/opentelemetry-sdk/src/logs/error.rs @@ -18,14 +18,16 @@ pub enum LogError { #[error("Exporter timed out after {} seconds", .0.as_secs())] ExportTimedOut(Duration), + /// The export client failed while holding the client lock. It is not + /// possible to complete the shutdown and a retry will not help. + /// This is something that should not happen and should likely emit some diagnostic. + #[error("export client failed while holding lock; cannot retry.")] + ClientFailed(String), + /// Processor is already shutdown #[error("{0} already shutdown")] AlreadyShutdown(String), - /// Mutex lock poisoning - #[error("mutex lock poisioning for {0}")] - MutexPoisoned(String), - /// Other errors propagated from log SDK that weren't covered above. #[error(transparent)] Other(#[from] Box), @@ -54,9 +56,10 @@ impl From<&'static str> for LogError { impl From> for LogError { fn from(err: PoisonError) -> Self { - LogError::Other(err.to_string().into()) + LogError::ClientFailed(err.to_string().into()) } } + /// Wrap type for string #[derive(Error, Debug)] #[error("{0}")] @@ -66,9 +69,11 @@ struct Custom(String); #[derive(Error, Debug)] #[non_exhaustive] pub enum ShutdownError { - /// Mutex lock poisoning - #[error("mutex lock poisioning for {0}")] - MutexPoisoned(String), + /// The export client failed while holding the client lock. It is not + /// possible to complete the shutdown and a retry will not help. + /// This is something that should not happen and should likely emit some diagnostic. + #[error("export client failed while holding lock; cannot retry.")] + ClientFailed(String), /// Other errors propagated from log SDK that weren't covered above. #[error(transparent)] @@ -77,6 +82,6 @@ pub enum ShutdownError { impl From> for ShutdownError { fn from(err: PoisonError) -> Self { - ShutdownError::Other(err.to_string().into()) + ShutdownError::ClientFailed(format!("Mutex poisoned during shutdown: {}", err)) } } diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 479ca36dd2..b763bd0e60 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -141,9 +141,9 @@ impl LoggerProviderInner { // which is non-actionable by the user match err { // specific handling for mutex poisioning - LogError::MutexPoisoned(_) => { + LogError::ClientFailed(_) => { otel_debug!( - name: "LoggerProvider.Drop.ShutdownMutexPoisoned", + name: "LoggerProvider.Drop.ShutdownClientFailed", ); } _ => { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 75d3b1b73e..d71068e5f9 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -113,14 +113,14 @@ impl LogProcessor for SimpleLogProcessor { let result = self .exporter .lock() - .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) + .map_err(|_| LogError::ClientFailed("SimpleLogProcessor".into())) .and_then(|exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); // Handle errors with specific static names match result { - Err(LogError::MutexPoisoned(_)) => { + Err(LogError::ClientFailed(_)) => { // logging as debug as this is not a user error otel_debug!( name: "SimpleLogProcessor.Emit.MutexPoisoning", @@ -149,7 +149,8 @@ impl LogProcessor for SimpleLogProcessor { .map_err(|e| LogError::Other(Box::new(e)))?; Ok(()) } else { - Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) + // Failing to get the mutex means the export client failed whilst holding it + Err(LogError::ClientFailed("SimpleLogProcessor".into())) } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 9abaefdc8f..66a1e2948d 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::{LogBatch, LogExporter}; +use crate::export::logs::{ExportResult, LogBatch, LogExporter}; use crate::logs::{LogError, LogResult}; use crate::logs::{LogRecord, ShutdownError}; use crate::Resource; @@ -172,19 +172,21 @@ impl InMemoryLogExporter { /// exporter.reset(); /// ``` /// - pub fn reset(&self) { - let _ = self - .logs - .lock() - .map(|mut logs_guard| logs_guard.clear()) - .map_err(LogError::from); + pub fn reset(&self) -> Result<(), LogError> { + let _ = self.logs.lock().map(|mut logs_guard| logs_guard.clear())?; + + Ok(()) } } #[async_trait] impl LogExporter for InMemoryLogExporter { - async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> { - let mut logs_guard = self.logs.lock().map_err(LogError::from)?; + async fn export(&self, batch: LogBatch<'_>) -> ExportResult { + let mut logs_guard = self + .logs + .lock() + .map_err(|e| LogError::ClientFailed(e.to_string()))?; + for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { record: (*log_record).clone(), @@ -197,7 +199,8 @@ impl LogExporter for InMemoryLogExporter { fn shutdown(&mut self) -> Result<(), ShutdownError> { if self.should_reset_on_shutdown { - self.reset(); + self.reset() + .map_err(|e| ShutdownError::Other(Box::new(e)))?; } Ok(()) } diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index 613c5dc7e3..3458671765 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use core::fmt; use futures_util::future::BoxFuture; -use opentelemetry::trace::{TraceError}; +use opentelemetry::trace::TraceError; use opentelemetry_sdk::export::trace::ShutdownResult; use opentelemetry_sdk::export::{self, trace::ExportResult}; use opentelemetry_sdk::resource::Resource; From c2586cedc4d334bba3e9a69a56d6efc392a76151 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Fri, 20 Dec 2024 11:03:35 +0100 Subject: [PATCH 08/12] clippy --- opentelemetry-sdk/src/logs/error.rs | 2 +- opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs index bb9b29555f..06b75f35a2 100644 --- a/opentelemetry-sdk/src/logs/error.rs +++ b/opentelemetry-sdk/src/logs/error.rs @@ -56,7 +56,7 @@ impl From<&'static str> for LogError { impl From> for LogError { fn from(err: PoisonError) -> Self { - LogError::ClientFailed(err.to_string().into()) + LogError::ClientFailed(err.to_string()) } } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 66a1e2948d..2b0f632241 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -173,7 +173,7 @@ impl InMemoryLogExporter { /// ``` /// pub fn reset(&self) -> Result<(), LogError> { - let _ = self.logs.lock().map(|mut logs_guard| logs_guard.clear())?; + self.logs.lock().map(|mut logs_guard| logs_guard.clear())?; Ok(()) } From a460184e594a88e311d83e6c37fb39f6cea995a7 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 23 Dec 2024 09:11:14 +0100 Subject: [PATCH 09/12] Remove some stuff --- opentelemetry-sdk/src/export/trace.rs | 4 ---- opentelemetry-sdk/src/logs/error.rs | 6 ------ .../src/testing/logs/in_memory_exporter.rs | 21 +++++++++++++++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index 7b7fbb0a95..7275533089 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -110,10 +110,6 @@ pub struct SpanData { #[derive(Error, Debug)] #[non_exhaustive] pub enum ShutdownError { - /// The exporter has already been shut down. - #[error("Shutdown already performed")] - AlreadyShutdown, - /// Shutdown timed out before completing. #[error("Shutdown timed out after {0:?}")] Timeout(Duration), diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs index 06b75f35a2..6da76253a6 100644 --- a/opentelemetry-sdk/src/logs/error.rs +++ b/opentelemetry-sdk/src/logs/error.rs @@ -54,12 +54,6 @@ impl From<&'static str> for LogError { } } -impl From> for LogError { - fn from(err: PoisonError) -> Self { - LogError::ClientFailed(err.to_string()) - } -} - /// Wrap type for string #[derive(Error, Debug)] #[error("{0}")] diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 2b0f632241..03b91ae129 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -148,8 +148,17 @@ impl InMemoryLogExporter { /// ``` /// pub fn get_emitted_logs(&self) -> LogResult> { - let logs_guard = self.logs.lock().map_err(LogError::from)?; - let resource_guard = self.resource.lock().map_err(LogError::from)?; + let logs_guard = self.logs.lock().map_err(|_| { + LogError::from( + "InMemoryLogExporter: log buffer mutex poisoned trying to get_emitted_logs", + ) + })?; + let resource_guard = self.resource.lock().map_err(|_| { + LogError::from( + "InMemoryLogExporter: resource mutex poisoned trying to get_emitted_logs", + ) + })?; + let logs: Vec = logs_guard .iter() .map(|log_data| LogDataWithResource { @@ -173,8 +182,12 @@ impl InMemoryLogExporter { /// ``` /// pub fn reset(&self) -> Result<(), LogError> { - self.logs.lock().map(|mut logs_guard| logs_guard.clear())?; - + self.logs + .lock() + .map_err(|_| { + LogError::from("InMemoryLogExporter: log buffer mutex poisoned trying to reset()") + }) + .map(|mut logs_guard| logs_guard.clear())?; Ok(()) } } From 0ad193f89c98c59b62d8493c6a54e7647ff06199 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 24 Dec 2024 08:14:45 +0100 Subject: [PATCH 10/12] clippy+fmt --- opentelemetry-sdk/src/export/logs/mod.rs | 6 ++---- stress/src/logs.rs | 7 ++----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 3298a4e8c1..debdf4bc32 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -79,10 +79,8 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future + Send; + fn export(&self, batch: LogBatch<'_>) + -> impl std::future::Future + Send; /// Shuts down the exporter. This function is idempotent; calling it /// more than once has no additional effect. diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 2242d48eea..ac9cad018b 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -23,11 +23,8 @@ mod throughput; struct MockLogExporter; impl LogExporter for MockLogExporter { - fn export( - &self, - _batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { - async { Ok(()) } + async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { + Ok(()) } } From 26bbdc4c1c8edc2754ccacde4b7b7fac8010df8f Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 24 Dec 2024 08:37:13 +0100 Subject: [PATCH 11/12] Re-export ShutdownError from common location --- examples/tracing-grpc/src/server.rs | 4 +-- opentelemetry-sdk/src/error.rs | 38 ++++++++++++++++++++++-- opentelemetry-sdk/src/export/logs/mod.rs | 5 +++- opentelemetry-sdk/src/export/trace.rs | 32 +++----------------- opentelemetry-sdk/src/logs/error.rs | 26 +++------------- 5 files changed, 48 insertions(+), 57 deletions(-) diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index 44d1fa82dc..13d10804ce 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -5,9 +5,7 @@ use opentelemetry::{ propagation::Extractor, trace::{Span, SpanKind, Tracer}, }; -use opentelemetry_sdk::{ - propagation::TraceContextPropagator, trace::TracerProvider, -}; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; use opentelemetry_stdout::SpanExporter; use tonic::{transport::Server, Request, Response, Status}; diff --git a/opentelemetry-sdk/src/error.rs b/opentelemetry-sdk/src/error.rs index 115da17b78..c3ab3f5483 100644 --- a/opentelemetry-sdk/src/error.rs +++ b/opentelemetry-sdk/src/error.rs @@ -1,6 +1,4 @@ //! Wrapper for error from trace, logs and metrics part of open telemetry. -use std::sync::PoisonError; - #[cfg(feature = "logs")] use crate::logs::LogError; #[cfg(feature = "metrics")] @@ -8,8 +6,13 @@ use crate::metrics::MetricError; use opentelemetry::propagation::PropagationError; #[cfg(feature = "trace")] use opentelemetry::trace::TraceError; +use std::sync::PoisonError; +use std::time::Duration; +use thiserror::Error; -/// Wrapper for error from both tracing and metrics part of open telemetry. +/// Wrapper for error from both tracing and metrics part of open telemetry. This +/// gives us a common error type where we _need_ to return errors that may come +/// from various components. #[derive(thiserror::Error, Debug)] #[non_exhaustive] pub enum Error { @@ -34,6 +37,10 @@ pub enum Error { /// Error happens when injecting and extracting information using propagators. Propagation(#[from] PropagationError), + /// Failed to shutdown an exporter + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("{0}")] /// Other types of failures not covered by the variants above. Other(String), @@ -44,3 +51,28 @@ impl From> for Error { Error::Other(err.to_string()) } } + +/// Errors returned by shutdown operations in the Export API. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ShutdownError { + /// Shutdown timed out before completing. + #[error("Shutdown timed out after {0:?}")] + Timeout(Duration), + + /// The export client failed while holding the client lock. It is not + /// possible to complete the shutdown and a retry will not help. + /// This is something that should not happen and should likely emit some diagnostic. + #[error("export client failed while holding lock; cannot retry.")] + ClientFailed(String), + + /// An unexpected error occurred during shutdown. + #[error(transparent)] + Other(#[from] Box), +} + +impl From> for ShutdownError { + fn from(err: PoisonError) -> Self { + ShutdownError::ClientFailed(format!("Mutex poisoned during shutdown: {}", err)) + } +} diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index debdf4bc32..2312874126 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -1,12 +1,15 @@ //! Log exporters +use crate::logs::LogError; use crate::logs::LogRecord; -use crate::logs::{LogError, ShutdownError}; use crate::Resource; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; +// Re-export ShutdownError +pub use crate::error::ShutdownError; + /// A batch of log records to be exported by a `LogExporter`. /// /// The `LogBatch` struct holds a collection of log records along with their associated diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index 7275533089..eaf357bbdd 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -5,9 +5,10 @@ use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceError}; use opentelemetry::{InstrumentationScope, KeyValue}; use std::borrow::Cow; use std::fmt::Debug; -use std::sync::PoisonError; -use std::time::{Duration, SystemTime}; -use thiserror::Error; +use std::time::SystemTime; + +// Re-export ShutdownError +pub use crate::error::ShutdownError; /// Results of an export operation pub type ExportResult = Result<(), TraceError>; @@ -105,28 +106,3 @@ pub struct SpanData { /// Instrumentation scope that produced this span pub instrumentation_scope: InstrumentationScope, } - -/// Errors returned by shutdown operations in the Export API. -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum ShutdownError { - /// Shutdown timed out before completing. - #[error("Shutdown timed out after {0:?}")] - Timeout(Duration), - - /// The export client failed while holding the client lock. It is not - /// possible to complete the shutdown and a retry will not help. - /// This is something that should not happen and should likely emit some diagnostic. - #[error("export client failed while holding lock; cannot retry.")] - ClientFailed(String), - - /// An unexpected error occurred during shutdown. - #[error(transparent)] - Other(#[from] Box), -} - -impl From> for ShutdownError { - fn from(err: PoisonError) -> Self { - ShutdownError::ClientFailed(format!("Mutex poisoned during shutdown: {}", err)) - } -} diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs index 6da76253a6..4669edb039 100644 --- a/opentelemetry-sdk/src/logs/error.rs +++ b/opentelemetry-sdk/src/logs/error.rs @@ -1,6 +1,9 @@ +// Re-export ShutdownError +pub use crate::error::ShutdownError; + use crate::export::ExportError; -use std::{sync::PoisonError, time::Duration}; +use std::time::Duration; use thiserror::Error; /// Describe the result of operations in log SDK. @@ -58,24 +61,3 @@ impl From<&'static str> for LogError { #[derive(Error, Debug)] #[error("{0}")] struct Custom(String); - -/// Errors returned during shutdown -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum ShutdownError { - /// The export client failed while holding the client lock. It is not - /// possible to complete the shutdown and a retry will not help. - /// This is something that should not happen and should likely emit some diagnostic. - #[error("export client failed while holding lock; cannot retry.")] - ClientFailed(String), - - /// Other errors propagated from log SDK that weren't covered above. - #[error(transparent)] - Other(#[from] Box), -} - -impl From> for ShutdownError { - fn from(err: PoisonError) -> Self { - ShutdownError::ClientFailed(format!("Mutex poisoned during shutdown: {}", err)) - } -} From de49068095e2c0e7c7b209b0c6beb2642fca1dfd Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Tue, 24 Dec 2024 08:49:25 +0100 Subject: [PATCH 12/12] Catch missing errors --- .../src/logs/log_processor_with_async_runtime.rs | 13 ++++++++++--- .../src/trace/span_processor_with_async_runtime.rs | 7 ++++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index e5649d886b..438312fd56 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -188,7 +188,12 @@ impl BatchLogProcessor { ) .await; - exporter.shutdown(); + if let Err(shutdown_error) = exporter.shutdown() { + otel_debug!( + name: "BatchLogProcessor.Shutdown.Error", + error = format!("{}", shutdown_error), + ); + } if let Err(send_error) = ch.send(result) { otel_debug!( @@ -282,7 +287,7 @@ where #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { - use crate::export::logs::{LogBatch, LogExporter}; + use crate::export::logs::{LogBatch, LogExporter, ShutdownResult}; use crate::logs::log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, @@ -325,7 +330,9 @@ mod tests { async { Ok(()) } } - fn shutdown(&mut self) {} + fn shutdown(&mut self) -> ShutdownResult { + Ok(()) + } fn set_resource(&mut self, resource: &Resource) { self.resource diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index c3c241c776..276a33f89f 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -287,7 +287,12 @@ impl BatchSpanProcessorInternal { // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { self.flush(Some(ch)).await; - self.exporter.shutdown(); + if let Err(shutdown_error) = self.exporter.shutdown() { + otel_debug!( + name: "BatchSpanProcessor.ShutdownError", + msg = format!("{:?}", shutdown_error) + ); + } return false; } // propagate the resource