diff --git a/epoch.go b/epoch.go index 8300dd7..add9d23 100644 --- a/epoch.go +++ b/epoch.go @@ -6,6 +6,7 @@ package simplex import ( "bytes" "context" + "errors" "fmt" "simplex/record" "time" @@ -284,16 +285,10 @@ func (e *Epoch) handleFinalizationMessage(message *Message, from NodeID) error { return e.maybeCollectFinalizationCertificate(round) } -func (e *Epoch) handleVoteMessage(message *Message, from NodeID) error { +func (e *Epoch) handleVoteMessage(message *Message, _ NodeID) error { msg := message.VoteMessage vote := msg.Vote - // Only process point to point votes - if !from.Equals(msg.Signature.Signer) { - e.Logger.Debug("Received a vote signed by a different party than sent it", zap.Stringer("signer", msg.Signature.Signer), zap.Stringer("sender", from)) - return nil - } - // TODO: what if we've received a vote for a round we didn't instantiate yet? round, exists := e.rounds[vote.Round] if !exists { @@ -310,12 +305,16 @@ func (e *Epoch) handleVoteMessage(message *Message, from NodeID) error { return nil } - if err := vote.Verify(msg.Signature.Value, e.Verifier, from); err != nil { - e.Logger.Debug("Vote verification failed", zap.Stringer("NodeID", msg.Signature.Signer), zap.Error(err)) - return nil + // Only verify the vote if we haven't verified it in the past. + signature := msg.Signature + if _, exists := round.votes[string(signature.Signer)]; !exists { + if err := vote.Verify(signature.Value, e.Verifier, signature.Signer); err != nil { + e.Logger.Debug("Vote verification failed", zap.Stringer("NodeID", signature.Signer), zap.Error(err)) + return nil + } } - e.rounds[vote.Round].votes[string(from)] = msg + e.rounds[vote.Round].votes[string(signature.Signer)] = msg return e.maybeCollectNotarization() } @@ -594,15 +593,24 @@ func (e *Epoch) hasSomeNodeSignedTwice(nodeIDs []NodeID) bool { return false } -func (e *Epoch) handleBlockMessage(message *Message, from NodeID) error { +func (e *Epoch) handleBlockMessage(message *Message, _ NodeID) error { block := message.BlockMessage.Block if block == nil { e.Logger.Debug("Got empty block in a BlockMessage") return nil } + vote := message.BlockMessage.Vote + from := vote.Signature.Signer + md := block.BlockHeader() + // Ignore block messages sent by us + if e.ID.Equals(from) || e.ID.Equals(from) { + e.Logger.Debug("Got a BlockMessage from ourselves or created by us") + return nil + } + // Check that the node is a leader for the round corresponding to the block. if !leaderForRound(e.nodes, md.Round).Equals(from) { // The block is associated with a round in which the sender is not the leader, @@ -629,26 +637,42 @@ func (e *Epoch) handleBlockMessage(message *Message, from NodeID) error { return nil } + // Ensure the block was voted on by its block producer: + + // 1) Verify block digest corresponds to the digest voted on + if !bytes.Equal(vote.Vote.Digest[:], md.Digest[:]) { + e.Logger.Debug("Vote digest mismatches block digest", zap.Stringer("voteDigest", vote.Vote.Digest), + zap.Stringer("blockDigest", md.Digest)) + return nil + } + // 2) Verify the vote is properly signed + if err := vote.Vote.Verify(vote.Signature.Value, e.Verifier, vote.Signature.Signer); err != nil { + e.Logger.Debug("Vote verification failed", zap.Stringer("NodeID", vote.Signature.Signer), zap.Error(err)) + return nil + } + if !e.storeProposal(block) { e.Logger.Warn("Unable to store proposed block for the round", zap.Stringer("NodeID", from), zap.Uint64("round", md.Round)) // TODO: timeout } - // If this is a block we have proposed, don't write it to the WAL - // because we have done so right before sending it. - // Also, don't bother verifying it. - // Else, it's a block that we have received from the leader of this round. - // So verify it and store it in the WAL. - if !e.ID.Equals(from) { - if err := block.Verify(); err != nil { - e.Logger.Debug("Failed verifying block", zap.Error(err)) - return nil - } - record := blockRecord(md, block.Bytes()) - e.WAL.Append(record) + // Once we have stored the proposal, we have a Round object for the round. + // We store the vote to prevent verifying its signature again. + round, exists := e.rounds[md.Round] + if !exists { + // This shouldn't happen, but in case it does, return an error + return fmt.Errorf("programming error: round %d not found", md.Round) } + round.votes[string(vote.Signature.Signer)] = &vote + + if err := block.Verify(); err != nil { + e.Logger.Debug("Failed verifying block", zap.Error(err)) + return nil + } + record := blockRecord(md, block.Bytes()) + e.WAL.Append(record) - return e.doProposed() + return e.doProposed(block, vote) } func (e *Epoch) isMetadataValid(block Block) bool { @@ -753,10 +777,10 @@ func (e *Epoch) locateBlock(seq uint64, digest []byte) (Block, bool) { return nil, false } -func (e *Epoch) proposeBlock() { +func (e *Epoch) proposeBlock() error { block, ok := e.BlockBuilder.BuildBlock(e.finishCtx, e.Metadata()) if !ok { - return + return errors.New("failed to build block") } md := block.BlockHeader() @@ -772,19 +796,29 @@ func (e *Epoch) proposeBlock() { zap.Int("size", len(rawBlock)), zap.Stringer("digest", md.Digest)) + vote, err := e.voteOnBlock(block) + if err != nil { + return err + } + proposal := &Message{ BlockMessage: &BlockMessage{ Block: block, + Vote: vote, }, } + if !e.storeProposal(block) { + return errors.New("failed to store block proposed by me") + } + e.Comm.Broadcast(proposal) e.Logger.Debug("Proposal broadcast", zap.Uint64("round", md.Round), zap.Int("size", len(rawBlock)), zap.Stringer("digest", md.Digest)) - e.handleBlockMessage(proposal, e.ID) + return e.handleVoteMessage(&Message{VoteMessage: &vote}, e.ID) } func (e *Epoch) Metadata() ProtocolMetadata { @@ -824,21 +858,10 @@ func (e *Epoch) startRound() error { return e.handleBlockMessage(msgsForRound.proposal, leaderForCurrentRound) } -func (e *Epoch) doProposed() error { - block := e.rounds[e.round].block - - vote := Vote{BlockHeader: block.BlockHeader()} - sig, err := vote.Sign(e.Signer) +func (e *Epoch) doProposed(block Block, voteFromLeader SignedVoteMessage) error { + vote, err := e.voteOnBlock(block) if err != nil { - return fmt.Errorf("failed signing vote %w", err) - } - - sv := SignedVoteMessage{ - Signature: Signature{ - Signer: e.ID, - Value: sig, - }, - Vote: vote, + return err } md := block.BlockHeader() @@ -846,7 +869,7 @@ func (e *Epoch) doProposed() error { // We do not write the vote to the WAL as we have written the block itself to the WAL // and we can always restore the block and sign it again if needed. voteMsg := &Message{ - VoteMessage: &sv, + VoteMessage: &vote, } e.Logger.Debug("Broadcasting vote", @@ -855,7 +878,28 @@ func (e *Epoch) doProposed() error { e.Comm.Broadcast(voteMsg) // Send yourself a vote message - return e.handleVoteMessage(voteMsg, e.ID) + if err := e.handleVoteMessage(voteMsg, e.ID); err != nil { + return err + } + + return e.handleVoteMessage(&Message{VoteMessage: &voteFromLeader}, e.ID) +} + +func (e *Epoch) voteOnBlock(block Block) (SignedVoteMessage, error) { + vote := Vote{BlockHeader: block.BlockHeader()} + sig, err := vote.Sign(e.Signer) + if err != nil { + return SignedVoteMessage{}, fmt.Errorf("failed signing vote %w", err) + } + + sv := SignedVoteMessage{ + Signature: Signature{ + Signer: e.ID, + Value: sig, + }, + Vote: vote, + } + return sv, nil } func (e *Epoch) increaseRound() { diff --git a/epoch_test.go b/epoch_test.go index a972ccd..7703f7b 100644 --- a/epoch_test.go +++ b/epoch_test.go @@ -26,6 +26,7 @@ func TestEpochSimpleFlow(t *testing.T) { bb := make(testBlockBuilder, 1) storage := newInMemStorage() + nodes := []NodeID{{1}, {2}, {3}, {4}} conf := EpochConfig{ Logger: l, ID: NodeID{1}, @@ -33,7 +34,7 @@ func TestEpochSimpleFlow(t *testing.T) { WAL: &wal.InMemWAL{}, Verifier: &testVerifier{}, Storage: storage, - Comm: noopComm([]NodeID{{1}, {2}, {3}, {4}}), + Comm: noopComm(nodes), BlockBuilder: bb, SignatureAggregator: &testSignatureAggregator{}, } @@ -44,8 +45,8 @@ func TestEpochSimpleFlow(t *testing.T) { require.NoError(t, e.Start()) for i := 0; i < 100; i++ { - leaderID := i%4 + 1 - shouldPropose := leaderID == 1 + leaderID := nodes[i%4] + shouldPropose := leaderID.Equals(NodeID{1}) if !shouldPropose { md := e.Metadata() @@ -56,11 +57,20 @@ func TestEpochSimpleFlow(t *testing.T) { block := <-bb if !shouldPropose { + vote := SignedVoteMessage{ + Signature: Signature{ + Signer: nodes[i%4], + }, + Vote: Vote{ + BlockHeader: block.BlockHeader(), + }, + } err := e.HandleMessage(&Message{ BlockMessage: &BlockMessage{ + Vote: vote, Block: block, }, - }, NodeID{byte(leaderID)}) + }, leaderID) require.NoError(t, err) } diff --git a/msg.go b/msg.go index c665e5a..7e59148 100644 --- a/msg.go +++ b/msg.go @@ -109,6 +109,7 @@ func (n *Notarization) Verify() error { type BlockMessage struct { Block Block + Vote SignedVoteMessage } type SignedMessage struct {