Skip to content

Commit

Permalink
[Queues] Add retry mechanism + Add nullable package (#126)
Browse files Browse the repository at this point in the history
* add logic for message retries

* add nullable package

* rename misc to pkg

* use int32 instead of int

* add to queue the updated message for keeping track of retries

* add declare with config queue method

* reject message on final retry

* add msg retries in consumer config

* update MaxRetries comment

* refactor nullable package + include more data types
  • Loading branch information
dimkouv authored Jan 26, 2022
1 parent 47cf91e commit 3580634
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 4 deletions.
35 changes: 32 additions & 3 deletions mq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"fmt"
"time"

"github.com/trustwallet/go-libs/pkg/nullable"

log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

const headerRemainingRetries = "x-remaining-retries"

type consumer struct {
client *Client

Expand Down Expand Up @@ -73,14 +77,25 @@ func (c *consumer) consume(ctx context.Context) {
err := c.fn(msg.Body)
if err != nil {
log.Error(err)
}

if c.options.RetryOnError {
time.Sleep(c.options.RetryDelay)
if err != nil && c.options.RetryOnError {
time.Sleep(c.options.RetryDelay)
remainingRetries := c.getRemainingRetries(msg)

switch {
case remainingRetries > 0:
if err := c.queue.PublishWithConfig(msg.Body, PublishConfig{
MaxRetries: nullable.Int(int(remainingRetries - 1)),
}); err != nil {
log.Error(err)
}
case remainingRetries == 0:
break
default:
if err := msg.Reject(true); err != nil {
log.Error(err)
}

continue
}
}
Expand Down Expand Up @@ -108,3 +123,17 @@ func (c *consumer) messageChannel() (<-chan amqp.Delivery, error) {

return messageChannel, nil
}

func (c *consumer) getRemainingRetries(delivery amqp.Delivery) int32 {
remainingRetriesRaw, exists := delivery.Headers[headerRemainingRetries]
if !exists {
return int32(c.options.MaxRetries)
}

remainingRetries, ok := remainingRetriesRaw.(int32)
if !ok {
return int32(c.options.MaxRetries)
}

return remainingRetries
}
11 changes: 11 additions & 0 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,21 @@ func (c *Client) reconnect() error {
}

func publish(amqpChan *amqp.Channel, exchange ExchangeName, key ExchangeKey, body []byte) error {
return publishWithConfig(amqpChan, exchange, key, body, PublishConfig{})
}

func publishWithConfig(amqpChan *amqp.Channel, exchange ExchangeName, key ExchangeKey, body []byte, cfg PublishConfig) error {
headers := map[string]interface{}{}

if cfg.MaxRetries != nil {
headers[headerRemainingRetries] = *cfg.MaxRetries
}

return amqpChan.Publish(string(exchange), string(key), false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
Headers: headers,
})
}

Expand Down
5 changes: 5 additions & 0 deletions mq/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ type ConsumerOptions struct {
Workers int
RetryOnError bool
RetryDelay time.Duration

// MaxRetries specifies the default number of retries for consuming a message.
// A negative value is equal to infinite retries.
MaxRetries int
}

func DefaultConsumerOptions(workers int) ConsumerOptions {
return ConsumerOptions{
Workers: workers,
RetryOnError: true,
RetryDelay: time.Second,
MaxRetries: -1,
}
}

Expand Down
33 changes: 32 additions & 1 deletion mq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ type queue struct {

type Queue interface {
Declare() error
DeclareWithConfig(cfg DeclareConfig) error
Publish(body []byte) error
PublishWithConfig(body []byte, cfg PublishConfig) error
Name() QueueName
}

Expand All @@ -16,10 +18,39 @@ func (q *queue) Name() QueueName {
}

func (q *queue) Declare() error {
_, err := q.client.amqpChan.QueueDeclare(string(q.name), true, false, false, false, nil)
return q.DeclareWithConfig(DeclareConfig{Durable: true})
}

func (q *queue) DeclareWithConfig(cfg DeclareConfig) error {
_, err := q.client.amqpChan.QueueDeclare(
string(q.name),
cfg.Durable,
cfg.AutoDelete,
cfg.Exclusive,
cfg.NoWait,
cfg.Args,
)
return err
}

func (q *queue) Publish(body []byte) error {
return publish(q.client.amqpChan, "", ExchangeKey(q.name), body)
}

func (q *queue) PublishWithConfig(body []byte, cfg PublishConfig) error {
return publishWithConfig(q.client.amqpChan, "", ExchangeKey(q.name), body, cfg)
}

type DeclareConfig struct {
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Args map[string]interface{}
}

type PublishConfig struct {
// MaxRetries defines the maximum number of retries after processing failures.
// Overrides the value of consumer's config.
MaxRetries *int
}
66 changes: 66 additions & 0 deletions pkg/nullable/primitives.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package nullable

import (
"fmt"
)

func String(s string) *string {
return &s
}

func Stringf(s string, args ...interface{}) *string {
s = fmt.Sprintf(s, args...)
return &s
}

func Int(i int) *int {
return &i
}

func Int8(i int8) *int8 {
return &i
}

func Int16(i int16) *int16 {
return &i
}

func Int32(i int32) *int32 {
return &i
}

func Int64(i int64) *int64 {
return &i
}

func Uint(i uint) *uint {
return &i
}

func Uint8(i uint8) *uint8 {
return &i
}

func Uint16(i uint16) *uint16 {
return &i
}

func Uint32(i uint32) *uint32 {
return &i
}

func Uint64(i uint64) *uint64 {
return &i
}

func Float32(f float32) *float32 {
return &f
}

func Float64(f float64) *float64 {
return &f
}

func Bool(b bool) *bool {
return &b
}
7 changes: 7 additions & 0 deletions pkg/nullable/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nullable

import "time"

func Time(t time.Time) *time.Time {
return &t
}

0 comments on commit 3580634

Please sign in to comment.