From 3682a1a43640d9ef643686f6a7e83e4f87665cdc Mon Sep 17 00:00:00 2001 From: smallfish Date: Thu, 11 Nov 2021 09:58:38 +0800 Subject: [PATCH] dq redis transaction --- xdelay_queue/delay_queue.go | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/xdelay_queue/delay_queue.go b/xdelay_queue/delay_queue.go index d36c2d5b..82bb71a2 100644 --- a/xdelay_queue/delay_queue.go +++ b/xdelay_queue/delay_queue.go @@ -1,6 +1,7 @@ package xdelay_queue import ( + "encoding/json" "github.com/go-redis/redis" "github.com/pkg/errors" "github.com/smallfish-root/common-pkg/xredis" @@ -25,22 +26,42 @@ func GetDelayQueue(alias string) *DelayQueue { return delayQueue[alias] } +//func (dq *DelayQueue) AddJob(job *Job) error { +// if job.Id == "" || job.Topic == "" || job.Delay < 0 || job.TTR <= 0 { +// return errors.New("invalid job") +// } +// +// err := dq.addJob(job.Id, job) +// if err != nil { +// return errors.WithStack(err) +// } +// +// err = dq.addJobToBucketZ(job.Delay, job.Id) +// if err != nil { +// return errors.WithStack(err) +// } +// +// return nil +//} + +// AddJob transaction func (dq *DelayQueue) AddJob(job *Job) error { if job.Id == "" || job.Topic == "" || job.Delay < 0 || job.TTR <= 0 { return errors.New("invalid job") } - err := dq.addJob(job.Id, job) + value, err := json.Marshal(job) if err != nil { return errors.WithStack(err) } - err = dq.addJobToBucketZ(job.Delay, job.Id) - if err != nil { - return errors.WithStack(err) - } - - return nil + return redis.NewScript(` + redis.call("SET", KEYS[1], ARGV[1]) + redis.call("ZADD", KEYS[2], ARGV[2]) + `).Run(dq.Client, []string{job.Id, <- dq.bucketNameChan}, value, redis.Z{ + Score: float64(job.Delay), + Member: job.Id, + }).Err() } func (dq *DelayQueue) GetJob(topics []string) (*Job, error) {