Skip to content

Commit

Permalink
Add aggregation mutator
Browse files Browse the repository at this point in the history
  • Loading branch information
bjrara committed Dec 31, 2024
1 parent 6e2aff9 commit bdd661e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package aggregation

import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type aggregationType int

const (
defaultAggregation aggregationType = iota
lastValueAggregation
)

// AggregationMutator is used to convert predefined ObservableUpDownCounter metrics to use LastValue aggregation. This
// is necessary for cases where metrics are instrumented as cumulative, yet reported with snapshot values.
//
// For example, metrics like DotNetGCGen0HeapSize may report values such as 1000, 2000, 1000, with cumulative temporality
// When exporters, such as the EMF exporter, detect these as cumulative, they convert the values to deltas,
// resulting in outputs like -, 1000, -1000, which misrepresent the data.
//
// Normally, this issue could be resolved by configuring a view with LastValue aggregation within the SDK.
// However, since the view feature is not fully supported in .NET, this workaround implements the required
// conversion to LastValue aggregation to ensure accurate metric reporting.
// See https://github.com/open-telemetry/opentelemetry-dotnet/issues/2618.
type AggregationMutator struct {
includes map[string]aggregationType
}

func NewAggregationMutator() AggregationMutator {
return newAggregationMutatorWithConfig(map[string]aggregationType{
"DotNetGCGen0HeapSize": lastValueAggregation,
"DotNetGCGen1HeapSize": lastValueAggregation,
"DotNetGCGen2HeapSize": lastValueAggregation,
"DotNetGCLOHHeapSize": lastValueAggregation,
"DotNetGCPOHHeapSize": lastValueAggregation,
"DotNetThreadCount": lastValueAggregation,
"DotNetThreadQueueLength": lastValueAggregation,
})
}

func newAggregationMutatorWithConfig(includes map[string]aggregationType) AggregationMutator {
return AggregationMutator{
includes,
}
}

func (t *AggregationMutator) ProcessMetrics(_ context.Context, m pmetric.Metric, _ pcommon.Map) {
aggType, exists := t.includes[m.Name()]
if !exists || aggType == defaultAggregation {
return
}
switch m.Type() {
case pmetric.MetricTypeSum:
switch aggType {
case lastValueAggregation:
m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
default:
}
default:
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package aggregation

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

func TestAggregationMutator_ProcessMetrics(t *testing.T) {
tests := []struct {
name string
config map[string]aggregationType
metrics []pmetric.Metric
expectedTemporality map[string]pmetric.AggregationTemporality
}{
{
"testCumulativeToDelta",
map[string]aggregationType{
"test0": lastValueAggregation,
},

[]pmetric.Metric{
generateMetricWithSumAggregation("test0", pmetric.AggregationTemporalityCumulative),
},
map[string]pmetric.AggregationTemporality{
"test0": pmetric.AggregationTemporalityDelta,
},
},
{
"testNoChange",
map[string]aggregationType{
"test0": lastValueAggregation,
"test1": defaultAggregation,
},
[]pmetric.Metric{
generateMetricWithSumAggregation("test0", pmetric.AggregationTemporalityDelta),
generateMetricWithSumAggregation("test1", pmetric.AggregationTemporalityCumulative),
generateMetricWithSumAggregation("test2", pmetric.AggregationTemporalityCumulative),
},
map[string]pmetric.AggregationTemporality{
"test0": pmetric.AggregationTemporalityDelta,
"test1": pmetric.AggregationTemporalityCumulative,
"test2": pmetric.AggregationTemporalityCumulative,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t1 *testing.T) {
mutator := newAggregationMutatorWithConfig(tt.config)

for _, m := range tt.metrics {
mutator.ProcessMetrics(nil, m, pcommon.NewMap())
assert.Equal(t1, tt.expectedTemporality[m.Name()], m.Sum().AggregationTemporality())
}
})
}

mutator := NewAggregationMutator()

m := generateMetricWithSumAggregation("DotNetGCGen0HeapSize", pmetric.AggregationTemporalityCumulative)
mutator.ProcessMetrics(nil, m, pcommon.NewMap())
assert.Equal(t, pmetric.MetricTypeSum, m.Type())
assert.Equal(t, pmetric.AggregationTemporalityDelta, m.Sum().AggregationTemporality())

m.SetEmptyHistogram()
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
mutator.ProcessMetrics(nil, m, pcommon.NewMap())
assert.Equal(t, pmetric.MetricTypeHistogram, m.Type())
assert.Equal(t, pmetric.AggregationTemporalityCumulative, m.Histogram().AggregationTemporality())

}

func generateMetricWithSumAggregation(metricName string, temporality pmetric.AggregationTemporality) pmetric.Metric {
m := pmetric.NewMetrics().ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName(metricName)
m.SetEmptySum()
m.Sum().SetAggregationTemporality(temporality)
return m
}
21 changes: 13 additions & 8 deletions plugins/processors/awsapplicationsignals/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"golang.org/x/text/language"

appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/aggregation"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/cardinalitycontrol"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/normalizer"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/prune"
Expand Down Expand Up @@ -44,14 +45,15 @@ type stopper interface {
}

type awsapplicationsignalsprocessor struct {
logger *zap.Logger
config *appsignalsconfig.Config
replaceActions *rules.ReplaceActions
allowlistMutators []allowListMutator
metricMutators []attributesMutator
traceMutators []attributesMutator
limiter cardinalitycontrol.Limiter
stoppers []stopper
logger *zap.Logger
config *appsignalsconfig.Config
replaceActions *rules.ReplaceActions
allowlistMutators []allowListMutator
metricMutators []attributesMutator
traceMutators []attributesMutator
limiter cardinalitycontrol.Limiter
aggregationMutator aggregation.AggregationMutator
stoppers []stopper
}

func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ component.Host) error {
Expand Down Expand Up @@ -81,6 +83,8 @@ func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ co
dropper := rules.NewDropper(ap.config.Rules)
ap.allowlistMutators = []allowListMutator{pruner, keeper, dropper}

ap.aggregationMutator = aggregation.NewAggregationMutator()

return nil
}

Expand Down Expand Up @@ -143,6 +147,7 @@ func (ap *awsapplicationsignalsprocessor) processMetrics(ctx context.Context, md
m.SetName(metricCaser.String(m.Name())) // Ensure metric name is in sentence case
}
ap.processMetricAttributes(ctx, m, resourceAttributes)
ap.aggregationMutator.ProcessMetrics(ctx, m, resourceAttributes)
}
}
}
Expand Down

0 comments on commit bdd661e

Please sign in to comment.