Skip to content

Commit

Permalink
fix(consensus): strong termination for the binary agreement (#765)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Oct 21, 2023
1 parent 1fc2f15 commit 34eff4f
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 86 deletions.
21 changes: 14 additions & 7 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (cs *consensus) AddVote(v *vote.Vote) {
}

if v.Type() == vote.VoteTypeCPPreVote ||
v.Type() == vote.VoteTypeCPMainVote {
v.Type() == vote.VoteTypeCPMainVote ||
v.Type() == vote.VoteTypeCPDecided {
err := cs.changeProposer.checkJust(v)
if err != nil {
cs.logger.Error("error on adding a cp vote", "vote", v, "error", err)
Expand Down Expand Up @@ -331,6 +332,14 @@ func (cs *consensus) signAddCPMainVote(hash hash.Hash,
cs.signAddVote(v)
}

func (cs *consensus) signAddCPDecidedVote(hash hash.Hash,
cpRound int16, cpValue vote.CPValue, just vote.Just,
) {
v := vote.NewCPDecidedVote(hash, cs.height, cs.round,
cpRound, cpValue, just, cs.valKey.Address())
cs.signAddVote(v)
}

func (cs *consensus) signAddPrepareVote(hash hash.Hash) {
v := vote.NewPrepareVote(hash, cs.height, cs.round, cs.valKey.Address())
cs.signAddVote(v)
Expand Down Expand Up @@ -424,11 +433,9 @@ func (cs *consensus) PickRandomVote(round int16) *vote.Vote {
m := cs.log.RoundMessages(round)
votes = append(votes, m.AllVotes()...)
} else {
// Don't broadcast prepare and precommit votes for previous rounds
vs0 := cs.log.CPPreVoteVoteSet(round)
vs1 := cs.log.CPMainVoteVoteSet(round)
votes = append(votes, vs0.AllVotes()...)
votes = append(votes, vs1.AllVotes()...)
// Only broadcast cp:decided votes
vs := cs.log.CPDecidedVoteVoteSet(round)
votes = append(votes, vs.AllVotes()...)
}
if len(votes) == 0 {
return nil
Expand All @@ -440,7 +447,7 @@ func (cs *consensus) startChangingProposer() {
// If it is not decided yet.
// TODO: can we remove this condition in new consensus model?
if cs.cpDecided == -1 {
cs.logger.Debug("changing proposer started", "cpRound", cs.cpRound)
cs.logger.Info("changing proposer started", "cpRound", cs.cpRound)
cs.enterNewState(cs.cpPreVoteState)
}
}
49 changes: 45 additions & 4 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ func (td *testData) addCPMainVote(cons *consensus, blockHash hash.Hash, height u
td.addVote(cons, v, valID)
}

func (td *testData) addCPDecidedVote(cons *consensus, blockHash hash.Hash, height uint32, round int16,
cpRound int16, cpVal vote.CPValue, just vote.Just, valID int,
) {
v := vote.NewCPDecidedVote(blockHash, height, round, cpRound, cpVal, just, td.valKeys[valID].Address())
td.addVote(cons, v, valID)
}

func (td *testData) addVote(cons *consensus, v *vote.Vote, valID int) *vote.Vote {
td.HelperSignVote(td.valKeys[valID], v)
cons.AddVote(v)
Expand Down Expand Up @@ -546,11 +553,44 @@ func TestPickRandomVote(t *testing.T) {

td.enterNewHeight(td.consP)
assert.Nil(t, td.consP.PickRandomVote(0))
cpRound := int16(1)

// === make valid certificate
sbPreVote := certificate.BlockCertificateSignBytes(hash.UndefHash, 1, 0)
sbPreVote = append(sbPreVote, util.StringToBytes(vote.VoteTypeCPPreVote.String())...)
sbPreVote = append(sbPreVote, util.Int16ToSlice(cpRound)...)
sbPreVote = append(sbPreVote, byte(vote.CPValueOne))

sbMainVote := certificate.BlockCertificateSignBytes(hash.UndefHash, 1, 0)
sbMainVote = append(sbMainVote, util.StringToBytes(vote.VoteTypeCPMainVote.String())...)
sbMainVote = append(sbMainVote, util.Int16ToSlice(cpRound)...)
sbMainVote = append(sbMainVote, byte(vote.CPValueOne))

committers := []int32{}
preVoteSigs := []*bls.Signature{}
mainVoteSigs := []*bls.Signature{}
for i, val := range td.consP.validators {
committers = append(committers, val.Number())
preVoteSigs = append(preVoteSigs, td.valKeys[i].Sign(sbPreVote))
mainVoteSigs = append(mainVoteSigs, td.valKeys[i].Sign(sbMainVote))
}

preVoteAggSig := bls.SignatureAggregate(preVoteSigs...)
mainVoteAggSig := bls.SignatureAggregate(mainVoteSigs...)

certPreVote := certificate.NewCertificate(1, 0, committers, []int32{}, preVoteAggSig)
certMainVote := certificate.NewCertificate(1, 0, committers, []int32{}, mainVoteAggSig)
// ====

// round 0
td.addPrepareVote(td.consP, td.RandHash(), 1, 0, tIndexX)
td.addPrepareVote(td.consP, td.RandHash(), 1, 0, tIndexY)
td.addCPPreVote(td.consP, hash.UndefHash, 1, 0, 0, vote.CPValueOne, &vote.JustInitOne{}, tIndexY)
td.addCPPreVote(td.consP, hash.UndefHash, 1, 0, cpRound+1, vote.CPValueOne,
&vote.JustPreVoteHard{QCert: certPreVote}, tIndexY)
td.addCPMainVote(td.consP, hash.UndefHash, 1, 0, cpRound, vote.CPValueOne,
&vote.JustMainVoteNoConflict{QCert: certPreVote}, tIndexY)
td.addCPDecidedVote(td.consP, hash.UndefHash, 1, 0, cpRound, vote.CPValueOne,
&vote.JustDecided{QCert: certMainVote}, tIndexY)

assert.NotNil(t, td.consP.PickRandomVote(0))

Expand Down Expand Up @@ -693,10 +733,11 @@ func TestCases(t *testing.T) {
round int16
description string
}{
{1694848856237853398, 2, "1/3+ cp:PRE-VOTE in prepare step"},
{1697898884837384019, 2, "1/3+ cp:PRE-VOTE in prepare step"},
{1694848907840926239, 0, "1/3+ cp:PRE-VOTE in precommit step"},
{1694849103290580532, 1, "Conflicting votes, cp-round=0"},
{1694849186681644508, 1, "Conflicting votes, cp-round=1"},
{1697900665869342730, 1, "Conflicting votes, cp-round=1"},
{1697887970998950590, 1, "consP & consB: Change Proposer, consX & consY: Commit (2 block announces)"},
}

for i, test := range tests {
Expand Down Expand Up @@ -910,7 +951,7 @@ func checkConsensus(td *testData, height uint32, byzVotes []*vote.Vote) (
}

// Check if more than 1/3 of nodes has committed the same block
if len(blockAnnounces) >= 3 {
if len(blockAnnounces) >= 2 {
var firstAnnounce *message.BlockAnnounceMessage
for _, msg := range blockAnnounces {
if firstAnnounce == nil {
Expand Down
61 changes: 58 additions & 3 deletions consensus/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,67 @@ func (cp *changeProposer) checkJustMainVote(v *vote.Vote) error {
}
}

func (cp *changeProposer) checkJustDecide(v *vote.Vote) error {
err := cp.checkCPValue(v, vote.CPValueZero, vote.CPValueOne)
if err != nil {
return err
}
j, ok := v.CPJust().(*vote.JustDecided)
if !ok {
return invalidJustificationError{
JustType: j.Type(),
Reason: "invalid just data",
}
}

sb := certificate.BlockCertificateSignBytes(v.BlockHash(),
j.QCert.Height(),
j.QCert.Round())
sb = append(sb, util.StringToBytes(vote.VoteTypeCPMainVote.String())...)
sb = append(sb, util.Int16ToSlice(v.CPRound())...)
sb = append(sb, byte(v.CPValue()))

err = j.QCert.Validate(cp.height, cp.validators, sb)
if err != nil {
return invalidJustificationError{
JustType: j.Type(),
Reason: err.Error(),
}
}
return nil
}

func (cp *changeProposer) checkJust(v *vote.Vote) error {
if v.Type() == vote.VoteTypeCPPreVote {
switch v.Type() {
case vote.VoteTypeCPPreVote:
return cp.checkJustPreVote(v)
} else if v.Type() == vote.VoteTypeCPMainVote {
case vote.VoteTypeCPMainVote:
return cp.checkJustMainVote(v)
} else {
case vote.VoteTypeCPDecided:
return cp.checkJustDecide(v)
default:
panic("unreachable")
}
}

func (cp *changeProposer) checkForTermination(v *vote.Vote) {
if v.Type() == vote.VoteTypeCPDecided &&
v.Round() == cp.round {
cp.cpDecide(v.CPValue())
}
}

func (cp *changeProposer) cpDecide(cpValue vote.CPValue) {
if cpValue == vote.CPValueOne {
cp.round++
cp.cpDecided = 1
cp.enterNewState(cp.proposeState)
} else if cpValue == vote.CPValueZero {
roundProposal := cp.log.RoundProposal(cp.round)
if roundProposal == nil {
cp.queryProposal()
}
cp.cpDecided = 0
cp.enterNewState(cp.prepareState)
}
}
55 changes: 29 additions & 26 deletions consensus/cp_decide.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consensus

import (
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/types/vote"
)

Expand All @@ -13,33 +14,33 @@ func (s *cpDecideState) enter() {
}

func (s *cpDecideState) decide() {
if s.cpDecided == 1 {
s.round++
s.enterNewState(s.proposeState)
} else if s.cpDecided == 0 {
roundProposal := s.log.RoundProposal(s.round)
if roundProposal == nil {
s.queryProposal()
}
s.enterNewState(s.prepareState)
} else {
cpMainVotes := s.log.CPMainVoteVoteSet(s.round)
if cpMainVotes.HasTwoThirdOfTotalPower(s.cpRound) {
if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueOne) {
// decided for yes, and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 1, "round", s.cpRound)

s.cpDecided = 1
} else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueZero) {
// decided for no and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound)

s.cpDecided = 0
} else {
// conflicting votes
s.logger.Debug("conflicting main votes", "round", s.cpRound)
cpMainVotes := s.log.CPMainVoteVoteSet(s.round)
if cpMainVotes.HasTwoThirdOfTotalPower(s.cpRound) {
if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueOne) {
// decided for yes, and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 1, "round", s.cpRound)

votes := cpMainVotes.BinaryVotes(s.cpRound, vote.CPValueOne)
cert := s.makeCertificate(votes)
just := &vote.JustDecided{
QCert: cert,
}

s.signAddCPDecidedVote(hash.UndefHash, s.cpRound, vote.CPValueOne, just)
s.cpDecide(vote.CPValueOne)
} else if cpMainVotes.HasQuorumVotesFor(s.cpRound, vote.CPValueZero) {
// decided for no and proceeds to the next round
s.logger.Info("binary agreement decided", "value", 0, "round", s.cpRound)

votes := cpMainVotes.BinaryVotes(s.cpRound, vote.CPValueZero)
cert := s.makeCertificate(votes)
just := &vote.JustDecided{
QCert: cert,
}
s.signAddCPDecidedVote(*s.cpWeakValidity, s.cpRound, vote.CPValueZero, just)
s.cpDecide(vote.CPValueZero)
} else {
// conflicting votes
s.logger.Debug("conflicting main votes", "round", s.cpRound)
s.cpRound++
s.enterNewState(s.cpPreVoteState)
}
Expand All @@ -50,6 +51,8 @@ func (s *cpDecideState) onAddVote(v *vote.Vote) {
if v.Type() == vote.VoteTypeCPMainVote {
s.decide()
}

s.checkForTermination(v)
}

func (s *cpDecideState) name() string {
Expand Down
3 changes: 3 additions & 0 deletions consensus/cp_mainvote.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s *cpMainVoteState) decide() {
Just0: vote0.CPJust(),
Just1: vote1.CPJust(),
}

s.signAddCPMainVote(*s.cpWeakValidity, s.cpRound, vote.CPValueAbstain, just)
s.enterNewState(s.cpDecideState)
}
Expand Down Expand Up @@ -88,6 +89,8 @@ func (s *cpMainVoteState) onAddVote(v *vote.Vote) {
if v.Type() == vote.VoteTypeCPPreVote {
s.decide()
}

s.checkForTermination(v)
}

func (s *cpMainVoteState) name() string {
Expand Down
49 changes: 33 additions & 16 deletions consensus/cp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,7 @@ func TestChangeProposerAgreement1(t *testing.T) {
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 0, vote.CPValueOne, mainVote0.CPJust(), tIndexX)
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 0, vote.CPValueOne, mainVote0.CPJust(), tIndexY)

preVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPPreVote, hash.UndefHash)
td.addCPPreVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, preVote1.CPJust(), tIndexX)
td.addCPPreVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, preVote1.CPJust(), tIndexY)

mainVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPMainVote, hash.UndefHash)
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, mainVote1.CPJust(), tIndexX)
td.addCPMainVote(td.consP, hash.UndefHash, h, r, 1, vote.CPValueOne, mainVote1.CPJust(), tIndexY)

td.shouldPublishVote(t, td.consP, vote.VoteTypeCPDecided, hash.UndefHash)
checkHeightRound(t, td.consP, h, r+1)
}

Expand Down Expand Up @@ -90,14 +83,7 @@ func TestChangeProposerAgreement0(t *testing.T) {
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 0, vote.CPValueZero, mainVote0.CPJust(), tIndexX)
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 0, vote.CPValueZero, mainVote0.CPJust(), tIndexY)

preVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPPreVote, p.Block().Hash())
td.addCPPreVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, preVote1.CPJust(), tIndexX)
td.addCPPreVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, preVote1.CPJust(), tIndexY)

mainVote1 := td.shouldPublishVote(t, td.consP, vote.VoteTypeCPMainVote, p.Block().Hash())
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, mainVote1.CPJust(), tIndexX)
td.addCPMainVote(td.consP, p.Block().Hash(), h, r, 1, vote.CPValueZero, mainVote1.CPJust(), tIndexY)

td.shouldPublishVote(t, td.consP, vote.VoteTypeCPDecided, p.Block().Hash())
td.shouldPublishQueryProposal(t, td.consP, h)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexX)
td.addPrecommitVote(td.consP, p.Block().Hash(), h, r, tIndexY)
Expand Down Expand Up @@ -457,3 +443,34 @@ func TestInvalidJustMainVoteConflict(t *testing.T) {
})
})
}

