Skip to content

Commit

Permalink
Add context to ProducerInterceptor
Browse files Browse the repository at this point in the history
This change adds a context.Context argument to the ProducerInterceptor
interface, and passes it between the pre- and post-Send interceptor
methods. Having this makes it much easier to write useful
interceptors that can integrate with common tracing SDKs like
OpenTelemetry, as the context is the conventional method for propagating
metadata vertically through a call stack.

For an example of another library using a similar convention, see:
https://github.com/jackc/pgx/blob/9ab9e3c40bbb33c6f37359c87508cbc6a9830ed6/tracer.go#L10

Fixes apache#443
  • Loading branch information
treuherz committed Jan 10, 2024
1 parent 062fefe commit f10fc7e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 27 deletions.
15 changes: 11 additions & 4 deletions pulsar/internal/pulsartracing/producer_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ const toPrefix = "To__"
type ProducerInterceptor struct {
}

func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) {
func (t *ProducerInterceptor) BeforeSend(
ctx context.Context,
producer pulsar.Producer,
message *pulsar.ProducerMessage,
) context.Context {
buildAndInjectSpan(message, producer).Finish()
return ctx
}

func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
message *pulsar.ProducerMessage,
msgID pulsar.MessageID) {
func (t *ProducerInterceptor) OnSendAcknowledgement(
_ context.Context,
_ pulsar.Producer,
_ *pulsar.ProducerMessage,
_ pulsar.MessageID) {
}

func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span {
Expand Down
15 changes: 9 additions & 6 deletions pulsar/producer_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@

package pulsar

import "context"

type ProducerInterceptor interface {
// BeforeSend This is called before send the message to the brokers. This method is allowed to modify the
// message.
BeforeSend(producer Producer, message *ProducerMessage)
BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context

// OnSendAcknowledgement This method is called when the message sent to the broker has been acknowledged,
// or when sending the message fails.
OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID)
OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID)
}

type ProducerInterceptors []ProducerInterceptor

func (x ProducerInterceptors) BeforeSend(producer Producer, message *ProducerMessage) {
func (x ProducerInterceptors) BeforeSend(ctx context.Context, producer Producer, message *ProducerMessage) context.Context {
for i := range x {
x[i].BeforeSend(producer, message)
ctx = x[i].BeforeSend(ctx, producer, message)
}
return ctx
}

func (x ProducerInterceptors) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
func (x ProducerInterceptors) OnSendAcknowledgement(ctx context.Context, producer Producer, message *ProducerMessage, msgID MessageID) {
for i := range x {
x[i].OnSendAcknowledgement(producer, message, msgID)
x[i].OnSendAcknowledgement(ctx, producer, message, msgID)
}
}

Expand Down
25 changes: 14 additions & 11 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes
isDone := uAtomic.NewBool(false)
doneCh := make(chan struct{})

p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
ctx = p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) {
if isDone.CAS(false, true) {
err = e
msgID = ID
Expand Down Expand Up @@ -1194,11 +1194,11 @@ func (p *partitionProducer) internalSendAsync(
msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error),
flushImmediately bool,
) {
) context.Context {
if err := p.validateMsg(msg); err != nil {
p.log.Error(err)
runCallback(callback, nil, msg, err)
return
return ctx
}

sr := sendRequestPool.Get().(*sendRequest)
Expand All @@ -1216,43 +1216,46 @@ func (p *partitionProducer) internalSendAsync(

if err := p.prepareTransaction(sr); err != nil {
sr.done(nil, err)
return
return ctx
}

if p.getProducerState() != producerReady {
sr.done(nil, errProducerClosed)
return
return ctx
}

p.options.Interceptors.BeforeSend(p, msg)
ctx = p.options.Interceptors.BeforeSend(ctx, p, msg)
sr.ctx = ctx

if err := p.updateSchema(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
return ctx
}

if err := p.updateUncompressedPayload(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
return ctx
}

p.updateMetaData(sr)

if err := p.updateChunkInfo(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
return ctx
}

if err := p.reserveResources(sr); err != nil {
p.log.Error(err)
sr.done(nil, err)
return
return ctx
}

p.dataChan <- sr

return ctx
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
Expand Down Expand Up @@ -1497,7 +1500,7 @@ func (sr *sendRequest) done(msgID MessageID, err error) {

if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
if sr.producer.options.Interceptors != nil {
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, msgID)
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.ctx, sr.producer, sr.msg, msgID)
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,9 +1480,11 @@ func TestProducuerSendFailOnInvalidKey(t *testing.T) {

type noopProduceInterceptor struct{}

func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}
func (noopProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context {
return ctx
}

func (noopProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
func (noopProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) {
}

// copyPropertyIntercepotr copy all keys in message properties map and add a suffix
Expand All @@ -1491,11 +1493,12 @@ type metricProduceInterceptor struct {
ackn int
}

func (x *metricProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {
func (x *metricProduceInterceptor) BeforeSend(ctx context.Context, _ Producer, _ *ProducerMessage) context.Context {
x.sendn++
return ctx
}

func (x *metricProduceInterceptor) OnSendAcknowledgement(producer Producer, message *ProducerMessage, msgID MessageID) {
func (x *metricProduceInterceptor) OnSendAcknowledgement(_ context.Context, _ Producer, _ *ProducerMessage, _ MessageID) {
x.ackn++
}

Expand Down Expand Up @@ -1720,7 +1723,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
}
producer.Flush()

//// create consumer
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub2",
Expand Down Expand Up @@ -1811,7 +1814,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
}
producer.Flush()

//// create consumer
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub2",
Expand Down

0 comments on commit f10fc7e

Please sign in to comment.