Skip to content

Commit

Permalink
[processor/awsentity] Add awsentity processor into EMF pipeline (#1482)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihonl authored Jan 6, 2025
1 parent ef15aa4 commit dfd2070
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 7 deletions.
24 changes: 23 additions & 1 deletion plugins/processors/awsentity/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ func NewFactory() processor.Factory {
return processor.NewFactory(
TypeStr,
createDefaultConfig,
processor.WithMetrics(createMetricsProcessor, stability))
processor.WithMetrics(createMetricsProcessor, stability),
processor.WithLogs(createLogsProcessor, stability),
)
}

func createDefaultConfig() component.Config {
Expand All @@ -53,3 +55,23 @@ func createMetricsProcessor(
metricsProcessor.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}

func createLogsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
processorConfig, ok := cfg.(*Config)
if !ok {
return nil, errors.New("configuration parsing error")
}
logProcessor := newAwsEntityProcessor(processorConfig, set.Logger)
return processorhelper.NewLogsProcessor(
ctx,
set,
cfg,
nextConsumer,
logProcessor.processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}
4 changes: 2 additions & 2 deletions plugins/processors/awsentity/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, mProcessor)

lProcessor, err := factory.CreateLogsProcessor(context.Background(), setting, cfg, consumertest.NewNop())
assert.Equal(t, err, component.ErrDataTypeIsNotSupported)
assert.Nil(t, lProcessor)
assert.NoError(t, err)
assert.NotNil(t, lProcessor)
}
5 changes: 5 additions & 0 deletions plugins/processors/awsentity/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-playground/validator/v10"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"
"go.uber.org/zap"
Expand Down Expand Up @@ -110,6 +111,10 @@ func newAwsEntityProcessor(config *Config, logger *zap.Logger) *awsEntityProcess
}
}

func (p *awsEntityProcessor) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) {
return ld, nil
}

func (p *awsEntityProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
// Get the following metric attributes from the EntityStore: PlatformType, EC2.InstanceId, EC2.AutoScalingGroup

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ extensions:
mode: ec2
region: us-east-1
processors:
awsentity/service/emf:
entity_type: Service
platform: ec2
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -235,6 +238,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ processors:
awsentity/resource:
entity_type: Resource
platform: ec2
awsentity/service/emf:
entity_type: Service
platform: ec2
awsentity/service/telegraf:
entity_type: Service
platform: ec2
Expand Down Expand Up @@ -273,6 +276,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- udplog/emf_logs
Expand Down
4 changes: 4 additions & 0 deletions translator/tocwconfig/sampleConfig/complete_linux_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ processors:
awsentity/resource:
entity_type: Resource
platform: ec2
awsentity/service/emf:
entity_type: Service
platform: ec2
awsentity/service/telegraf:
entity_type: Service
platform: ec2
Expand Down Expand Up @@ -380,6 +383,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- udplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ processors:
awsentity/resource:
entity_type: Resource
platform: ec2
awsentity/service/emf:
entity_type: Service
platform: ec2
awsentity/service/telegraf:
entity_type: Service
platform: ec2
Expand Down Expand Up @@ -260,6 +263,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- udplog/emf_logs
Expand Down
4 changes: 4 additions & 0 deletions translator/tocwconfig/sampleConfig/config_with_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ extensions:
mode: ec2
region: ${ENV_REGION}
processors:
awsentity/service/emf:
entity_type: Service
platform: ec2
batch/emf_logs:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -92,6 +95,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ extensions:
region: us-east-1
shared_credential_file: /root/.aws/credentials
processors:
awsentity/service/emf:
entity_type: Service
platform: onPremise
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -499,6 +502,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@ extensions:
region: us-east-1
shared_credential_file: /root/.aws/credentials
processors:
awsentity/service/emf:
entity_type: Service
platform: onPremise
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -1171,6 +1174,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ extensions:
region: us-east-1
shared_credential_file: /root/.aws/credentials
processors:
awsentity/service/emf:
entity_type: Service
platform: onPremise
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -588,6 +591,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ extensions:
mode: ec2
region: us-east-1
processors:
awsentity/service/emf:
entity_type: Service
platform: ec2
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -322,6 +325,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ extensions:
mode: ec2
region: us-east-1
processors:
awsentity/service/emf:
entity_type: Service
platform: ec2
batch/containerinsights:
metadata_cardinality_limit: 1000
send_batch_max_size: 0
Expand Down Expand Up @@ -491,6 +494,7 @@ service:
exporters:
- awscloudwatchlogs/emf_logs
processors:
- awsentity/service/emf
- batch/emf_logs
receivers:
- tcplog/emf_logs
Expand Down
6 changes: 6 additions & 0 deletions translator/translate/otel/pipeline/emf_logs/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"

"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/exporter/awscloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/extension/agenthealth"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/awsentity"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/tcplog"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/receiver/udplog"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
)

var (
Expand Down Expand Up @@ -54,6 +57,9 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
agenthealth.NewTranslatorWithStatusCode(component.MustNewType("statuscode"), nil, true),
),
}
if !(context.CurrentContext().RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS()) {
translators.Processors.Set(awsentity.NewTranslatorWithEntityType(awsentity.Service, "emf", false))
}
if serviceAddress, ok := common.GetString(conf, serviceAddressEMFKey); ok {
if strings.Contains(serviceAddress, common.Udp) {
translators.Receivers.Set(udplog.NewTranslatorWithName(common.PipelineNameEmfLogs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineType: "logs/emf_logs",
receivers: []string{"tcplog/emf_logs", "udplog/emf_logs"},
processors: []string{"batch/emf_logs"},
processors: []string{"batch/emf_logs", "awsentity/service/emf"},
exporters: []string{"awscloudwatchlogs/emf_logs"},
extensions: []string{"agenthealth/logs", "agenthealth/statuscode"},
},
Expand All @@ -62,7 +62,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineType: "logs/emf_logs",
receivers: []string{"tcplog/emf_logs", "udplog/emf_logs"},
processors: []string{"batch/emf_logs"},
processors: []string{"batch/emf_logs", "awsentity/service/emf"},
exporters: []string{"awscloudwatchlogs/emf_logs"},
extensions: []string{"agenthealth/logs", "agenthealth/statuscode"},
},
Expand All @@ -80,7 +80,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineType: "logs/emf_logs",
receivers: []string{"udplog/emf_logs"},
processors: []string{"batch/emf_logs"},
processors: []string{"batch/emf_logs", "awsentity/service/emf"},
exporters: []string{"awscloudwatchlogs/emf_logs"},
extensions: []string{"agenthealth/logs", "agenthealth/statuscode"},
},
Expand All @@ -98,7 +98,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineType: "logs/emf_logs",
receivers: []string{"tcplog/emf_logs"},
processors: []string{"batch/emf_logs"},
processors: []string{"batch/emf_logs", "awsentity/service/emf"},
exporters: []string{"awscloudwatchlogs/emf_logs"},
extensions: []string{"agenthealth/logs", "agenthealth/statuscode"},
},
Expand Down

0 comments on commit dfd2070

Please sign in to comment.