From bdd661e1779c6a5235c318c5ce995929bef47147 Mon Sep 17 00:00:00 2001 From: Zhou Date: Fri, 8 Nov 2024 11:00:08 -0800 Subject: [PATCH] Add aggregation mutator --- .../aggregation/aggregation_mutator.go | 67 +++++++++++++++ .../aggregation/aggregation_mutator_test.go | 84 +++++++++++++++++++ .../awsapplicationsignals/processor.go | 21 +++-- 3 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator.go create mode 100644 plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator_test.go diff --git a/plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator.go b/plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator.go new file mode 100644 index 0000000000..edb1e5ff13 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator.go @@ -0,0 +1,67 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package aggregation + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type aggregationType int + +const ( + defaultAggregation aggregationType = iota + lastValueAggregation +) + +// AggregationMutator is used to convert predefined ObservableUpDownCounter metrics to use LastValue aggregation. This +// is necessary for cases where metrics are instrumented as cumulative, yet reported with snapshot values. +// +// For example, metrics like DotNetGCGen0HeapSize may report values such as 1000, 2000, 1000, with cumulative temporality +// When exporters, such as the EMF exporter, detect these as cumulative, they convert the values to deltas, +// resulting in outputs like -, 1000, -1000, which misrepresent the data. +// +// Normally, this issue could be resolved by configuring a view with LastValue aggregation within the SDK. +// However, since the view feature is not fully supported in .NET, this workaround implements the required +// conversion to LastValue aggregation to ensure accurate metric reporting. +// See https://github.com/open-telemetry/opentelemetry-dotnet/issues/2618. +type AggregationMutator struct { + includes map[string]aggregationType +} + +func NewAggregationMutator() AggregationMutator { + return newAggregationMutatorWithConfig(map[string]aggregationType{ + "DotNetGCGen0HeapSize": lastValueAggregation, + "DotNetGCGen1HeapSize": lastValueAggregation, + "DotNetGCGen2HeapSize": lastValueAggregation, + "DotNetGCLOHHeapSize": lastValueAggregation, + "DotNetGCPOHHeapSize": lastValueAggregation, + "DotNetThreadCount": lastValueAggregation, + "DotNetThreadQueueLength": lastValueAggregation, + }) +} + +func newAggregationMutatorWithConfig(includes map[string]aggregationType) AggregationMutator { + return AggregationMutator{ + includes, + } +} + +func (t *AggregationMutator) ProcessMetrics(_ context.Context, m pmetric.Metric, _ pcommon.Map) { + aggType, exists := t.includes[m.Name()] + if !exists || aggType == defaultAggregation { + return + } + switch m.Type() { + case pmetric.MetricTypeSum: + switch aggType { + case lastValueAggregation: + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + default: + } + default: + } +} diff --git a/plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator_test.go b/plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator_test.go new file mode 100644 index 0000000000..0bcfb30a14 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/aggregation/aggregation_mutator_test.go @@ -0,0 +1,84 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package aggregation + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestAggregationMutator_ProcessMetrics(t *testing.T) { + tests := []struct { + name string + config map[string]aggregationType + metrics []pmetric.Metric + expectedTemporality map[string]pmetric.AggregationTemporality + }{ + { + "testCumulativeToDelta", + map[string]aggregationType{ + "test0": lastValueAggregation, + }, + + []pmetric.Metric{ + generateMetricWithSumAggregation("test0", pmetric.AggregationTemporalityCumulative), + }, + map[string]pmetric.AggregationTemporality{ + "test0": pmetric.AggregationTemporalityDelta, + }, + }, + { + "testNoChange", + map[string]aggregationType{ + "test0": lastValueAggregation, + "test1": defaultAggregation, + }, + []pmetric.Metric{ + generateMetricWithSumAggregation("test0", pmetric.AggregationTemporalityDelta), + generateMetricWithSumAggregation("test1", pmetric.AggregationTemporalityCumulative), + generateMetricWithSumAggregation("test2", pmetric.AggregationTemporalityCumulative), + }, + map[string]pmetric.AggregationTemporality{ + "test0": pmetric.AggregationTemporalityDelta, + "test1": pmetric.AggregationTemporalityCumulative, + "test2": pmetric.AggregationTemporalityCumulative, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t1 *testing.T) { + mutator := newAggregationMutatorWithConfig(tt.config) + + for _, m := range tt.metrics { + mutator.ProcessMetrics(nil, m, pcommon.NewMap()) + assert.Equal(t1, tt.expectedTemporality[m.Name()], m.Sum().AggregationTemporality()) + } + }) + } + + mutator := NewAggregationMutator() + + m := generateMetricWithSumAggregation("DotNetGCGen0HeapSize", pmetric.AggregationTemporalityCumulative) + mutator.ProcessMetrics(nil, m, pcommon.NewMap()) + assert.Equal(t, pmetric.MetricTypeSum, m.Type()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, m.Sum().AggregationTemporality()) + + m.SetEmptyHistogram() + m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + mutator.ProcessMetrics(nil, m, pcommon.NewMap()) + assert.Equal(t, pmetric.MetricTypeHistogram, m.Type()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, m.Histogram().AggregationTemporality()) + +} + +func generateMetricWithSumAggregation(metricName string, temporality pmetric.AggregationTemporality) pmetric.Metric { + m := pmetric.NewMetrics().ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + m.SetName(metricName) + m.SetEmptySum() + m.Sum().SetAggregationTemporality(temporality) + return m +} diff --git a/plugins/processors/awsapplicationsignals/processor.go b/plugins/processors/awsapplicationsignals/processor.go index 784ee9dd96..c4a17a52a9 100644 --- a/plugins/processors/awsapplicationsignals/processor.go +++ b/plugins/processors/awsapplicationsignals/processor.go @@ -16,6 +16,7 @@ import ( "golang.org/x/text/language" appsignalsconfig "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config" + "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/aggregation" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/cardinalitycontrol" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/normalizer" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/internal/prune" @@ -44,14 +45,15 @@ type stopper interface { } type awsapplicationsignalsprocessor struct { - logger *zap.Logger - config *appsignalsconfig.Config - replaceActions *rules.ReplaceActions - allowlistMutators []allowListMutator - metricMutators []attributesMutator - traceMutators []attributesMutator - limiter cardinalitycontrol.Limiter - stoppers []stopper + logger *zap.Logger + config *appsignalsconfig.Config + replaceActions *rules.ReplaceActions + allowlistMutators []allowListMutator + metricMutators []attributesMutator + traceMutators []attributesMutator + limiter cardinalitycontrol.Limiter + aggregationMutator aggregation.AggregationMutator + stoppers []stopper } func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ component.Host) error { @@ -81,6 +83,8 @@ func (ap *awsapplicationsignalsprocessor) StartMetrics(ctx context.Context, _ co dropper := rules.NewDropper(ap.config.Rules) ap.allowlistMutators = []allowListMutator{pruner, keeper, dropper} + ap.aggregationMutator = aggregation.NewAggregationMutator() + return nil } @@ -143,6 +147,7 @@ func (ap *awsapplicationsignalsprocessor) processMetrics(ctx context.Context, md m.SetName(metricCaser.String(m.Name())) // Ensure metric name is in sentence case } ap.processMetricAttributes(ctx, m, resourceAttributes) + ap.aggregationMutator.ProcessMetrics(ctx, m, resourceAttributes) } } }