Skip to content

Commit

Permalink
fix re-schedule bug for missed intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
naueramant committed Dec 21, 2022
1 parent 475ee0e commit 6f67e52
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewSchedule(redisClient *redis.Client) Schedule {
func (s *ScheduleImpl) Add(ctx context.Context, task *Task) error {
// Marshal the task data

payload, err := json.Marshal(TaskData{
taskData, err := json.Marshal(TaskData{
Interval: task.Interval / time.Millisecond,
Data: task.Data,
})
Expand All @@ -53,12 +53,12 @@ func (s *ScheduleImpl) Add(ctx context.Context, task *Task) error {

script := redis.NewScript(`
local queueKey = KEYS[1]
local payloadKey = KEYS[2]
local taskDataKey = KEYS[2]
local id = ARGV[1]
local payload = ARGV[2]
local taskData = ARGV[2]
local score = ARGV[3]
redis.call("HSETNX", payloadKey, id, payload)
redis.call("HSETNX", taskDataKey, id, taskData)
redis.call("ZADD", queueKey, score, id)
return 1
Expand All @@ -72,7 +72,7 @@ func (s *ScheduleImpl) Add(ctx context.Context, task *Task) error {
s.taskDataKey(task.Kind),
},
task.ID,
payload,
taskData,
float64(nextTick),
).Err(); err != nil {
return err
Expand All @@ -86,10 +86,10 @@ func (s *ScheduleImpl) Remove(ctx context.Context, kind string, id string) error

script := redis.NewScript(`
local queueKey = KEYS[1]
local payloadKey = KEYS[2]
local taskDataKey = KEYS[2]
local id = ARGV[1]
redis.call("HDEL", payloadKey, id)
redis.call("HDEL", taskDataKey, id)
redis.call("ZREM", queueKey, id)
return 1
Expand Down Expand Up @@ -147,6 +147,8 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
local now = tonumber(ARGV[1])
-- Pop the next task from the queue
local res = redis.call("ZPOPMIN", queueKey)
if #res == 0 then
return { -1 }
Expand All @@ -155,11 +157,15 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
local id = res[1]
local score = tonumber(res[2])
-- If the task is scheduled for more than 1 second in the future, put it back in the queue
if score > (now + 1000) then
redis.call("ZADD", queueKey, score, id)
return { -1 }
end
-- Get the task data
local taskDataRaw = redis.call("HGET", taskDataKey, id)
if taskDataRaw == nil then
return { -1 }
Expand All @@ -170,8 +176,19 @@ func (s *ScheduleImpl) On(ctx context.Context, kind string, handler func(ctx con
return { -1 }
end
-- Schedule the next execution
local nextTick = score + taskData.Interval
-- If the next execution is in the past, schedule it for the next interval
if nextTick < now then
-- Find how many intervals have passed since the last execution
local intervals = math.floor((now - score) / taskData.Interval)
-- Schedule the next execution for the next interval
nextTick = score + (intervals * taskData.Interval) + taskData.Interval
end
redis.call("ZADD", queueKey, nextTick, id)
return {id, score, taskDataRaw}
Expand Down

0 comments on commit 6f67e52

Please sign in to comment.