Skip to content

Commit

Permalink
PoC new approach to build aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Jan 22, 2025
1 parent 90b0dd4 commit 86e7f72
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 8 deletions.
24 changes: 24 additions & 0 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
fn as_mut(&mut self) -> &mut dyn any::Any;
}

/// Allow to access data points of an [Aggregation].
pub(crate) trait AggregationDataPoints {
/// The type of data point in the aggregation.
type DataPoint;
/// The data points of the aggregation.
fn points(&mut self) -> &mut Vec<Self::DataPoint>;
}

/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct GaugeDataPoint<T> {
Expand Down Expand Up @@ -142,6 +150,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
}
}

impl<T> AggregationDataPoints for Sum<T> {
type DataPoint = SumDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::DataPoint> {
&mut self.data_points
}
}

/// Represents the histogram of all measurements of values from an instrument.
#[derive(Debug)]
pub struct Histogram<T> {
Expand Down Expand Up @@ -228,6 +244,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
}
}

impl<T> AggregationDataPoints for ExponentialHistogram<T> {
type DataPoint = ExponentialHistogramDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::DataPoint> {
&mut self.data_points
}

Check warning on line 252 in opentelemetry-sdk/src/metrics/data/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/data/mod.rs#L250-L252

Added lines #L250 - L252 were not covered by tests
}

/// A single exponential histogram data point in a time series.
#[derive(Debug, PartialEq)]
pub struct ExponentialHistogramDataPoint<T> {
Expand Down
27 changes: 24 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ use opentelemetry::KeyValue;
use crate::metrics::{data::Aggregation, Temporality};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
aggregate_impl::{create_aggregation, CumulativeValueMap, DeltaValueMap},
exponential_histogram::ExpoHistogram,
histogram::Histogram,
last_value::LastValue,
precomputed_sum::PrecomputedSum,
sum::{Sum, SumNew},
Number,
};

pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
Expand Down Expand Up @@ -157,7 +162,23 @@ impl<T: Number> AggregateBuilder<T> {

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
// this if statement does nothing, but it allows to preserve old code, without compile warnings
if true {
match self.temporality {
Temporality::Delta => create_aggregation(
SumNew { monotonic },
DeltaValueMap::new(()),
self.filter.clone(),
),
_ => create_aggregation(
SumNew { monotonic },
CumulativeValueMap::new(()),
self.filter.clone(),
),
}
} else {
Sum::new(self.temporality, self.filter.clone(), monotonic).into()

Check warning on line 180 in opentelemetry-sdk/src/metrics/internal/aggregate.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/aggregate.rs#L180

Added line #L180 was not covered by tests
}
}

/// Builds a histogram aggregate function input and output.
Expand Down
207 changes: 207 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/aggregate_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use std::{marker::PhantomData, sync::Arc};

use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
aggregate::{AggregateTime, AttributeSetFilter},
AggregateFns, AggregateTimeInitiator, Aggregator, ComputeAggregation, Measure, Number,
ValueMap,
};

/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
pub(crate) trait AggregateMap: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
}

/// This trait provides aggregation specific functionality
pub(crate) trait AggregationImpl<T>: Send + Sync + 'static {
// an implementation that knows how to aggregate a measurement
type Aggr: Aggregator;
// an implementation that stores collected aggregation data
type AggrData: Aggregation + AggregationDataPoints;

fn precompute(&self, value: T) -> <Self::Aggr as Aggregator>::PreComputedValue;
fn new_aggregation_data(&self, temporality: Temporality, time: AggregateTime)
-> Self::AggrData;
fn reset_aggregation_data(
&self,
existing: &mut Self::AggrData,
temporality: Temporality,
time: AggregateTime,
);
fn build_create_points_fn(
&self,
) -> impl FnMut(Vec<KeyValue>, &Self::Aggr) -> <Self::AggrData as AggregationDataPoints>::DataPoint;
}

pub(crate) fn create_aggregation<A, AM, T>(
aggregation: A,
aggregate_map: AM,
filter: AttributeSetFilter,
) -> AggregateFns<T>
where
AM: AggregateMap,
A: AggregationImpl<T, Aggr = AM::Aggr>,
T: Number,
{
let fns = Arc::new(AggregionFnsImpl {
filter,
aggregation,
aggregate_map,
time: AggregateTimeInitiator::default(),
_marker: Default::default(),
});
AggregateFns {
collect: fns.clone(),
measure: fns,
}
}

struct AggregionFnsImpl<A, AM, T> {
filter: AttributeSetFilter,
aggregation: A,
aggregate_map: AM,
time: AggregateTimeInitiator,
_marker: PhantomData<T>,
}

