diff --git a/taskmanager/cron.go b/taskmanager/cron.go deleted file mode 100644 index 41ecdc24..00000000 --- a/taskmanager/cron.go +++ /dev/null @@ -1,32 +0,0 @@ -package taskmanager - -import "github.com/robfig/cron/v3" - -// cronLocal is the interface for the "local cron" service -type cronLocal struct { - cronService *cron.Cron -} - -// New will stop any existing cron service and start a new one -func (c *cronLocal) New() { - if c.cronService != nil { - c.cronService.Stop() - } - c.cronService = cron.New() -} - -// AddFunc will add a function to the cron service -func (c *cronLocal) AddFunc(spec string, cmd func()) (int, error) { - e, err := c.cronService.AddFunc(spec, cmd) - return int(e), err -} - -// Start will start the cron service -func (c *cronLocal) Start() { - c.cronService.Start() -} - -// Stop will stop the cron service -func (c *cronLocal) Stop() { - c.cronService.Stop() -} diff --git a/taskmanager/cron_jobs_test.go b/taskmanager/cron_jobs_test.go index 50dd9a83..9addfac6 100644 --- a/taskmanager/cron_jobs_test.go +++ b/taskmanager/cron_jobs_test.go @@ -9,7 +9,7 @@ import ( ) func TestCronTasks(t *testing.T) { - makeClient := func() *TaskManager { + makeTaskManager := func() *TaskManager { client, _ := NewTaskManager( context.Background(), ) @@ -17,7 +17,7 @@ func TestCronTasks(t *testing.T) { } t.Run("register one cron job", func(t *testing.T) { - client := makeClient() + tm := makeTaskManager() desiredExecutions := 2 @@ -26,9 +26,9 @@ func TestCronTasks(t *testing.T) { } target := &mockTarget{make(chan bool, desiredExecutions)} - err := client.CronJobsInit(CronJobs{ + err := tm.CronJobsInit(CronJobs{ "test": { - Period: 100 * time.Millisecond, + Period: 1 * time.Second, Handler: func(ctx context.Context) error { target.times <- true return nil @@ -50,7 +50,7 @@ func TestCronTasks(t *testing.T) { }) t.Run("register two cron jobs", func(t *testing.T) { - client := makeClient() + tm := makeTaskManager() desiredExecutions := 6 @@ -59,16 +59,16 @@ func TestCronTasks(t *testing.T) { } target := &mockTarget{make(chan int, desiredExecutions)} - err := client.CronJobsInit(CronJobs{ + err := tm.CronJobsInit(CronJobs{ "test1": { - Period: 100 * time.Millisecond, + Period: 1 * time.Second, Handler: func(ctx context.Context) error { target.times <- 1 return nil }, }, "test2": { - Period: 100 * time.Millisecond, + Period: 1 * time.Second, Handler: func(ctx context.Context) error { target.times <- 2 return nil diff --git a/taskmanager/taskmanager.go b/taskmanager/taskmanager.go index e00986ef..3eed8340 100644 --- a/taskmanager/taskmanager.go +++ b/taskmanager/taskmanager.go @@ -8,6 +8,7 @@ import ( "github.com/BuxOrg/bux/logging" "github.com/newrelic/go-agent/v3/newrelic" + "github.com/robfig/cron/v3" "github.com/rs/zerolog" taskq "github.com/vmihailenco/taskq/v3" ) @@ -21,7 +22,7 @@ type ( // options holds all the configuration for the client options struct { - cronService *cronLocal // Internal cron job client + cronService *cron.Cron // Internal cron job client debug bool // For extra logs and additional debug information logger *zerolog.Logger // Internal logging newRelicEnabled bool // If NewRelic is enabled (parent application) @@ -63,10 +64,7 @@ func NewTaskManager(ctx context.Context, opts ...ClientOps) (Tasker, error) { } // Create the cron scheduler - cr := &cronLocal{} - cr.New() - cr.Start() - tm.options.cronService = cr + tm.ResetCron() // Return the client return tm, nil @@ -100,7 +98,10 @@ func (tm *TaskManager) Close(ctx context.Context) error { // ResetCron will reset the cron scheduler and all loaded tasks func (tm *TaskManager) ResetCron() { - tm.options.cronService.New() + if tm.options.cronService != nil { + tm.options.cronService.Stop() + } + tm.options.cronService = cron.New() tm.options.cronService.Start() } diff --git a/taskmanager/taskq.go b/taskmanager/taskq.go index 96297bdb..0d9377e7 100644 --- a/taskmanager/taskq.go +++ b/taskmanager/taskq.go @@ -60,8 +60,6 @@ func DefaultTaskQConfig(name string, opts ...TasqOps) *taskq.QueueOptions { // TaskRunOptions are the options for running a task type TaskRunOptions struct { Arguments []interface{} // Arguments for the task - Delay time.Duration // Run after X delay - OnceInPeriod time.Duration // Run once in X period RunEveryPeriod time.Duration // Cron job! TaskName string // Name of the task } @@ -109,7 +107,8 @@ func (c *TaskManager) RegisterTask(name string, handler interface{}) (err error) defer mutex.Unlock() if t := taskq.Tasks.Get(name); t != nil { - return fmt.Errorf("task %s already registered", name) + // if already registered - register the task locally + c.options.taskq.tasks[name] = t } else { // Register and store the task c.options.taskq.tasks[name] = taskq.RegisterTask(&taskq.TaskOptions{ @@ -119,46 +118,65 @@ func (c *TaskManager) RegisterTask(name string, handler interface{}) (err error) }) } - // Debugging c.DebugLog(fmt.Sprintf("registering task: %s...", c.options.taskq.tasks[name].Name())) return nil } // RunTask will run a task using TaskQ func (c *TaskManager) RunTask(ctx context.Context, options *TaskRunOptions) error { - // Starting the execution of the task - c.DebugLog(fmt.Sprintf( - "executing task: %s... delay: %s arguments: %s", - options.TaskName, - options.Delay.String(), - fmt.Sprintf("%+v", options.Arguments), - )) + c.options.logger.Info().Msgf("executing task: %s", options.TaskName) // Try to get the task - if _, ok := c.options.taskq.tasks[options.TaskName]; !ok { + task, ok := c.options.taskq.tasks[options.TaskName] + if !ok { return fmt.Errorf("task %s not registered", options.TaskName) } - // Add arguments, and delay if set - msg := c.options.taskq.tasks[options.TaskName].WithArgs(ctx, options.Arguments...) - if options.OnceInPeriod > 0 { - msg.OnceInPeriod(options.OnceInPeriod, options.Arguments...) - } else if options.Delay > 0 { - msg.SetDelay(options.Delay) + // Task message will be used to add to the queue + taskMessage := task.WithArgs(ctx, options.Arguments...) + + // There are two ways to run a task: + // Option 1: Run the task immediately + if options.RunEveryPeriod == 0 { + return c.options.taskq.queue.Add(taskMessage) } - // This is the "cron" aspect of the task - if options.RunEveryPeriod > 0 { - _, err := c.options.cronService.AddFunc( - fmt.Sprintf("@every %ds", int(options.RunEveryPeriod.Seconds())), - func() { - // todo: log the error if it occurs? Cannot pass the error back up - _ = c.options.taskq.queue.Add(msg) - }, - ) - return err + // Option 2: Run the task periodically using cron + // Note: The first run will be after the period has passed + return c.scheduleTaskWithCron(ctx, task, taskMessage, options.RunEveryPeriod) +} + +func (c *TaskManager) scheduleTaskWithCron(ctx context.Context, task *taskq.Task, taskMessage *taskq.Message, runEveryPeriod time.Duration) error { + // When using Redis, we need to use a distributed timed lock to prevent the addition of the same task to the queue by multiple instances. + // With this approach, only one instance will add the task to the queue within a given period. + var tryLock func() bool + if c.Factory() == FactoryRedis { + key := fmt.Sprintf("taskq_cronlock_%s", task.Name()) + + // The runEveryPeriod should be greater than 1 second + if runEveryPeriod < 1*time.Second { + runEveryPeriod = 1 * time.Second + } + + // Lock time is the period minus 500ms to allow for some clock drift + lockTime := runEveryPeriod - 500*time.Millisecond + + tryLock = func() bool { + boolCmd := c.options.taskq.config.Redis.SetNX(ctx, key, "1", lockTime) + return boolCmd.Val() + } } - // Add to the queue - return c.options.taskq.queue.Add(msg) + // handler will be called by cron every runEveryPeriod seconds + handler := func() { + if tryLock != nil && !tryLock() { + return + } + _ = c.options.taskq.queue.Add(taskMessage) + } + _, err := c.options.cronService.AddFunc( + fmt.Sprintf("@every %ds", int(runEveryPeriod.Seconds())), + handler, + ) + return err } diff --git a/taskmanager/taskq_test.go b/taskmanager/taskq_test.go index df3d49a8..3431202d 100644 --- a/taskmanager/taskq_test.go +++ b/taskmanager/taskq_test.go @@ -102,7 +102,7 @@ func TestNewTaskManager_RegisterTwice(t *testing.T) { resultChan <- 2 return nil }) - require.Error(t, err) + require.NoError(t, err) err = c.RunTask(ctx, &TaskRunOptions{ Arguments: []interface{}{task1Arg},