Skip to content

Commit

Permalink
fix(consensus): strong termination for the binary agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed Oct 21, 2023
1 parent 1fc2f15 commit 2d228a9
Show file tree
Hide file tree
Showing 17 changed files with 310 additions and 85 deletions.
21 changes: 14 additions & 7 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (cs *consensus) AddVote(v *vote.Vote) {
}

if v.Type() == vote.VoteTypeCPPreVote ||
v.Type() == vote.VoteTypeCPMainVote {
v.Type() == vote.VoteTypeCPMainVote ||
v.Type() == vote.VoteTypeCPDecided {
err := cs.changeProposer.checkJust(v)
if err != nil {
cs.logger.Error("error on adding a cp vote", "vote", v, "error", err)
Expand Down Expand Up @@ -331,6 +332,14 @@ func (cs *consensus) signAddCPMainVote(hash hash.Hash,
cs.signAddVote(v)
}

func (cs *consensus) signAddCPDecidedVote(hash hash.Hash,
cpRound int16, cpValue vote.CPValue, just vote.Just,
) {
v := vote.NewCPDecidedVote(hash, cs.height, cs.round,
cpRound, cpValue, just, cs.valKey.Address())
cs.signAddVote(v)
}

func (cs *consensus) signAddPrepareVote(hash hash.Hash) {
v := vote.NewPrepareVote(hash, cs.height, cs.round, cs.valKey.Address())
cs.signAddVote(v)
Expand Down Expand Up @@ -424,11 +433,9 @@ func (cs *consensus) PickRandomVote(round int16) *vote.Vote {
m := cs.log.RoundMessages(round)
votes = append(votes, m.AllVotes()...)
} else {
// Don't broadcast prepare and precommit votes for previous rounds
vs0 := cs.log.CPPreVoteVoteSet(round)
vs1 := cs.log.CPMainVoteVoteSet(round)
votes = append(votes, vs0.AllVotes()...)
votes = append(votes, vs1.AllVotes()...)
// Only broadcast cp:decided votes
vs := cs.log.CPDecidedVoteVoteSet(round)
votes = append(votes, vs.AllVotes()...)
}
if len(votes) == 0 {
return nil
Expand All @@ -440,7 +447,7 @@ func (cs *consensus) startChangingProposer() {
// If it is not decided yet.
// TODO: can we remove this condition in new consensus model?
if cs.cpDecided == -1 {
cs.logger.Debug("changing proposer started", "cpRound", cs.cpRound)
cs.logger.Info("changing proposer started", "cpRound", cs.cpRound)
cs.enterNewState(cs.cpPreVoteState)
}
}
49 changes: 45 additions & 4 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ func (td *testData) addCPMainVote(cons *consensus, blockHash hash.Hash, height u
td.addVote(cons, v, valID)
}

func (td *testData) addCPDecidedVote(cons *consensus, blockHash hash.Hash, height uint32, round int16,
cpRound int16, cpVal vote.CPValue, just vote.Just, valID int,
) {
v := vote.NewCPDecidedVote(blockHash, height, round, cpRound, cpVal, just, td.valKeys[valID].Address())
td.addVote(cons, v, valID)
}

func (td *testData) addVote(cons *consensus, v *vote.Vote, valID int) *vote.Vote {
td.HelperSignVote(td.valKeys[valID], v)
cons.AddVote(v)
Expand Down Expand Up @@ -546,11 +553,44 @@ func TestPickRandomVote(t *testing.T) {

td.enterNewHeight(td.consP)
assert.Nil(t, td.consP.PickRandomVote(0))
cpRound := int16(1)

// === make valid certificate
sbPreVote := certificate.BlockCertificateSignBytes(hash.UndefHash, 1, 0)
sbPreVote = append(sbPreVote, util.StringToBytes(vote.VoteTypeCPPreVote.String())...)
sbPreVote = append(sbPreVote, util.Int16ToSlice(cpRound)...)
sbPreVote = append(sbPreVote, byte(vote.CPValueOne))

sbMainVote := certificate.BlockCertificateSignBytes(hash.UndefHash, 1, 0)
sbMainVote = append(sbMainVote, util.StringToBytes(vote.VoteTypeCPMainVote.String())...)
sbMainVote = append(sbMainVote, util.Int16ToSlice(cpRound)...)
sbMainVote = append(sbMainVote, byte(vote.CPValueOne))

committers := []int32{}
preVoteSigs := []*bls.Signature{}
mainVoteSigs := []*bls.Signature{}
for i, val := range td.consP.validators {
committers = append(committers, val.Number())
preVoteSigs = append(preVoteSigs, td.valKeys[i].Sign(sbPreVote))
mainVoteSigs = append(mainVoteSigs, td.valKeys[i].Sign(sbMainVote))
}

preVoteAggSig := bls.SignatureAggregate(preVoteSigs...)
mainVoteAggSig := bls.SignatureAggregate(mainVoteSigs...)

certPreVote := certificate.NewCertificate(1, 0, committers, []int32{}, preVoteAggSig)
certMainVote := certificate.NewCertificate(1, 0, committers, []int32{}, mainVoteAggSig)
// ====

// round 0
td.addPrepareVote(td.consP, td.RandHash(), 1, 0, tIndexX)
td.addPrepareVote(td.consP, td.RandHash(), 1, 0, tIndexY)
td.addCPPreVote(td.consP, hash.UndefHash, 1, 0, 0, vote.CPValueOne, &vote.JustInitOne{}, tIndexY)
td.addCPPreVote(td.consP, hash.UndefHash, 1, 0, cpRound+1, vote.CPValueOne,
&vote.JustPreVoteHard{QCert: certPreVote}, tIndexY)
td.addCPMainVote(td.consP, hash.UndefHash, 1, 0, cpRound, vote.CPValueOne,
&vote.JustMainVoteNoConflict{QCert: certPreVote}, tIndexY)
td.addCPDecidedVote(td.consP, hash.UndefHash, 1, 0, cpRound, vote.CPValueOne,
&vote.JustDecided{QCert: certMainVote}, tIndexY)

assert.NotNil(t, td.consP.PickRandomVote(0))

Expand Down Expand Up @@ -693,10 +733,11 @@ func TestCases(t *testing.T) {
round int16
description string
}{
{1694848856237853398, 2, "1/3+ cp:PRE-VOTE in prepare step"},
{1697898884837384019, 2, "1/3+ cp:PRE-VOTE in prepare step"},
{1694848907840926239, 0, "1/3+ cp:PRE-VOTE in precommit step"},
{1694849103290580532, 1, "Conflicting votes, cp-round=0"},
{1694849186681644508, 1, "Conflicting votes, cp-round=1"},
{1697900665869342730, 1, "Conflicting votes, cp-round=1"},
{1697887970998950590, 1, "consP & consB: Change Proposer, consX & consY: Commit (2 block announces)"},
}

for i, test := range tests {
Expand Down Expand Up @@ -910,7 +951,7 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) (
}

// Check if more than 1/3 of nodes has committed the same block
if len(blockAnnounces) >= 3 {
if len(blockAnnounces) >= 2 {
var firstAnnounce *message.BlockAnnounceMessage
for _, msg := range blockAnnounces {
if firstAnnounce == nil {
Expand Down
61 changes: 59 additions & 2 deletions consensus/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,68 @@ func (cp *changeProposer) checkJustMainVote(v *vote.Vote) error {
}
}

func (cp *changeProposer) checkJustDecide(v *vote.Vote) error {
err := cp.checkCPValue(v, vote.CPValueZero, vote.CPValueOne)
if err != nil {
return err

Check warning on line 319 in consensus/cp.go

View check run for this annotation

Codecov / codecov/patch

consensus/cp.go#L319

Added line #L319 was not covered by tests
}
j, ok := v.CPJust().(*vote.JustDecided)
if !ok {
return invalidJustificationError{
JustType: j.Type(),
Reason: "invalid just data",

Check warning on line 325 in consensus/cp.go

View check run for this annotation

Codecov / codecov/patch

consensus/cp.go#L323-L325

Added lines #L323 - L325 were not covered by tests
}
}

sb := certificate.BlockCertificateSignBytes(v.BlockHash(),
j.QCert.Height(),
j.QCert.Round())
sb = append(sb, util.StringToBytes(vote.VoteTypeCPMainVote.String())...)
sb = append(sb, util.Int16ToSlice(v.CPRound())...)
sb = append(sb, byte(v.CPValue()))

err = j.QCert.Validate(cp.height, cp.validators, sb)
if err != nil {
return invalidJustificationError{
JustType: j.Type(),
Reason: err.Error(),

Check warning on line 340 in consensus/cp.go

View check run for this annotation

Codecov / codecov/patch

consensus/cp.go#L338-L340

Added lines #L338 - L340 were not covered by tests
}
}
return nil
}

func (cp *changeProposer) checkJust(v *vote.Vote) error {
if v.Type() == vote.VoteTypeCPPreVote {
switch v.Type() {
case vote.VoteTypeCPPreVote:
return cp.checkJustPreVote(v)
} else if v.Type() == vote.VoteTypeCPMainVote {
case vote.VoteTypeCPMainVote:
return cp.checkJustMainVote(v)
case vote.VoteTypeCPDecided:
return cp.checkJustDecide(v)
default:
panic("unreachable")

Check warning on line 355 in consensus/cp.go

View check run for this annotation

Codecov / codecov/patch

consensus/cp.go#L354-L355

Added lines #L354 - L355 were not covered by tests
}
}

func (cp *changeProposer) checkForTermination(v *vote.Vote) {
if v.Type() == vote.VoteTypeCPDecided &&
v.Round() == cp.round {
cp.cpDecide(v.CPValue())
}
}

func (cp *changeProposer) cpDecide(cpValue vote.CPValue) {
if cpValue == vote.CPValueOne {
cp.round++
cp.cpDecided = 1
cp.enterNewState(cp.proposeState)
} else if cpValue == vote.CPValueZero {
roundProposal := cp.log.RoundProposal(cp.round)
if roundProposal == nil {
cp.queryProposal()
}
cp.cpDecided = 0
cp.enterNewState(cp.prepareState)
} else {
panic("unreachable")
}
Expand Down
56 changes: 30 additions & 26 deletions consensus/cp_decide.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consensus

import (
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/vote"
)

Expand All @@ -13,33 +14,34 @@ func (s *cpDecideState) enter() {
}

func (s *cpDecideState) decide() {
if s.cpDecided == 1 {
s.round++
s.enterNewState(s.proposeState)
} else if s.cpDecided == 0 {
roundProposal := s.log.RoundProposal(s.round)
if roundProposal == nil {
s.queryProposal()
}
s.enterNewState(s.prepareState)
} else {
cpMainVotes := s.log.CPMainVoteVoteSet(s.round)
if cpMainVotes.HasTwoThirdOfTotalPower(s.cpRound) {
if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueOne) {
// decided for yes, and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 1, "round", s.cpRound)

s.cpDecided = 1
} else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueZero) {
// decided for no and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound)

s.cpDecided = 0
} else {
// conflicting votes
s.logger.Debug("conflicting main votes", "round", s.cpRound)
cpMainVotes := s.log.CPMainVoteVoteSet(s.round)
if cpMainVotes.HasTwoThirdOfTotalPower(s.cpRound) {
if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueOne) {
// decided for yes, and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 1, "round", s.cpRound)

votes := cpMainVotes.BinaryVotes(s.cpRound, vote.CPValueOne)
cert := s.makeCertificate(votes)
just := &vote.JustDecided{
QCert: cert,
}

s.signAddCPDecidedVote(hash.UndefHash, s.cpRound, vote.CPValueOne, just)
s.cpDecide(vote.CPValueOne)
} else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueZero) {
// decided for no and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound)

votes := cpMainVotes.BinaryVotes(s.cpRound, vote.CPValueZero)
cert := s.makeCertificate(votes)
just := &vote.JustDecided{
QCert: cert,
}
s.signAddCPDecidedVote(*s.cpWeakValidity, s.cpRound, vote.CPValueZero, just)
s.cpDecide(vote.CPValueZero)
s.cpDecided = 0
} else {
// conflicting votes
s.logger.Debug("conflicting main votes", "round", s.cpRound)
s.cpRound++
s.enterNewState(s.cpPreVoteState)
}
Expand All @@ -50,6 +52,8 @@ func (s *cpDecideState) onAddVote(v *vote.Vote) {
if v.Type() == vote.VoteTypeCPMainVote {
s.decide()
}

s.checkForTermination(v)
}

func (s *cpDecideState) name() string {
Expand Down
3 changes: 3 additions & 0 deletions consensus/cp_mainvote.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s *cpMainVoteState) decide() {
Just0: vote0.CPJust(),
Just1: vote1.CPJust(),
}

s.signAddCPMainVote(*s.cpWeakValidity, s.cpRound, vote.CPValueAbstain, just)
s.enterNewState(s.cpDecideState)
}
Expand Down Expand Up @@ -88,6 +89,8 @@ func (s *cpMainVoteState) onAddVote(v *vote.Vote) {
if v.Type() == vote.VoteTypeCPPreVote {
s.decide()
}

s.checkForTermination(v)
}

func (s *cpMainVoteState) name() string {
Expand Down
18 changes: 2 additions & 16 deletions consensus/cp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,7 @@ func TestChangeProposerAgreement1(t *testing.T) {
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 0, vote.CPValueOne, mainVote0.CPJust(), tIndexX)
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 0, vote.CPValueOne, mainVote0.CPJust(), tIndexY)

preVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPPreVote, hash.UndefHash)
td.addCPPreVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, preVote1.CPJust(), tIndexX)
td.addCPPreVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, preVote1.CPJust(), tIndexY)

mainVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPMainVote, hash.UndefHash)
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, mainVote1.CPJust(), tIndexX)
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, mainVote1.CPJust(), tIndexY)

td.shouldPublishVote(t, td.consP, vote.VoteTypeCPDecided, hash.UndefHash)
checkHeightRound(t, td.consP, h, r+1)
}

Expand Down Expand Up @@ -90,14 +83,7 @@ func TestChangeProposerAgreement0(t *testing.T) {
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 0, vote.CPValueZero, mainVote0.CPJust(), tIndexX)
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 0, vote.CPValueZero, mainVote0.CPJust(), tIndexY)

preVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPPreVote, p.Block().Hash())
td.addCPPreVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, preVote1.CPJust(), tIndexX)
td.addCPPreVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, preVote1.CPJust(), tIndexY)

mainVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPMainVote, p.Block().Hash())
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, mainVote1.CPJust(), tIndexX)
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, mainVote1.CPJust(), tIndexY)

