Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow custom Redis-like broker #976

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package broker

import (
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)

// This package exports the same types as the internal package.
// This is a temporary solution until we can move the these types out of internal.

type (
TaskMessage = base.TaskMessage
WorkerInfo = base.WorkerInfo
ServerInfo = base.ServerInfo

Broker = base.Broker

CancellationSubscription = base.CancellationSubscription

RDB = rdb.RDB
)

var (
NewRDB = rdb.NewRDB
)
9 changes: 8 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/hibiken/asynq/broker"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
Expand Down Expand Up @@ -44,7 +45,13 @@ func NewClient(r RedisConnOpt) *Client {
// NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewClientFromRedisClient(c redis.UniversalClient) *Client {
return &Client{broker: rdb.NewRDB(c), sharedConnection: true}
return NewClientFromBroker(rdb.NewRDB(c))
}

// NewClientFromBroker returns a new instance of Client given a broker.
// Warning: The underlying broker will not be closed by Asynq, you are responsible for closing it.
func NewClientFromBroker(b broker.Broker) *Client {
return &Client{broker: b, sharedConnection: true}
}

type OptionType int
Expand Down
9 changes: 7 additions & 2 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -718,8 +717,14 @@ type Broker interface {
ClearServerState(host string, pid int, serverID string) error

// Cancelation related methods
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
SubscribeCancellation() (CancellationSubscription, error)
PublishCancelation(id string) error

WriteResult(qname, id string, data []byte) (n int, err error)
}

// CancellationSubscription is a subscription to cancellation messages.
type CancellationSubscription interface {
Channel() <-chan string // returns a channel to receive the id of tasks to be cancelled.
Close() error // closes the subscription.
}
31 changes: 28 additions & 3 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type RDB struct {
queuesPublished sync.Map
}

var _ base.Broker = &RDB{}

