diff --git a/v2/middlewares/deadletter/deadletter.go b/v2/middlewares/deadletter/deadletter.go index ccece48..580760f 100644 --- a/v2/middlewares/deadletter/deadletter.go +++ b/v2/middlewares/deadletter/deadletter.go @@ -4,11 +4,17 @@ import ( "context" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/honestbank/kp/v2/internal/retrycounter" "github.com/honestbank/kp/v2/middlewares" ) +var deadletterProduceCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "kp_deadletter_produce_total", +}, []string{"topic"}) + type deadletter struct { producer Producer onProduceErrors func(err error) @@ -16,6 +22,7 @@ type deadletter struct { } type Producer interface { ProduceRaw(message *kafka.Message) error + GetTopic() string } func (r deadletter) Process(ctx context.Context, item *kafka.Message, next func(ctx context.Context, item *kafka.Message) error) error { @@ -26,6 +33,7 @@ func (r deadletter) Process(ctx context.Context, item *kafka.Message, next func( if retrycounter.GetCount(item) < r.threshold { return err } + deadletterProduceCounter.With(prometheus.Labels{"topic": r.producer.GetTopic()}).Inc() err = r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque}) if err != nil { r.onProduceErrors(err) diff --git a/v2/middlewares/deadletter/deadletter_test.go b/v2/middlewares/deadletter/deadletter_test.go index 1250dcb..6ff60ad 100644 --- a/v2/middlewares/deadletter/deadletter_test.go +++ b/v2/middlewares/deadletter/deadletter_test.go @@ -3,8 +3,13 @@ package deadletter_test import ( "context" "errors" + "io" + "net/http/httptest" "testing" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/stretchr/testify/require" + "github.com/honestbank/kp/v2/internal/retrycounter" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -16,6 +21,7 @@ import ( type producerMock struct { produceRaw func(message *kafka.Message) error + topic string } func (r producerMock) Flush() error { @@ -30,8 +36,12 @@ func (r producerMock) ProduceRaw(message *kafka.Message) error { return r.produceRaw(message) } -func newProducer(cb func(item *kafka.Message) error) producer.Producer[any] { - return producerMock{produceRaw: cb} +func (r producerMock) GetTopic() string { + return r.topic +} + +func newProducer(topic string, cb func(item *kafka.Message) error) producer.Producer[any] { + return producerMock{produceRaw: cb, topic: topic} } func TestRetry_Process(t *testing.T) { @@ -46,7 +56,7 @@ func TestRetry_Process(t *testing.T) { }) t.Run("if the retry count is less than threshold it simply returns error", func(t *testing.T) { - middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error { + middleware := deadletter.NewDeadletterMiddleware(newProducer("dlq-success-case", func(item *kafka.Message) error { return nil }), 2, func(err error) { t.FailNow() @@ -67,11 +77,15 @@ func TestRetry_Process(t *testing.T) { return errors.New("random error") }) assert.NoError(t, err) + + metrics, err := scrapeMetrics() + require.NoError(t, err) + assert.Contains(t, metrics, `kp_deadletter_produce_total{topic="dlq-success-case"} 1`, "should report metric on successful DLQ produce") }) t.Run("if producing returns error, we get a callback", func(t *testing.T) { called := false - middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error { + middleware := deadletter.NewDeadletterMiddleware(newProducer("dlq-failure-case", func(item *kafka.Message) error { return errors.New("random error") }), 1, func(err error) { called = true @@ -82,5 +96,19 @@ func TestRetry_Process(t *testing.T) { return errors.New("random error") }) assert.True(t, called) + metrics, err := scrapeMetrics() + require.NoError(t, err) + assert.Contains(t, metrics, `kp_deadletter_produce_total{topic="dlq-failure-case"} 1`, "should report metric on failed DLQ produce") }) } + +func scrapeMetrics() (string, error) { + recorder := httptest.NewRecorder() + promhttp.Handler().ServeHTTP(recorder, httptest.NewRequest("GET", "/metrics", nil)) + body, err := io.ReadAll(recorder.Result().Body) + if err != nil { + return "", err + } + + return string(body), nil +} diff --git a/v2/producer/producer.go b/v2/producer/producer.go index 56081da..f6272e2 100644 --- a/v2/producer/producer.go +++ b/v2/producer/producer.go @@ -42,6 +42,10 @@ func (p producer[BodyType]) Flush() error { return nil } +func (p producer[BodyType]) GetTopic() string { + return p.topic +} + func (p producer[BodyType]) ProduceRaw(message *kafka.Message) error { return p.k.ProduceRaw(message) } diff --git a/v2/producer/types.go b/v2/producer/types.go index 8b59d7a..42f85f8 100644 --- a/v2/producer/types.go +++ b/v2/producer/types.go @@ -10,9 +10,11 @@ type Producer[BodyType any] interface { Flush() error Produce(context context.Context, message BodyType) error ProduceRaw(message *kafka.Message) error + GetTopic() string } type UntypedProducer interface { ProduceRaw(message *kafka.Message) error + GetTopic() string Flush() error } diff --git a/v2/producer/untyped.go b/v2/producer/untyped.go index a694587..73c2cbe 100644 --- a/v2/producer/untyped.go +++ b/v2/producer/untyped.go @@ -26,6 +26,10 @@ func (u untypedProducer) Flush() error { return nil } +func (u untypedProducer) GetTopic() string { + return u.topic +} + func NewUntyped(topic string, cfg config.Kafka) (UntypedProducer, error) { p, err := kafka.NewProducer(config.GetKafkaConfig(cfg)) if err != nil {