From 35806346e10d0c899fcf43dbfdf18ea11372980e Mon Sep 17 00:00:00 2001 From: dimkouv Date: Wed, 26 Jan 2022 14:59:05 +0200 Subject: [PATCH] [Queues] Add retry mechanism + Add nullable package (#126) * add logic for message retries * add nullable package * rename misc to pkg * use int32 instead of int * add to queue the updated message for keeping track of retries * add declare with config queue method * reject message on final retry * add msg retries in consumer config * update MaxRetries comment * refactor nullable package + include more data types --- mq/consumer.go | 35 ++++++++++++++++++-- mq/mq.go | 11 +++++++ mq/options.go | 5 +++ mq/queue.go | 33 ++++++++++++++++++- pkg/nullable/primitives.go | 66 ++++++++++++++++++++++++++++++++++++++ pkg/nullable/time.go | 7 ++++ 6 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 pkg/nullable/primitives.go create mode 100644 pkg/nullable/time.go diff --git a/mq/consumer.go b/mq/consumer.go index 87bed52..b8613cb 100644 --- a/mq/consumer.go +++ b/mq/consumer.go @@ -5,10 +5,14 @@ import ( "fmt" "time" + "github.com/trustwallet/go-libs/pkg/nullable" + log "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) +const headerRemainingRetries = "x-remaining-retries" + type consumer struct { client *Client @@ -73,14 +77,25 @@ func (c *consumer) consume(ctx context.Context) { err := c.fn(msg.Body) if err != nil { log.Error(err) + } - if c.options.RetryOnError { - time.Sleep(c.options.RetryDelay) + if err != nil && c.options.RetryOnError { + time.Sleep(c.options.RetryDelay) + remainingRetries := c.getRemainingRetries(msg) + switch { + case remainingRetries > 0: + if err := c.queue.PublishWithConfig(msg.Body, PublishConfig{ + MaxRetries: nullable.Int(int(remainingRetries - 1)), + }); err != nil { + log.Error(err) + } + case remainingRetries == 0: + break + default: if err := msg.Reject(true); err != nil { log.Error(err) } - continue } } @@ -108,3 +123,17 @@ func (c *consumer) messageChannel() (<-chan amqp.Delivery, error) { return messageChannel, nil } + +func (c *consumer) getRemainingRetries(delivery amqp.Delivery) int32 { + remainingRetriesRaw, exists := delivery.Headers[headerRemainingRetries] + if !exists { + return int32(c.options.MaxRetries) + } + + remainingRetries, ok := remainingRetriesRaw.(int32) + if !ok { + return int32(c.options.MaxRetries) + } + + return remainingRetries +} diff --git a/mq/mq.go b/mq/mq.go index 3f49e30..93ee3b4 100755 --- a/mq/mq.go +++ b/mq/mq.go @@ -202,10 +202,21 @@ func (c *Client) reconnect() error { } func publish(amqpChan *amqp.Channel, exchange ExchangeName, key ExchangeKey, body []byte) error { + return publishWithConfig(amqpChan, exchange, key, body, PublishConfig{}) +} + +func publishWithConfig(amqpChan *amqp.Channel, exchange ExchangeName, key ExchangeKey, body []byte, cfg PublishConfig) error { + headers := map[string]interface{}{} + + if cfg.MaxRetries != nil { + headers[headerRemainingRetries] = *cfg.MaxRetries + } + return amqpChan.Publish(string(exchange), string(key), false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: body, + Headers: headers, }) } diff --git a/mq/options.go b/mq/options.go index d66bcdf..82d77af 100644 --- a/mq/options.go +++ b/mq/options.go @@ -8,6 +8,10 @@ type ConsumerOptions struct { Workers int RetryOnError bool RetryDelay time.Duration + + // MaxRetries specifies the default number of retries for consuming a message. + // A negative value is equal to infinite retries. + MaxRetries int } func DefaultConsumerOptions(workers int) ConsumerOptions { @@ -15,6 +19,7 @@ func DefaultConsumerOptions(workers int) ConsumerOptions { Workers: workers, RetryOnError: true, RetryDelay: time.Second, + MaxRetries: -1, } } diff --git a/mq/queue.go b/mq/queue.go index 4d8a889..206593c 100644 --- a/mq/queue.go +++ b/mq/queue.go @@ -7,7 +7,9 @@ type queue struct { type Queue interface { Declare() error + DeclareWithConfig(cfg DeclareConfig) error Publish(body []byte) error + PublishWithConfig(body []byte, cfg PublishConfig) error Name() QueueName } @@ -16,10 +18,39 @@ func (q *queue) Name() QueueName { } func (q *queue) Declare() error { - _, err := q.client.amqpChan.QueueDeclare(string(q.name), true, false, false, false, nil) + return q.DeclareWithConfig(DeclareConfig{Durable: true}) +} + +func (q *queue) DeclareWithConfig(cfg DeclareConfig) error { + _, err := q.client.amqpChan.QueueDeclare( + string(q.name), + cfg.Durable, + cfg.AutoDelete, + cfg.Exclusive, + cfg.NoWait, + cfg.Args, + ) return err } func (q *queue) Publish(body []byte) error { return publish(q.client.amqpChan, "", ExchangeKey(q.name), body) } + +func (q *queue) PublishWithConfig(body []byte, cfg PublishConfig) error { + return publishWithConfig(q.client.amqpChan, "", ExchangeKey(q.name), body, cfg) +} + +type DeclareConfig struct { + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Args map[string]interface{} +} + +type PublishConfig struct { + // MaxRetries defines the maximum number of retries after processing failures. + // Overrides the value of consumer's config. + MaxRetries *int +} diff --git a/pkg/nullable/primitives.go b/pkg/nullable/primitives.go new file mode 100644 index 0000000..0b7f8b7 --- /dev/null +++ b/pkg/nullable/primitives.go @@ -0,0 +1,66 @@ +package nullable + +import ( + "fmt" +) + +func String(s string) *string { + return &s +} + +func Stringf(s string, args ...interface{}) *string { + s = fmt.Sprintf(s, args...) + return &s +} + +func Int(i int) *int { + return &i +} + +func Int8(i int8) *int8 { + return &i +} + +func Int16(i int16) *int16 { + return &i +} + +func Int32(i int32) *int32 { + return &i +} + +func Int64(i int64) *int64 { + return &i +} + +func Uint(i uint) *uint { + return &i +} + +func Uint8(i uint8) *uint8 { + return &i +} + +func Uint16(i uint16) *uint16 { + return &i +} + +func Uint32(i uint32) *uint32 { + return &i +} + +func Uint64(i uint64) *uint64 { + return &i +} + +func Float32(f float32) *float32 { + return &f +} + +func Float64(f float64) *float64 { + return &f +} + +func Bool(b bool) *bool { + return &b +} diff --git a/pkg/nullable/time.go b/pkg/nullable/time.go new file mode 100644 index 0000000..f5043fe --- /dev/null +++ b/pkg/nullable/time.go @@ -0,0 +1,7 @@ +package nullable + +import "time" + +func Time(t time.Time) *time.Time { + return &t +}