Skip to content

Commit

Permalink
Add worker pool for log pushers (#1499)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Jan 16, 2025
1 parent 09f8b09 commit 444825e
Show file tree
Hide file tree
Showing 26 changed files with 2,061 additions and 887 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ PKG_WITH_DATA_RACE += internal/tls
PKG_WITH_DATA_RACE += plugins/inputs/logfile
PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatchlogs
PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals
PKG_WITH_DATA_RACE += plugins/processors/ec2tagger
PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|')
Expand Down
6 changes: 5 additions & 1 deletion logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ type LogEvent interface {
Done()
}

type LogEntityProvider interface {
Entity() *cloudwatchlogs.Entity
}

// A LogSrc is a single source where log events are generated
// e.g. a single log file
type LogSrc interface {
LogEntityProvider
SetOutput(func(LogEvent))
Group() string
Stream() string
Destination() string
Description() string
Retention() int
Class() string
Entity() *cloudwatchlogs.Entity
Stop()
}

Expand Down
34 changes: 34 additions & 0 deletions plugins/outputs/cloudwatchlogs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Amazon CloudWatch Logs Output Plugin

For each configured target (log group/stream), the output plugin maintains a queue for log events that it batches.
Once each batch is full or the flush interval is reached, the current batch is sent using the PutLogEvents API to Amazon CloudWatch.

When concurrency is enabled, the pusher uses a shared worker pool to allow multiple concurrent sends.
```
Target #1 (Log Group/Stream) ┌──Shared Worker Pool──┐
┌──────────────────────────────────────────────────────────────────┐ │ │
│ │ │ ┌──Worker 1──┐ │
│ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ ┌────────┐ │ │
│ │ │ │ ┌───────────────────┐ │ │ ┌──────┼───►│ │ Sender │ │ │
│ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ │ └────────┘ │ │
AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ └────────────┘ │
│ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ │ │ │
│ │ │ │ └───────────────────┘ │ │ │ │ ┌──Worker 2──┐ │
│ └───────────────────────────┘ └───────────────────────┘ │ │ │ │ ┌────────┐ │ │
│ │ ┼──────┼───►│ │ Sender │ │ │
└──────────────────────────────────────────────────────────────────┘ │ │ │ └────────┘ │ │
│ │ └────────────┘ │
│ │ │
Target #2 (Log Group/Stream) │ │ . │
┌──────────────────────────────────────────────────────────────────┐ │ │ . │
│ │ │ │ . │
│ ┌────────Event Queue────────┐ ┌─────────Batch─────────┐ │ │ │ │
│ │ │ │ ┌───────────────────┐ │ │ │ │ │
│ │ ┌───┐ ┌───┐┌───┐┌───┐ │ │ │ │ │ │ │ │ ┌──Worker n──┐ │
AddEvent───│───►│ │ n │ ... │ 3 ││ 2 ││ 1 │ ├─────►│ │ PutLogEventsInput │ ├──┼────┤ │ │ ┌────────┐ │ │
│ │ └───┘ └───┘└───┘└───┘ │ │ │ │ │ │ └──────┼───►│ │ Sender │ │ │
│ │ │ │ └───────────────────┘ │ │ │ │ └────────┘ │ │
│ └───────────────────────────┘ └───────────────────────┘ │ │ └────────────┘ │
│ │ │ │
└──────────────────────────────────────────────────────────────────┘ └──────────────────────┘
```
80 changes: 41 additions & 39 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"go.uber.org/zap"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)
Expand All @@ -39,9 +39,6 @@ const (
LogEntryField = "value"

defaultFlushTimeout = 5 * time.Second
eventHeaderSize = 200
truncatedSuffix = "[Truncated...]"
msgSizeLimit = 256*1024 - eventHeaderSize

maxRetryTimeout = 14*24*time.Hour + 10*time.Minute
metricRetryTimeout = 2 * time.Minute
Expand Down Expand Up @@ -71,14 +68,18 @@ type CloudWatchLogs struct {

// Retention for log group
RetentionInDays int `toml:"retention_in_days"`
Concurrency int `toml:"concurrency"`

ForceFlushInterval internal.Duration `toml:"force_flush_interval"` // unit is second

Log telegraf.Logger `toml:"-"`

pusherStopChan chan struct{}
pusherWaitGroup sync.WaitGroup
cwDests map[Target]*cwDest
cwDests map[pusher.Target]*cwDest
workerPool pusher.WorkerPool
targetManager pusher.TargetManager
once sync.Once
middleware awsmiddleware.Middleware
}

Expand All @@ -93,6 +94,9 @@ func (c *CloudWatchLogs) Close() error {
for _, d := range c.cwDests {
d.Stop()
}
if c.workerPool != nil {
c.workerPool.Stop()
}

return nil
}
Expand All @@ -115,7 +119,7 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou
retention = -1
}

