Skip to content

Commit

Permalink
[chore] Use generics for exporterhelper Sender (#12011)
Browse files Browse the repository at this point in the history
In the followup PR will convert batcher to be generic, then we can have
the whole baseexporter generic and remove conversions to/from
internal.Request to implementation.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jan 6, 2025
1 parent 57c6c15 commit 20d9955
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 38 deletions.
16 changes: 8 additions & 8 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegis
featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"),
)

type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender
type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request]

// Option apply changes to BaseExporter.
type Option func(*BaseExporter) error
Expand All @@ -53,10 +53,10 @@ type BaseExporter struct {
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
BatchSender RequestSender
QueueSender RequestSender
ObsrepSender RequestSender
RetrySender RequestSender
BatchSender Sender[internal.Request]
QueueSender Sender[internal.Request]
ObsrepSender Sender[internal.Request]
RetrySender Sender[internal.Request]
TimeoutSender *TimeoutSender // TimeoutSender is always initialized.

ConsumerOptions []consumer.Option
Expand All @@ -73,10 +73,10 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

be := &BaseExporter{
BatchSender: &BaseRequestSender{},
QueueSender: &BaseRequestSender{},
BatchSender: &BaseSender[internal.Request]{},
QueueSender: &BaseSender[internal.Request]{},
ObsrepSender: osf(obsReport),
RetrySender: &BaseRequestSender{},
RetrySender: &BaseSender[internal.Request]{},
TimeoutSender: &TimeoutSender{cfg: NewDefaultTimeoutConfig()},

Set: set,
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ var (
}()
)

func newNoopObsrepSender(*ObsReport) RequestSender {
return &BaseRequestSender{}
func newNoopObsrepSender(*ObsReport) Sender[internal.Request] {
return &BaseSender[internal.Request]{}
}

func TestBaseExporter(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out.
// - concurrencyLimit is reached.
type BatchSender struct {
BaseRequestSender
BaseSender[internal.Request]
cfg exporterbatcher.Config

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (qCfg *QueueConfig) Validate() error {
}

type QueueSender struct {
BaseRequestSender
BaseSender[internal.Request]
queue exporterqueue.Queue[internal.Request]
numConsumers int
traceAttribute attribute.KeyValue
Expand Down
18 changes: 9 additions & 9 deletions exporter/exporterhelper/internal/request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

import (
"context" // RequestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
"context" // Sender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/internal"
)

type RequestSender interface {
type Sender[K any] interface {
component.Component
Send(context.Context, internal.Request) error
SetNextSender(nextSender RequestSender)
Send(context.Context, K) error
SetNextSender(nextSender Sender[K])
}

type BaseRequestSender struct {
type BaseSender[K any] struct {
component.StartFunc
component.ShutdownFunc
NextSender RequestSender
NextSender Sender[K]
}

var _ RequestSender = (*BaseRequestSender)(nil)
var _ Sender[internal.Request] = (*BaseSender[internal.Request])(nil)

func (b *BaseRequestSender) Send(ctx context.Context, req internal.Request) error {
func (b *BaseSender[K]) Send(ctx context.Context, req K) error {
return b.NextSender.Send(ctx, req)
}

func (b *BaseRequestSender) SetNextSender(nextSender RequestSender) {
func (b *BaseSender[K]) SetNextSender(nextSender Sender[K]) {
b.NextSender = nextSender
}
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/retry_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewThrottleRetry(err error, delay time.Duration) error {
}

type retrySender struct {
BaseRequestSender
BaseSender[internal.Request]
traceAttribute attribute.KeyValue
cfg configretry.BackOffConfig
stopCh chan struct{}
Expand All @@ -65,7 +65,7 @@ func (rs *retrySender) Shutdown(context.Context) error {
return nil
}

// send implements the requestSender interface
// Send implements the requestSender interface
func (rs *retrySender) Send(ctx context.Context, req internal.Request) error {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ func newMockRequest(cnt int, consumeError error) *mockRequest {
}

type observabilityConsumerSender struct {
BaseRequestSender
BaseSender[internal.Request]
waitGroup *sync.WaitGroup
sentItemsCount *atomic.Int64
droppedItemsCount *atomic.Int64
}

func newObservabilityConsumerSender(*ObsReport) RequestSender {
func newObservabilityConsumerSender(*ObsReport) Sender[internal.Request] {
return &observabilityConsumerSender{
waitGroup: new(sync.WaitGroup),
droppedItemsCount: &atomic.Int64{},
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/timeout_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewDefaultTimeoutConfig() TimeoutConfig {

// TimeoutSender is a requestSender that adds a `timeout` to every request that passes this sender.
type TimeoutSender struct {
BaseRequestSender
BaseSender[internal.Request]
cfg TimeoutConfig
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func requestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc {
}
}

// NewLogsRequest creates new logs exporter based on custom LogsConverter and RequestSender.
// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewLogsRequest(
Expand Down Expand Up @@ -148,11 +148,11 @@ func NewLogsRequest(
}

type logsExporterWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[Request]
obsrep *internal.ObsReport
}

func newLogsWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newLogsWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] {
return &logsExporterWithObservability{obsrep: obsrep}
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func requestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFu
}
}

// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and RequestSender.
// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewMetricsRequest(
Expand Down Expand Up @@ -148,11 +148,11 @@ func NewMetricsRequest(
}

type metricsSenderWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[Request]
obsrep *internal.ObsReport
}

func newMetricsSenderWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newMetricsSenderWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] {
return &metricsSenderWithObservability{obsrep: obsrep}
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func requestFromTraces(pusher consumer.ConsumeTracesFunc) RequestFromTracesFunc
}
}

// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and RequestSender.
// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewTracesRequest(
Expand Down Expand Up @@ -148,11 +148,11 @@ func NewTracesRequest(
}

type tracesWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[Request]
obsrep *internal.ObsReport
}

func newTracesWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newTracesWithObservability(obsrep *internal.ObsReport) internal.Sender[Request] {
return &tracesWithObservability{obsrep: obsrep}
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/xexporterhelper/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func requestFromProfiles(pusher xconsumer.ConsumeProfilesFunc) RequestFromProfil
}
}

// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and RequestSender.
// NewProfilesRequestExporter creates a new profiles exporter based on a custom ProfilesConverter and Sender.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewProfilesRequestExporter(
Expand Down Expand Up @@ -146,11 +146,11 @@ func NewProfilesRequestExporter(
}

type profilesExporterWithObservability struct {
internal.BaseRequestSender
internal.BaseSender[exporterhelper.Request]
obsrep *internal.ObsReport
}

func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.RequestSender {
func newProfilesExporterWithObservability(obsrep *internal.ObsReport) internal.Sender[exporterhelper.Request] {
return &profilesExporterWithObservability{obsrep: obsrep}
}

Expand Down

0 comments on commit 20d9955

Please sign in to comment.