td.shouldPublishVote(t, td.consP, vote.VoteTypeCPDecided, p.Block().Hash())
td.shouldPublishQueryProposal(t, td.consP, h)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY)
Expand Down
6 changes: 6 additions & 0 deletions consensus/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (log *Log) mustGetRoundMessages(round int16) *Messages {
precommitVotes: voteset.NewPrecommitVoteSet(round, log.totalPower, log.validators),
cpPreVotes: voteset.NewCPPreVoteVoteSet(round, log.totalPower, log.validators),
cpMainVotes: voteset.NewCPMainVoteVoteSet(round, log.totalPower, log.validators),
cpDecidedVotes: voteset.NewCPDecidedVoteVoteSet(round, log.totalPower, log.validators),
}
log.roundMessages[round] = rm
}
Expand Down Expand Up @@ -74,6 +75,11 @@ func (log *Log) CPMainVoteVoteSet(round int16) *voteset.BinaryVoteSet {
return m.cpMainVotes
}

func (log *Log) CPDecidedVoteVoteSet(round int16) *voteset.BinaryVoteSet {
m := log.mustGetRoundMessages(round)
return m.cpDecidedVotes

Check warning on line 80 in consensus/log/log.go

View check run for this annotation

Codecov / codecov/patch

consensus/log/log.go#L79-L80

Added lines #L79 - L80 were not covered by tests
}

