Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/dynamic queue with concurrency #973

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func newAggregator(params aggregatorParams) *aggregator {
}
}

func (a *aggregator) resetState() {
a.done = make(chan struct{})
a.sema = make(chan struct{}, maxConcurrentAggregationChecks)
}

func (a *aggregator) shutdown() {
if a.ga == nil {
return
Expand Down
1 change: 1 addition & 0 deletions heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"

"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
Expand Down
8 changes: 5 additions & 3 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"sync"
"time"

"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
)

// Version of asynq library and CLI.
Expand Down Expand Up @@ -722,4 +723,5 @@ type Broker interface {
PublishCancelation(id string) error

WriteResult(qname, id string, data []byte) (n int, err error)
SetQueueConcurrency(qname string, concurrency int)
}
63 changes: 47 additions & 16 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,47 @@ import (
"time"

"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/spf13/cast"

"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"github.com/spf13/cast"
)

const statsTTL = 90 * 24 * time.Hour // 90 days

// LeaseDuration is the duration used to initially create a lease and to extend it thereafter.
const LeaseDuration = 30 * time.Second

type Option func(r *RDB)

func WithQueueConcurrency(queueConcurrency map[string]int) Option {
return func(r *RDB) {
for qname, concurrency := range queueConcurrency {
r.queueConcurrency.Store(qname, concurrency)
}
}
}

// RDB is a client interface to query and mutate task queues.
type RDB struct {
client redis.UniversalClient
clock timeutil.Clock
queuesPublished sync.Map
client redis.UniversalClient
clock timeutil.Clock
queuesPublished sync.Map
queueConcurrency sync.Map
}

// NewRDB returns a new instance of RDB.
func NewRDB(client redis.UniversalClient) *RDB {
return &RDB{
func NewRDB(client redis.UniversalClient, opts ...Option) *RDB {
r := &RDB{
client: client,
clock: timeutil.NewRealClock(),
}
for _, opt := range opts {
opt(r)
}
return r
}

// Close closes the connection with redis server.
Expand Down Expand Up @@ -217,6 +233,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
// --
// ARGV[1] -> initial lease expiration Unix time
// ARGV[2] -> task key prefix
// ARGV[3] -> queue concurrency
//
// Output:
// Returns nil if no processable task is found in the given queue.
Expand All @@ -225,15 +242,20 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
// Note: dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue.
var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if id then
local key = ARGV[2] .. id
redis.call("HSET", key, "state", "active")
redis.call("HDEL", key, "pending_since")
redis.call("ZADD", KEYS[4], ARGV[1], id)
return redis.call("HGET", key, "msg")
end
if redis.call("EXISTS", KEYS[2]) > 0 then
return nil
end
local count = redis.call("ZCARD", KEYS[4])
if (count >= tonumber(ARGV[3])) then
return nil
end
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if id then
local key = ARGV[2] .. id
redis.call("HSET", key, "state", "active")
redis.call("HDEL", key, "pending_since")
redis.call("ZADD", KEYS[4], ARGV[1], id)
return redis.call("HGET", key, "msg")
end
return nil`)

Expand All @@ -251,9 +273,14 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT
base.LeaseKey(qname),
}
leaseExpirationTime = r.clock.Now().Add(LeaseDuration)
queueConcurrency, ok := r.queueConcurrency.Load(qname)
if !ok || queueConcurrency.(int) <= 0 {
queueConcurrency = math.MaxInt
}
argv := []interface{}{
leaseExpirationTime.Unix(),
base.TaskKeyPrefix(qname),
queueConcurrency,
}
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err == redis.Nil {
Expand Down Expand Up @@ -1556,3 +1583,7 @@ func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) {
}
return len(data), nil
}

func (r *RDB) SetQueueConcurrency(qname string, concurrency int) {
r.queueConcurrency.Store(qname, concurrency)
}
90 changes: 89 additions & 1 deletion internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"

"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
)

// variables used for package testing.
Expand Down Expand Up @@ -384,6 +385,7 @@ func TestDequeue(t *testing.T) {
wantPending map[string][]*base.TaskMessage
wantActive map[string][]*base.TaskMessage
wantLease map[string][]base.Z
queueConcurrency map[string]int
}{
{
pending: map[string][]*base.TaskMessage{
Expand Down Expand Up @@ -494,6 +496,92 @@ func TestDequeue(t *testing.T) {
}
}

func TestDequeueWithQueueConcurrency(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
const taskNum = 3
msgs := make([]*base.TaskMessage, 0, taskNum)
for i := 0; i < taskNum; i++ {
msg := &base.TaskMessage{
ID: uuid.NewString(),
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
Queue: "default",
Timeout: 1800,
Deadline: 0,
}
msgs = append(msgs, msg)
}

tests := []struct {
name string
pending map[string][]*base.TaskMessage
qnames []string // list of queues to query
queueConcurrency map[string]int
wantMsgs []*base.TaskMessage
}{
{
name: "without queue concurrency control",
pending: map[string][]*base.TaskMessage{
"default": msgs,
},
qnames: []string{"default"},
wantMsgs: msgs,
},
{
name: "with queue concurrency control",
pending: map[string][]*base.TaskMessage{
"default": msgs,
},
qnames: []string{"default"},
queueConcurrency: map[string]int{"default": 2},
wantMsgs: msgs[:2],
},
{
name: "with queue concurrency zero",
pending: map[string][]*base.TaskMessage{
"default": msgs,
},
qnames: []string{"default"},
queueConcurrency: map[string]int{"default": 0},
wantMsgs: msgs,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
r.queueConcurrency.Range(func(key, value interface{}) bool {
r.queueConcurrency.Delete(key)
return true
})
for queue, n := range tc.queueConcurrency {
r.queueConcurrency.Store(queue, n)
}

gotMsgs := make([]*base.TaskMessage, 0, len(msgs))
for i := 0; i < len(msgs); i++ {
msg, _, err := r.Dequeue(tc.qnames...)
if errors.Is(err, errors.ErrNoProcessableTask) {
break
}
if err != nil {
t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.qnames, err)
continue
}
gotMsgs = append(gotMsgs, msg)
}
if diff := cmp.Diff(tc.wantMsgs, gotMsgs, h.SortZSetEntryOpt); diff != "" {
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v",
tc.qnames, gotMsgs, tc.wantMsgs)
}
})
}
}

func TestDequeueError(t *testing.T) {
r := setup(t)
defer r.Close()
Expand Down
7 changes: 6 additions & 1 deletion internal/testbroker/testbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
"sync"
"time"

"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"

"github.com/hibiken/asynq/internal/base"
)

var errRedisDown = errors.New("testutil: redis is down")
Expand Down Expand Up @@ -297,3 +298,7 @@ func (tb *TestBroker) ReclaimStaleAggregationSets(qname string) error {
}
return tb.real.ReclaimStaleAggregationSets(qname)
}

func (tb *TestBroker) SetQueueConcurrency(qname string, concurrency int) {
tb.real.SetQueueConcurrency(qname, concurrency)
}
18 changes: 15 additions & 3 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/hibiken/asynq/internal/base"
asynqcontext "github.com/hibiken/asynq/internal/context"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
"golang.org/x/time/rate"
)

type processor struct {
Expand Down Expand Up @@ -57,7 +58,7 @@ type processor struct {
// channel to communicate back to the long running "processor" goroutine.
// once is used to send value to the channel only once.
done chan struct{}
once sync.Once
once *sync.Once

// quit channel is closed when the shutdown of the "processor" goroutine starts.
quit chan struct{}
Expand Down Expand Up @@ -112,6 +113,7 @@ func newProcessor(params processorParams) *processor {
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}),
once: &sync.Once{},
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
Expand Down Expand Up @@ -139,7 +141,9 @@ func (p *processor) stop() {
func (p *processor) shutdown() {
p.stop()

time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
go func(abort chan struct{}) {
time.AfterFunc(p.shutdownTimeout, func() { close(abort) })
}(p.abort)

p.logger.Info("Waiting for all workers to finish...")
// block until all workers have released the token
Expand All @@ -149,6 +153,14 @@ func (p *processor) shutdown() {
p.logger.Info("All workers have finished")
}

func (p *processor) resetState() {
p.sema = make(chan struct{}, cap(p.sema))
p.done = make(chan struct{})
p.quit = make(chan struct{})
p.abort = make(chan struct{})
p.once = &sync.Once{}
}

func (p *processor) start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
Expand Down
Loading