From 0de6e9ad5baf37818e04d7b52252fe8d56f7a715 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 3 Sep 2024 00:43:18 -0700 Subject: [PATCH 1/4] initial commit --- opentelemetry-sdk/src/logs/record.rs | 24 ++++---- opentelemetry/src/logs/logger.rs | 6 +- opentelemetry/src/logs/noop.rs | 14 ++--- opentelemetry/src/logs/record.rs | 92 +++++++++++++++++++++++----- 4 files changed, 97 insertions(+), 39 deletions(-) diff --git a/opentelemetry-sdk/src/logs/record.rs b/opentelemetry-sdk/src/logs/record.rs index 78713598d8..5e851fb170 100644 --- a/opentelemetry-sdk/src/logs/record.rs +++ b/opentelemetry-sdk/src/logs/record.rs @@ -15,14 +15,14 @@ const PREALLOCATED_ATTRIBUTE_CAPACITY: usize = 5; /// This type uses `GrowableArray` to store key-value pairs of log attributes, where each attribute is an `Option<(Key, AnyValue)>`. /// The initial attributes are allocated in a fixed-size array of capacity `PREALLOCATED_ATTRIBUTE_CAPACITY`. /// If more attributes are added beyond this capacity, additional storage is handled by dynamically growing a vector. -pub(crate) type LogRecordAttributes = - GrowableArray, PREALLOCATED_ATTRIBUTE_CAPACITY>; +pub(crate) type LogRecordAttributes<'a> = + GrowableArray)>, PREALLOCATED_ATTRIBUTE_CAPACITY>; #[derive(Debug, Default, Clone, PartialEq)] #[non_exhaustive] /// LogRecord represents all data carried by a log record, and /// is provided to `LogExporter`s as input. -pub struct LogRecord { +pub struct LogRecord<'a> { /// Event name. Optional as not all the logging API support it. pub event_name: Option<&'static str>, @@ -45,13 +45,13 @@ pub struct LogRecord { pub severity_number: Option, /// Record body - pub body: Option, + pub body: Option>, /// Additional attributes associated with this record - pub(crate) attributes: LogRecordAttributes, + pub(crate) attributes: LogRecordAttributes<'a>, } -impl opentelemetry::logs::LogRecord for LogRecord { +impl<'a> opentelemetry::logs::LogRecord<'a> for LogRecord<'a> { fn set_event_name(&mut self, name: &'static str) { self.event_name = Some(name); } @@ -80,7 +80,7 @@ impl opentelemetry::logs::LogRecord for LogRecord { self.severity_number = Some(severity_number); } - fn set_body(&mut self, body: AnyValue) { + fn set_body(&mut self, body: AnyValue<'a>) { self.body = Some(body); } @@ -88,7 +88,7 @@ impl opentelemetry::logs::LogRecord for LogRecord { where I: IntoIterator, K: Into, - V: Into, + V: Into>, { for (key, value) in attributes.into_iter() { self.add_attribute(key, value); @@ -98,15 +98,15 @@ impl opentelemetry::logs::LogRecord for LogRecord { fn add_attribute(&mut self, key: K, value: V) where K: Into, - V: Into, + V: Into>, { self.attributes.push(Some((key.into(), value.into()))); } } -impl LogRecord { +impl<'a> LogRecord<'a> { /// Provides an iterator over the attributes. - pub fn attributes_iter(&self) -> impl Iterator { + pub fn attributes_iter(&self) -> impl Iterator)> { self.attributes.iter().filter_map(|opt| opt.as_ref()) } @@ -118,7 +118,7 @@ impl LogRecord { #[allow(dead_code)] /// Checks if the `LogRecord` contains the specified attribute. - pub(crate) fn attributes_contains(&self, key: &Key, value: &AnyValue) -> bool { + pub(crate) fn attributes_contains(&self, key: &Key, value: &AnyValue<'a>) -> bool { self.attributes .iter() .flatten() diff --git a/opentelemetry/src/logs/logger.rs b/opentelemetry/src/logs/logger.rs index fd4e18e043..737b17856b 100644 --- a/opentelemetry/src/logs/logger.rs +++ b/opentelemetry/src/logs/logger.rs @@ -9,16 +9,16 @@ use super::Severity; pub trait Logger { /// Specifies the `LogRecord` type associated with this logger. - type LogRecord: LogRecord; + type LogRecord<'a>: LogRecord<'a>; /// Creates a new log record builder. - fn create_log_record(&self) -> Self::LogRecord; + fn create_log_record<'a>(&self) -> Self::LogRecord<'a>; /// Emit a [`LogRecord`]. If there is active current thread's [`Context`], /// the logger will set the record's `TraceContext` to the active trace context, /// /// [`Context`]: crate::Context - fn emit(&self, record: Self::LogRecord); + fn emit<'a>(&self, record: Self::LogRecord<'a>); #[cfg(feature = "logs_level_enabled")] /// Check if the given log level is enabled. diff --git a/opentelemetry/src/logs/noop.rs b/opentelemetry/src/logs/noop.rs index 8c31328e5d..32f78876db 100644 --- a/opentelemetry/src/logs/noop.rs +++ b/opentelemetry/src/logs/noop.rs @@ -38,7 +38,7 @@ impl LoggerProvider for NoopLoggerProvider { /// A no-operation log record that implements the LogRecord trait. pub struct NoopLogRecord; -impl LogRecord for NoopLogRecord { +impl<'a> LogRecord<'a> for NoopLogRecord { // Implement the LogRecord trait methods with empty bodies. #[inline] fn set_event_name(&mut self, _name: &'static str) {} @@ -51,20 +51,20 @@ impl LogRecord for NoopLogRecord { #[inline] fn set_severity_number(&mut self, _number: Severity) {} #[inline] - fn set_body(&mut self, _body: AnyValue) {} + fn set_body(&mut self, _body: AnyValue<'a>) {} #[inline] fn add_attributes(&mut self, _attributes: I) where I: IntoIterator, K: Into, - V: Into, + V: Into>, { } #[inline] fn add_attribute(&mut self, _key: K, _value: V) where K: Into, - V: Into, + V: Into>, { } @@ -82,12 +82,12 @@ impl LogRecord for NoopLogRecord { pub struct NoopLogger(()); impl Logger for NoopLogger { - type LogRecord = NoopLogRecord; + type LogRecord<'a> = NoopLogRecord; - fn create_log_record(&self) -> Self::LogRecord { + fn create_log_record<'a>(&self) -> Self::LogRecord<'a> { NoopLogRecord {} } - fn emit(&self, _record: Self::LogRecord) {} + fn emit<'a>(&self, _record: Self::LogRecord<'a>) {} #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: super::Severity, _target: &str) -> bool { false diff --git a/opentelemetry/src/logs/record.rs b/opentelemetry/src/logs/record.rs index 2e171ef0a1..bbe81efdc0 100644 --- a/opentelemetry/src/logs/record.rs +++ b/opentelemetry/src/logs/record.rs @@ -2,7 +2,7 @@ use crate::{Array, Key, StringValue, Value}; use std::{borrow::Cow, collections::HashMap, time::SystemTime}; /// SDK implemented trait for managing log records -pub trait LogRecord { +pub trait LogRecord<'a> { /// Sets the `event_name` of a record fn set_event_name(&mut self, name: &'static str); @@ -27,45 +27,45 @@ pub trait LogRecord { fn set_severity_number(&mut self, number: Severity); /// Sets the message body of the log. - fn set_body(&mut self, body: AnyValue); + fn set_body(&mut self, body: AnyValue<'a>); /// Adds multiple attributes. fn add_attributes(&mut self, attributes: I) where I: IntoIterator, K: Into, - V: Into; + V: Into>; /// Adds a single attribute. fn add_attribute(&mut self, key: K, value: V) where K: Into, - V: Into; + V: Into>; } /// Value types for representing arbitrary values in a log record. #[derive(Debug, Clone, PartialEq)] -pub enum AnyValue { +pub enum AnyValue<'a> { /// An integer value Int(i64), /// A double value Double(f64), /// A string value - String(StringValue), + String(Cow<'a, str>), /// A boolean value Boolean(bool), /// A byte array Bytes(Box>), /// An array of `Any` values - ListAny(Box>), + ListAny(Box>>), /// A map of string keys to `Any` values, arbitrarily nested. - Map(Box>), + Map(Box>>), } macro_rules! impl_trivial_from { ($t:ty, $variant:path) => { - impl From<$t> for AnyValue { - fn from(val: $t) -> AnyValue { + impl<'a> From<$t> for AnyValue<'a> { + fn from(val: $t) -> AnyValue<'a> { $variant(val.into()) } } @@ -84,21 +84,35 @@ impl_trivial_from!(u32, AnyValue::Int); impl_trivial_from!(f64, AnyValue::Double); impl_trivial_from!(f32, AnyValue::Double); -impl_trivial_from!(String, AnyValue::String); -impl_trivial_from!(Cow<'static, str>, AnyValue::String); -impl_trivial_from!(&'static str, AnyValue::String); -impl_trivial_from!(StringValue, AnyValue::String); impl_trivial_from!(bool, AnyValue::Boolean); -impl> FromIterator for AnyValue { +impl<'a> From> for AnyValue<'a> { + fn from(val: Cow<'a, str>) -> AnyValue<'a> { + AnyValue::String(val) + } +} + +impl<'a> From<&'a str> for AnyValue<'a> { + fn from(val: &'a str) -> AnyValue<'a> { + AnyValue::String(Cow::Borrowed(val)) + } +} + +impl From for AnyValue<'static> { + fn from(val: String) -> AnyValue<'static> { + AnyValue::String(Cow::Owned(val)) + } +} + +impl<'a, T: Into>> FromIterator for AnyValue<'a> { /// Creates an [`AnyValue::ListAny`] value from a sequence of `Into` values. fn from_iter>(iter: I) -> Self { AnyValue::ListAny(Box::new(iter.into_iter().map(Into::into).collect())) } } -impl, V: Into> FromIterator<(K, V)> for AnyValue { +impl<'a, K: Into, V: Into>> FromIterator<(K, V)> for AnyValue<'a> { /// Creates an [`AnyValue::Map`] value from a sequence of key-value pairs /// that can be converted into a `Key` and `AnyValue` respectively. fn from_iter>(iter: I) -> Self { @@ -108,7 +122,13 @@ impl, V: Into> FromIterator<(K, V)> for AnyValue { } } -impl From for AnyValue { +impl<'a> From for AnyValue<'a> { + fn from(value: StringValue) -> Self { + AnyValue::String(Cow::Owned(value.to_string())) + } +} + +impl<'a> From for AnyValue<'a> { fn from(value: Value) -> Self { match value { Value::Bool(b) => b.into(), @@ -215,3 +235,41 @@ impl Severity { } } } + +#[derive(Debug, Clone, PartialEq)] +enum StringType<'a> { + Static(&'static str), + Dynamic(Cow<'a, str>), +} + +impl<'a> StringType<'a> { + // Creates a StringType from a static string slice + fn from_static(s: &'static str) -> Self { + StringType::Static(s) + } + + // Creates a StringType from a borrowed string slice using Cow + fn from_borrowed(s: &'a str) -> Self { + StringType::Dynamic(Cow::Borrowed(s)) + } + + // Creates a StringType from an owned String using Cow + fn from_owned(s: String) -> Self { + StringType::Dynamic(Cow::Owned(s)) + } + + // Converts the StringType to a fully owned String + fn into_owned(self) -> String { + match self { + StringType::Static(s) => s.to_string(), + StringType::Dynamic(cow) => cow.into_owned(), + } + } + + // Converts the borrowed Cow variant to an owned String if it's not already owned + fn to_owned(&mut self) { + if let StringType::Dynamic(cow) = self { + *cow = Cow::Owned(cow.to_string()); + } + } +} From 39520db664f05903494a59b94472fa5072ec552c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 3 Sep 2024 02:20:52 -0700 Subject: [PATCH 2/4] changes.. --- opentelemetry-sdk/src/export/logs/mod.rs | 8 ++-- opentelemetry-sdk/src/growable_array.rs | 6 +-- opentelemetry-sdk/src/logs/log_emitter.rs | 10 ++-- opentelemetry-sdk/src/logs/log_processor.rs | 51 +++++++++++++-------- opentelemetry-sdk/src/logs/mod.rs | 10 ---- 5 files changed, 44 insertions(+), 41 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 8056f28222..5e5a3b964d 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -23,7 +23,7 @@ use std::fmt::Debug; pub struct LogBatch<'a> { /// The data field contains a slice of tuples, where each tuple consists of a reference to /// a `LogRecord` and a reference to an `InstrumentationLibrary`. - data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)], + data: &'a [(&'a LogRecord<'a>, &'a InstrumentationLibrary)], } impl<'a> LogBatch<'a> { @@ -42,12 +42,12 @@ impl<'a> LogBatch<'a> { /// Note - this is not a public function, and should not be used directly. This would be /// made private in the future. - pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> { + pub fn new(data: &'a [(&'a LogRecord<'a>, &'a InstrumentationLibrary)]) -> LogBatch<'a> { LogBatch { data } } } -impl LogBatch<'_> { +impl<'a> LogBatch<'a> { /// Returns an iterator over the log records and instrumentation libraries in the batch. /// /// Each item yielded by the iterator is a tuple containing references to a `LogRecord` @@ -57,7 +57,7 @@ impl LogBatch<'_> { /// /// An iterator that yields references to the `LogRecord` and `InstrumentationLibrary` in the batch. /// - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator, &InstrumentationLibrary)> { self.data .iter() .map(|(record, library)| (*record, *library)) diff --git a/opentelemetry-sdk/src/growable_array.rs b/opentelemetry-sdk/src/growable_array.rs index f174bedea2..6d9d61eb95 100644 --- a/opentelemetry-sdk/src/growable_array.rs +++ b/opentelemetry-sdk/src/growable_array.rs @@ -182,7 +182,7 @@ mod tests { use opentelemetry::logs::AnyValue; use opentelemetry::Key; - type KeyValuePair = Option<(Key, AnyValue)>; + type KeyValuePair<'a> = Option<(Key, AnyValue<'a>)>; #[test] fn test_push_and_get() { @@ -234,7 +234,7 @@ mod tests { #[test] fn test_key_value_pair_storage_growable_array() { - let mut collection = GrowableArray::::new(); + let mut collection = GrowableArray::>::new(); let key1 = Key::from("key1"); let value1 = AnyValue::String("value1".into()); @@ -267,7 +267,7 @@ mod tests { #[test] fn test_empty_attributes() { - let collection = GrowableArray::::new(); + let collection = GrowableArray::>::new(); assert_eq!(collection.len(), 0); assert_eq!(collection.get(0), None); diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 7463b19059..9e9ac284ce 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -239,14 +239,14 @@ impl Logger { } impl opentelemetry::logs::Logger for Logger { - type LogRecord = LogRecord; + type LogRecord<'a> = LogRecord<'a>; - fn create_log_record(&self) -> Self::LogRecord { + fn create_log_record<'a>(&self) -> Self::LogRecord<'a> { LogRecord::default() } /// Emit a `LogRecord`. - fn emit(&self, mut record: Self::LogRecord) { + fn emit<'a>(&self, mut record: Self::LogRecord<'a>) { let provider = self.provider(); let processors = provider.log_processors(); let trace_context = Context::map_current(|cx| { @@ -320,7 +320,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) { + fn emit<'a>(&self, _data: &mut LogRecord<'a>, _library: &InstrumentationLibrary) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -550,7 +550,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) { + fn emit<'a>(&self, _data: &mut LogRecord<'a>, _library: &InstrumentationLibrary) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b615acb9b8..6c95492399 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -57,7 +57,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// # Parameters /// - `record`: A mutable reference to `LogData` representing the log record. /// - `instrumentation`: The instrumentation library associated with the log record. - fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationLibrary); + fn emit<'a>(&self, data: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -95,7 +95,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -106,7 +106,7 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - let log_tuple = &[(record as &LogRecord, instrumentation)]; + let log_tuple = &[(record as &LogRecord<'a>, instrumentation)]; futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); if let Err(err) = result { @@ -153,7 +153,7 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) { let result = self.message_sender.try_send(BatchMessage::ExportLog(( record.clone(), instrumentation.clone(), @@ -300,11 +300,11 @@ impl BatchLogProcessor { } } -async fn export_with_timeout( +async fn export_with_timeout<'a, R, E>( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec<(LogRecord, InstrumentationLibrary)>, + batch: Vec<(LogRecord<'a>, InstrumentationLibrary)>, ) -> ExportResult where R: RuntimeChannel, @@ -315,7 +315,7 @@ where } // TBD - Can we avoid this conversion as it involves heap allocation with new vector? - let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch + let log_vec: Vec<(&LogRecord<'a>, &InstrumentationLibrary)> = batch .iter() .map(|log_data| (&log_data.0, &log_data.1)) .collect(); @@ -497,9 +497,9 @@ where /// Messages sent between application thread and batch log processor's work thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] -enum BatchMessage { +enum BatchMessage<'a> { /// Export logs, usually called when the log is emitted. - ExportLog((LogRecord, InstrumentationLibrary)), + ExportLog((LogRecord<'a>, InstrumentationLibrary)), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -777,7 +777,7 @@ mod tests { runtime::Tokio, ); - let mut record: LogRecord = Default::default(); + let mut record: LogRecord<'_> = Default::default(); let instrumentation: InstrumentationLibrary = Default::default(); processor.emit(&mut record, &instrumentation); @@ -795,7 +795,7 @@ mod tests { .build(); let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); - let mut record: LogRecord = Default::default(); + let mut record: LogRecord<'_> = Default::default(); let instrumentation: InstrumentationLibrary = Default::default(); processor.emit(&mut record, &instrumentation); @@ -813,12 +813,12 @@ mod tests { } #[derive(Debug)] - struct FirstProcessor { - pub(crate) logs: Arc>>, + struct FirstProcessor<'b> { + pub(crate) logs: Arc, InstrumentationLibrary)>>>, } - impl LogProcessor for FirstProcessor { - fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + impl<'b> LogProcessor for FirstProcessor<'b> { + fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) { // add attribute record.add_attribute( Key::from_static_str("processed_by"), @@ -844,11 +844,12 @@ mod tests { #[derive(Debug)] struct SecondProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc, InstrumentationLibrary)>>>, } - impl LogProcessor for SecondProcessor { - fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { + impl<'b> LogProcessor for SecondProcessor { + fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) + { assert!(record.attributes_contains( &Key::from_static_str("processed_by"), &AnyValue::String("FirstProcessor".into()) @@ -857,10 +858,22 @@ mod tests { record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); + // Clone the `LogRecord` to have a `'static` version to store in the log. + let static_record: LogRecord<'static> = LogRecord { + event_name: record.event_name, + target: record.target.clone().map(|t| t.into_owned().into()), + timestamp: record.timestamp, + observed_timestamp: record.observed_timestamp, + trace_context: record.trace_context.clone(), + severity_text: record.severity_text, + severity_number: record.severity_number, + body: record.body.clone().map(AnyValue::to_owned_value), + attributes: record.attributes.clone(), // Assuming `attributes` can be cloned + }; self.logs .lock() .unwrap() - .push((record.clone(), instrumentation.clone())); + .push((static_record, instrumentation.clone())); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 59b1b9b338..1d9856a4fd 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -10,16 +10,6 @@ pub use log_processor::{ }; pub use record::{LogRecord, TraceContext}; -use opentelemetry::InstrumentationLibrary; -/// `LogData` represents a single log event without resource context. -#[derive(Clone, Debug)] -pub struct LogData { - /// Log record - pub record: LogRecord, - /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: InstrumentationLibrary, -} - #[cfg(all(test, feature = "testing"))] mod tests { use super::*; From b25f7eaae5e220c9412b0fb4f78a65d2d2b1ce85 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 3 Sep 2024 10:38:30 -0700 Subject: [PATCH 3/4] changes.. --- opentelemetry-sdk/src/growable_array.rs | 15 +++++++ opentelemetry-sdk/src/logs/log_processor.rs | 45 ++++++++++---------- opentelemetry-sdk/src/logs/record.rs | 42 +++++++++++++++++++ opentelemetry/src/logs/record.rs | 46 ++++++++++++++++++++- 4 files changed, 125 insertions(+), 23 deletions(-) diff --git a/opentelemetry-sdk/src/growable_array.rs b/opentelemetry-sdk/src/growable_array.rs index 6d9d61eb95..7b2bdf12ce 100644 --- a/opentelemetry-sdk/src/growable_array.rs +++ b/opentelemetry-sdk/src/growable_array.rs @@ -106,6 +106,21 @@ impl< .chain(self.overflow.as_ref().unwrap().iter()) } } + + /// Provides a mutable iterator over the elements. + #[allow(dead_code)] + #[inline] + pub(crate) fn iter_mut(&mut self) -> impl Iterator { + if self.overflow.is_none() || self.overflow.as_ref().unwrap().is_empty() { + self.inline.iter_mut().take(self.count).chain([].iter_mut()) // Chaining with an empty array + // so that both `if` and `else` branch return the same type + } else { + self.inline + .iter_mut() + .take(self.count) + .chain(self.overflow.as_mut().unwrap().iter_mut()) + } + } } // Implement `IntoIterator` for `GrowableArray` diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 6c95492399..5cdfbf65d8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -25,6 +25,8 @@ use std::{ time::Duration, }; +use super::record; + /// Delay interval between two consecutive exports. const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. @@ -154,8 +156,9 @@ impl Debug for BatchLogProcessor { impl LogProcessor for BatchLogProcessor { fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) { + let record_static = record.clone().into_owned(); let result = self.message_sender.try_send(BatchMessage::ExportLog(( - record.clone(), + record_static, instrumentation.clone(), ))); @@ -497,9 +500,9 @@ where /// Messages sent between application thread and batch log processor's work thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] -enum BatchMessage<'a> { +enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog((LogRecord<'a>, InstrumentationLibrary)), + ExportLog((LogRecord<'static>, InstrumentationLibrary)), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -813,11 +816,11 @@ mod tests { } #[derive(Debug)] - struct FirstProcessor<'b> { - pub(crate) logs: Arc, InstrumentationLibrary)>>>, + struct FirstProcessor { + pub(crate) logs: Arc, InstrumentationLibrary)>>>, } - impl<'b> LogProcessor for FirstProcessor<'b> { + impl<'b> LogProcessor for FirstProcessor { fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) { // add attribute record.add_attribute( @@ -826,11 +829,10 @@ mod tests { ); // update body record.body = Some("Updated by FirstProcessor".into()); - self.logs .lock() .unwrap() - .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data. + .push((record.clone().to_owned(), instrumentation.clone())); //clone as the LogProcessor is storing the data. } fn force_flush(&self) -> LogResult<()> { @@ -848,8 +850,7 @@ mod tests { } impl<'b> LogProcessor for SecondProcessor { - fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) - { + fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) { assert!(record.attributes_contains( &Key::from_static_str("processed_by"), &AnyValue::String("FirstProcessor".into()) @@ -858,18 +859,18 @@ mod tests { record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); - // Clone the `LogRecord` to have a `'static` version to store in the log. - let static_record: LogRecord<'static> = LogRecord { - event_name: record.event_name, - target: record.target.clone().map(|t| t.into_owned().into()), - timestamp: record.timestamp, - observed_timestamp: record.observed_timestamp, - trace_context: record.trace_context.clone(), - severity_text: record.severity_text, - severity_number: record.severity_number, - body: record.body.clone().map(AnyValue::to_owned_value), - attributes: record.attributes.clone(), // Assuming `attributes` can be cloned - }; + // Clone the `LogRecord` to have a `'static` version to store in the log. + let static_record: LogRecord<'static> = LogRecord { + event_name: record.event_name, + target: record.target.clone().map(|t| t.into_owned().into()), + timestamp: record.timestamp, + observed_timestamp: record.observed_timestamp, + trace_context: record.trace_context.clone(), + severity_text: record.severity_text, + severity_number: record.severity_number, + body: record.body.clone().map(AnyValue::to_owned_value), + attributes: record.attributes.clone(), // Assuming `attributes` can be cloned + }; self.logs .lock() .unwrap() diff --git a/opentelemetry-sdk/src/logs/record.rs b/opentelemetry-sdk/src/logs/record.rs index 5e851fb170..7d0d171bee 100644 --- a/opentelemetry-sdk/src/logs/record.rs +++ b/opentelemetry-sdk/src/logs/record.rs @@ -110,6 +110,11 @@ impl<'a> LogRecord<'a> { self.attributes.iter().filter_map(|opt| opt.as_ref()) } + /// Provides a mutable iterator over the attributes. + pub fn attributes_iter_mut(&mut self) -> impl Iterator)> { + self.attributes.iter_mut().filter_map(|opt| opt.as_mut()) + } + #[allow(dead_code)] /// Returns the number of attributes in the `LogRecord`. pub(crate) fn attributes_len(&self) -> usize { @@ -124,6 +129,43 @@ impl<'a> LogRecord<'a> { .flatten() .any(|(k, v)| k == key && v == value) } + + pub fn make_owned(&mut self) { + if let Some(ref mut target) = self.target { + *target = Cow::Owned(target.clone().into_owned()); + } + + if let Some(ref mut body) = self.body { + body.make_owned(); // Use the `make_owned` method of `AnyValue` + } + + // Use the mutable iterator method to convert each attribute value to owned + for (_, value) in self.attributes_iter_mut() { + value.make_owned(); // Convert each attribute value to owned + } + } + + pub fn into_owned(self) -> LogRecord<'static> { + // Create a new empty GrowableArray with the same capacity settings + let mut owned_attributes: LogRecordAttributes<'static> = GrowableArray::new(); + + // Iterate over the attributes and convert each to owned + for (key, value) in self.attributes_iter() { + owned_attributes.push(Some((key.clone(), value.clone().into_owned()))); + } + + LogRecord { + event_name: self.event_name, + target: self.target.map(|t| Cow::Owned(t.into_owned())), + timestamp: self.timestamp, + observed_timestamp: self.observed_timestamp, + trace_context: self.trace_context.clone(), + severity_text: self.severity_text, + severity_number: self.severity_number, + body: self.body.map(|b| b.into_owned()), // Convert the body to an owned version + attributes: owned_attributes, // Use the newly created owned attributes + } + } } /// TraceContext stores the trace context for logs that have an associated diff --git a/opentelemetry/src/logs/record.rs b/opentelemetry/src/logs/record.rs index bbe81efdc0..af769fca05 100644 --- a/opentelemetry/src/logs/record.rs +++ b/opentelemetry/src/logs/record.rs @@ -84,7 +84,6 @@ impl_trivial_from!(u32, AnyValue::Int); impl_trivial_from!(f64, AnyValue::Double); impl_trivial_from!(f32, AnyValue::Double); - impl_trivial_from!(bool, AnyValue::Boolean); impl<'a> From> for AnyValue<'a> { @@ -145,6 +144,51 @@ impl<'a> From for AnyValue<'a> { } } +impl<'a> AnyValue<'a> { + /// Converts the `AnyValue` into an owned version in place, updating the existing instance. + pub fn make_owned(&mut self) { + match self { + AnyValue::String(ref mut s) => { + if let Cow::Borrowed(borrowed_str) = s { + // Replace the borrowed string with an owned string + *s = Cow::Owned(borrowed_str.to_string()); + } + } + AnyValue::ListAny(ref mut list) => { + // Recursively convert each item in the list to owned + for item in list.iter_mut() { + item.make_owned(); + } + } + AnyValue::Map(ref mut map) => { + // Recursively convert each value in the map to owned + for value in map.values_mut() { + value.make_owned(); + } + } + // Other variants are inherently owned and do not need to be modified + _ => {} + } + } + + /// Converts the `AnyValue` into an owned version. + pub fn into_owned(self) -> AnyValue<'static> { + match self { + AnyValue::Int(v) => AnyValue::Int(v), + AnyValue::Double(v) => AnyValue::Double(v), + AnyValue::String(s) => AnyValue::String(Cow::Owned(s.into_owned())), + AnyValue::Boolean(v) => AnyValue::Boolean(v), + AnyValue::Bytes(b) => AnyValue::Bytes(b), // Assuming this is already owned + AnyValue::ListAny(v) => { + AnyValue::ListAny(Box::new(v.into_iter().map(AnyValue::into_owned).collect())) + } + AnyValue::Map(m) => AnyValue::Map(Box::new( + m.into_iter().map(|(k, v)| (k, v.into_owned())).collect(), + )), + } + } +} + /// A normalized severity value. #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] pub enum Severity { From 4af90a0ad16ef95095dc519064e550a7ce5e8062 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 5 Sep 2024 12:43:52 -0700 Subject: [PATCH 4/4] add layer changes --- opentelemetry-appender-tracing/src/layer.rs | 19 ++++++++++++++----- .../src/testing/logs/in_memory_exporter.rs | 6 +++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 776dd27502..34dc100719 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -13,7 +13,7 @@ use tracing_subscriber::Layer; const INSTRUMENTATION_LIBRARY_NAME: &str = "opentelemetry-appender-tracing"; /// Visitor to record the fields from the event record. -struct EventVisitor<'a, LR: LogRecord> { +struct EventVisitor<'a, LR: LogRecord<'a>> { log_record: &'a mut LR, } @@ -37,7 +37,7 @@ fn get_filename(filepath: &str) -> &str { filepath } -impl<'a, LR: LogRecord> EventVisitor<'a, LR> { +impl<'a, LR: LogRecord<'a>> EventVisitor<'a, LR> { fn new(log_record: &'a mut LR) -> Self { EventVisitor { log_record } } @@ -74,7 +74,8 @@ impl<'a, LR: LogRecord> EventVisitor<'a, LR> { } } -impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> { +impl<'a, LR: LogRecord<'a>> tracing::field::Visit for EventVisitor<'a, LR> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { #[cfg(feature = "experimental_metadata_attributes")] if is_duplicated_metadata(field.name()) { @@ -88,7 +89,8 @@ impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> { } } - fn record_str(&mut self, field: &tracing_core::Field, value: &str) { + fn record_str(&mut self, field: &tracing_core::Field, value: &str) + { #[cfg(feature = "experimental_metadata_attributes")] if is_duplicated_metadata(field.name()) { return; @@ -100,8 +102,15 @@ impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> { //TODO: Fix heap allocation. Check if lifetime of &str can be used // to optimize sync exporter scenario. + + //let v = Cow::Borrowed(value); + { self.log_record - .add_attribute(Key::new(field.name()), AnyValue::from(value.to_owned())); + .add_attribute:: (Key::new(field.name()), value.into()); + } + + /*self.log_record + .add_attribute(Key::new(field.name()), AnyValue::from(value.to_owned()));*/ } fn record_bool(&mut self, field: &tracing_core::Field, value: bool) { diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 958ab11fe1..f4db89c762 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -54,7 +54,7 @@ impl Default for InMemoryLogsExporter { #[derive(Debug, Clone)] pub struct OwnedLogData { /// Log record, which can be borrowed or owned. - pub record: LogRecord, + pub record: LogRecord<'static>, /// Instrumentation details for the emitter who produced this `LogEvent`. pub instrumentation: InstrumentationLibrary, } @@ -64,7 +64,7 @@ pub struct OwnedLogData { #[derive(Clone, Debug)] pub struct LogDataWithResource { /// Log record - pub record: LogRecord, + pub record: LogRecord<'static>, /// Instrumentation details for the emitter who produced this `LogData`. pub instrumentation: InstrumentationLibrary, /// Resource for the emitter who produced this `LogData`. @@ -188,7 +188,7 @@ impl LogExporter for InMemoryLogsExporter { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { - record: (*log_record).clone(), + record: (*log_record).clone().into_owned(), instrumentation: (*instrumentation).clone(), }; logs_guard.push(owned_log);