Skip to content

Commit

Permalink
Merge branch 'main' of github.com:aws/amazon-cloudwatch-agent
Browse files Browse the repository at this point in the history
  • Loading branch information
Paramadon committed Jan 17, 2025
2 parents 9e9bf43 + 444825e commit c32d8ca
Show file tree
Hide file tree
Showing 26 changed files with 2,061 additions and 887 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ PKG_WITH_DATA_RACE += internal/tls
PKG_WITH_DATA_RACE += plugins/inputs/logfile
PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatchlogs
PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals
PKG_WITH_DATA_RACE += plugins/processors/ec2tagger
PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|')
Expand Down
6 changes: 5 additions & 1 deletion logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ type LogEvent interface {
Done()
}

type LogEntityProvider interface {
Entity() *cloudwatchlogs.Entity
}

// A LogSrc is a single source where log events are generated
// e.g. a single log file
type LogSrc interface {
LogEntityProvider
SetOutput(func(LogEvent))
Group() string
Stream() string
Destination() string
Description() string
Retention() int
Class() string
Entity() *cloudwatchlogs.Entity
Stop()
}

Expand Down
34 changes: 34 additions & 0 deletions plugins/outputs/cloudwatchlogs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Amazon CloudWatch Logs Output Plugin

For each configured target (log group/stream), the output plugin maintains a queue for log events that it batches.
Once each batch is full or the flush interval is reached, the current batch is sent using the PutLogEvents API to Amazon CloudWatch.

When concurrency is enabled, the pusher uses a shared worker pool to allow multiple concurrent sends.
```
Target #1 (Log Group/Stream) ┌──Shared Worker Pool──┐
┌──────────────────────────────────────────────────────────────────┐ │ │
│ │ │ ┌──Worker 1──┐ │
│ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ ┌────────┐ │ │
│ │ │ │ ┌───────────────────┐ │ │ ┌──────┼───►│ │ Sender │ │ │
│ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ │ └────────┘ │ │
AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ └────────────┘ │
│ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ │ │ │
│ │ │ │ └───────────────────┘ │ │ │ │ ┌──Worker 2──┐ │
│ └───────────────────────────┘ └───────────────────────┘ │ │ │ │ ┌────────┐ │ │
│ │ ┼──────┼───►│ │ Sender │ │ │
└──────────────────────────────────────────────────────────────────┘ │ │ │ └────────┘ │ │
│ │ └────────────┘ │
│ │ │
Target #2 (Log Group/Stream) │ │ . │
┌──────────────────────────────────────────────────────────────────┐ │ │ . │
│ │ │ │ . │
│ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ │
│ │ │ │ ┌───────────────────┐ │ │ │ │ │
│ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ ┌──Worker n──┐ │
AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ │ ┌────────┐ │ │
│ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ └──────┼───►│ │ Sender │ │ │
│ │ │ │ └───────────────────┘ │ │ │ │ └────────┘ │ │
│ └───────────────────────────┘ └───────────────────────┘ │ │ └────────────┘ │
│ │ │ │
└──────────────────────────────────────────────────────────────────┘ └──────────────────────┘
```
80 changes: 41 additions & 39 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"go.uber.org/zap"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)
Expand All @@ -39,9 +39,6 @@ const (
LogEntryField = "value"

defaultFlushTimeout = 5 * time.Second
eventHeaderSize = 200
truncatedSuffix = "[Truncated...]"
msgSizeLimit = 256*1024 - eventHeaderSize

maxRetryTimeout = 14*24*time.Hour + 10*time.Minute
metricRetryTimeout = 2 * time.Minute
Expand Down Expand Up @@ -71,14 +68,18 @@ type CloudWatchLogs struct {

// Retention for log group
RetentionInDays int `toml:"retention_in_days"`
Concurrency int `toml:"concurrency"`

ForceFlushInterval internal.Duration `toml:"force_flush_interval"` // unit is second

Log telegraf.Logger `toml:"-"`

pusherStopChan chan struct{}
pusherWaitGroup sync.WaitGroup
cwDests map[Target]*cwDest
cwDests map[pusher.Target]*cwDest
workerPool pusher.WorkerPool
targetManager pusher.TargetManager
once sync.Once
middleware awsmiddleware.Middleware
}

Expand All @@ -93,6 +94,9 @@ func (c *CloudWatchLogs) Close() error {
for _, d := range c.cwDests {
d.Stop()
}
if c.workerPool != nil {
c.workerPool.Stop()
}

return nil
}
Expand All @@ -115,7 +119,7 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou
retention = -1
}

