Skip to content

Commit

Permalink
Enable queuing and retries for SignalFx hec exporter (#1223)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Oct 8, 2020
1 parent 32a14d6 commit d51340b
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 71 deletions.
1 change: 1 addition & 0 deletions exporter/signalfxexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The following configuration options can also be configured:
- `exclude_metrics`: metric names that will be excluded from sending
to Signalfx backend. If `send_compatible_metrics` or `translation_rules`
options are enabled, the exclusion will be applied on translated metrics.
- To configure queuing and retries see [here](https://github.com/open-telemetry/opentelemetry-collector/tree/master/exporter/exporterhelper#configuration)

Example:

Expand Down
10 changes: 5 additions & 5 deletions exporter/signalfxexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/translation"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
)

// Config defines configuration for SignalFx exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// AccessToken is the authentication token provided by SignalFx.
AccessToken string `mapstructure:"access_token"`
Expand All @@ -47,10 +51,6 @@ type Config struct {
// value takes precedence over the value of Realm
APIURL string `mapstructure:"api_url"`

// Timeout is the maximum timeout for HTTP request sending trace data. The
// default value is 5 seconds.
Timeout time.Duration `mapstructure:"timeout"`

// Headers are a set of headers to be added to the HTTP request sending
// trace data. These can override pre-defined header values used by the
// exporter, eg: "User-Agent" can be set to a custom value if specified
Expand Down
31 changes: 23 additions & 8 deletions exporter/signalfxexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/translation"
Expand Down Expand Up @@ -65,8 +66,20 @@ func TestLoadConfig(t *testing.T) {
"added-entry": "added value",
"dot.test": "test",
},
Timeout: 2 * time.Second,
AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 2 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
}, AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
AccessTokenPassthrough: false,
},
SendCompatibleMetrics: true,
Expand Down Expand Up @@ -207,12 +220,14 @@ func TestConfig_getOptionsFromConfig(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
ExporterSettings: tt.fields.ExporterSettings,
AccessToken: tt.fields.AccessToken,
Realm: tt.fields.Realm,
IngestURL: tt.fields.IngestURL,
APIURL: tt.fields.APIURL,
Timeout: tt.fields.Timeout,
ExporterSettings: tt.fields.ExporterSettings,
AccessToken: tt.fields.AccessToken,
Realm: tt.fields.Realm,
IngestURL: tt.fields.IngestURL,
APIURL: tt.fields.APIURL,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: tt.fields.Timeout,
},
Headers: tt.fields.Headers,
SendCompatibleMetrics: tt.fields.SendCompatibleMetrics,
TranslationRules: tt.fields.TranslationRules,
Expand Down
35 changes: 9 additions & 26 deletions exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand Down Expand Up @@ -57,7 +55,7 @@ type exporterOptions struct {
func newSignalFxExporter(
config *Config,
logger *zap.Logger,
) (component.MetricsExporter, error) {
) (*signalfxExporter, error) {
if config == nil {
return nil, errors.New("nil config")
}
Expand Down Expand Up @@ -109,7 +107,7 @@ func newSignalFxExporter(
hms = hostmetadata.NewSyncer(logger, dimClient)
}

return signalfxExporter{
return &signalfxExporter{
logger: logger,
pushMetricsData: dpClient.pushMetricsData,
pushMetadata: dimClient.PushMetadata,
Expand All @@ -123,7 +121,7 @@ func newGzipPool() sync.Pool {
}}
}

func NewEventExporter(config *Config, logger *zap.Logger) (component.LogsExporter, error) {
func newEventExporter(config *Config, logger *zap.Logger) (*signalfxExporter, error) {
if config == nil {
return nil, errors.New("nil config")
}
Expand Down Expand Up @@ -151,35 +149,21 @@ func NewEventExporter(config *Config, logger *zap.Logger) (component.LogsExporte
accessTokenPassthrough: config.AccessTokenPassthrough,
}

return signalfxExporter{
return &signalfxExporter{
logger: logger,
pushResourceLogs: eventClient.pushResourceLogs,
}, nil
}

func (se signalfxExporter) Start(context.Context, component.Host) error {
return nil
}

func (se signalfxExporter) Shutdown(context.Context) error {
return nil
}

func (se signalfxExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
ctx = obsreport.StartMetricsExportOp(ctx, typeStr)
func (se *signalfxExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
numDroppedTimeSeries, err := se.pushMetricsData(ctx, md)
if err == nil && se.hostMetadataSyncer != nil {
se.hostMetadataSyncer.Sync(md)
}
numReceivedTimeSeries, numPoints := md.MetricAndDataPointCount()

obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
return err
return numDroppedTimeSeries, err
}

func (se signalfxExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
ctx = obsreport.StartLogsExportOp(ctx, typeStr)

func (se *signalfxExporter) pushLogs(ctx context.Context, ld pdata.Logs) (int, error) {
var numDroppedRecords int
var err error
rls := ld.ResourceLogs()
Expand All @@ -189,10 +173,9 @@ func (se signalfxExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error
err = multierr.Append(err, thisErr)
}

obsreport.EndLogsExportOp(ctx, ld.LogRecordCount(), numDroppedRecords, err)
return err
return numDroppedRecords, err
}

func (se signalfxExporter) ConsumeMetadata(metadata []*collection.MetadataUpdate) error {
func (se *signalfxExporter) ConsumeMetadata(metadata []*collection.MetadataUpdate) error {
return se.pushMetadata(metadata)
}
38 changes: 18 additions & 20 deletions exporter/signalfxexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/testutil/metricstestutil"
"go.opentelemetry.io/collector/translator/internaldata"
"go.uber.org/zap"
Expand Down Expand Up @@ -67,18 +67,18 @@ func TestNew(t *testing.T) {
{
name: "successfully create exporter",
config: &Config{
AccessToken: "someToken",
Realm: "xyz",
Timeout: 1 * time.Second,
Headers: nil,
AccessToken: "someToken",
Realm: "xyz",
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second},
Headers: nil,
},
},
{
name: "create exporter with host metadata syncer",
config: &Config{
AccessToken: "someToken",
Realm: "xyz",
Timeout: 1 * time.Second,
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second},
Headers: nil,
SyncHostMetadata: true,
},
Expand All @@ -95,8 +95,6 @@ func TestNew(t *testing.T) {
}
} else {
require.NotNil(t, got)
require.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, got.Shutdown(context.Background()))
}
})
}
Expand Down Expand Up @@ -449,37 +447,37 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) {
}

func TestNewEventExporter(t *testing.T) {
got, err := NewEventExporter(nil, zap.NewNop())
got, err := newEventExporter(nil, zap.NewNop())
assert.EqualError(t, err, "nil config")
assert.Nil(t, got)

config := &Config{
AccessToken: "someToken",
IngestURL: "asdf://:%",
Timeout: 1 * time.Second,
Headers: nil,
AccessToken: "someToken",
IngestURL: "asdf://:%",
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second},
Headers: nil,
}

got, err = NewEventExporter(config, zap.NewNop())
got, err = newEventExporter(config, zap.NewNop())
assert.NotNil(t, err)
assert.Nil(t, got)

config = &Config{
AccessToken: "someToken",
Realm: "xyz",
Timeout: 1 * time.Second,
Headers: nil,
AccessToken: "someToken",
Realm: "xyz",
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second},
Headers: nil,
}

