diff --git a/xdelay_queue/bucket.go b/xdelay_queue/bucket.go index c6321f5e..5362a68e 100644 --- a/xdelay_queue/bucket.go +++ b/xdelay_queue/bucket.go @@ -1,8 +1,9 @@ package xdelay_queue import ( - "errors" + "fmt" "github.com/go-redis/redis" + "time" ) type BucketZ struct { @@ -18,12 +19,13 @@ func (dq *DelayQueue) addJobToBucketZ(timestamp int64, jobId string) error { } func (dq *DelayQueue) getJobFromBucketZ(key string) (*BucketZ, error) { - //zrangebyscore zrangebyscorewithscores - result, err := dq.ZRangeWithScores(key, 0, 0).Result() + //result, err := dq.ZRangeWithScores(key, 0, 0).Result() + result, err := dq.ZRangeByScore(key, redis.ZRangeBy{ + Max: fmt.Sprintf("%d", time.Now().Unix()), + Offset: 0, + Count: 1, + }).Result() if err != nil { - if errors.Is(err, redis.Nil) { - return nil, nil - } return nil, err } if len(result) == 0 { @@ -31,9 +33,8 @@ func (dq *DelayQueue) getJobFromBucketZ(key string) (*BucketZ, error) { } bz := &BucketZ{ - timeScore: result[0].Score, + jobId: result[0], } - bz.jobId = result[0].Member.(string) return bz, nil } diff --git a/xdelay_queue/delay_queue.go b/xdelay_queue/delay_queue.go index ee251996..569b5825 100644 --- a/xdelay_queue/delay_queue.go +++ b/xdelay_queue/delay_queue.go @@ -50,7 +50,6 @@ func (dq *DelayQueue) AddJob(job *Job) error { return errors.New("invalid job") } - job.Delay = job.Delay + time.Now().Unix() value, err := json.Marshal(job) if err != nil { return errors.WithStack(err) @@ -59,7 +58,7 @@ func (dq *DelayQueue) AddJob(job *Job) error { err = redis.NewScript(` redis.call("SET", KEYS[1], ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) - `).Run(dq.Client, []string{job.Id, <- dq.bucketNameChan}, value, float64(job.Delay), job.Id).Err() + `).Run(dq.Client, []string{job.Id, <-dq.bucketNameChan}, value, float64(job.Delay), job.Id).Err() if err == nil || errors.Is(err, redis.Nil) { return nil } @@ -134,15 +133,11 @@ func (dq *DelayQueue) handleTicker(t time.Time, bucketName string) { for { bucketZ, err := dq.getJobFromBucketZ(bucketName) if err != nil { - return + continue } if bucketZ == nil { - return - } - - if int64(bucketZ.timeScore) > t.Unix() { - return + continue } job, err := dq.getJob(bucketZ.jobId)