t := Target{
t := pusher.Target{
Group: group,
Stream: stream,
Retention: retention,
Expand All @@ -124,11 +128,31 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou
return c.getDest(t, logSrc)
}

func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest {
func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
if cwd, ok := c.cwDests[t]; ok {
return cwd
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := c.createClient(logThrottleRetryer)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
useragent.Get().SetContainerInsightsFlag()
}
c.once.Do(func() {
if c.Concurrency > 0 {
c.workerPool = pusher.NewWorkerPool(c.Concurrency)
}
c.targetManager = pusher.NewTargetManager(c.Log, client)
})
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup)
cwd := &cwDest{pusher: p, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
}

func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlogs.CloudWatchLogs {
credentialConfig := &configaws.CredentialConfig{
Region: c.Region,
AccessKey: c.AccessKey,
Expand All @@ -138,34 +162,24 @@ func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest {
Filename: c.Filename,
Token: c.Token,
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := cloudwatchlogs.New(
credentialConfig.Credentials(),
&aws.Config{
Endpoint: aws.String(c.EndpointOverride),
Retryer: logThrottleRetryer,
Retryer: retryer,
LogLevel: configaws.SDKLogLevel(),
Logger: configaws.SDKLogger{},
},
)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
useragent.Get().SetContainerInsightsFlag()
}
client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"}))
if c.middleware != nil {
if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil {
c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err)
} else {
c.Log.Info("Configured middleware on AWS client")
c.Log.Debug("Configured middleware on AWS client")
}
}
pusher := NewPusher(c.Region, t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup, logSrc)
cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
return client
}

func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
Expand All @@ -179,7 +193,7 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
return
}
cwd.switchToEMF()
cwd.pusher.RetryDuration = metricRetryTimeout
cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout)

e := c.getLogEventFromMetric(m)
if e == nil {
Expand All @@ -189,11 +203,11 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
cwd.AddEvent(e)
}

func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error) {
func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) {
tags := m.Tags()
logGroup, ok := tags[LogGroupNameTag]
if !ok {
return Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
} else {
m.RemoveTag(LogGroupNameTag)
}
Expand All @@ -205,7 +219,7 @@ func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error)
logStream = c.LogStreamName
}

return Target{logGroup, logStream, util.StandardLogGroupClass, -1}, nil
return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil
}

func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent {
Expand Down Expand Up @@ -299,7 +313,7 @@ func (e *structuredLogEvent) Time() time.Time {
func (e *structuredLogEvent) Done() {}

type cwDest struct {
*pusher
pusher *pusher.Pusher
sync.Mutex
isEMF bool
stopped bool
Expand Down Expand Up @@ -341,25 +355,13 @@ func (cd *cwDest) switchToEMF() {
defer cd.Unlock()
if !cd.isEMF {
cd.isEMF = true
cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs)
cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs)
if ok {
cwl.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("x-amzn-logs-format", "json/emf"))
}
}
}

func (cd *cwDest) setRetryer(r request.Retryer) {
cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs)
if ok {
cwl.Retryer = r
}
}

type Target struct {
Group, Stream, Class string
Retention int
}

// Description returns a one-sentence description on the Output
func (c *CloudWatchLogs) Description() string {
return "Configuration for AWS CloudWatchLogs output."
Expand Down Expand Up @@ -398,7 +400,7 @@ func init() {
return &CloudWatchLogs{
ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout},
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
middleware: agenthealth.NewAgentHealth(
zap.NewNop(),
&agenthealth.Config{
Expand Down
11 changes: 7 additions & 4 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ package cloudwatchlogs
import (
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

// TestCreateDestination would create different destination for cloudwatchlogs endpoint based on the log group, log stream,
// and log group's retention
func TestCreateDestination(t *testing.T) {

testCases := map[string]struct {
cfgLogGroup string
cfgLogStream string
Expand Down Expand Up @@ -68,28 +69,30 @@ func TestCreateDestination(t *testing.T) {
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
LogGroupName: "G1",
LogStreamName: "S1",
AccessKey: "access_key",
SecretKey: "secret_key",
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
}
dest := c.CreateDest(testCase.cfgLogGroup, testCase.cfgLogStream, testCase.cfgLogRetention, testCase.cfgLogClass, testCase.cfgTailerSrc).(*cwDest)
require.Equal(t, testCase.expectedLogGroup, dest.pusher.Group)
require.Equal(t, testCase.expectedLogStream, dest.pusher.Stream)
require.Equal(t, testCase.expectedLogGroupRetention, dest.pusher.Retention)
require.Equal(t, testCase.expectedLogClass, dest.pusher.Class)
require.Equal(t, testCase.expectedTailerSrc, dest.pusher.logSrc)
require.Equal(t, testCase.expectedTailerSrc, dest.pusher.EntityProvider)
})
}
}

func TestDuplicateDestination(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
AccessKey: "access_key",
SecretKey: "secret_key",
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
pusherStopChan: make(chan struct{}),
}
// Given the same log group, log stream, same retention, and logClass
Expand Down
Loading

0 comments on commit c32d8ca

Please sign in to comment.