t := Target{
t := pusher.Target{
Group: group,
Stream: stream,
Retention: retention,
Expand All @@ -124,11 +128,31 @@ func (c *CloudWatchLogs) CreateDest(group, stream string, retention int, logGrou
return c.getDest(t, logSrc)
}

func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest {
func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
if cwd, ok := c.cwDests[t]; ok {
return cwd
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := c.createClient(logThrottleRetryer)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
useragent.Get().SetContainerInsightsFlag()
}
c.once.Do(func() {
if c.Concurrency > 0 {
c.workerPool = pusher.NewWorkerPool(c.Concurrency)
}
c.targetManager = pusher.NewTargetManager(c.Log, client)
})
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, c.pusherStopChan, &c.pusherWaitGroup)
cwd := &cwDest{pusher: p, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
}

func (c *CloudWatchLogs) createClient(retryer aws.RequestRetryer) *cloudwatchlogs.CloudWatchLogs {
credentialConfig := &configaws.CredentialConfig{
Region: c.Region,
AccessKey: c.AccessKey,
Expand All @@ -138,34 +162,24 @@ func (c *CloudWatchLogs) getDest(t Target, logSrc logs.LogSrc) *cwDest {
Filename: c.Filename,
Token: c.Token,
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := cloudwatchlogs.New(
credentialConfig.Credentials(),
&aws.Config{
Endpoint: aws.String(c.EndpointOverride),
Retryer: logThrottleRetryer,
Retryer: retryer,
LogLevel: configaws.SDKLogLevel(),
Logger: configaws.SDKLogger{},
},
)
agent.UsageFlags().SetValue(agent.FlagRegionType, c.RegionType)
agent.UsageFlags().SetValue(agent.FlagMode, c.Mode)
if containerInsightsRegexp.MatchString(t.Group) {
useragent.Get().SetContainerInsightsFlag()
}
client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"}))
if c.middleware != nil {
if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil {
c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err)
} else {
c.Log.Info("Configured middleware on AWS client")
c.Log.Debug("Configured middleware on AWS client")
}
}
pusher := NewPusher(c.Region, t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup, logSrc)
cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
return client
}

func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
Expand All @@ -179,7 +193,7 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
return
}
cwd.switchToEMF()
cwd.pusher.RetryDuration = metricRetryTimeout
cwd.pusher.Sender.SetRetryDuration(metricRetryTimeout)

e := c.getLogEventFromMetric(m)
if e == nil {
Expand All @@ -189,11 +203,11 @@ func (c *CloudWatchLogs) writeMetricAsStructuredLog(m telegraf.Metric) {
cwd.AddEvent(e)
}

func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error) {
func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (pusher.Target, error) {
tags := m.Tags()
logGroup, ok := tags[LogGroupNameTag]
if !ok {
return Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
return pusher.Target{}, fmt.Errorf("structuredlog receive a metric with name '%v' without log group name", m.Name())
} else {
m.RemoveTag(LogGroupNameTag)
}
Expand All @@ -205,7 +219,7 @@ func (c *CloudWatchLogs) getTargetFromMetric(m telegraf.Metric) (Target, error)
logStream = c.LogStreamName
}

return Target{logGroup, logStream, util.StandardLogGroupClass, -1}, nil
return pusher.Target{Group: logGroup, Stream: logStream, Class: util.StandardLogGroupClass, Retention: -1}, nil
}

func (c *CloudWatchLogs) getLogEventFromMetric(metric telegraf.Metric) *structuredLogEvent {
Expand Down Expand Up @@ -299,7 +313,7 @@ func (e *structuredLogEvent) Time() time.Time {
func (e *structuredLogEvent) Done() {}

type cwDest struct {
*pusher
pusher *pusher.Pusher
sync.Mutex
isEMF bool
stopped bool
Expand Down Expand Up @@ -341,25 +355,13 @@ func (cd *cwDest) switchToEMF() {
defer cd.Unlock()
if !cd.isEMF {
cd.isEMF = true
cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs)
cwl, ok := cd.pusher.Service.(*cloudwatchlogs.CloudWatchLogs)
if ok {
cwl.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("x-amzn-logs-format", "json/emf"))
}
}
}

func (cd *cwDest) setRetryer(r request.Retryer) {
cwl, ok := cd.Service.(*cloudwatchlogs.CloudWatchLogs)
if ok {
cwl.Retryer = r
}
}

type Target struct {
Group, Stream, Class string
Retention int
}

// Description returns a one-sentence description on the Output
func (c *CloudWatchLogs) Description() string {
return "Configuration for AWS CloudWatchLogs output."
Expand Down Expand Up @@ -398,7 +400,7 @@ func init() {
return &CloudWatchLogs{
ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout},
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
middleware: agenthealth.NewAgentHealth(
zap.NewNop(),
&agenthealth.Config{
Expand Down
11 changes: 7 additions & 4 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ package cloudwatchlogs
import (
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/outputs/cloudwatchlogs/internal/pusher"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

// TestCreateDestination would create different destination for cloudwatchlogs endpoint based on the log group, log stream,
// and log group's retention
func TestCreateDestination(t *testing.T) {

testCases := map[string]struct {
cfgLogGroup string
cfgLogStream string
Expand Down Expand Up @@ -68,28 +69,30 @@ func TestCreateDestination(t *testing.T) {
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
LogGroupName: "G1",
LogStreamName: "S1",
AccessKey: "access_key",
SecretKey: "secret_key",
pusherStopChan: make(chan struct{}),
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
}
dest := c.CreateDest(testCase.cfgLogGroup, testCase.cfgLogStream, testCase.cfgLogRetention, testCase.cfgLogClass, testCase.cfgTailerSrc).(*cwDest)
require.Equal(t, testCase.expectedLogGroup, dest.pusher.Group)
require.Equal(t, testCase.expectedLogStream, dest.pusher.Stream)
require.Equal(t, testCase.expectedLogGroupRetention, dest.pusher.Retention)
require.Equal(t, testCase.expectedLogClass, dest.pusher.Class)
require.Equal(t, testCase.expectedTailerSrc, dest.pusher.logSrc)
require.Equal(t, testCase.expectedTailerSrc, dest.pusher.EntityProvider)
})
}
}

func TestDuplicateDestination(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
AccessKey: "access_key",
SecretKey: "secret_key",
cwDests: make(map[Target]*cwDest),
cwDests: make(map[pusher.Target]*cwDest),
pusherStopChan: make(chan struct{}),
}
// Given the same log group, log stream, same retention, and logClass
Expand Down
Loading

0 comments on commit 444825e

Please sign in to comment.