-
Notifications
You must be signed in to change notification settings - Fork 13
/
consumer.go
162 lines (137 loc) · 3.92 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package beanstalk
import (
"context"
"fmt"
"sync"
"time"
)
// Consumer maintains a pool of connections and allows workers to reserve jobs
// on those connections.
type Consumer struct {
uris []string
tubes []string
config Config
reserveC chan chan *Job
wg sync.WaitGroup
}
// NewConsumer returns a new Consumer.
func NewConsumer(uris, tubes []string, config Config) (*Consumer, error) {
if !config.IgnoreURIValidation {
if err := ValidURIs(uris); err != nil {
return nil, err
}
}
config = config.normalize()
return &Consumer{
uris: uris,
tubes: tubes,
config: config,
reserveC: make(chan chan *Job, config.NumGoroutines),
}, nil
}
// Receive calls fn for each job it can reserve.
func (consumer *Consumer) Receive(ctx context.Context, fn func(ctx context.Context, job *Job)) {
// Spin up the connections to the beanstalk servers.
for _, uri := range multiply(consumer.uris, consumer.config.Multiply) {
go maintainConn(ctx, uri, consumer.config, connHandler{
setup: consumer.watchTubes,
handle: consumer.reserveJobs,
})
}
// Spin up the workers in their own goroutine.
for i := 0; i < consumer.config.NumGoroutines; i++ {
consumer.wg.Add(1)
go func() {
defer consumer.wg.Done()
consumer.worker(ctx, fn)
}()
}
// Wait for all the workers to finish before returning.
consumer.wg.Wait()
}
// worker calls fn for every job it can reserve.
func (consumer *Consumer) worker(ctx context.Context, fn func(ctx context.Context, job *Job)) {
jobC := make(chan *Job)
for {
// Add a reserve request to the reserve channel.
consumer.reserveC <- jobC
select {
// If a reserved job comes in, pass it to fn.
case job := <-jobC:
func() {
defer job.Release(context.Background())
fn(context.Background(), job)
}()
// Stop if the context was cancelled.
case <-ctx.Done():
return
}
}
}
// watchTubes watches the requested tubes.
func (consumer *Consumer) watchTubes(ctx context.Context, conn *Conn) error {
if len(consumer.tubes) == 0 {
return nil
}
// Watch all the requested tubes.
for _, tube := range consumer.tubes {
if err := conn.Watch(ctx, tube); err != nil {
return fmt.Errorf("error watching tube: %s: %w", tube, err)
}
}
// Ignore the default tube, unless it was explicitly requested.
if !includes(consumer.tubes, "default") {
if err := conn.Ignore(ctx, "default"); err != nil {
return fmt.Errorf("error ignoring default tube: %w", err)
}
}
return nil
}
// reserveJobs is responsible for reserving jobs on demand and pass them back
// to the worker method that will call its worker function with it.
func (consumer *Consumer) reserveJobs(ctx context.Context, conn *Conn) error {
var err error
// If the return error is nil, then the context was cancelled. However,
// reserved jobs on this connection might still need to finish up so wait for
// the workers to finish before returning (and thus closing) the connection.
defer func() {
if err == nil {
consumer.wg.Wait()
}
}()
var job *Job
var jobC chan *Job
for {
// Wait for a reserve request from a worker to come in.
select {
case jobC = <-consumer.reserveC:
case <-ctx.Done():
return nil
}
// Attempt to reserve a job.
if job, err = conn.reserveWithTimeout(ctx, 0); job != nil {
select {
// Return the job to the worker and wait for another reserve request.
case jobC <- job:
continue
// Release the job and stop if the context was cancelled.
case <-ctx.Done():
if err = job.Release(context.Background()); err != nil {
consumer.config.ErrorFunc(err, "Unable to release job after context was cancelled")
}
return nil
}
}
// Put the reserve request back and return if an error occurred.
consumer.reserveC <- jobC
if err != nil {
return err
}
// The watched tubes are empty, so wait a bit before reserving again.
select {
case <-time.After(consumer.config.ReserveTimeout):
case <-ctx.Done():
return nil
}
}
}