From f10fc7e18c68e802b292f15327f946e3d61a5a07 Mon Sep 17 00:00:00 2001 From: Eli Treuherz Date: Tue, 19 Dec 2023 17:36:19 +0000 Subject: [PATCH] Add context to ProducerInterceptor This change adds a context.Context argument to the ProducerInterceptor interface, and passes it between the pre- and post-Send interceptor methods. Having this makes it much easier to write useful interceptors that can integrate with common tracing SDKs like OpenTelemetry, as the context is the conventional method for propagating metadata vertically through a call stack. For an example of another library using a similar convention, see: https://github.com/jackc/pgx/blob/9ab9e3c40bbb33c6f37359c87508cbc6a9830ed6/tracer.go#L10 Fixes #443 --- .../pulsartracing/producer_interceptor.go | 15 ++++++++--- pulsar/producer_interceptor.go | 15 ++++++----- pulsar/producer_partition.go | 25 +++++++++++-------- pulsar/producer_test.go | 15 ++++++----- 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go index 6c7728cf0a..465ee3db2d 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -29,13 +29,20 @@ const toPrefix = "To__" type ProducerInterceptor struct { } -func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) { +func (t *ProducerInterceptor) BeforeSend( + ctx context.Context, + producer pulsar.Producer, + message *pulsar.ProducerMessage, +) context.Context { buildAndInjectSpan(message, producer).Finish() + return ctx } -func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, - message *pulsar.ProducerMessage, - msgID pulsar.MessageID) { +func (t *ProducerInterceptor) OnSendAcknowledgement( + _ context.Context, + _ pulsar.Producer, + _ *pulsar.ProducerMessage, + _ pulsar.MessageID) { } func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span { diff --git a/pulsar/producer_interceptor.go b/pulsar/producer_interceptor.go index e18994cfcd..7358349ef5 100644 --- a/pulsar/producer_interceptor.go +++ b/pulsar/producer_interceptor.go @@ -17,27 +17,30 @@ package pulsar +import "context" + type ProducerInterceptor interface { // BeforeSend This is called before send the message to the brokers. This method is allowed to modify the // message. - BeforeSend(producer Producer, message *ProducerMessage) + BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context // OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged, // or when sending the message fails. - OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) + OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) } type ProducerInterceptors []ProducerInterceptor -func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) { +func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context { for i := range x { - x[i].BeforeSend(producer, message) + ctx = x[i].BeforeSend(ctx, producer, message) } + return ctx } -func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) { for i := range x { - x[i].OnSendAcknowledgement(producer, message, msgID) + x[i].OnSendAcknowledgement(ctx, producer, message, msgID) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 46167d0cf1..c99a669360 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -994,7 +994,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes isDone := uAtomic.NewBool(false) doneCh := make(chan struct{}) - p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { + ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { if isDone.CAS(false, true) { err = e msgID = ID @@ -1194,11 +1194,11 @@ func (p *partitionProducer) internalSendAsync( msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool, -) { +) context.Context { if err := p.validateMsg(msg); err != nil { p.log.Error(err) runCallback(callback, nil, msg, err) - return + return ctx } sr := sendRequestPool.Get().(*sendRequest) @@ -1216,26 +1216,27 @@ func (p *partitionProducer) internalSendAsync( if err := p.prepareTransaction(sr); err != nil { sr.done(nil, err) - return + return ctx } if p.getProducerState() != producerReady { sr.done(nil, errProducerClosed) - return + return ctx } - p.options.Interceptors.BeforeSend(p, msg) + ctx = p.options.Interceptors.BeforeSend(ctx, p, msg) + sr.ctx = ctx if err := p.updateSchema(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.updateUncompressedPayload(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.updateMetaData(sr) @@ -1243,16 +1244,18 @@ func (p *partitionProducer) internalSendAsync( if err := p.updateChunkInfo(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } if err := p.reserveResources(sr); err != nil { p.log.Error(err) sr.done(nil, err) - return + return ctx } p.dataChan <- sr + + return ctx } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1497,7 +1500,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) { if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 { if sr.producer.options.Interceptors != nil { - sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID) + sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID) } } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0f89069243..5ee6297f0d 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1480,9 +1480,11 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) { type noopProduceInterceptor struct{} -func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {} +func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context { + return ctx +} -func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { } // copyPropertyIntercepotr copy all keys in message properties map and add a suffix @@ -1491,11 +1493,12 @@ type metricProduceInterceptor struct { ackn int } -func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) { +func (x *metricProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context { x.sendn++ + return ctx } -func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) { +func (x *metricProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) { x.ackn++ } @@ -1720,7 +1723,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2", @@ -1811,7 +1814,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) { } producer.Flush() - //// create consumer + // create consumer consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: "my-sub2",