diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 149f24987ba..416d11264c0 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -32,7 +32,7 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), ) -type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender +type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request] // Option apply changes to BaseExporter. type Option func(*BaseExporter) error @@ -53,10 +53,10 @@ type BaseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. - BatchSender RequestSender - QueueSender RequestSender - ObsrepSender RequestSender - RetrySender RequestSender + BatchSender Sender[internal.Request] + QueueSender Sender[internal.Request] + ObsrepSender Sender[internal.Request] + RetrySender Sender[internal.Request] TimeoutSender *TimeoutSender // TimeoutSender is always initialized. ConsumerOptions []consumer.Option @@ -73,10 +73,10 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } be := &BaseExporter{ - BatchSender: &BaseRequestSender{}, - QueueSender: &BaseRequestSender{}, + BatchSender: &BaseSender[internal.Request]{}, + QueueSender: &BaseSender[internal.Request]{}, ObsrepSender: osf(obsReport), - RetrySender: &BaseRequestSender{}, + RetrySender: &BaseSender[internal.Request]{}, TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()}, Set: set, diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index cca72cd8272..12c56bb76a0 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -33,8 +33,8 @@ var ( }() ) -func newNoopObsrepSender(*ObsReport) RequestSender { - return &BaseRequestSender{} +func newNoopObsrepSender(*ObsReport) Sender[internal.Request] { + return &BaseSender[internal.Request]{} } func TestBaseExporter(t *testing.T) { diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index cb7e1ed6116..4cb3ace63b0 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -23,7 +23,7 @@ import ( // - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out. // - concurrencyLimit is reached. type BatchSender struct { - BaseRequestSender + BaseSender[internal.Request] cfg exporterbatcher.Config // concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher. diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 509c747115b..3c99cf38876 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -67,7 +67,7 @@ func (qCfg *QueueConfig) Validate() error { } type QueueSender struct { - BaseRequestSender + BaseSender[internal.Request] queue exporterqueue.Queue[internal.Request] numConsumers int traceAttribute attribute.KeyValue diff --git a/exporter/exporterhelper/internal/request_sender.go b/exporter/exporterhelper/internal/request_sender.go index 683aca40d79..8ca75f66fb6 100644 --- a/exporter/exporterhelper/internal/request_sender.go +++ b/exporter/exporterhelper/internal/request_sender.go @@ -4,30 +4,30 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" import ( - "context" // RequestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). + "context" // Sender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/internal" ) -type RequestSender interface { +type Sender[K any] interface { component.Component - Send(context.Context, internal.Request) error - SetNextSender(nextSender RequestSender) + Send(context.Context, K) error + SetNextSender(nextSender Sender[K]) } -type BaseRequestSender struct { +type BaseSender[K any] struct { component.StartFunc component.ShutdownFunc - NextSender RequestSender + NextSender Sender[K] } -var _ RequestSender = (*BaseRequestSender)(nil) +var _ Sender[internal.Request] = (*BaseSender[internal.Request])(nil) -func (b *BaseRequestSender) Send(ctx context.Context, req internal.Request) error { +func (b *BaseSender[K]) Send(ctx context.Context, req K) error { return b.NextSender.Send(ctx, req) } -func (b *BaseRequestSender) SetNextSender(nextSender RequestSender) { +func (b *BaseSender[K]) SetNextSender(nextSender Sender[K]) { b.NextSender = nextSender } diff --git a/exporter/exporterhelper/internal/retry_sender.go b/exporter/exporterhelper/internal/retry_sender.go index db5d96d3fa5..1af3256344c 100644 --- a/exporter/exporterhelper/internal/retry_sender.go +++ b/exporter/exporterhelper/internal/retry_sender.go @@ -44,7 +44,7 @@ func NewThrottleRetry(err error, delay time.Duration) error { } type retrySender struct { - BaseRequestSender + BaseSender[internal.Request] traceAttribute attribute.KeyValue cfg configretry.BackOffConfig stopCh chan struct{} @@ -65,7 +65,7 @@ func (rs *retrySender) Shutdown(context.Context) error { return nil } -// send implements the requestSender interface +// Send implements the requestSender interface func (rs *retrySender) Send(ctx context.Context, req internal.Request) error { // Do not use NewExponentialBackOff since it calls Reset and the code here must // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 525a043ac92..470f4c62e82 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -477,13 +477,13 @@ func newMockRequest(cnt int, consumeError error) *mockRequest { } type observabilityConsumerSender struct { - BaseRequestSender + BaseSender[internal.Request] waitGroup *sync.WaitGroup sentItemsCount *atomic.Int64 droppedItemsCount *atomic.Int64 } -func newObservabilityConsumerSender(*ObsReport) RequestSender { +func newObservabilityConsumerSender(*ObsReport) Sender[internal.Request] { return &observabilityConsumerSender{ waitGroup: new(sync.WaitGroup), droppedItemsCount: &atomic.Int64{}, diff --git a/exporter/exporterhelper/internal/timeout_sender.go b/exporter/exporterhelper/internal/timeout_sender.go index 5abae1b6746..a47ddccfb8c 100644 --- a/exporter/exporterhelper/internal/timeout_sender.go +++ b/exporter/exporterhelper/internal/timeout_sender.go @@ -35,7 +35,7 @@ func NewDefaultTimeoutConfig() TimeoutConfig { // TimeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. type TimeoutSender struct { - BaseRequestSender + BaseSender[internal.Request] cfg TimeoutConfig } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 55652dd7b4a..b1acd7fda45 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -104,7 +104,7 @@ func requestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc { } } -// NewLogsRequest creates new logs exporter based on custom LogsConverter and RequestSender. +// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewLogsRequest( @@ -148,11 +148,11 @@ func NewLogsRequest( } type logsExporterWithObservability struct { - internal.BaseRequestSender + internal.BaseSender[Request] obsrep *internal.ObsReport } -func newLogsWithObservability(obsrep *internal.ObsReport) internal.RequestSender { +func newLogsWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] { return &logsExporterWithObservability{obsrep: obsrep} } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index f84ed8c226e..6488250d247 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -104,7 +104,7 @@ func requestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFu } } -// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and RequestSender. +// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewMetricsRequest( @@ -148,11 +148,11 @@ func NewMetricsRequest( } type metricsSenderWithObservability struct { - internal.BaseRequestSender + internal.BaseSender[Request] obsrep *internal.ObsReport } -func newMetricsSenderWithObservability(obsrep *internal.ObsReport) internal.RequestSender { +func newMetricsSenderWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] { return &metricsSenderWithObservability{obsrep: obsrep} } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 2924eea1115..24a13676d5d 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -104,7 +104,7 @@ func requestFromTraces(pusher consumer.ConsumeTracesFunc) RequestFromTracesFunc } } -// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and RequestSender. +// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewTracesRequest( @@ -148,11 +148,11 @@ func NewTracesRequest( } type tracesWithObservability struct { - internal.BaseRequestSender + internal.BaseSender[Request] obsrep *internal.ObsReport } -func newTracesWithObservability(obsrep *internal.ObsReport) internal.RequestSender { +func newTracesWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] { return &tracesWithObservability{obsrep: obsrep} } diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index d045dafbb81..d9eb55b3ef7 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -106,7 +106,7 @@ func requestFromProfiles(pusher xconsumer.ConsumeProfilesFunc) RequestFromProfil } } -// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and RequestSender. +// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and Sender. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewProfilesRequestExporter( @@ -146,11 +146,11 @@ func NewProfilesRequestExporter( } type profilesExporterWithObservability struct { - internal.BaseRequestSender + internal.BaseSender[exporterhelper.Request] obsrep *internal.ObsReport } -func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.RequestSender { +func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.Sender[exporterhelper.Request] { return &profilesExporterWithObservability{obsrep: obsrep} }