Skip to content

Commit

Permalink
[BugFix] Set EKS.Cluster and K8s.Cluster without modifying global ent…
Browse files Browse the repository at this point in the history
…ity attribute map to prevent concurrent map writes (#1434)
  • Loading branch information
lisguo authored Nov 20, 2024
1 parent ed0c1ae commit 1b8ad16
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 295 deletions.
39 changes: 1 addition & 38 deletions plugins/outputs/cloudwatch/convert_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func ConvertOtelMetric(m pmetric.Metric, entity cloudwatch.Entity) []*aggregatio
func ConvertOtelMetrics(m pmetric.Metrics) []*aggregationDatum {
datums := make([]*aggregationDatum, 0, m.DataPointCount())
for i := 0; i < m.ResourceMetrics().Len(); i++ {
entity := fetchEntityFields(m.ResourceMetrics().At(i).Resource().Attributes())
entity := entityattributes.CreateCloudWatchEntityFromAttributes(m.ResourceMetrics().At(i).Resource().Attributes())
scopeMetrics := m.ResourceMetrics().At(i).ScopeMetrics()
for j := 0; j < scopeMetrics.Len(); j++ {
metrics := scopeMetrics.At(j).Metrics()
Expand All @@ -184,40 +184,3 @@ func ConvertOtelMetrics(m pmetric.Metrics) []*aggregationDatum {
}
return datums
}

func fetchEntityFields(resourceAttributes pcommon.Map) cloudwatch.Entity {
keyAttributesMap := map[string]*string{}
attributeMap := map[string]*string{}
platformType := ""
if platformTypeValue, ok := resourceAttributes.Get(entityattributes.AttributeEntityPlatformType); ok {
platformType = platformTypeValue.Str()
}
processEntityAttributes(entityattributes.GetKeyAttributeEntityShortNameMap(), keyAttributesMap, resourceAttributes)
processEntityAttributes(entityattributes.GetAttributeEntityShortNameMap(platformType), attributeMap, resourceAttributes)
removeEntityFields(resourceAttributes)
if _, ok := keyAttributesMap[entityattributes.AwsAccountId]; !ok {
return cloudwatch.Entity{}
}
return cloudwatch.Entity{
KeyAttributes: keyAttributesMap,
Attributes: attributeMap,
}
}

// processEntityAttributes fetches the fields with entity prefix and creates an entity to be sent at the PutMetricData call.
func processEntityAttributes(entityMap map[string]string, targetMap map[string]*string, mutableResourceAttributes pcommon.Map) {
for entityField, shortName := range entityMap {
if val, ok := mutableResourceAttributes.Get(entityField); ok {
if strVal := val.Str(); strVal != "" {
targetMap[shortName] = aws.String(strVal)
}
}
}
}

// removeEntityFields so that it is not tagged as a dimension, and reduces the size of the PMD payload.
func removeEntityFields(mutableResourceAttributes pcommon.Map) {
mutableResourceAttributes.RemoveIf(func(s string, _ pcommon.Value) bool {
return strings.HasPrefix(s, entityattributes.AWSEntityPrefix)
})
}
250 changes: 0 additions & 250 deletions plugins/outputs/cloudwatch/convert_otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,256 +242,6 @@ func TestConvertOtelMetrics_Entity(t *testing.T) {

}

func TestProcessAndRemoveEntityAttributes(t *testing.T) {
testCases := []struct {
name string
resourceAttributes map[string]any
wantedAttributes map[string]*string
leftoverAttributes map[string]any
}{
{
name: "key_attributes",
resourceAttributes: map[string]any{
entityattributes.AttributeEntityServiceName: "my-service",
entityattributes.AttributeEntityDeploymentEnvironment: "my-environment",
},
wantedAttributes: map[string]*string{
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
},
leftoverAttributes: make(map[string]any),
},
{
name: "non-key_attributes",
resourceAttributes: map[string]any{
entityattributes.AttributeEntityCluster: "my-cluster",
entityattributes.AttributeEntityNamespace: "my-namespace",
entityattributes.AttributeEntityNode: "my-node",
entityattributes.AttributeEntityWorkload: "my-workload",
entityattributes.AttributeEntityPlatformType: "AWS::EKS",
},
wantedAttributes: map[string]*string{
entityattributes.EksCluster: aws.String("my-cluster"),
entityattributes.NamespaceField: aws.String("my-namespace"),
entityattributes.Node: aws.String("my-node"),
entityattributes.Workload: aws.String("my-workload"),
entityattributes.Platform: aws.String("AWS::EKS"),
},
leftoverAttributes: make(map[string]any),
},
{
name: "key_and_non_key_attributes",
resourceAttributes: map[string]any{
entityattributes.AttributeEntityServiceName: "my-service",
entityattributes.AttributeEntityDeploymentEnvironment: "my-environment",
entityattributes.AttributeEntityCluster: "my-cluster",
entityattributes.AttributeEntityNamespace: "my-namespace",
entityattributes.AttributeEntityNode: "my-node",
entityattributes.AttributeEntityWorkload: "my-workload",
entityattributes.AttributeEntityPlatformType: "K8s",
},
wantedAttributes: map[string]*string{
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.K8sCluster: aws.String("my-cluster"),
entityattributes.NamespaceField: aws.String("my-namespace"),
entityattributes.Node: aws.String("my-node"),
entityattributes.Workload: aws.String("my-workload"),
entityattributes.Platform: aws.String("K8s"),
},
leftoverAttributes: make(map[string]any),
},
{
name: "key_and_non_key_attributes_plus_extras",
resourceAttributes: map[string]any{
"extra_attribute": "extra_value",
entityattributes.AttributeEntityServiceName: "my-service",
entityattributes.AttributeEntityDeploymentEnvironment: "my-environment",
entityattributes.AttributeEntityCluster: "my-cluster",
entityattributes.AttributeEntityNamespace: "my-namespace",
entityattributes.AttributeEntityNode: "my-node",
entityattributes.AttributeEntityWorkload: "my-workload",
entityattributes.AttributeEntityPlatformType: "K8s",
},
wantedAttributes: map[string]*string{
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.K8sCluster: aws.String("my-cluster"),
entityattributes.NamespaceField: aws.String("my-namespace"),
entityattributes.Node: aws.String("my-node"),
entityattributes.Workload: aws.String("my-workload"),
entityattributes.Platform: aws.String("K8s"),
},
leftoverAttributes: map[string]any{
"extra_attribute": "extra_value",
},
},
{
name: "key_and_non_key_attributes_plus_unsupported_entity_field",
resourceAttributes: map[string]any{
entityattributes.AWSEntityPrefix + "not.real.values": "unsupported",
entityattributes.AttributeEntityServiceName: "my-service",
entityattributes.AttributeEntityDeploymentEnvironment: "my-environment",
entityattributes.AttributeEntityCluster: "my-cluster",
entityattributes.AttributeEntityNamespace: "my-namespace",
entityattributes.AttributeEntityNode: "my-node",
entityattributes.AttributeEntityWorkload: "my-workload",
entityattributes.AttributeEntityPlatformType: "AWS::EKS",
},
wantedAttributes: map[string]*string{
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.EksCluster: aws.String("my-cluster"),
entityattributes.NamespaceField: aws.String("my-namespace"),
entityattributes.Node: aws.String("my-node"),
entityattributes.Workload: aws.String("my-workload"),
entityattributes.Platform: aws.String("AWS::EKS"),
},
leftoverAttributes: map[string]any{},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
attrs := pcommon.NewMap()
err := attrs.FromRaw(tc.resourceAttributes)

// resetting fields for current test case
entityAttrMap := []map[string]string{entityattributes.GetKeyAttributeEntityShortNameMap()}
platformType := ""
if platformTypeValue, ok := attrs.Get(entityattributes.AttributeEntityPlatformType); ok {
platformType = platformTypeValue.Str()
}
if platformType != "" {
delete(entityattributes.GetAttributeEntityShortNameMap(platformType), entityattributes.AttributeEntityCluster)
entityAttrMap = append(entityAttrMap, entityattributes.GetAttributeEntityShortNameMap(platformType))
}
assert.Nil(t, err)
targetMap := make(map[string]*string)
for _, entityMap := range entityAttrMap {
processEntityAttributes(entityMap, targetMap, attrs)
}
removeEntityFields(attrs)
assert.Equal(t, tc.leftoverAttributes, attrs.AsRaw())
assert.Equal(t, tc.wantedAttributes, targetMap)
})
}
}

func TestFetchEntityFields_WithoutAccountID(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityType, "Service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNode, "my-node")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityCluster, "my-cluster")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNamespace, "my-namespace")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityWorkload, "my-workload")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "AWS::EKS")
assert.Equal(t, 8, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: nil,
Attributes: nil,
}
entity := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())
assert.Equal(t, expectedEntity, entity)
}

func TestFetchEntityFields_WithAccountID(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityType, "Service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNode, "my-node")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityCluster, "my-cluster")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNamespace, "my-namespace")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityWorkload, "my-workload")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "AWS::EKS")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityAwsAccountId, "123456789")
assert.Equal(t, 9, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: map[string]*string{
entityattributes.EntityType: aws.String(entityattributes.Service),
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.AwsAccountId: aws.String("123456789"),
},
Attributes: map[string]*string{
entityattributes.Node: aws.String("my-node"),
entityattributes.EksCluster: aws.String("my-cluster"),
entityattributes.NamespaceField: aws.String("my-namespace"),
entityattributes.Workload: aws.String("my-workload"),
entityattributes.Platform: aws.String("AWS::EKS"),
},
}
entity := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())
assert.Equal(t, expectedEntity, entity)
}

