-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmigrate.go
110 lines (96 loc) · 2.17 KB
/
migrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package goq
import (
"context"
"github.com/pfdtk/goq/connect"
"github.com/pfdtk/goq/event"
e "github.com/pfdtk/goq/internal/errors"
rdq "github.com/pfdtk/goq/internal/queue/redis"
"github.com/pfdtk/goq/logger"
"github.com/pfdtk/goq/task"
"sync"
"time"
)
type MigrateType string
const (
MigrateAck MigrateType = "ack"
MigrateDelay MigrateType = "delay"
)
type migrate struct {
tasks *sync.Map
wg *sync.WaitGroup
stopRun chan struct{}
ctx context.Context
logger logger.Logger
interval time.Duration
}
func newMigrate(ctx context.Context, s *Server) *migrate {
return &migrate{
wg: &s.wg,
tasks: &s.tasks,
ctx: ctx,
logger: s.logger,
interval: 5 * time.Second,
stopRun: make(chan struct{}),
}
}
func (m *migrate) mustStartMigrate() {
redisTask := task.GetRedisTask(m.tasks)
for i := range redisTask {
m.migrateRedisTasks(redisTask[i], MigrateAck)
m.migrateRedisTasks(redisTask[i], MigrateDelay)
}
}
func (m *migrate) stopMigrating() {
m.logger.Info("stopping migration...")
close(m.stopRun)
}
func (m *migrate) migrateRedisTasks(t task.Task, cat MigrateType) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
timer := time.NewTimer(m.interval)
for {
select {
case <-m.stopRun:
m.logger.Infof("migrate stopped, type=%s, name=%s", cat, t.GetName())
timer.Stop()
return
case <-timer.C:
m.performMigrateTask(t, cat)
timer.Reset(m.interval)
}
}
}()
}
func (m *migrate) performMigrateTask(t task.Task, cat MigrateType) {
defer func() {
if x := recover(); x != nil {
m.handleError(e.NewPanicError(x))
}
}()
c := connect.MustGetRedis(t.OnConnect())
q := rdq.NewRedisQueue(c)
from := m.getMigrateQueueKey(q, t.OnQueue(), cat)
moveTo := t.OnQueue()
err := q.Migrate(m.ctx, from, moveTo)
if err != nil {
m.handleError(err)
}
}
func (m *migrate) getMigrateQueueKey(
q *rdq.Queue,
qn string,
cat MigrateType) string {
switch cat {
case MigrateAck:
return q.GetReservedKey(qn)
case MigrateDelay:
return q.GetDelayedKey(qn)
default:
panic("invalid migrate type")
}
}
func (m *migrate) handleError(err error) {
m.logger.Error(err)
event.Dispatch(NewMigrateErrorEvent(err))
}