func (log *Log) HasRoundProposal(round int16) bool {
return log.RoundProposal(round) != nil
}
Expand Down
4 changes: 4 additions & 0 deletions consensus/log/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Messages struct {
precommitVotes *voteset.BlockVoteSet // Precommit votes
cpPreVotes *voteset.BinaryVoteSet // Change proposer Pre-votes
cpMainVotes *voteset.BinaryVoteSet // Change proposer Main-votes
cpDecidedVotes *voteset.BinaryVoteSet // Change proposer Decided-votes
proposal *proposal.Proposal
}

Expand All @@ -27,6 +28,8 @@ func (m *Messages) addVote(v *vote.Vote) (bool, error) {
return m.cpPreVotes.AddVote(v)
case vote.VoteTypeCPMainVote:
return m.cpMainVotes.AddVote(v)
case vote.VoteTypeCPDecided:
return m.cpDecidedVotes.AddVote(v)
}

return false, fmt.Errorf("unexpected vote type: %v", v.Type())
Expand All @@ -48,6 +51,7 @@ func (m *Messages) AllVotes() []*vote.Vote {
votes = append(votes, m.precommitVotes.AllVotes()...)
votes = append(votes, m.cpPreVotes.AllVotes()...)
votes = append(votes, m.cpMainVotes.AllVotes()...)
votes = append(votes, m.cpDecidedVotes.AllVotes()...)

return votes
}
11 changes: 9 additions & 2 deletions consensus/voteset/binary_voteset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,21 @@ type BinaryVoteSet struct {
func NewCPPreVoteVoteSet(round int16, totalPower int64,
validators map[crypto.Address]*validator.Validator,
) *BinaryVoteSet {
voteSet := newVoteSet(vote.VoteTypeCPPreVote, round, totalPower, validators)
voteSet := newVoteSet(round, totalPower, validators)
return newBinaryVoteSet(voteSet)
}

func NewCPMainVoteVoteSet(round int16, totalPower int64,
validators map[crypto.Address]*validator.Validator,
) *BinaryVoteSet {
voteSet := newVoteSet(vote.VoteTypeCPMainVote, round, totalPower, validators)
voteSet := newVoteSet(round, totalPower, validators)
return newBinaryVoteSet(voteSet)
}

func NewCPDecidedVoteVoteSet(round int16, totalPower int64,
validators map[crypto.Address]*validator.Validator,
) *BinaryVoteSet {
voteSet := newVoteSet(round, totalPower, validators)
return newBinaryVoteSet(voteSet)
}

Expand Down
Loading

0 comments on commit 2d228a9

Please sign in to comment.