-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathconsumer.go
300 lines (255 loc) · 9.56 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package gosqs
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/service/sqs"
)
var maxMessages = int64(10)
// Consumer provides an interface for receiving messages through AWS SQS and SNS
type Consumer interface {
// Consume polls for new messages and if it finds one, decodes it, sends it to the handler and deletes it
//
// A message is not considered dequeued until it has been sucessfully processed and deleted. There is a 30 Second
// delay between receiving a single message and receiving the same message. This delay can be adjusted in the AWS
// console and can also be extended during operation. If a message is successfully received 4 times but not deleted,
// it will be considered unprocessable and sent to the DLQ automatically
//
// Consume uses long-polling to check and retrieve messages, if it is unable to make a connection, the aws-SDK will use its
// advanced retrying mechanism (including exponential backoff), if all of the retries fail, then we will wait 10s before
// trying again.
//
// When a new message is received, it runs in a separate go-routine that will handle the full consuming of the message, error reporting
// and deleting
Consume()
// RegisterHandler registers an event listener and an associated handler. If the event matches, the handler will
// be run
RegisterHandler(name string, h Handler, adapters ...Adapter)
// Message serves as the direct messaging capability within the consumer. A worker can send direct messages to other workers
Message(ctx context.Context, queue, event string, body interface{})
// MessageSelf serves as the self messaging capability within the consumer, a worker can send messages to itself for continued
// processing and resiliency
MessageSelf(ctx context.Context, event string, body interface{})
}
// consumer is a wrapper around sqs.SQS
type consumer struct {
sqs *sqs.SQS
handlers map[string]Handler
env string
QueueURL string
Hostname string
VisibilityTimeout int
workerPool int
workerCount int
extensionLimit int
attributes []customAttribute
logger Logger
}
// NewConsumer creates a new SQS instance and provides a configured consumer interface for
// receiving and sending messages
func NewConsumer(c Config, queueName string) (Consumer, error) {
if c.SessionProvider == nil {
c.SessionProvider = newSession
}
sess, err := c.SessionProvider(c)
if err != nil {
return nil, err
}
cons := &consumer{
sqs: sqs.New(sess),
env: c.Env,
VisibilityTimeout: 30,
workerPool: 30,
extensionLimit: 2,
}
if c.Logger != nil {
cons.logger = c.Logger
}
if c.VisibilityTimeout != 0 {
cons.VisibilityTimeout = c.VisibilityTimeout
}
if c.WorkerPool != 0 {
cons.workerPool = c.WorkerPool
}
if c.ExtensionLimit != nil {
cons.extensionLimit = *c.ExtensionLimit
}
cons.QueueURL = c.QueueURL
// custom QueueURLs can be provided for testing and mocking purposes
if cons.QueueURL == "" {
name := fmt.Sprintf("%s-%s", c.Env, queueName)
o, err := cons.sqs.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &name})
if err != nil {
return nil, err
}
cons.QueueURL = *o.QueueUrl
}
return cons, nil
}
// Logger accesses the logging field or applies a default logger
func (c *consumer) Logger() Logger {
if c.logger == nil {
return &defaultLogger{}
}
return c.logger
}
// RegisterHandler registers an event listener and an associated handler. If the event matches, the handler will
// be run along with any included middleware
func (c *consumer) RegisterHandler(name string, h Handler, adapters ...Adapter) {
if c.handlers == nil {
c.handlers = make(map[string]Handler)
}
for i := len(adapters) - 1; i >= 0; i-- {
h = adapters[i](h)
}
c.handlers[name] = func(ctx context.Context, m Message) error {
return h(ctx, m)
}
}
var (
all = "All"
)
// Consume polls for new messages and if it finds one, decodes it, sends it to the handler and deletes it
//
// A message is not considered dequeued until it has been sucessfully processed and deleted. There is a 30 Second
// delay between receiving a single message and receiving the same message. This delay can be adjusted in the AWS
// console and can also be extended during operation. If a message is successfully received 4 times but not deleted,
// it will be considered unprocessable and sent to the DLQ automatically
//
// Consume uses long-polling to check and retrieve messages, if it is unable to make a connection, the aws-SDK will use its
// advanced retrying mechanism (including exponential backoff), if all of the retries fail, then we will wait 10s before
// trying again.
//
// When a new message is received, it runs in a separate go-routine that will handle the full consuming of the message, error reporting
// and deleting
func (c *consumer) Consume() {
jobs := make(chan *message)
for w := 1; w <= c.workerPool; w++ {
go c.worker(w, jobs)
}
for {
output, err := c.sqs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: &c.QueueURL, MaxNumberOfMessages: &maxMessages, MessageAttributeNames: []*string{&all}})
if err != nil {
c.Logger().Println("%s , retrying in 10s", ErrGetMessage.Context(err).Error())
time.Sleep(10 * time.Second)
continue
}
for _, m := range output.Messages {
if _, ok := m.MessageAttributes["route"]; !ok {
//a message will be sent to the DLQ automatically after 4 tries if it is received but not deleted
c.Logger().Println(ErrNoRoute.Error())
continue
}
jobs <- newMessage(m)
}
}
}
// worker is an always-on concurrent worker that will take tasks when they are added into the messages buffer
func (c *consumer) worker(id int, messages <-chan *message) {
for m := range messages {
if err := c.run(m); err != nil {
c.Logger().Println(err.Error())
}
}
}
// run should be run within a worker
// if there is no handler for that route, then the message will be deleted and fully consumed
//
// if the handler exists, it will wait for the err channel to be processed. Once it receives feedback from the handler in the form
// of a channel, it will either log the error, or consume the message
func (c *consumer) run(m *message) error {
if h, ok := c.handlers[m.Route()]; ok {
ctx := context.Background()
go c.extend(ctx, m)
if err := h(ctx, m); err != nil {
return m.ErrorResponse(ctx, err)
}
// finish the extension channel if the message was processed successfully
m.Success(ctx)
}
//deletes message if the handler was successful or if there was no handler with that route
return c.delete(m) //MESSAGE CONSUMED
}
// MessageSelf serves as the self messaging capability within the consumer, a worker can send messages to itself for continued
// processing and resiliency
func (c *consumer) MessageSelf(ctx context.Context, event string, body interface{}) {
o, err := json.Marshal(body)
if err != nil {
log.Println(ErrMarshal.Context(err).Error(), event)
return
}
out := string(o)
sqsInput := &sqs.SendMessageInput{
MessageBody: &out,
MessageAttributes: defaultSQSAttributes(event, c.attributes...),
QueueUrl: &c.QueueURL,
}
go c.sendDirectMessage(ctx, sqsInput, event)
}
// Message serves as the direct messaging capability within the consumer. A worker can send direct messages to other workers
func (c *consumer) Message(ctx context.Context, queue, event string, body interface{}) {
name := fmt.Sprintf("%s-%s", c.env, queue)
queueResp, err := c.sqs.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &name})
if err != nil {
log.Printf("%s, queue: %s", ErrQueueURL.Context(err).Error(), name)
return
}
o, err := json.Marshal(body)
if err != nil {
log.Println(ErrMarshal.Context(err).Error(), event)
return
}
out := string(o)
sqsInput := &sqs.SendMessageInput{
MessageBody: &out,
MessageAttributes: defaultSQSAttributes(event, c.attributes...),
QueueUrl: queueResp.QueueUrl,
}
go c.sendDirectMessage(ctx, sqsInput, event)
}
// sendDirectMessage is a helper that should be run concurrently since it will block the main thread if there is a connection issue
func (c *consumer) sendDirectMessage(ctx context.Context, input *sqs.SendMessageInput, event string) {
if _, err := c.sqs.SendMessage(input); err != nil {
log.Printf("%s, event: %s \nretrying in 10s", ErrPublish.Context(err).Error(), event)
time.Sleep(10 * time.Second)
c.sendDirectMessage(ctx, input, event)
}
}
// delete will remove a message from the queue, this is necessary to fully and successfully consume a message
func (c *consumer) delete(m *message) error {
_, err := c.sqs.DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: &c.QueueURL, ReceiptHandle: m.ReceiptHandle})
if err != nil {
c.Logger().Println(ErrUnableToDelete.Context(err).Error())
return ErrUnableToDelete.Context(err)
}
return nil
}
func (c *consumer) extend(ctx context.Context, m *message) {
var count int
extension := int64(c.VisibilityTimeout)
for {
//only allow 1 extensions (Default 1m30s)
if count >= c.extensionLimit {
c.Logger().Println(ErrMessageProcessing.Error(), m.Route())
return
}
count++
// allow 10 seconds to process the extension request
time.Sleep(time.Duration(c.VisibilityTimeout-10) * time.Second)
select {
case <-m.err:
// goroutine finished
return
default:
// double the allowed processing time
extension = extension + int64(c.VisibilityTimeout)
_, err := c.sqs.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{QueueUrl: &c.QueueURL, ReceiptHandle: m.ReceiptHandle, VisibilityTimeout: &extension})
if err != nil {
c.Logger().Println(ErrUnableToExtend.Error(), err.Error())
return
}
}
}
}