Skip to content

Commit

Permalink
Leader sends vote as part of its block proposal
Browse files Browse the repository at this point in the history
In order to make it simple to implement replication, it's best to couple
the block proposal and its corresponding vote produced by the block proposer.

This way, a node different than the block proposer can prove that a certain block has been
proposed by the leader of the round.

Signed-off-by: Yacov Manevich <[email protected]>
  • Loading branch information
yacovm committed Dec 30, 2024
1 parent a950b56 commit 34fdbb9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 48 deletions.
132 changes: 88 additions & 44 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package simplex
import (
"bytes"
"context"
"errors"
"fmt"
"simplex/record"
"time"
Expand Down Expand Up @@ -284,16 +285,10 @@ func (e *Epoch) handleFinalizationMessage(message *Message, from NodeID) error {
return e.maybeCollectFinalizationCertificate(round)
}

func (e *Epoch) handleVoteMessage(message *Message, from NodeID) error {
func (e *Epoch) handleVoteMessage(message *Message, _ NodeID) error {
msg := message.VoteMessage
vote := msg.Vote

// Only process point to point votes
if !from.Equals(msg.Signature.Signer) {
e.Logger.Debug("Received a vote signed by a different party than sent it", zap.Stringer("signer", msg.Signature.Signer), zap.Stringer("sender", from))
return nil
}

// TODO: what if we've received a vote for a round we didn't instantiate yet?
round, exists := e.rounds[vote.Round]
if !exists {
Expand All @@ -310,12 +305,16 @@ func (e *Epoch) handleVoteMessage(message *Message, from NodeID) error {
return nil
}

if err := vote.Verify(msg.Signature.Value, e.Verifier, from); err != nil {
e.Logger.Debug("Vote verification failed", zap.Stringer("NodeID", msg.Signature.Signer), zap.Error(err))
return nil
// Only verify the vote if we haven't verified it in the past.
signature := msg.Signature
if _, exists := round.votes[string(signature.Signer)]; !exists {
if err := vote.Verify(signature.Value, e.Verifier, signature.Signer); err != nil {
e.Logger.Debug("Vote verification failed", zap.Stringer("NodeID", signature.Signer), zap.Error(err))
return nil
}
}

e.rounds[vote.Round].votes[string(from)] = msg
e.rounds[vote.Round].votes[string(signature.Signer)] = msg

return e.maybeCollectNotarization()
}
Expand Down Expand Up @@ -594,15 +593,24 @@ func (e *Epoch) hasSomeNodeSignedTwice(nodeIDs []NodeID) bool {
return false
}

func (e *Epoch) handleBlockMessage(message *Message, from NodeID) error {
func (e *Epoch) handleBlockMessage(message *Message, _ NodeID) error {
block := message.BlockMessage.Block
if block == nil {
e.Logger.Debug("Got empty block in a BlockMessage")
return nil
}

vote := message.BlockMessage.Vote
from := vote.Signature.Signer

md := block.BlockHeader()

// Ignore block messages sent by us
if e.ID.Equals(from) {
e.Logger.Debug("Got a BlockMessage from ourselves or created by us")
return nil
}

// Check that the node is a leader for the round corresponding to the block.
if !leaderForRound(e.nodes, md.Round).Equals(from) {
// The block is associated with a round in which the sender is not the leader,
Expand All @@ -629,26 +637,42 @@ func (e *Epoch) handleBlockMessage(message *Message, from NodeID) error {
return nil
}

// Ensure the block was voted on by its block producer:

// 1) Verify block digest corresponds to the digest voted on
if !bytes.Equal(vote.Vote.Digest[:], md.Digest[:]) {
e.Logger.Debug("Vote digest mismatches block digest", zap.Stringer("voteDigest", vote.Vote.Digest),
zap.Stringer("blockDigest", md.Digest))
return nil
}
// 2) Verify the vote is properly signed
if err := vote.Vote.Verify(vote.Signature.Value, e.Verifier, vote.Signature.Signer); err != nil {
e.Logger.Debug("Vote verification failed", zap.Stringer("NodeID", vote.Signature.Signer), zap.Error(err))
return nil
}

if !e.storeProposal(block) {
e.Logger.Warn("Unable to store proposed block for the round", zap.Stringer("NodeID", from), zap.Uint64("round", md.Round))
// TODO: timeout
}

// If this is a block we have proposed, don't write it to the WAL
// because we have done so right before sending it.
// Also, don't bother verifying it.
// Else, it's a block that we have received from the leader of this round.
// So verify it and store it in the WAL.
if !e.ID.Equals(from) {
if err := block.Verify(); err != nil {
e.Logger.Debug("Failed verifying block", zap.Error(err))
return nil
}
record := blockRecord(md, block.Bytes())
e.WAL.Append(record)
// Once we have stored the proposal, we have a Round object for the round.
// We store the vote to prevent verifying its signature again.
round, exists := e.rounds[md.Round]
if !exists {
// This shouldn't happen, but in case it does, return an error
return fmt.Errorf("programming error: round %d not found", md.Round)
}
round.votes[string(vote.Signature.Signer)] = &vote

if err := block.Verify(); err != nil {
e.Logger.Debug("Failed verifying block", zap.Error(err))
return nil
}
record := blockRecord(md, block.Bytes())
e.WAL.Append(record)

return e.doProposed()
return e.doProposed(block, vote)
}

func (e *Epoch) isMetadataValid(block Block) bool {
Expand Down Expand Up @@ -753,10 +777,10 @@ func (e *Epoch) locateBlock(seq uint64, digest []byte) (Block, bool) {
return nil, false
}

func (e *Epoch) proposeBlock() {
func (e *Epoch) proposeBlock() error {
block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata())
if !ok {
return
return errors.New("failed to build block")
}

md := block.BlockHeader()
Expand All @@ -772,19 +796,29 @@ func (e *Epoch) proposeBlock() {
zap.Int("size", len(rawBlock)),
zap.Stringer("digest", md.Digest))

vote, err := e.voteOnBlock(block)
if err != nil {
return err
}

proposal := &Message{
BlockMessage: &BlockMessage{
Block: block,
Vote: vote,
},
}

if !e.storeProposal(block) {
return errors.New("failed to store block proposed by me")
}

e.Comm.Broadcast(proposal)
e.Logger.Debug("Proposal broadcast",
zap.Uint64("round", md.Round),
zap.Int("size", len(rawBlock)),
zap.Stringer("digest", md.Digest))

e.handleBlockMessage(proposal, e.ID)
return e.handleVoteMessage(&Message{VoteMessage: &vote}, e.ID)
}

func (e *Epoch) Metadata() ProtocolMetadata {
Expand Down Expand Up @@ -824,29 +858,18 @@ func (e *Epoch) startRound() error {
return e.handleBlockMessage(msgsForRound.proposal, leaderForCurrentRound)
}

func (e *Epoch) doProposed() error {
block := e.rounds[e.round].block

vote := Vote{BlockHeader: block.BlockHeader()}
sig, err := vote.Sign(e.Signer)
func (e *Epoch) doProposed(block Block, voteFromLeader SignedVoteMessage) error {
vote, err := e.voteOnBlock(block)
if err != nil {
return fmt.Errorf("failed signing vote %w", err)
}

sv := SignedVoteMessage{
Signature: Signature{
Signer: e.ID,
Value: sig,
},
Vote: vote,
return err
}

md := block.BlockHeader()

// We do not write the vote to the WAL as we have written the block itself to the WAL
// and we can always restore the block and sign it again if needed.
voteMsg := &Message{
VoteMessage: &sv,
VoteMessage: &vote,
}

e.Logger.Debug("Broadcasting vote",
Expand All @@ -855,7 +878,28 @@ func (e *Epoch) doProposed() error {

e.Comm.Broadcast(voteMsg)
// Send yourself a vote message
return e.handleVoteMessage(voteMsg, e.ID)
if err := e.handleVoteMessage(voteMsg, e.ID); err != nil {
return err
}

return e.handleVoteMessage(&Message{VoteMessage: &voteFromLeader}, e.ID)
}

func (e *Epoch) voteOnBlock(block Block) (SignedVoteMessage, error) {
vote := Vote{BlockHeader: block.BlockHeader()}
sig, err := vote.Sign(e.Signer)
if err != nil {
return SignedVoteMessage{}, fmt.Errorf("failed signing vote %w", err)
}

sv := SignedVoteMessage{
Signature: Signature{
Signer: e.ID,
Value: sig,
},
Vote: vote,
}
return sv, nil
}

func (e *Epoch) increaseRound() {
Expand Down
18 changes: 14 additions & 4 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ func TestEpochSimpleFlow(t *testing.T) {
bb := make(testBlockBuilder, 1)
storage := newInMemStorage()

nodes := []NodeID{{1}, {2}, {3}, {4}}
conf := EpochConfig{
Logger: l,
ID: NodeID{1},
Signer: &testSigner{},
WAL: &wal.InMemWAL{},
Verifier: &testVerifier{},
Storage: storage,
Comm: noopComm([]NodeID{{1}, {2}, {3}, {4}}),
Comm: noopComm(nodes),
BlockBuilder: bb,
SignatureAggregator: &testSignatureAggregator{},
}
Expand All @@ -44,8 +45,8 @@ func TestEpochSimpleFlow(t *testing.T) {
require.NoError(t, e.Start())

for i := 0; i < 100; i++ {
leaderID := i%4 + 1
shouldPropose := leaderID == 1
leaderID := nodes[i%4]
shouldPropose := leaderID.Equals(NodeID{1})

if !shouldPropose {
md := e.Metadata()
Expand All @@ -56,11 +57,20 @@ func TestEpochSimpleFlow(t *testing.T) {
block := <-bb

if !shouldPropose {
vote := SignedVoteMessage{
Signature: Signature{
Signer: nodes[i%4],
},
Vote: Vote{
BlockHeader: block.BlockHeader(),
},
}
err := e.HandleMessage(&Message{
BlockMessage: &BlockMessage{
Vote: vote,
Block: block,
},
}, NodeID{byte(leaderID)})
}, leaderID)
require.NoError(t, err)
}

Expand Down
1 change: 1 addition & 0 deletions msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (n *Notarization) Verify() error {

type BlockMessage struct {
Block Block
Vote SignedVoteMessage
}

type SignedMessage struct {
Expand Down

0 comments on commit 34fdbb9

Please sign in to comment.