Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Update AnyValue to avoid heap allocation for &str reference in synchronous path #2075

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing_subscriber::Layer;
const INSTRUMENTATION_LIBRARY_NAME: &str = "opentelemetry-appender-tracing";

/// Visitor to record the fields from the event record.
struct EventVisitor<'a, LR: LogRecord> {
struct EventVisitor<'a, LR: LogRecord<'a>> {
log_record: &'a mut LR,
}

Expand All @@ -37,7 +37,7 @@ fn get_filename(filepath: &str) -> &str {
filepath
}

impl<'a, LR: LogRecord> EventVisitor<'a, LR> {
impl<'a, LR: LogRecord<'a>> EventVisitor<'a, LR> {
fn new(log_record: &'a mut LR) -> Self {
EventVisitor { log_record }
}
Expand Down Expand Up @@ -74,7 +74,8 @@ impl<'a, LR: LogRecord> EventVisitor<'a, LR> {
}
}

impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> {
impl<'a, LR: LogRecord<'a>> tracing::field::Visit for EventVisitor<'a, LR> {

fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
#[cfg(feature = "experimental_metadata_attributes")]
if is_duplicated_metadata(field.name()) {
Expand All @@ -88,7 +89,8 @@ impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> {
}
}

fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
fn record_str(&mut self, field: &tracing_core::Field, value: &str)
{
#[cfg(feature = "experimental_metadata_attributes")]
if is_duplicated_metadata(field.name()) {
return;
Expand All @@ -100,8 +102,15 @@ impl<'a, LR: LogRecord> tracing::field::Visit for EventVisitor<'a, LR> {

//TODO: Fix heap allocation. Check if lifetime of &str can be used
// to optimize sync exporter scenario.

//let v = Cow::Borrowed(value);
{
self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(value.to_owned()));
.add_attribute::<Key, AnyValue> (Key::new(field.name()), value.into());
}

/*self.log_record
.add_attribute(Key::new(field.name()), AnyValue::from(value.to_owned()));*/
}

fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt::Debug;
pub struct LogBatch<'a> {
/// The data field contains a slice of tuples, where each tuple consists of a reference to
/// a `LogRecord` and a reference to an `InstrumentationLibrary`.
data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)],
data: &'a [(&'a LogRecord<'a>, &'a InstrumentationLibrary)],
}

impl<'a> LogBatch<'a> {
Expand All @@ -42,12 +42,12 @@ impl<'a> LogBatch<'a> {
/// Note - this is not a public function, and should not be used directly. This would be
/// made private in the future.

pub fn new(data: &'a [(&'a LogRecord, &'a InstrumentationLibrary)]) -> LogBatch<'a> {
pub fn new(data: &'a [(&'a LogRecord<'a>, &'a InstrumentationLibrary)]) -> LogBatch<'a> {
LogBatch { data }
}
}

impl LogBatch<'_> {
impl<'a> LogBatch<'a> {
/// Returns an iterator over the log records and instrumentation libraries in the batch.
///
/// Each item yielded by the iterator is a tuple containing references to a `LogRecord`
Expand All @@ -57,7 +57,7 @@ impl LogBatch<'_> {
///
/// An iterator that yields references to the `LogRecord` and `InstrumentationLibrary` in the batch.
///
pub fn iter(&self) -> impl Iterator<Item = (&LogRecord, &InstrumentationLibrary)> {
pub fn iter(&self) -> impl Iterator<Item = (&LogRecord<'a>, &InstrumentationLibrary)> {
self.data
.iter()
.map(|(record, library)| (*record, *library))
Expand Down
21 changes: 18 additions & 3 deletions opentelemetry-sdk/src/growable_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ impl<
.chain(self.overflow.as_ref().unwrap().iter())
}
}

/// Provides a mutable iterator over the elements.
#[allow(dead_code)]
#[inline]
pub(crate) fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
if self.overflow.is_none() || self.overflow.as_ref().unwrap().is_empty() {
self.inline.iter_mut().take(self.count).chain([].iter_mut()) // Chaining with an empty array
// so that both `if` and `else` branch return the same type
} else {
self.inline
.iter_mut()
.take(self.count)
.chain(self.overflow.as_mut().unwrap().iter_mut())
}
}
}

// Implement `IntoIterator` for `GrowableArray`
Expand Down Expand Up @@ -182,7 +197,7 @@ mod tests {
use opentelemetry::logs::AnyValue;
use opentelemetry::Key;

type KeyValuePair = Option<(Key, AnyValue)>;
type KeyValuePair<'a> = Option<(Key, AnyValue<'a>)>;

