-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer.go
132 lines (115 loc) · 4.13 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package kp
import (
"context"
"errors"
"time"
"github.com/Shopify/sarama"
backoff_policy "github.com/honestbank/backoff-policy"
)
type KPConsumer interface {
Setup(sarama.ConsumerGroupSession) error
Cleanup(sarama.ConsumerGroupSession) error
ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error
GetReady() chan bool
SetReady(chan bool)
Process(ctx context.Context, message *sarama.ConsumerMessage) error
}
// ConsumerStruct represents a Sarama consumer group consumer
type ConsumerStruct struct {
topic string
deadLetterTopic string
retryTopic string
ready chan bool
Processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error
producer KPProducer
retries int
backoffPolicy backoff_policy.BackoffPolicy
onFailure *func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error
}
func NewConsumer(topic string, retryTopic string, deadLetterTopic string, retries int, processor func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error, onFailure *func(ctx context.Context, key string, message string, retries int, rawMessage *sarama.ConsumerMessage) error, producer KPProducer, backoffPolicyTime time.Duration) KPConsumer {
return &ConsumerStruct{
ready: make(chan bool),
Processor: processor,
producer: producer,
topic: topic,
deadLetterTopic: deadLetterTopic,
retries: retries,
retryTopic: retryTopic,
backoffPolicy: backoff_policy.NewExponentialBackoffPolicy(backoffPolicyTime, retries),
onFailure: onFailure,
}
}
func (Consumer *ConsumerStruct) GetReady() chan bool {
return Consumer.ready
}
func (consumer *ConsumerStruct) SetReady(ready chan bool) {
consumer.ready = ready
}
func (consumer *ConsumerStruct) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *ConsumerStruct) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (consumer *ConsumerStruct) Process(ctx context.Context, message *sarama.ConsumerMessage) error {
if message == nil {
return errors.New("error while trying to consume nil message")
}
unmarshalMessage, retries, err := UnmarshalStringMessage(string(message.Value))
if err != nil {
return err
}
if retries >= consumer.retries {
err = consumer.producer.ProduceMessage(ctx, consumer.deadLetterTopic, string(message.Key), unmarshalMessage)
if consumer.onFailure != nil {
err = (*consumer.onFailure)(ctx, string(message.Key), unmarshalMessage, retries, message)
if err != nil {
return err
}
}
if err != nil {
return err
}
return nil
}
consumer.backoffPolicy.Execute(func(marker backoff_policy.Marker) {
err := consumer.Processor(ctx, string(message.Key), unmarshalMessage, retries, message)
if err != nil {
marker.MarkFailure()
if err != nil {
marshaledMessage := MarshalStringMessage(unmarshalMessage, retries+1)
err = consumer.producer.ProduceMessage(ctx, consumer.retryTopic, string(message.Key), marshaledMessage)
if err != nil {
return //need to handle the error in V2
}
}
return
}
marker.MarkSuccess()
})
return nil
}
func (consumer *ConsumerStruct) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message := <-claim.Messages():
err := consumer.Process(session.Context(), message)
if err != nil {
return err
}
session.MarkMessage(message, "")
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}