From 37f40b864d6f3746aea243fcb0e7327a0c2ed5d1 Mon Sep 17 00:00:00 2001 From: Nishchal Gautam Date: Fri, 6 Sep 2024 15:59:06 +0700 Subject: [PATCH] feat: skip commit on failures --- v2/kp.go | 14 +++++++++++--- v2/kp_example_test.go | 6 +++--- v2/kp_test.go | 6 +++--- v2/middlewares/consumer/consumer.go | 8 ++++++-- v2/middlewares/deadletter/deadletter.go | 15 +++++---------- v2/middlewares/deadletter/deadletter_test.go | 17 ++++++----------- v2/middlewares/retry/retry.go | 13 ++++--------- v2/middlewares/retry/retry_test.go | 15 +++++---------- 8 files changed, 43 insertions(+), 51 deletions(-) diff --git a/v2/kp.go b/v2/kp.go index 5237ed0..9518dad 100644 --- a/v2/kp.go +++ b/v2/kp.go @@ -12,6 +12,7 @@ type Processor[MessageType any] func(ctx context.Context, item *MessageType) err type kp[MessageType any] struct { chain middleware.Processor[*MessageType, error] shouldContinue int32 + onError func(error) } func (t *kp[MessageType]) getShouldContinue() bool { @@ -27,6 +28,13 @@ func (t *kp[MessageType]) AddMiddleware(middleware middleware.Middleware[*Messag func (t *kp[MessageType]) Stop() { atomic.StoreInt32(&t.shouldContinue, 0) } +func (t *kp[MessageType]) process() { + ctx := context.Background() + err := t.chain.Process(ctx, nil) + if err != nil && t.onError != nil { + t.onError(err) + } +} func (t *kp[MessageType]) Run(processor Processor[MessageType]) error { t.chain.AddMiddleware(middleware.FinalMiddleware[*MessageType, error](func(ctx context.Context, msg *MessageType) error { @@ -34,16 +42,16 @@ func (t *kp[MessageType]) Run(processor Processor[MessageType]) error { })) for t.getShouldContinue() { - ctx := context.Background() - _ = t.chain.Process(ctx, nil) + t.process() } return nil } -func New[MessageType any]() MessageProcessor[MessageType] { +func New[MessageType any](onError func(err error)) MessageProcessor[MessageType] { return &kp[MessageType]{ chain: middleware.New[*MessageType, error](), shouldContinue: 1, + onError: onError, } } diff --git a/v2/kp_example_test.go b/v2/kp_example_test.go index 5122f52..5f6901b 100644 --- a/v2/kp_example_test.go +++ b/v2/kp_example_test.go @@ -44,7 +44,7 @@ func ExampleNew() { if err != nil { panic(err) } - processor := v2.New[kafka.Message]() + processor := v2.New[kafka.Message](nil) kafkaConsumer, err := consumer2.New([]string{"user-logged-in", "user-logged-in-rewards-processor-retry"}, kpConfig.KafkaConfig.WithDefaults()) if err != nil { panic(err) @@ -56,8 +56,8 @@ func ExampleNew() { err = processor. AddMiddleware(consumer.NewConsumerMiddleware(kafkaConsumer)). AddMiddleware(retry_count.NewRetryCountMiddleware()). - AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer, func(err error) {})). - AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, 3, func(err error) {})). + AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer)). + AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, 3)). Run(func(ctx context.Context, ev *kafka.Message) error { val, _ := serialization.Decode[UserLoggedInEvent](ev.Value) fmt.Printf("%s-%d|", val.UserID, retry_count.FromContext(ctx)) diff --git a/v2/kp_test.go b/v2/kp_test.go index 00b8d20..17127e2 100644 --- a/v2/kp_test.go +++ b/v2/kp_test.go @@ -47,7 +47,7 @@ func TestKP(t *testing.T) { p.Produce(context.Background(), MyType{Username: "username1", Count: 1}) p.Flush() assert.NoError(t, err) - kp := v2.New[kafka.Message]() + kp := v2.New[kafka.Message](nil) messageProcessCount := 0 const retryCount = 10 retryTopicProducer, err := producer.New[UserLoggedInEvent]("kp-topic-retry", config.KPConfig{KafkaConfig: kafkaCfg, SchemaRegistryConfig: schemaRegistryConfig}) @@ -69,8 +69,8 @@ func TestKP(t *testing.T) { }() err = kp.AddMiddleware(consumer.NewConsumerMiddleware(kafkaConsumer)). AddMiddleware(MyMw{}). - AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer, func(err error) {})). - AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, retryCount, func(err error) {})). + AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer)). + AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, retryCount)). Run(func(ctx context.Context, message *kafka.Message) error { fmt.Printf("%v\n", message) messageProcessCount++ diff --git a/v2/middlewares/consumer/consumer.go b/v2/middlewares/consumer/consumer.go index 029ca36..bb63194 100644 --- a/v2/middlewares/consumer/consumer.go +++ b/v2/middlewares/consumer/consumer.go @@ -22,9 +22,13 @@ func (c consumerMiddleware) Process(ctx context.Context, item *kafka.Message, ne if msg == nil { return nil } - defer c.consumer.Commit(msg) - return next(ctx, msg) + err := next(ctx, msg) + if err != nil { + return err + } + + return c.consumer.Commit(msg) } func NewConsumerMiddleware(consumer consumer.Consumer) middlewares.KPMiddleware[*kafka.Message] { diff --git a/v2/middlewares/deadletter/deadletter.go b/v2/middlewares/deadletter/deadletter.go index ccece48..7805373 100644 --- a/v2/middlewares/deadletter/deadletter.go +++ b/v2/middlewares/deadletter/deadletter.go @@ -10,9 +10,8 @@ import ( ) type deadletter struct { - producer Producer - onProduceErrors func(err error) - threshold int + producer Producer + threshold int } type Producer interface { ProduceRaw(message *kafka.Message) error @@ -26,14 +25,10 @@ func (r deadletter) Process(ctx context.Context, item *kafka.Message, next func( if retrycounter.GetCount(item) < r.threshold { return err } - err = r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque}) - if err != nil { - r.onProduceErrors(err) - } - return nil + return r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque}) } -func NewDeadletterMiddleware(producer Producer, threshold int, onProduceErrors func(error)) middlewares.KPMiddleware[*kafka.Message] { - return deadletter{onProduceErrors: onProduceErrors, producer: producer, threshold: threshold} +func NewDeadletterMiddleware(producer Producer, threshold int) middlewares.KPMiddleware[*kafka.Message] { + return deadletter{producer: producer, threshold: threshold} } diff --git a/v2/middlewares/deadletter/deadletter_test.go b/v2/middlewares/deadletter/deadletter_test.go index 1250dcb..ed416b7 100644 --- a/v2/middlewares/deadletter/deadletter_test.go +++ b/v2/middlewares/deadletter/deadletter_test.go @@ -36,7 +36,7 @@ func newProducer(cb func(item *kafka.Message) error) producer.Producer[any] { func TestRetry_Process(t *testing.T) { t.Run("if message processing succeeds, it returns nil without retrying", func(t *testing.T) { - middleware := deadletter.NewDeadletterMiddleware(nil, 2, nil) + middleware := deadletter.NewDeadletterMiddleware(nil, 2) assert.NotPanics(t, func() { err := middleware.Process(context.Background(), nil, func(ctx context.Context, item *kafka.Message) error { return nil @@ -48,9 +48,7 @@ func TestRetry_Process(t *testing.T) { t.Run("if the retry count is less than threshold it simply returns error", func(t *testing.T) { middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error { return nil - }), 2, func(err error) { - t.FailNow() - }) + }), 2) msg := &kafka.Message{} retrycounter.SetCount(msg, 0) err := middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error { @@ -69,18 +67,15 @@ func TestRetry_Process(t *testing.T) { assert.NoError(t, err) }) - t.Run("if producing returns error, we get a callback", func(t *testing.T) { - called := false + t.Run("if producing returns error, it returns an error", func(t *testing.T) { middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error { return errors.New("random error") - }), 1, func(err error) { - called = true - }) + }), 1) msg := &kafka.Message{} retrycounter.SetCount(msg, 2) - middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error { + err := middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error { return errors.New("random error") }) - assert.True(t, called) + assert.Error(t, err) }) } diff --git a/v2/middlewares/retry/retry.go b/v2/middlewares/retry/retry.go index 1041d97..d6999d4 100644 --- a/v2/middlewares/retry/retry.go +++ b/v2/middlewares/retry/retry.go @@ -10,8 +10,7 @@ import ( ) type retry struct { - producer Producer - onProduceErrors func(err error) + producer Producer } type Producer interface { @@ -24,14 +23,10 @@ func (r retry) Process(ctx context.Context, item *kafka.Message, next func(ctx c return nil } retrycounter.SetCount(item, retrycounter.GetCount(item)+1) - err = r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque}) - if err != nil { - r.onProduceErrors(err) - } - return nil + return r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque}) } -func NewRetryMiddleware(producer Producer, onProduceErrors func(error)) middlewares.KPMiddleware[*kafka.Message] { - return retry{onProduceErrors: onProduceErrors, producer: producer} +func NewRetryMiddleware(producer Producer) middlewares.KPMiddleware[*kafka.Message] { + return retry{producer: producer} } diff --git a/v2/middlewares/retry/retry_test.go b/v2/middlewares/retry/retry_test.go index e2a0072..ba28b8b 100644 --- a/v2/middlewares/retry/retry_test.go +++ b/v2/middlewares/retry/retry_test.go @@ -35,7 +35,7 @@ func newProducer(cb func(item *kafka.Message) error) producer.Producer[any] { func TestRetry_Process(t *testing.T) { t.Run("if message processing succeeds, it returns nil without retrying", func(t *testing.T) { - middleware := retry.NewRetryMiddleware(nil, nil) + middleware := retry.NewRetryMiddleware(nil) assert.NotPanics(t, func() { err := middleware.Process(context.Background(), nil, func(ctx context.Context, item *kafka.Message) error { return nil @@ -47,9 +47,7 @@ func TestRetry_Process(t *testing.T) { t.Run("producing message works and increments the retry count", func(t *testing.T) { middleware := retry.NewRetryMiddleware(newProducer(func(item *kafka.Message) error { return nil - }), func(err error) { - t.FailNow() - }) + })) msg := &kafka.Message{} middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error { assert.Equal(t, 0, retrycounter.GetCount(item)) @@ -66,17 +64,14 @@ func TestRetry_Process(t *testing.T) { }) t.Run("if producing fails while retrying, we get a callback", func(t *testing.T) { - called := false middleware := retry.NewRetryMiddleware(newProducer(func(item *kafka.Message) error { return errors.New("some error") - }), func(err error) { - called = true - }) + })) msg := &kafka.Message{} - middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error { + err := middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error { assert.Equal(t, 0, retrycounter.GetCount(item)) return errors.New("random error") }) - assert.True(t, called) + assert.Error(t, err) }) }