func TestInvalidJustDecided(t *testing.T) {
td := setup(t)

td.enterNewHeight(td.consX)
h := uint32(1)
r := int16(0)
just := &vote.JustDecided{
QCert: td.GenerateTestCertificate(h),
}

t.Run("invalid value: abstain", func(t *testing.T) {
v := vote.NewCPDecidedVote(td.RandHash(), h, r, 0, vote.CPValueAbstain, just, td.consB.valKey.Address())

err := td.consX.changeProposer.checkJust(v)
assert.ErrorIs(t, err, invalidJustificationError{
JustType: just.Type(),
Reason: "invalid value: abstain",
})
})

t.Run("invalid certificate", func(t *testing.T) {
v := vote.NewCPDecidedVote(td.RandHash(), h, r, 0, vote.CPValueOne, just, td.consB.valKey.Address())

err := td.consX.changeProposer.checkJust(v)
assert.ErrorIs(t, err, invalidJustificationError{
JustType: just.Type(),
Reason: fmt.Sprintf("certificate has an unexpected committers: %v", just.QCert.Committers()),
})
})
}
6 changes: 6 additions & 0 deletions consensus/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (log *Log) mustGetRoundMessages(round int16) *Messages {
precommitVotes: voteset.NewPrecommitVoteSet(round, log.totalPower, log.validators),
cpPreVotes: voteset.NewCPPreVoteVoteSet(round, log.totalPower, log.validators),
cpMainVotes: voteset.NewCPMainVoteVoteSet(round, log.totalPower, log.validators),
cpDecidedVotes: voteset.NewCPDecidedVoteVoteSet(round, log.totalPower, log.validators),
}
log.roundMessages[round] = rm
}
Expand Down Expand Up @@ -74,6 +75,11 @@ func (log *Log) CPMainVoteVoteSet(round int16) *voteset.BinaryVoteSet {
return m.cpMainVotes
}

func (log *Log) CPDecidedVoteVoteSet(round int16) *voteset.BinaryVoteSet {
m := log.mustGetRoundMessages(round)
return m.cpDecidedVotes
}

func (log *Log) HasRoundProposal(round int16) bool {
return log.RoundProposal(round) != nil
}
Expand Down
Loading

0 comments on commit 34eff4f

Please sign in to comment.