Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip commit on failures #99

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions v2/kp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Processor[MessageType any] func(ctx context.Context, item *MessageType) err
type kp[MessageType any] struct {
chain middleware.Processor[*MessageType, error]
shouldContinue int32
onError func(error)
}

func (t *kp[MessageType]) getShouldContinue() bool {
Expand All @@ -27,23 +28,30 @@ func (t *kp[MessageType]) AddMiddleware(middleware middleware.Middleware[*Messag
func (t *kp[MessageType]) Stop() {
atomic.StoreInt32(&t.shouldContinue, 0)
}
func (t *kp[MessageType]) process() {
ctx := context.Background()
err := t.chain.Process(ctx, nil)
if err != nil && t.onError != nil {
t.onError(err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need check if onError is nil, func type in go is nullable

}
}

func (t *kp[MessageType]) Run(processor Processor[MessageType]) error {
t.chain.AddMiddleware(middleware.FinalMiddleware[*MessageType, error](func(ctx context.Context, msg *MessageType) error {
return processor(ctx, msg)
}))

for t.getShouldContinue() {
ctx := context.Background()
_ = t.chain.Process(ctx, nil)
t.process()
}

return nil
}

func New[MessageType any]() MessageProcessor[MessageType] {
func New[MessageType any](onError func(err error)) MessageProcessor[MessageType] {
return &kp[MessageType]{
chain: middleware.New[*MessageType, error](),
shouldContinue: 1,
onError: onError,
}
}
6 changes: 3 additions & 3 deletions v2/kp_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func ExampleNew() {
if err != nil {
panic(err)
}
processor := v2.New[kafka.Message]()
processor := v2.New[kafka.Message](nil)
kafkaConsumer, err := consumer2.New([]string{"user-logged-in", "user-logged-in-rewards-processor-retry"}, kpConfig.KafkaConfig.WithDefaults())
if err != nil {
panic(err)
Expand All @@ -56,8 +56,8 @@ func ExampleNew() {
err = processor.
AddMiddleware(consumer.NewConsumerMiddleware(kafkaConsumer)).
AddMiddleware(retry_count.NewRetryCountMiddleware()).
AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer, func(err error) {})).
AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, 3, func(err error) {})).
AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer)).
AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, 3)).
Run(func(ctx context.Context, ev *kafka.Message) error {
val, _ := serialization.Decode[UserLoggedInEvent](ev.Value)
fmt.Printf("%s-%d|", val.UserID, retry_count.FromContext(ctx))
Expand Down
6 changes: 3 additions & 3 deletions v2/kp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestKP(t *testing.T) {
p.Produce(context.Background(), MyType{Username: "username1", Count: 1})
p.Flush()
assert.NoError(t, err)
kp := v2.New[kafka.Message]()
kp := v2.New[kafka.Message](nil)
messageProcessCount := 0
const retryCount = 10
retryTopicProducer, err := producer.New[UserLoggedInEvent]("kp-topic-retry", config.KPConfig{KafkaConfig: kafkaCfg, SchemaRegistryConfig: schemaRegistryConfig})
Expand All @@ -69,8 +69,8 @@ func TestKP(t *testing.T) {
}()
err = kp.AddMiddleware(consumer.NewConsumerMiddleware(kafkaConsumer)).
AddMiddleware(MyMw{}).
AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer, func(err error) {})).
AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, retryCount, func(err error) {})).
AddMiddleware(retry.NewRetryMiddleware(retryTopicProducer)).
AddMiddleware(deadletter.NewDeadletterMiddleware(dltProducer, retryCount)).
Run(func(ctx context.Context, message *kafka.Message) error {
fmt.Printf("%v\n", message)
messageProcessCount++
Expand Down
8 changes: 6 additions & 2 deletions v2/middlewares/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ func (c consumerMiddleware) Process(ctx context.Context, item *kafka.Message, ne
if msg == nil {
return nil
}
defer c.consumer.Commit(msg)

return next(ctx, msg)
err := next(ctx, msg)
if err != nil {
return err
}

return c.consumer.Commit(msg)
}

func NewConsumerMiddleware(consumer consumer.Consumer) middlewares.KPMiddleware[*kafka.Message] {
Expand Down
15 changes: 5 additions & 10 deletions v2/middlewares/deadletter/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
)

type deadletter struct {
producer Producer
onProduceErrors func(err error)
threshold int
producer Producer
threshold int
}
type Producer interface {
ProduceRaw(message *kafka.Message) error
Expand All @@ -26,14 +25,10 @@ func (r deadletter) Process(ctx context.Context, item *kafka.Message, next func(
if retrycounter.GetCount(item) < r.threshold {
return err
}
err = r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque})
if err != nil {
r.onProduceErrors(err)
}

return nil
return r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque})
}