func TestFetchEntityFieldsOnK8s(t *testing.T) {
entityMap := entityattributes.GetAttributeEntityShortNameMap("")
delete(entityMap, entityattributes.AttributeEntityCluster)
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityType, "Service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNode, "my-node")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityCluster, "my-cluster")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityNamespace, "my-namespace")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityWorkload, "my-workload")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "K8s")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityAwsAccountId, "123456789")
assert.Equal(t, 9, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: map[string]*string{
entityattributes.EntityType: aws.String(entityattributes.Service),
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.AwsAccountId: aws.String("123456789"),
},
Attributes: map[string]*string{
entityattributes.Node: aws.String("my-node"),
entityattributes.K8sCluster: aws.String("my-cluster"),
entityattributes.NamespaceField: aws.String("my-namespace"),
entityattributes.Workload: aws.String("my-workload"),
entityattributes.Platform: aws.String("K8s"),
},
}
entity := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())
assert.Equal(t, expectedEntity, entity)
}

func TestFetchEntityFieldsOnEc2(t *testing.T) {
resourceMetrics := pmetric.NewResourceMetrics()
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityType, "Service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityDeploymentEnvironment, "my-environment")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityServiceName, "my-service")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityPlatformType, "AWS::EC2")
resourceMetrics.Resource().Attributes().PutStr(entityattributes.AttributeEntityAwsAccountId, "123456789")
assert.Equal(t, 5, resourceMetrics.Resource().Attributes().Len())

expectedEntity := cloudwatch.Entity{
KeyAttributes: map[string]*string{
entityattributes.EntityType: aws.String(entityattributes.Service),
entityattributes.ServiceName: aws.String("my-service"),
entityattributes.DeploymentEnvironment: aws.String("my-environment"),
entityattributes.AwsAccountId: aws.String("123456789"),
},
Attributes: map[string]*string{
entityattributes.Platform: aws.String("AWS::EC2"),
},
}
entity := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())
assert.Equal(t, expectedEntity, entity)
}

