Skip to content

Commit

Permalink
feat: add metric for dlq middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
taras-zak committed Sep 6, 2024
1 parent bda3e6c commit 09c51eb
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 4 deletions.
8 changes: 8 additions & 0 deletions v2/middlewares/deadletter/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,25 @@ import (
"context"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/honestbank/kp/v2/internal/retrycounter"
"github.com/honestbank/kp/v2/middlewares"
)

var deadletterProduceCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "kp_deadletter_produce_total",
}, []string{"topic"})

type deadletter struct {
producer Producer
onProduceErrors func(err error)
threshold int
}
type Producer interface {
ProduceRaw(message *kafka.Message) error
GetTopic() string
}

func (r deadletter) Process(ctx context.Context, item *kafka.Message, next func(ctx context.Context, item *kafka.Message) error) error {
Expand All @@ -26,6 +33,7 @@ func (r deadletter) Process(ctx context.Context, item *kafka.Message, next func(
if retrycounter.GetCount(item) < r.threshold {
return err
}
deadletterProduceCounter.With(prometheus.Labels{"topic": r.producer.GetTopic()}).Inc()
err = r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque})
if err != nil {
r.onProduceErrors(err)
Expand Down
36 changes: 32 additions & 4 deletions v2/middlewares/deadletter/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package deadletter_test
import (
"context"
"errors"
"io"
"net/http/httptest"
"testing"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/require"

"github.com/honestbank/kp/v2/internal/retrycounter"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -16,6 +21,7 @@ import (

type producerMock struct {
produceRaw func(message *kafka.Message) error
topic string
}

func (r producerMock) Flush() error {
Expand All @@ -30,8 +36,12 @@ func (r producerMock) ProduceRaw(message *kafka.Message) error {
return r.produceRaw(message)
}

func newProducer(cb func(item *kafka.Message) error) producer.Producer[any] {
return producerMock{produceRaw: cb}
func (r producerMock) GetTopic() string {
return r.topic
}

func newProducer(topic string, cb func(item *kafka.Message) error) producer.Producer[any] {
return producerMock{produceRaw: cb, topic: topic}
}

func TestRetry_Process(t *testing.T) {
Expand All @@ -46,7 +56,7 @@ func TestRetry_Process(t *testing.T) {
})

t.Run("if the retry count is less than threshold it simply returns error", func(t *testing.T) {
middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error {
middleware := deadletter.NewDeadletterMiddleware(newProducer("dlq-success-case", func(item *kafka.Message) error {
return nil
}), 2, func(err error) {
t.FailNow()
Expand All @@ -67,11 +77,15 @@ func TestRetry_Process(t *testing.T) {
return errors.New("random error")
})
assert.NoError(t, err)

metrics, err := scrapeMetrics()
require.NoError(t, err)
assert.Contains(t, metrics, `kp_deadletter_produce_total{topic="dlq-success-case"} 1`, "should report metric on successful DLQ produce")
})

t.Run("if producing returns error, we get a callback", func(t *testing.T) {
called := false
middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error {
middleware := deadletter.NewDeadletterMiddleware(newProducer("dlq-failure-case", func(item *kafka.Message) error {
return errors.New("random error")
}), 1, func(err error) {
called = true
Expand All @@ -82,5 +96,19 @@ func TestRetry_Process(t *testing.T) {
return errors.New("random error")
})
assert.True(t, called)
metrics, err := scrapeMetrics()
require.NoError(t, err)
assert.Contains(t, metrics, `kp_deadletter_produce_total{topic="dlq-failure-case"} 1`, "should report metric on failed DLQ produce")
})
}

func scrapeMetrics() (string, error) {
recorder := httptest.NewRecorder()
promhttp.Handler().ServeHTTP(recorder, httptest.NewRequest("GET", "/metrics", nil))
body, err := io.ReadAll(recorder.Result().Body)
if err != nil {
return "", err
}

return string(body), nil
}
4 changes: 4 additions & 0 deletions v2/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (p producer[BodyType]) Flush() error {
return nil
}

func (p producer[BodyType]) GetTopic() string {
return p.topic
}

func (p producer[BodyType]) ProduceRaw(message *kafka.Message) error {
return p.k.ProduceRaw(message)
}
Expand Down
2 changes: 2 additions & 0 deletions v2/producer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ type Producer[BodyType any] interface {
Flush() error
Produce(context context.Context, message BodyType) error
ProduceRaw(message *kafka.Message) error
GetTopic() string
}

type UntypedProducer interface {
ProduceRaw(message *kafka.Message) error
GetTopic() string
Flush() error
}
4 changes: 4 additions & 0 deletions v2/producer/untyped.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func (u untypedProducer) Flush() error {
return nil
}

func (u untypedProducer) GetTopic() string {
return u.topic
}

func NewUntyped(topic string, cfg config.Kafka) (UntypedProducer, error) {
p, err := kafka.NewProducer(config.GetKafkaConfig(cfg))
if err != nil {
Expand Down

0 comments on commit 09c51eb

Please sign in to comment.