func NewDeadletterMiddleware(producer Producer, threshold int, onProduceErrors func(error)) middlewares.KPMiddleware[*kafka.Message] {
return deadletter{onProduceErrors: onProduceErrors, producer: producer, threshold: threshold}
func NewDeadletterMiddleware(producer Producer, threshold int) middlewares.KPMiddleware[*kafka.Message] {
return deadletter{producer: producer, threshold: threshold}
}
17 changes: 6 additions & 11 deletions v2/middlewares/deadletter/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newProducer(cb func(item *kafka.Message) error) producer.Producer[any] {

func TestRetry_Process(t *testing.T) {
t.Run("if message processing succeeds, it returns nil without retrying", func(t *testing.T) {
middleware := deadletter.NewDeadletterMiddleware(nil, 2, nil)
middleware := deadletter.NewDeadletterMiddleware(nil, 2)
assert.NotPanics(t, func() {
err := middleware.Process(context.Background(), nil, func(ctx context.Context, item *kafka.Message) error {
return nil
Expand All @@ -48,9 +48,7 @@ func TestRetry_Process(t *testing.T) {
t.Run("if the retry count is less than threshold it simply returns error", func(t *testing.T) {
middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error {
return nil
}), 2, func(err error) {
t.FailNow()
})
}), 2)
msg := &kafka.Message{}
retrycounter.SetCount(msg, 0)
err := middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error {
Expand All @@ -69,18 +67,15 @@ func TestRetry_Process(t *testing.T) {
assert.NoError(t, err)
})

t.Run("if producing returns error, we get a callback", func(t *testing.T) {
called := false
t.Run("if producing returns error, it returns an error", func(t *testing.T) {
middleware := deadletter.NewDeadletterMiddleware(newProducer(func(item *kafka.Message) error {
return errors.New("random error")
}), 1, func(err error) {
called = true
})
}), 1)
msg := &kafka.Message{}
retrycounter.SetCount(msg, 2)
middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error {
err := middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error {
return errors.New("random error")
})
assert.True(t, called)
assert.Error(t, err)
})
}
13 changes: 4 additions & 9 deletions v2/middlewares/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
)

type retry struct {
producer Producer
onProduceErrors func(err error)
producer Producer
}

type Producer interface {
Expand All @@ -24,14 +23,10 @@ func (r retry) Process(ctx context.Context, item *kafka.Message, next func(ctx c
return nil
}
retrycounter.SetCount(item, retrycounter.GetCount(item)+1)
err = r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque})
if err != nil {
r.onProduceErrors(err)
}

return nil
return r.producer.ProduceRaw(&kafka.Message{Value: item.Value, Key: item.Key, Headers: item.Headers, Timestamp: item.Timestamp, TimestampType: item.TimestampType, Opaque: item.Opaque})
}

func NewRetryMiddleware(producer Producer, onProduceErrors func(error)) middlewares.KPMiddleware[*kafka.Message] {
return retry{onProduceErrors: onProduceErrors, producer: producer}
func NewRetryMiddleware(producer Producer) middlewares.KPMiddleware[*kafka.Message] {
return retry{producer: producer}
}
15 changes: 5 additions & 10 deletions v2/middlewares/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newProducer(cb func(item *kafka.Message) error) producer.Producer[any] {

func TestRetry_Process(t *testing.T) {
t.Run("if message processing succeeds, it returns nil without retrying", func(t *testing.T) {
middleware := retry.NewRetryMiddleware(nil, nil)
middleware := retry.NewRetryMiddleware(nil)
assert.NotPanics(t, func() {
err := middleware.Process(context.Background(), nil, func(ctx context.Context, item *kafka.Message) error {
return nil
Expand All @@ -47,9 +47,7 @@ func TestRetry_Process(t *testing.T) {
t.Run("producing message works and increments the retry count", func(t *testing.T) {
middleware := retry.NewRetryMiddleware(newProducer(func(item *kafka.Message) error {
return nil
}), func(err error) {
t.FailNow()
})
}))
msg := &kafka.Message{}
middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error {
assert.Equal(t, 0, retrycounter.GetCount(item))
Expand All @@ -66,17 +64,14 @@ func TestRetry_Process(t *testing.T) {
})

t.Run("if producing fails while retrying, we get a callback", func(t *testing.T) {
called := false
middleware := retry.NewRetryMiddleware(newProducer(func(item *kafka.Message) error {
return errors.New("some error")
}), func(err error) {
called = true
})
}))
msg := &kafka.Message{}
middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error {
err := middleware.Process(context.Background(), msg, func(ctx context.Context, item *kafka.Message) error {
assert.Equal(t, 0, retrycounter.GetCount(item))
return errors.New("random error")
})
assert.True(t, called)
assert.Error(t, err)
})
}
Loading