Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/IBM/sarama.v1: support for consumer groups #3041

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions contrib/IBM/sarama.v1/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"context"
"github.com/IBM/sarama"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

type partitionConsumer struct {
sarama.PartitionConsumer
dispatcher dispatcher
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.dispatcher.Messages()
}

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := new(config)
defaults(cfg)
for _, opt := range opts {
opt(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg)

d := wrapDispatcher(pc, cfg)
go d.Run()

wrapped := &partitionConsumer{
PartitionConsumer: pc,
dispatcher: d,
}
return wrapped
}

type consumer struct {
sarama.Consumer
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
// PartitionConsumer.
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
pc, err := c.Consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return pc, err
}
return WrapPartitionConsumer(pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
opts: opts,
}
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewConsumerMessageCarrier(msg)

ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset)
}
}

func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
54 changes: 54 additions & 0 deletions contrib/IBM/sarama.v1/consumer_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"github.com/IBM/sarama"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

type consumerGroupHandler struct {
sarama.ConsumerGroupHandler
cfg *config
}

func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// Wrap claim
wd := wrapDispatcher(claim, h.cfg)
go wd.Run()
claim = &consumerGroupClaim{
ConsumerGroupClaim: claim,
dispatcher: wd,
}

return h.ConsumerGroupHandler.ConsumeClaim(session, claim)
}

// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received
// message to be traced.
func WrapConsumerGroupHandler(handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := new(config)
defaults(cfg)
for _, opt := range opts {
opt(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Consumer Group Handler: %#v", cfg)

return &consumerGroupHandler{
ConsumerGroupHandler: handler,
cfg: cfg,
}
}

type consumerGroupClaim struct {
sarama.ConsumerGroupClaim
dispatcher dispatcher
}

func (c *consumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
return c.dispatcher.Messages()
}
155 changes: 155 additions & 0 deletions contrib/IBM/sarama.v1/consumer_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"context"
"log"
"sync"
"testing"

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
)

func TestWrapConsumerGroupHandler(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1

cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, cg.Close())
}()

handler := &testConsumerGroupHandler{
T: t,
ready: make(chan bool),
rcvMessages: make(chan *sarama.ConsumerMessage, 1),
}
tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(testGroupID))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := cg.Consume(ctx, []string{testTopic}, tracedHandler); err != nil {
assert.ErrorIs(t, err, sarama.ErrClosedConsumerGroup)
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
}
}()

<-handler.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)

require.NoError(t, err)
p = WrapSyncProducer(cfg, p, WithDataStreams())

produceMsg := &sarama.ProducerMessage{
Topic: testTopic,
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
_, _, err = p.SendMessage(produceMsg)
require.NoError(t, err)

waitForSpans(mt, 2)
cancel()
wg.Wait()

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
consumeMsg := <-handler.rcvMessages

s0 := spans[0]
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic gotest", s0.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s0.Tag("offset"))
assert.Equal(t, "IBM/sarama", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, testTopic, produceMsg)

s1 := spans[1]
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, "Consume Topic gotest", s1.Tag(ext.ResourceName))
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s1.Tag("offset"))
assert.Equal(t, "IBM/sarama", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, testTopic, testGroupID, consumeMsg, true)

assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child")
}

type testConsumerGroupHandler struct {
*testing.T
ready chan bool
rcvMessages chan *sarama.ConsumerMessage
}

func (t *testConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(t.ready)
return nil
}

func (t *testConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (t *testConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
t.T.Log("message channel was closed")
return nil
}
t.T.Logf("Message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic)
session.MarkMessage(msg, "")
t.rcvMessages <- msg

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
Loading
Loading