Skip to content

Commit

Permalink
TLA+ spec of raft consensus algorithm in etcd implementation and mode…
Browse files Browse the repository at this point in the history
…l based trace validation

Signed-off-by: Joshua Zhang <[email protected]>
  • Loading branch information
joshuazh-x committed Jan 29, 2024
1 parent 026484c commit 1e9254f
Show file tree
Hide file tree
Showing 13 changed files with 6,314 additions and 0 deletions.
4 changes: 4 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func setupNode(c *Config, peers []Peer) *node {
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
n := setupNode(c, peers)
traceInitState(n.rn.raft)
go n.run()
return n
}
Expand All @@ -284,6 +285,7 @@ func RestartNode(c *Config) Node {
panic(err)
}
n := newNode(rn)
traceInitState(n.rn.raft)
go n.run()
return &n
}
Expand Down Expand Up @@ -440,6 +442,8 @@ func (n *node) run() {
rd = Ready{}
}
readyc = nil

traceReady(r)
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
Expand Down
26 changes: 26 additions & 0 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ type Config struct {
// This behavior will become unconditional in the future. See:
// https://github.com/etcd-io/raft/issues/83
StepDownOnRemoval bool

// raft state tracer
TraceLogger TraceLogger
}

func (c *Config) validate() error {
Expand Down Expand Up @@ -427,6 +430,9 @@ type raft struct {
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message

traceLogger TraceLogger
initStateTraced bool
}

func newRaft(c *Config) *raft {
Expand Down Expand Up @@ -456,6 +462,8 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
disableConfChangeValidation: c.DisableConfChangeValidation,
stepDownOnRemoval: c.StepDownOnRemoval,
traceLogger: c.TraceLogger,
initStateTraced: false,
}

cfg, trk, err := confchange.Restore(confchange.Changer{
Expand Down Expand Up @@ -578,11 +586,13 @@ func (r *raft) send(m pb.Message) {
// we err on the side of safety and omit a `&& !m.Reject` condition
// above.
r.msgsAfterAppend = append(r.msgsAfterAppend, m)
traceSendMessage(r, &m)
} else {
if m.To == r.id {
r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
}
r.msgs = append(r.msgs, m)
traceSendMessage(r, &m)
}
}

Expand Down Expand Up @@ -753,6 +763,8 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) {
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
defer traceCommit(r)

mci := r.trk.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
}
Expand Down Expand Up @@ -793,6 +805,9 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
if es[i].Type == pb.EntryNormal {
traceReplicate(r)
}
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
Expand Down Expand Up @@ -868,6 +883,8 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)

traceBecomeFollower(r)
}

func (r *raft) becomeCandidate() {
Expand All @@ -881,6 +898,8 @@ func (r *raft) becomeCandidate() {
r.Vote = r.id
r.state = StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)

traceBecomeCandidate(r)
}

func (r *raft) becomePreCandidate() {
Expand All @@ -904,6 +923,7 @@ func (r *raft) becomeLeader() {
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}

r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
Expand All @@ -926,6 +946,7 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

traceBecomeLeader(r)
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
Expand Down Expand Up @@ -1049,6 +1070,8 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected
}

func (r *raft) Step(m pb.Message) error {
traceReceiveMessage(r, &m)

// Handle the message term, which may result in our stepping down to a follower.
switch {
case m.Term == 0:
Expand Down Expand Up @@ -1291,6 +1314,7 @@ func stepLeader(r *raft, m pb.Message) error {
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
traceChangeConfEvent(cc, r)
}
}
}
Expand Down Expand Up @@ -1914,6 +1938,8 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.ConfState {
traceConfChangeEvent(cfg, r)

r.trk.Config = cfg
r.trk.Progress = trk

Expand Down
Loading

0 comments on commit 1e9254f

Please sign in to comment.