// NewRDB returns a new instance of RDB.
func NewRDB(client redis.UniversalClient) *RDB {
return &RDB{
Expand Down Expand Up @@ -1481,16 +1483,39 @@ func (r *RDB) ClearSchedulerEntries(schedulerID string) error {
return nil
}

// CancelationPubSub returns a pubsub for cancelation messages.
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
// cancelationSubscription is a wrapper for redis pubsub.
type cancellationSubscription struct {
pubsub *redis.PubSub
}

func (c *cancellationSubscription) Channel() <-chan string {
channelSize := 100 // same as redis defaults
ch := make(chan string, channelSize)

go func() {
for msg := range c.pubsub.Channel(redis.WithChannelSize(channelSize)) {
ch <- msg.Payload
}
close(ch)
}()

return ch
}

func (c *cancellationSubscription) Close() error {
return c.pubsub.Close()
}

// SubscribeCancellation returns a subscription for cancelation messages.
func (r *RDB) SubscribeCancellation() (base.CancellationSubscription, error) {
var op errors.Op = "rdb.CancelationPubSub"
ctx := context.Background()
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil
return &cancellationSubscription{pubsub: pubsub}, nil
}

// PublishCancelation publish cancelation message to all subscribers.
Expand Down
10 changes: 5 additions & 5 deletions internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3236,22 +3236,22 @@ func TestCancelationPubSub(t *testing.T) {
r := setup(t)
defer r.Close()

pubsub, err := r.CancelationPubSub()
sub, err := r.SubscribeCancellation()
if err != nil {
t.Fatalf("(*RDB).CancelationPubSub() returned an error: %v", err)
}

cancelCh := pubsub.Channel()
cancelCh := sub.Channel()

var (
mu sync.Mutex
received []string
)

go func() {
for msg := range cancelCh {
for id := range cancelCh {
mu.Lock()
received = append(received, msg.Payload)
received = append(received, id)
mu.Unlock()
}
}()
Expand All @@ -3265,7 +3265,7 @@ func TestCancelationPubSub(t *testing.T) {
// allow for message to reach subscribers.
time.Sleep(time.Second)

pubsub.Close()
sub.Close()

mu.Lock()
if diff := cmp.Diff(publish, received, h.SortStringSliceOpt); diff != "" {
Expand Down
5 changes: 2 additions & 3 deletions internal/testbroker/testbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)

var errRedisDown = errors.New("testutil: redis is down")
Expand Down Expand Up @@ -190,13 +189,13 @@ func (tb *TestBroker) ClearServerState(host string, pid int, serverID string) er
return tb.real.ClearServerState(host, pid, serverID)
}

func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) {
func (tb *TestBroker) SubscribeCancellation() (base.CancellationSubscription, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return nil, errRedisDown
}
return tb.real.CancelationPubSub()
return tb.real.SubscribeCancellation()
}

func (tb *TestBroker) PublishCancelation(id string) error {
Expand Down
27 changes: 17 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/hibiken/asynq/broker"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
Expand Down Expand Up @@ -443,6 +444,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
// and server configuration
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
rdb := rdb.NewRDB(c)
return NewServerFromBroker(rdb, cfg)
}

// NewServerFromBroker returns a new instance of Server given a Broker and server configuration.
// Warning: The underlying broker will not be closed by Asynq, you are responsible for closing it.
func NewServerFromBroker(b broker.Broker, cfg Config) *Server {
baseCtxFn := cfg.BaseContext
if baseCtxFn == nil {
baseCtxFn = context.Background
Expand Down Expand Up @@ -504,7 +512,6 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
logger.SetLevel(toInternalLogLevel(loglevel))

rdb := rdb.NewRDB(c)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
Expand All @@ -518,7 +525,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
heartbeater := newHeartbeater(heartbeaterParams{
logger: logger,
broker: rdb,
broker: b,
interval: 5 * time.Second,
concurrency: n,
queues: queues,
Expand All @@ -533,18 +540,18 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
forwarder := newForwarder(forwarderParams{
logger: logger,
broker: rdb,
broker: b,
queues: qnames,
interval: delayedTaskCheckInterval,
})
subscriber := newSubscriber(subscriberParams{
logger: logger,
broker: rdb,
broker: b,
cancelations: cancels,
})
processor := newProcessor(processorParams{
logger: logger,
broker: rdb,
broker: b,
retryDelayFunc: delayFunc,
taskCheckInterval: taskCheckInterval,
baseCtxFn: baseCtxFn,
Expand All @@ -561,15 +568,15 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
recoverer := newRecoverer(recovererParams{
logger: logger,
broker: rdb,
broker: b,
retryDelayFunc: delayFunc,
isFailureFunc: isFailureFunc,
queues: qnames,
interval: 1 * time.Minute,
})
healthchecker := newHealthChecker(healthcheckerParams{
logger: logger,
broker: rdb,
broker: b,
interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc,
})
Expand All @@ -589,14 +596,14 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
janitor := newJanitor(janitorParams{
logger: logger,
broker: rdb,
broker: b,
queues: qnames,
interval: janitorInterval,
batchSize: janitorBatchSize,
})
aggregator := newAggregator(aggregatorParams{
logger: logger,
broker: rdb,
broker: b,
queues: qnames,
gracePeriod: groupGracePeriod,
maxDelay: cfg.GroupMaxDelay,
Expand All @@ -605,7 +612,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
return &Server{
logger: logger,
broker: rdb,
broker: b,
sharedConnection: true,
state: srvState,
forwarder: forwarder,
Expand Down
15 changes: 7 additions & 8 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
Expand Down Expand Up @@ -54,12 +53,12 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
var (
pubsub *redis.PubSub
err error
sub base.CancellationSubscription
err error
)
// Try until successfully connect to Redis.
for {
pubsub, err = s.broker.CancelationPubSub()
sub, err = s.broker.SubscribeCancellation()
if err != nil {
s.logger.Errorf("cannot subscribe to cancelation channel: %v", err)
select {
Expand All @@ -72,15 +71,15 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
}
break
}
cancelCh := pubsub.Channel()
cancelCh := sub.Channel()
for {
select {
case <-s.done:
pubsub.Close()
sub.Close()
s.logger.Debug("Subscriber done")
return
case msg := <-cancelCh:
cancel, ok := s.cancelations.Get(msg.Payload)
case id := <-cancelCh:
cancel, ok := s.cancelations.Get(id)
if ok {
cancel()
}
Expand Down
Loading