got, err = NewEventExporter(config, zap.NewNop())
got, err = newEventExporter(config, zap.NewNop())
assert.NoError(t, err)
require.NotNil(t, got)

// This is expected to fail.
rls := makeSampleResourceLogs()
ld := pdata.NewLogs()
ld.ResourceLogs().Append(rls)
err = got.ConsumeLogs(context.Background(), ld)
_, err = got.pushLogs(context.Background(), ld)
assert.Error(t, err)
}

Expand Down
34 changes: 26 additions & 8 deletions exporter/signalfxexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func createDefaultConfig() configmodels.Exporter {
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
Timeout: defaultHTTPTimeout,
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: defaultHTTPTimeout},
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
AccessTokenPassthroughConfig: splunk.AccessTokenPassthroughConfig{
AccessTokenPassthrough: true,
},
Expand All @@ -65,21 +67,26 @@ func createMetricsExporter(
_ context.Context,
params component.ExporterCreateParams,
config configmodels.Exporter,
) (exp component.MetricsExporter, err error) {
) (component.MetricsExporter, error) {

expCfg := config.(*Config)
err = setTranslationRules(expCfg)
err := setTranslationRules(expCfg)
if err != nil {
return nil, err
}

exp, err = newSignalFxExporter(expCfg, params.Logger)

exp, err := newSignalFxExporter(expCfg, params.Logger)
if err != nil {
return nil, err
}

return exp, nil
return exporterhelper.NewMetricsExporter(
expCfg,
exp.pushMetrics,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(expCfg.RetrySettings),
exporterhelper.WithQueue(expCfg.QueueSettings))
}

func setTranslationRules(cfg *Config) error {
Expand Down Expand Up @@ -116,7 +123,18 @@ func createLogsExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.LogsExporter, error) {
oCfg := cfg.(*Config)
expCfg := cfg.(*Config)

exp, err := newEventExporter(expCfg, params.Logger)
if err != nil {
return nil, err
}

return NewEventExporter(oCfg, params.Logger)
return exporterhelper.NewLogsExporter(
expCfg,
exp.pushLogs,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(expCfg.RetrySettings),
exporterhelper.WithQueue(expCfg.QueueSettings))
}
9 changes: 5 additions & 4 deletions exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -100,7 +101,7 @@ func TestCreateMetricsExporter_CustomConfig(t *testing.T) {
"added-entry": "added value",
"dot.test": "test",
},
Timeout: 2 * time.Second,
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 2 * time.Second},
}

te, err := createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config)
Expand All @@ -121,9 +122,9 @@ func TestFactory_CreateMetricsExporterFails(t *testing.T) {
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
AccessToken: "testToken",
Realm: "lab",
Timeout: -2 * time.Second,
AccessToken: "testToken",
Realm: "lab",
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: -2 * time.Second},
},
errorMessage: "failed to process \"signalfx\" config: cannot have a negative \"timeout\"",
},
Expand Down
9 changes: 9 additions & 0 deletions exporter/signalfxexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ exporters:
access_token: testToken
realm: "us1"
timeout: 2s
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m
headers:
added-entry: "added value"
dot.test: test
Expand Down

0 comments on commit d51340b

Please sign in to comment.