Skip to content

Commit

Permalink
log: use entryID in raftLog.maybeCommit
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 5, 2024
1 parent cdcc0da commit a52b6af
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 18 deletions.
12 changes: 6 additions & 6 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,12 @@ func (l *raftLog) matchTerm(id entryID) bool {
return t == id.term
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
func (l *raftLog) maybeCommit(at entryID) bool {
// NB: term should never be 0 on a commit because the leader campaigned at
// least at term 1. But if it is 0 for some reason, we don't consider this a
// term match.
if at.term != 0 && at.index > l.committed && l.matchTerm(at) {
l.commitTo(at.index)
return true
}
return false
Expand Down
13 changes: 6 additions & 7 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ func TestCompactionSideEffects(t *testing.T) {
// Populate the log with 1000 entries; 750 in stable storage and 250 in unstable.
lastIndex := uint64(1000)
unstableIndex := uint64(750)
lastTerm := lastIndex
storage := NewMemoryStorage()
for i = 1; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: i, Index: i}})
Expand All @@ -336,7 +335,7 @@ func TestCompactionSideEffects(t *testing.T) {
raftLog.append(pb.Entry{Term: i + 1, Index: i + 1})
}

require.True(t, raftLog.maybeCommit(lastIndex, lastTerm))
require.True(t, raftLog.maybeCommit(raftLog.lastEntryID()))
raftLog.appliedTo(raftLog.committed, 0 /* size */)

offset := uint64(500)
Expand Down Expand Up @@ -408,7 +407,7 @@ func TestHasNextCommittedEnts(t *testing.T) {
raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -466,7 +465,7 @@ func TestNextCommittedEnts(t *testing.T) {
raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -525,7 +524,7 @@ func TestAcceptApplying(t *testing.T) {
raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(3, 0 /* size */)

raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable)
Expand Down Expand Up @@ -574,7 +573,7 @@ func TestAppliedTo(t *testing.T) {
raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(5, 1)
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)

Expand Down Expand Up @@ -733,7 +732,7 @@ func TestCompaction(t *testing.T) {
storage.Append([]pb.Entry{{Index: i}})
}
raftLog := newLog(storage, raftLogger)
raftLog.maybeCommit(tt.lastIndex, 0)
raftLog.maybeCommit(entryID{term: 0, index: tt.lastIndex}) // TODO(pav-kv): this is a no-op

raftLog.appliedTo(raftLog.committed, 0 /* size */)
for j := 0; j < len(tt.compact); j++ {
Expand Down
9 changes: 4 additions & 5 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,11 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) {
r.appliedTo(index, 0 /* size */)
}

// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
// maybeCommit attempts to advance the commit index. Returns true if the commit
// index changed (in which case the caller should call r.bcastAppend). This can
// only be called in StateLeader.
func (r *raft) maybeCommit() bool {
mci := r.trk.Committed()
return r.raftLog.maybeCommit(mci, r.Term)
return r.raftLog.maybeCommit(entryID{term: r.Term, index: r.trk.Committed()})
}

func (r *raft) reset(term uint64) {
Expand Down

0 comments on commit a52b6af

Please sign in to comment.