-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsentinel.go
311 lines (269 loc) · 7.18 KB
/
sentinel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package sentinel
import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func newLogger() *zap.SugaredLogger {
config := zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
logger, err := config.Build()
if err != nil {
panic(err)
}
return logger.Sugar()
}
type Config struct {
MyID string `mapstructure:"my_id"`
Binds []string `mapstructure:"binds"` //TODO: understand how to bind multiple network interfaces
Port string `mapstructure:"port"`
Masters []MasterMonitor `mapstructure:"masters"`
CurrentEpoch int `mapstructure:"current_epoch"`
}
type MasterMonitor struct {
RunID string `mapstructure:"run_id"`
Name string `mapstructure:"name"`
Addr string `mapstructure:"addr"`
Quorum int `mapstructure:"quorum"`
DownAfter time.Duration `mapstructure:"down_after"`
FailoverTimeout time.Duration `mapstructure:"failover_timeout"`
ReconfigSlaveTimeout time.Duration `mapstructure:"reconfig_slave_timeout"`
ConfigEpoch int `mapstructure:"config_epoch"` //epoch of master received from hello message
LeaderEpoch int `mapstructure:"config_epoch"` //last leader epoch
KnownReplicas []KnownReplica `mapstructure:"known_replicas"`
ParallelSync int `mapstructure:"parallel_sync"`
}
type KnownSentinel struct {
ID string
Addr string
}
type KnownReplica struct {
Addr string
}
type Sentinel struct {
protoMutex *sync.Mutex
tcpDone bool
mu *sync.Mutex
quorum int
conf Config
masterInstances map[string]*masterInstance //key by address ip:port
currentEpoch int
runID string
//given a preassigned slaveInstance struct, assign missing fields to make it complete
// - create client from given address, for example
// slaveFactory func(*slaveInstance) error
clientFactory func(string) (InternalClient, error)
listener net.Listener
logger *zap.SugaredLogger
}
func defaultSlaveFactory(sl *slaveInstance) error {
client, err := newInternalClient(sl.addr)
if err != nil {
return err
}
sl.client = client
return nil
}
func newInternalClient(addr string) (InternalClient, error) {
return newRedisClient(addr), nil
}
func NewFromConfig(conf Config) (*Sentinel, error) {
if conf.MyID == "" {
conf.MyID = uuid.NewString()
}
return &Sentinel{
runID: conf.MyID,
conf: conf,
mu: &sync.Mutex{},
protoMutex: &sync.Mutex{},
clientFactory: newInternalClient,
masterInstances: map[string]*masterInstance{},
logger: newLogger(),
}, nil
}
func NewFromConfigFile(filepath string) (*Sentinel, error) {
viper.SetConfigType("yaml")
viper.SetConfigFile(filepath)
err := viper.ReadInConfig()
if err != nil {
return nil, err
}
var conf Config
err = viper.Unmarshal(&conf)
if err != nil {
return nil, err
}
return NewFromConfig(conf)
}
func (s *Sentinel) Start() error {
if len(s.conf.Masters) != 1 {
return fmt.Errorf("only support monitoring 1 master for now")
}
m := s.conf.Masters[0]
parts := strings.Split(m.Addr, ":")
masterIP, masterPort := parts[0], parts[1]
cl, err := s.clientFactory(m.Addr)
if err != nil {
return err
}
// cl2, err := kevago.NewInternalClient(m.Addr)
if err != nil {
return err
}
//read master from config, contact that master to get its slave, then contact it slave and sta
infoStr, err := cl.Info()
if err != nil {
return err
}
s.mu.Lock()
master := &masterInstance{
runID: m.RunID,
sentinelConf: m,
name: m.Name,
host: masterIP,
port: masterPort,
configEpoch: m.ConfigEpoch,
mu: sync.Mutex{},
client: cl,
slaves: map[string]*slaveInstance{},
sentinels: map[string]*sentinelInstance{},
state: masterStateUp,
lastSuccessfulPing: time.Now(),
subjDownNotify: make(chan struct{}),
followerNewMasterNotify: make(chan struct{}, 1),
}
s.masterInstances[m.Addr] = master
s.mu.Unlock()
switchedRole, err := s.parseInfoMaster(m.Addr, infoStr)
if err != nil {
return err
}
if switchedRole {
return fmt.Errorf("reported address of master %s is not currently in master role", m.Name)
}
go s.serveTCP()
go s.masterRoutine(master)
return nil
}
func (s *Sentinel) Shutdown() {
locked(s.protoMutex, func() {
if s.listener != nil {
s.listener.Close()
}
s.tcpDone = true
})
}
type sentinelInstance struct {
runID string
mu sync.Mutex
// masterDown bool
client sentinelClient
lastMasterDownReply time.Time
// these 2 must alwasy change together
leaderEpoch int
leaderID string
sdown bool
}
type masterInstanceState int
var (
masterStateUp masterInstanceState = 1
masterStateSubjDown masterInstanceState = 2
masterStateObjDown masterInstanceState = 3
)
type slaveInstance struct {
runID string
killed bool
mu sync.Mutex
masterDownSince time.Duration
masterHost string
masterPort string
masterUp bool
host string
port string
addr string
slavePriority int //TODO
replOffset int
reportedMaster *masterInstance
sDown bool
lastSucessfulPingAt time.Time
lastSucessfulInfoAt time.Time
masterRoleSwitchChan chan struct{}
//notify goroutines that master is down, to change info interval from 10 to 1s like Redis
masterDownNotify chan struct{}
client InternalClient
reconfigFlag int
}
const (
reconfigSent = 1 << iota
reconfigInProgress
reconfigDone
)
type instanceRole int
var (
instanceRoleMaster instanceRole = 1
instanceRoleSlave instanceRole = 2
)
type sentinelClient interface {
IsMasterDownByAddr(IsMasterDownByAddrArgs) (IsMasterDownByAddrReply, error)
}
type IsMasterDownByAddrArgs struct {
Name string
IP string
Port string
CurrentEpoch int
SelfID string
}
func (req *IsMasterDownByAddrArgs) decode(parts [][]byte) error {
if len(parts) != 5 {
return fmt.Errorf("invalid arg")
}
req.Name, req.IP, req.Port, req.SelfID = string(parts[0]), string(parts[1]), string(parts[2]), string(parts[4])
intepoch, err := strconv.Atoi(string(parts[3]))
if err != nil {
return err
}
req.CurrentEpoch = intepoch
return nil
}
type IsMasterDownByAddrReply struct {
MasterDown bool
VotedLeaderID string
LeaderEpoch int
}
func (res *IsMasterDownByAddrReply) decode(parts []string) error {
if len(parts) != 3 {
return fmt.Errorf("invalid arg")
}
res.MasterDown = false
if parts[0] == "1" {
res.MasterDown = true
}
leaderEpoch, err := strconv.Atoi(parts[2])
if err != nil {
return err
}
res.LeaderEpoch = leaderEpoch
res.VotedLeaderID = parts[1]
return nil
}
func (res *IsMasterDownByAddrReply) toLines() []string {
down := "0"
if res.MasterDown {
down = "1"
}
return []string{
down,
res.VotedLeaderID,
strconv.Itoa(res.LeaderEpoch),
}
}
const (
SENTINEL_MAX_DESYNC = 1000
)