func TestInvalidMetric(t *testing.T) {
m := pmetric.NewMetric()
m.SetName("name")
Expand Down
56 changes: 49 additions & 7 deletions plugins/processors/awsentity/entityattributes/entityattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

package entityattributes

import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatch"
)

const (

// The following are the possible values for EntityType config options
Expand Down Expand Up @@ -80,16 +89,42 @@ var attributeEntityToShortNameMap = map[string]string{
AttributeEntityServiceNameSource: ServiceNameSource,
}

func GetKeyAttributeEntityShortNameMap() map[string]string {
return keyAttributeEntityToShortNameMap
func CreateCloudWatchEntityFromAttributes(resourceAttributes pcommon.Map) cloudwatch.Entity {
keyAttributesMap := map[string]*string{}
attributeMap := map[string]*string{}

// Process KeyAttributes and return empty entity if AwsAccountId is not found
processEntityAttributes(keyAttributeEntityToShortNameMap, keyAttributesMap, resourceAttributes)
if _, ok := keyAttributesMap[AwsAccountId]; !ok {
return cloudwatch.Entity{}
}

// Process Attributes and add cluster attribute if on EKS/K8s
processEntityAttributes(attributeEntityToShortNameMap, attributeMap, resourceAttributes)
if platformTypeValue, ok := resourceAttributes.Get(AttributeEntityPlatformType); ok {
platformType := clusterType(platformTypeValue.Str())
if clusterNameValue, ok := resourceAttributes.Get(AttributeEntityCluster); ok {
attributeMap[platformType] = aws.String(clusterNameValue.Str())
}
}

// Remove entity fields from attributes and return the entity
removeEntityFields(resourceAttributes)
return cloudwatch.Entity{
KeyAttributes: keyAttributesMap,
Attributes: attributeMap,
}
}

// Cluster attribute prefix could be either EKS or K8s. We set the field once at runtime.
func GetAttributeEntityShortNameMap(platformType string) map[string]string {
if _, ok := attributeEntityToShortNameMap[AttributeEntityCluster]; !ok {
attributeEntityToShortNameMap[AttributeEntityCluster] = clusterType(platformType)
// processEntityAttributes fetches the fields with entity prefix and creates an entity to be sent at the PutMetricData call.
func processEntityAttributes(entityMap map[string]string, targetMap map[string]*string, incomingResourceAttributes pcommon.Map) {
for entityField, shortName := range entityMap {
if val, ok := incomingResourceAttributes.Get(entityField); ok {
if strVal := val.Str(); strVal != "" {
targetMap[shortName] = aws.String(strVal)
}
}
}
return attributeEntityToShortNameMap
}

func clusterType(platformType string) string {
Expand All @@ -100,3 +135,10 @@ func clusterType(platformType string) string {
}
return ""
}

// removeEntityFields so that it is not tagged as a dimension, and reduces the size of the PMD payload.
func removeEntityFields(mutableResourceAttributes pcommon.Map) {
mutableResourceAttributes.RemoveIf(func(s string, _ pcommon.Value) bool {
return strings.HasPrefix(s, AWSEntityPrefix)
})
}
Loading

0 comments on commit 1b8ad16

Please sign in to comment.