impl<A, AM, T> Measure<T> for AggregionFnsImpl<A, AM, T>
where
A: AggregationImpl<T>,
AM: AggregateMap<Aggr = A::Aggr>,
T: Number,
{
fn call(&self, measurement: T, attrs: &[KeyValue]) {
self.filter.apply(attrs, |filtered_attrs| {
self.aggregate_map
.measure(self.aggregation.precompute(measurement), filtered_attrs);
});
}
}

impl<A, AM, T> ComputeAggregation for AggregionFnsImpl<A, AM, T>
where
A: AggregationImpl<T>,
AM: AggregateMap<Aggr = A::Aggr>,
T: Number,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = if let Temporality::Delta = AM::TEMPORALITY {
self.time.delta()
} else {
self.time.cumulative()
};
let mut s_data = dest.and_then(|d| d.as_mut().downcast_mut::<A::AggrData>());
let mut new_agg = match s_data.as_mut() {
Some(existing) => {
self.aggregation
.reset_aggregation_data(existing, AM::TEMPORALITY, time);
None
}
None => Some(self.aggregation.new_aggregation_data(AM::TEMPORALITY, time)),
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));

let mut create_point = self.aggregation.build_create_points_fn();
self.aggregate_map
.collect_data_points(s_data.points(), move |a, b| create_point(a, b));

(
s_data.points().len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
/// Later this could be improved to support only Delta temporality
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> DeltaValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for DeltaValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Delta;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
/// Later this could be improved to support only Cumulative temporality
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> CumulativeValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for CumulativeValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Cumulative;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0.collect_readonly(dest, map_fn);
}
}
8 changes: 6 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod aggregate;
mod aggregate_impl;
mod exponential_histogram;
mod histogram;
mod last_value;
Expand All @@ -12,7 +13,10 @@ use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock, RwLock};

use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
use aggregate::{is_under_cardinality_limit, AggregateTimeInitiator, STREAM_CARDINALITY_LIMIT};

pub(crate) use aggregate_impl::AggregationImpl;

pub(crate) use aggregate::{AggregateBuilder, AggregateFns, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use opentelemetry::{otel_warn, KeyValue};
Expand All @@ -25,7 +29,7 @@ fn stream_overflow_attributes() -> &'static Vec<KeyValue> {
STREAM_OVERFLOW_ATTRIBUTES.get_or_init(|| vec![KeyValue::new("otel.metric.overflow", "true")])
}

pub(crate) trait Aggregator {
pub(crate) trait Aggregator: Send + Sync + 'static {
/// A static configuration that is needed in order to initialize aggregator.
/// E.g. bucket_size at creation time .
type InitConfig;
Expand Down
55 changes: 52 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use crate::metrics::data::{self, Aggregation, SumDataPoint};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

use super::aggregate::{AggregateTimeInitiator, AttributeSetFilter};
use super::{Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
use super::aggregate::{AggregateTime, AggregateTimeInitiator, AttributeSetFilter};
use super::{AggregationImpl, Aggregator, AtomicTracker, ComputeAggregation, Measure, Number};
use super::{AtomicallyUpdate, ValueMap};

struct Increment<T>
pub(crate) struct Increment<T>
where
T: AtomicallyUpdate<T>,
{
Expand Down Expand Up @@ -162,3 +162,52 @@ where
}
}
}

pub(crate) struct SumNew {
pub(crate) monotonic: bool,
}

impl<T> AggregationImpl<T> for SumNew
where
T: Number,
{
type Aggr = Increment<T>;
type AggrData = data::Sum<T>;

fn precompute(&self, value: T) -> T {
value
}

fn new_aggregation_data(&self, temporality: Temporality, time: AggregateTime) -> data::Sum<T> {
data::Sum {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality,
is_monotonic: self.monotonic,
}
}

fn reset_aggregation_data(
&self,
existing: &mut data::Sum<T>,
temporality: Temporality,
time: AggregateTime,
) {
existing.data_points.clear();
existing.start_time = time.start;
existing.time = time.current;
existing.temporality = temporality;
existing.is_monotonic = self.monotonic;
}

fn build_create_points_fn(
&self,
) -> impl FnMut(Vec<KeyValue>, &Increment<T>) -> SumDataPoint<T> {
|attributes, aggr| SumDataPoint {
attributes,
value: aggr.value.get_value(),
exemplars: vec![],
}
}
}

0 comments on commit 86e7f72

Please sign in to comment.