Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
fix(BUX-411): distributed locking when using taskq-redis with multipl…
Browse files Browse the repository at this point in the history
…e instances
  • Loading branch information
chris-4chain committed Dec 15, 2023
1 parent 51ca270 commit 432149f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 77 deletions.
32 changes: 0 additions & 32 deletions taskmanager/cron.go

This file was deleted.

16 changes: 8 additions & 8 deletions taskmanager/cron_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
)

func TestCronTasks(t *testing.T) {
makeClient := func() *TaskManager {
makeTaskManager := func() *TaskManager {
client, _ := NewTaskManager(
context.Background(),
)
return client.(*TaskManager)
}

t.Run("register one cron job", func(t *testing.T) {
client := makeClient()
tm := makeTaskManager()

desiredExecutions := 2

Expand All @@ -26,9 +26,9 @@ func TestCronTasks(t *testing.T) {
}
target := &mockTarget{make(chan bool, desiredExecutions)}

err := client.CronJobsInit(CronJobs{
err := tm.CronJobsInit(CronJobs{
"test": {
Period: 100 * time.Millisecond,
Period: 1 * time.Second,
Handler: func(ctx context.Context) error {
target.times <- true
return nil
Expand All @@ -50,7 +50,7 @@ func TestCronTasks(t *testing.T) {
})

t.Run("register two cron jobs", func(t *testing.T) {
client := makeClient()
tm := makeTaskManager()

desiredExecutions := 6

Expand All @@ -59,16 +59,16 @@ func TestCronTasks(t *testing.T) {
}
target := &mockTarget{make(chan int, desiredExecutions)}

err := client.CronJobsInit(CronJobs{
err := tm.CronJobsInit(CronJobs{
"test1": {
Period: 100 * time.Millisecond,
Period: 1 * time.Second,
Handler: func(ctx context.Context) error {
target.times <- 1
return nil
},
},
"test2": {
Period: 100 * time.Millisecond,
Period: 1 * time.Second,
Handler: func(ctx context.Context) error {
target.times <- 2
return nil
Expand Down
13 changes: 7 additions & 6 deletions taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/BuxOrg/bux/logging"
"github.com/newrelic/go-agent/v3/newrelic"
"github.com/robfig/cron/v3"
"github.com/rs/zerolog"
taskq "github.com/vmihailenco/taskq/v3"
)
Expand All @@ -21,7 +22,7 @@ type (

// options holds all the configuration for the client
options struct {
cronService *cronLocal // Internal cron job client
cronService *cron.Cron // Internal cron job client
debug bool // For extra logs and additional debug information
logger *zerolog.Logger // Internal logging
newRelicEnabled bool // If NewRelic is enabled (parent application)
Expand Down Expand Up @@ -63,10 +64,7 @@ func NewTaskManager(ctx context.Context, opts ...ClientOps) (Tasker, error) {
}

// Create the cron scheduler
cr := &cronLocal{}
cr.New()
cr.Start()
tm.options.cronService = cr
tm.ResetCron()

// Return the client
return tm, nil
Expand Down Expand Up @@ -100,7 +98,10 @@ func (tm *TaskManager) Close(ctx context.Context) error {

// ResetCron will reset the cron scheduler and all loaded tasks
func (tm *TaskManager) ResetCron() {
tm.options.cronService.New()
if tm.options.cronService != nil {
tm.options.cronService.Stop()
}
tm.options.cronService = cron.New()
tm.options.cronService.Start()
}

Expand Down
78 changes: 48 additions & 30 deletions taskmanager/taskq.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ func DefaultTaskQConfig(name string, opts ...TasqOps) *taskq.QueueOptions {
// TaskRunOptions are the options for running a task
type TaskRunOptions struct {
Arguments []interface{} // Arguments for the task
Delay time.Duration // Run after X delay
OnceInPeriod time.Duration // Run once in X period
RunEveryPeriod time.Duration // Cron job!
TaskName string // Name of the task
}
Expand Down Expand Up @@ -109,7 +107,8 @@ func (c *TaskManager) RegisterTask(name string, handler interface{}) (err error)
defer mutex.Unlock()

if t := taskq.Tasks.Get(name); t != nil {
return fmt.Errorf("task %s already registered", name)
// if already registered - register the task locally
c.options.taskq.tasks[name] = t
} else {
// Register and store the task
c.options.taskq.tasks[name] = taskq.RegisterTask(&taskq.TaskOptions{
Expand All @@ -119,46 +118,65 @@ func (c *TaskManager) RegisterTask(name string, handler interface{}) (err error)
})
}

// Debugging
c.DebugLog(fmt.Sprintf("registering task: %s...", c.options.taskq.tasks[name].Name()))
return nil
}

// RunTask will run a task using TaskQ
func (c *TaskManager) RunTask(ctx context.Context, options *TaskRunOptions) error {
// Starting the execution of the task
c.DebugLog(fmt.Sprintf(
"executing task: %s... delay: %s arguments: %s",
options.TaskName,
options.Delay.String(),
fmt.Sprintf("%+v", options.Arguments),
))
c.options.logger.Info().Msgf("executing task: %s", options.TaskName)

// Try to get the task
if _, ok := c.options.taskq.tasks[options.TaskName]; !ok {
task, ok := c.options.taskq.tasks[options.TaskName]
if !ok {
return fmt.Errorf("task %s not registered", options.TaskName)
}

// Add arguments, and delay if set
msg := c.options.taskq.tasks[options.TaskName].WithArgs(ctx, options.Arguments...)
if options.OnceInPeriod > 0 {
msg.OnceInPeriod(options.OnceInPeriod, options.Arguments...)
} else if options.Delay > 0 {
msg.SetDelay(options.Delay)
// Task message will be used to add to the queue
taskMessage := task.WithArgs(ctx, options.Arguments...)

// There are two ways to run a task:
// Option 1: Run the task immediately
if options.RunEveryPeriod == 0 {
return c.options.taskq.queue.Add(taskMessage)
}

// This is the "cron" aspect of the task
if options.RunEveryPeriod > 0 {
_, err := c.options.cronService.AddFunc(
fmt.Sprintf("@every %ds", int(options.RunEveryPeriod.Seconds())),
func() {
// todo: log the error if it occurs? Cannot pass the error back up
_ = c.options.taskq.queue.Add(msg)
},
)
return err
// Option 2: Run the task periodically using cron
// Note: The first run will be after the period has passed
return c.scheduleTaskWithCron(ctx, task, taskMessage, options.RunEveryPeriod)
}

func (c *TaskManager) scheduleTaskWithCron(ctx context.Context, task *taskq.Task, taskMessage *taskq.Message, runEveryPeriod time.Duration) error {
// When using Redis, we need to use a distributed timed lock to prevent the addition of the same task to the queue by multiple instances.
// With this approach, only one instance will add the task to the queue within a given period.
var tryLock func() bool
if c.Factory() == FactoryRedis {
key := fmt.Sprintf("taskq_cronlock_%s", task.Name())

// The runEveryPeriod should be greater than 1 second
if runEveryPeriod < 1*time.Second {
runEveryPeriod = 1 * time.Second
}

// Lock time is the period minus 500ms to allow for some clock drift
lockTime := runEveryPeriod - 500*time.Millisecond

tryLock = func() bool {
boolCmd := c.options.taskq.config.Redis.SetNX(ctx, key, "1", lockTime)
return boolCmd.Val()
}
}

// Add to the queue
return c.options.taskq.queue.Add(msg)
// handler will be called by cron every runEveryPeriod seconds
handler := func() {
if tryLock != nil && !tryLock() {
return
}
_ = c.options.taskq.queue.Add(taskMessage)
}
_, err := c.options.cronService.AddFunc(
fmt.Sprintf("@every %ds", int(runEveryPeriod.Seconds())),
handler,
)
return err
}
2 changes: 1 addition & 1 deletion taskmanager/taskq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestNewTaskManager_RegisterTwice(t *testing.T) {
resultChan <- 2
return nil
})
require.Error(t, err)
require.NoError(t, err)

err = c.RunTask(ctx, &TaskRunOptions{
Arguments: []interface{}{task1Arg},
Expand Down

0 comments on commit 432149f

Please sign in to comment.