#[test]
fn test_push_and_get() {
Expand Down Expand Up @@ -234,7 +249,7 @@ mod tests {

#[test]
fn test_key_value_pair_storage_growable_array() {
let mut collection = GrowableArray::<KeyValuePair>::new();
let mut collection = GrowableArray::<KeyValuePair<'_>>::new();

let key1 = Key::from("key1");
let value1 = AnyValue::String("value1".into());
Expand Down Expand Up @@ -267,7 +282,7 @@ mod tests {

#[test]
fn test_empty_attributes() {
let collection = GrowableArray::<KeyValuePair>::new();
let collection = GrowableArray::<KeyValuePair<'_>>::new();
assert_eq!(collection.len(), 0);
assert_eq!(collection.get(0), None);

Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ impl Logger {
}

impl opentelemetry::logs::Logger for Logger {
type LogRecord = LogRecord;
type LogRecord<'a> = LogRecord<'a>;

fn create_log_record(&self) -> Self::LogRecord {
fn create_log_record<'a>(&self) -> Self::LogRecord<'a> {
LogRecord::default()
}

/// Emit a `LogRecord`.
fn emit(&self, mut record: Self::LogRecord) {
fn emit<'a>(&self, mut record: Self::LogRecord<'a>) {
let provider = self.provider();
let processors = provider.log_processors();
let trace_context = Context::map_current(|cx| {
Expand Down Expand Up @@ -320,7 +320,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
fn emit<'a>(&self, _data: &mut LogRecord<'a>, _library: &InstrumentationLibrary) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -550,7 +550,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
fn emit<'a>(&self, _data: &mut LogRecord<'a>, _library: &InstrumentationLibrary) {
// nothing to do.
}

Expand Down
54 changes: 34 additions & 20 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use std::{
time::Duration,
};

use super::record;

/// Delay interval between two consecutive exports.
const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
/// Default delay interval between two consecutive exports.
Expand Down Expand Up @@ -57,7 +59,7 @@ pub trait LogProcessor: Send + Sync + Debug {
/// # Parameters
/// - `record`: A mutable reference to `LogData` representing the log record.
/// - `instrumentation`: The instrumentation library associated with the log record.
fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationLibrary);
fn emit<'a>(&self, data: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary);
/// Force the logs lying in the cache to be exported.
fn force_flush(&self) -> LogResult<()>;
/// Shuts down the processor.
Expand Down Expand Up @@ -95,7 +97,7 @@ impl SimpleLogProcessor {
}

impl LogProcessor for SimpleLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return;
Expand All @@ -106,7 +108,7 @@ impl LogProcessor for SimpleLogProcessor {
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
let log_tuple = &[(record as &LogRecord<'a>, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
Expand Down Expand Up @@ -153,9 +155,10 @@ impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
}

impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
let record_static = record.clone().into_owned();
let result = self.message_sender.try_send(BatchMessage::ExportLog((
record.clone(),
record_static,
instrumentation.clone(),
)));

Expand Down Expand Up @@ -300,11 +303,11 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}

async fn export_with_timeout<R, E>(
async fn export_with_timeout<'a, R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<(LogRecord, InstrumentationLibrary)>,
batch: Vec<(LogRecord<'a>, InstrumentationLibrary)>,
) -> ExportResult
where
R: RuntimeChannel,
Expand All @@ -315,7 +318,7 @@ where
}

// TBD - Can we avoid this conversion as it involves heap allocation with new vector?
let log_vec: Vec<(&LogRecord, &InstrumentationLibrary)> = batch
let log_vec: Vec<(&LogRecord<'a>, &InstrumentationLibrary)> = batch
.iter()
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
Expand Down Expand Up @@ -499,7 +502,7 @@ where
#[derive(Debug)]
enum BatchMessage {
/// Export logs, usually called when the log is emitted.
ExportLog((LogRecord, InstrumentationLibrary)),
ExportLog((LogRecord<'static>, InstrumentationLibrary)),
/// Flush the current buffer to the backend, it can be triggered by
/// pre configured interval or a call to `force_push` function.
Flush(Option<oneshot::Sender<ExportResult>>),
Expand Down Expand Up @@ -777,7 +780,7 @@ mod tests {
runtime::Tokio,
);

let mut record: LogRecord = Default::default();
let mut record: LogRecord<'_> = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
Expand All @@ -795,7 +798,7 @@ mod tests {
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));

let mut record: LogRecord = Default::default();
let mut record: LogRecord<'_> = Default::default();
let instrumentation: InstrumentationLibrary = Default::default();

processor.emit(&mut record, &instrumentation);
Expand All @@ -814,23 +817,22 @@ mod tests {

#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
pub(crate) logs: Arc<Mutex<Vec<(LogRecord<'static>, InstrumentationLibrary)>>>,
}

impl LogProcessor for FirstProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
impl<'b> LogProcessor for FirstProcessor {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
// add attribute
record.add_attribute(
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);
// update body
record.body = Some("Updated by FirstProcessor".into());

self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
.push((record.clone().to_owned(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
}

fn force_flush(&self) -> LogResult<()> {
Expand All @@ -844,11 +846,11 @@ mod tests {

#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationLibrary)>>>,
pub(crate) logs: Arc<Mutex<Vec<(LogRecord<'static>, InstrumentationLibrary)>>>,
}

impl LogProcessor for SecondProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
impl<'b> LogProcessor for SecondProcessor {
fn emit<'a>(&self, record: &mut LogRecord<'a>, instrumentation: &InstrumentationLibrary) {
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
Expand All @@ -857,10 +859,22 @@ mod tests {
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
// Clone the `LogRecord` to have a `'static` version to store in the log.
let static_record: LogRecord<'static> = LogRecord {
event_name: record.event_name,
target: record.target.clone().map(|t| t.into_owned().into()),
timestamp: record.timestamp,
observed_timestamp: record.observed_timestamp,
trace_context: record.trace_context.clone(),
severity_text: record.severity_text,
severity_number: record.severity_number,
body: record.body.clone().map(AnyValue::to_owned_value),
attributes: record.attributes.clone(), // Assuming `attributes` can be cloned
};
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
.push((static_record, instrumentation.clone()));
}

fn force_flush(&self) -> LogResult<()> {
Expand Down
10 changes: 0 additions & 10 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ pub use log_processor::{
};
pub use record::{LogRecord, TraceContext};

use opentelemetry::InstrumentationLibrary;
/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
/// Log record
pub record: LogRecord,
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
}

#[cfg(all(test, feature = "testing"))]
mod tests {
use super::*;
Expand Down
Loading
Loading