Skip to content

Commit

Permalink
å¢增加增加批量添加人ä»任务和更新任务功能
Browse files Browse the repository at this point in the history
  • Loading branch information
blackroot committed Feb 24, 2022
1 parent 3dfdbae commit e453659
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 7 deletions.
2 changes: 1 addition & 1 deletion xdelay_queue/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type AppConf struct {
type DelayQueue struct {
*redis.Client
timers []*time.Ticker
bucketNameChan <-chan string
bucketNameChan <-chan *JobBucket

AppConf `mapstructure:"app_conf" json:"app_conf"`
xredis.RedisClientConfig `mapstructure:"redis_conf" json:"redis_conf"`
Expand Down
106 changes: 100 additions & 6 deletions xdelay_queue/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ func (dq *DelayQueue) AddJob(jobCore *JobCore) error {
return errors.New("invalid job")
}

jobBucket := <-dq.bucketNameChan
job := &Job{
JobCore: jobCore,
DoneTimes: 0,
JobBucket: jobBucket,
}
value, err := json.Marshal(job)
if err != nil {
Expand All @@ -66,7 +68,7 @@ func (dq *DelayQueue) AddJob(jobCore *JobCore) error {
err = redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
`).Run(dq.Client, []string{job.Id, <-dq.bucketNameChan}, value, float64(job.Delay), job.Id).Err()
`).Run(dq.Client, []string{job.Id, jobBucket.BucketName}, value, float64(job.Delay), job.Id).Err()
if err == nil || errors.Is(err, redis.Nil) {
return nil
}
Expand All @@ -88,10 +90,15 @@ func (dq *DelayQueue) GetJob(topics []string) (*Job, error) {
return nil, nil
}

jobBucket := <-dq.bucketNameChan
job.DoneTimes++
if job.Times <= 0 || job.DoneTimes < job.Times {
timestamp := time.Now().Unix() + job.TTR
_ = dq.addJobToBucketZ(timestamp, job.Id)
job.JobBucket = jobBucket
value, _ := json.Marshal(job)
_ = redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
`).Run(dq.Client, []string{job.Id, jobBucket.BucketName}, value, float64(time.Now().Unix()+job.TTR), job.Id).Err()
}
return job, nil
}
Expand All @@ -112,11 +119,94 @@ func (dq *DelayQueue) QueryJob(jobId string) (*Job, error) {
return job, err
}

func (dq *DelayQueue) UpdateJob(jobCore *JobCore) error {
if jobCore == nil {
return errors.New("invalid param")
}
job, err := dq.getJob(jobCore.Id)
if err != nil {
return errors.WithStack(err)
}
if job.DoneTimes > 0 {
return errors.New("job is doing")
}
value, err := json.Marshal(&Job{
JobCore: jobCore,
JobBucket: job.JobBucket,
DoneTimes: 0,
})
if err != nil {
return errors.WithStack(err)
}

return redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
`).Run(dq.Client, []string{job.Id, job.BucketName}, value, float64(jobCore.Delay), job.Id).Err()
}

func (dq *DelayQueue) BatchAddJob(jobCores []*JobCore) error {
jobCoreNum := len(jobCores)
if jobCoreNum == 0 {
return errors.New("invalid param")
}
jobKey := make([]string, 0, jobCoreNum)
jobValue := make([][]byte, jobCoreNum)
jobDelay := make([]float64, 0, jobCoreNum)
jobBucketName := make([]string, 0, jobCoreNum)
for index, jobCore := range jobCores {
if jobCore == nil || jobCore.Id == "" || jobCore.Topic == "" || jobCore.Delay < 0 || jobCore.TTR <= 0 {
return errors.New("invalid job")
}
jobBucket := <-dq.bucketNameChan
jobKey = append(jobKey, jobCore.Id)
value, err := json.Marshal(&Job{
JobCore: jobCore,
JobBucket: jobBucket,
DoneTimes: 0,
})
if err != nil {
return errors.WithStack(err)
}
jobValue[index] = value
jobDelay = append(jobDelay, float64(jobCore.Delay))
jobBucketName = append(jobBucketName, jobBucket.BucketName)
}

argv := make([]interface{}, 0, 4*jobCoreNum)
for _, value := range jobValue {
argv = append(argv, value)
}
for _, name := range jobBucketName {
argv = append(argv, name)
}

for _, delay := range jobDelay {
argv = append(argv, delay)
}

for _, key := range jobKey {
argv = append(argv, key)
}

src := `
local l = table.getn(KEYS)
for k, v in pairs(KEYS) do
redis.call("SET", v, ARGV[k])
redis.call("ZADD", ARGV[l+k], ARGV[2*l+k], ARGV[3*l+k])
`
return redis.NewScript(src).Run(dq.Client, jobKey, argv...).Err()
}

func (dq *DelayQueue) getBucketName() {
c := make(chan string)
c := make(chan *JobBucket)
go func() {
for {
c <- buildBucket(dq.BucketName, rand.Int()%dq.BucketSize)
index := rand.Int() % dq.BucketSize
c <- &JobBucket{
BucketName: buildBucket(dq.BucketName, index),
BucketIndex: index,
}
}
}()
dq.bucketNameChan = c
Expand Down Expand Up @@ -164,10 +254,14 @@ func (dq *DelayQueue) handleTicker(t time.Time, bucketName string) {
if job.Delay > t.Unix() {
//_ = dq.removeJobFromBucketZ(bucketName, bucketZ.jobId)
//_ = dq.addJobToBucketZ(job.Delay, bucketZ.jobId)
jobBucket := <-dq.bucketNameChan
job.JobBucket = jobBucket
value, _ := json.Marshal(job)
_ = redis.NewScript(`
redis.call("ZREM", KEYS[1], ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
`).Run(dq.Client, []string{bucketName, <-dq.bucketNameChan}, bucketZ.jobId, float64(job.Delay), bucketZ.jobId).Err()
redis.call("SET", KEYS[3], ARGV[4])
`).Run(dq.Client, []string{bucketName, jobBucket.BucketName, job.Id}, bucketZ.jobId, float64(job.Delay), bucketZ.jobId, value).Err()
continue
}

Expand Down
6 changes: 6 additions & 0 deletions xdelay_queue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/pkg/errors"
)

type JobBucket struct {
BucketName string `json:"bucket_name"`
BucketIndex int `json:"bucket_index"`
}

type JobCore struct {
Topic string `json:"topic"`
Id string `json:"id"`
Expand All @@ -17,6 +22,7 @@ type JobCore struct {

type Job struct {
*JobCore
*JobBucket
DoneTimes int64 `json:"done_times"`
}

Expand Down

0 comments on commit e453659

Please sign in to comment.