Skip to content

Commit

Permalink
chore: fix queue function
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec committed Aug 11, 2024
1 parent 8e0bdeb commit 1ad0c42
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion examples/rabbitmq/withretries/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ func handler() interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
fmt.Printf("Message: %+v\n", m)
// something happend, we need to requeue the message
return m.PutToBackOfQueueWithDelay(ctx, interfaces.ExponentialBackoffDelayFn)
return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
}
}
12 changes: 6 additions & 6 deletions interfaces/delayfn.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package interfaces

type DelayFn func(retries int64) (delay int64)
type DelayFn func(currenRetries int64) (delay int64)

var (
// ExponentialBackoffDelayFn is a delay function that implements exponential backoff.
// It takes the number of retries as input and returns the delay in seconds.
ExponentialBackoffDelayFn DelayFn = func(retries int64) (delay int64) {
return 2 << (retries - 1)
ExponentialBackoffDelayFn DelayFn = func(currenRetries int64) (delay int64) {
return 2 << (currenRetries - 1)
}

// LinearDelayFn is a delay function that implements linear delay.
// It takes the number of retries as input and returns the delay in seconds.
LinearDelayFn DelayFn = func(retries int64) (delay int64) {
return retries
LinearDelayFn DelayFn = func(currenRetries int64) (delay int64) {
return currenRetries
}

// NoDelayFn is a DelayFn implementation that returns 0 delay for retries.
NoDelayFn DelayFn = func(retries int64) (delay int64) {
NoDelayFn DelayFn = func(currenRetries int64) (delay int64) {
return 0
}
DefaultDelayFn DelayFn = LinearDelayFn
Expand Down
2 changes: 1 addition & 1 deletion interfaces/inboundmessagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ type InboundMessage struct {
// eg RabbitMQ: https://www.rabbitmq.com/docs/dlx
MoveToDeadLetterQueue func(ctx context.Context) (err error) `json:"-"`
// Requeue is used to put the message back to the tail of the queue after a delay.
PutToBackOfQueueWithDelay func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
RetryWithDelayFn func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
}
4 changes: 2 additions & 2 deletions internal/consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// rabbitMQ is the subscriber handler for rabbitmq
type rabbitMQ struct {
consumerChannel *amqp.Channel
requeueChannel *amqp.Channel //if want requeue support to another queue
requeueChannel *amqp.Channel
option *consumerOpts.ConsumerOption
tagName string
msgReceiver <-chan amqp.Delivery
Expand Down Expand Up @@ -307,7 +307,7 @@ func (r *rabbitMQ) Consume(ctx context.Context,
err = receivedMsg.Nack(false, false)
return
},
PutToBackOfQueueWithDelay: r.requeueMessageWithDLQ(meta, msg, receivedMsg),
RetryWithDelayFn: r.requeueMessageWithDLQ(meta, msg, receivedMsg),
}

logrus.WithFields(logrus.Fields{
Expand Down

0 comments on commit 1ad0c42

Please sign in to comment.