Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Randomized leader election based on the VRF #1841

Merged
merged 13 commits into from
Jan 28, 2020
40 changes: 0 additions & 40 deletions core/src/consensus/tendermint/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,43 +284,3 @@ impl TwoThirdsMajority {
}
}
}

#[derive(Debug, PartialEq)]
pub enum Proposal {
ProposalReceived(BlockHash, Bytes, SchnorrSignature),
ProposalImported(BlockHash),
None,
}

impl Proposal {
pub fn new_received(hash: BlockHash, block: Bytes, signature: SchnorrSignature) -> Self {
Proposal::ProposalReceived(hash, block, signature)
}

pub fn new_imported(hash: BlockHash) -> Self {
Proposal::ProposalImported(hash)
}

pub fn block_hash(&self) -> Option<BlockHash> {
match self {
Proposal::ProposalReceived(hash, ..) => Some(*hash),
Proposal::ProposalImported(hash) => Some(*hash),
Proposal::None => None,
}
}

pub fn imported_block_hash(&self) -> Option<BlockHash> {
match self {
Proposal::ProposalReceived(..) => None,
Proposal::ProposalImported(hash) => Some(*hash),
Proposal::None => None,
}
}

pub fn is_none(&self) -> bool {
match self {
Proposal::None => true,
_ => false,
}
}
}
65 changes: 62 additions & 3 deletions core/src/consensus/tendermint/vote_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::iter::Iterator;
use std::collections::{btree_set::Iter, BTreeMap, BTreeSet, HashMap};
use std::iter::{Iterator, Rev};

use ckey::SchnorrSignature;
use ctypes::BlockHash;
use rlp::{Encodable, RlpStream};

use super::super::PriorityInfo;
use super::stake::Action;
use super::{ConsensusMessage, SortitionRound, Step, VoteStep};
use super::{ConsensusMessage, ProposalSummary, SortitionRound, Step, VoteStep};
use crate::consensus::BitSet;

/// Storing all Proposals, Prevotes and Precommits.
Expand Down Expand Up @@ -130,6 +130,14 @@ impl PriorityCollector {
fn insert(&mut self, info: PriorityInfo) -> bool {
self.priorities.insert(info)
}

fn get_highest(&self) -> Option<PriorityInfo> {
self.priorities.iter().rev().next().cloned()
}

fn iter_from_highest(&self) -> Rev<Iter<'_, PriorityInfo>> {
self.priorities.iter().rev()
}
}

impl MessageCollector {
Expand Down Expand Up @@ -317,3 +325,54 @@ impl VoteCollector {
.unwrap_or_default()
}
}

impl VoteCollector {
pub fn collect_priority(&mut self, sortition_round: SortitionRound, info: PriorityInfo) -> bool {
self.votes.entry(sortition_round.into()).or_insert_with(StepCollector::new_pp).insert_priority(info)
}

pub fn get_highest_priority_info(&self, sortition_round: SortitionRound) -> Option<PriorityInfo> {
self.votes
.get(&sortition_round.into())
.and_then(|step_collector| step_collector.priority_collector().get_highest())
}

pub fn get_highest_proposal_hash(&self, sortition_round: SortitionRound) -> Option<BlockHash> {
self.votes.get(&sortition_round.into()).and_then(|step_collector| {
let highest_priority_idx =
step_collector.priority_collector().get_highest().map(|priority_info| priority_info.signer_idx())?;
step_collector
.message_collector()
.fetch_by_idx(highest_priority_idx)
.and_then(|priority_message| priority_message.block_hash())
})
}

pub fn get_highest_proposal_summary(&self, sortition_round: SortitionRound) -> Option<ProposalSummary> {
let block_hash = self.get_highest_proposal_hash(sortition_round)?;
let priority_info = self.get_highest_priority_info(sortition_round)?;
Some(ProposalSummary {
priority_info,
block_hash,
})
}

pub fn block_hashes_from_highest(&self, sortition_round: SortitionRound) -> Vec<BlockHash> {
match self.votes.get(&sortition_round.into()) {
Some(step_collector) => {
let message_collector = step_collector.message_collector();
let priority_iter_from_highest = step_collector.priority_collector().iter_from_highest();
priority_iter_from_highest
.map(|priority_info| {
message_collector
.fetch_by_idx(priority_info.signer_idx())
.expect("Signer index was verified")
.block_hash()
.expect("Proposal vote always have BlockHash")
})
.collect()
}
None => vec![],
}
}
}
109 changes: 50 additions & 59 deletions core/src/consensus/tendermint/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::message::*;
use super::network;
use super::params::TimeGapParams;
use super::stake::CUSTOM_ACTION_HANDLER_ID;
use super::types::{Height, Proposal, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View};
use super::types::{Height, Step, TendermintSealView, TendermintState, TwoThirdsMajority, View};
use super::vote_collector::{DoubleVote, VoteCollector};
use super::vote_regression_checker::VoteRegressionChecker;
use super::{ENGINE_TIMEOUT_BROADCAST_STEP_STATE, ENGINE_TIMEOUT_TOKEN_NONCE_BASE, SEAL_FIELDS};
Expand All @@ -52,6 +52,7 @@ use crate::consensus::{
use crate::encoded;
use crate::error::{BlockError, Error};
use crate::transaction::{SignedTransaction, UnverifiedTransaction};
use crate::types::BlockStatus;
use crate::views::BlockView;
use crate::BlockId;
use std::cell::Cell;
Expand Down Expand Up @@ -84,8 +85,6 @@ struct Worker {
signer: EngineSigner,
/// Last majority
last_two_thirds_majority: TwoThirdsMajority,
/// hash of the proposed block, used for seal submission.
proposal: Proposal,
/// The finalized view of the previous height's block.
/// The signatures for the previous block is signed for the view below.
finalized_view_of_previous_block: View,
Expand Down Expand Up @@ -192,7 +191,6 @@ impl Worker {
votes: Default::default(),
signer: Default::default(),
last_two_thirds_majority: TwoThirdsMajority::Empty,
proposal: Proposal::None,
finalized_view_of_previous_block: 0,
finalized_view_of_current_block: None,
validators,
Expand Down Expand Up @@ -637,7 +635,6 @@ impl Worker {
fn increment_view(&mut self, n: View) {
cinfo!(ENGINE, "increment_view: New view.");
self.view += n;
self.proposal = Proposal::None;
self.votes_received = MutTrigger::new(BitSet::new());
}

Expand All @@ -654,7 +651,6 @@ impl Worker {
self.last_two_thirds_majority = TwoThirdsMajority::Empty;
self.height += 1;
self.view = 0;
self.proposal = Proposal::None;
self.votes_received = MutTrigger::new(BitSet::new());
self.finalized_view_of_previous_block =
self.finalized_view_of_current_block.expect("self.step == Step::Commit");
Expand All @@ -672,7 +668,6 @@ impl Worker {
self.last_two_thirds_majority = TwoThirdsMajority::Empty;
self.height = height;
self.view = 0;
self.proposal = Proposal::None;
self.votes_received = MutTrigger::new(BitSet::new());
self.finalized_view_of_previous_block = finalized_view_of_previous_height;
self.finalized_view_of_current_block = None;
Expand Down Expand Up @@ -708,7 +703,7 @@ impl Worker {
// need to reset vote
self.broadcast_state(
vote_step,
self.proposal.block_hash(),
self.votes.get_highest_proposal_summary(self.current_sortition_round()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a concern about this line. The concern is how to handle the highest proposal block if the block was invalid. There are many strategies to make a better situation.

However, we can think of a situation like this. A proposer is elected by the VRF. The proposer chooses not to create a block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If validators are elected as eligible proposers but they send invalid proposal block then all validators may agree on Nil. I think this is natural behavior.

self.last_two_thirds_majority.view(),
self.votes_received.borrow_anyway(),
);
Expand All @@ -723,9 +718,6 @@ impl Worker {
// Wait for verification.
return
}
self.proposal = Proposal::new_imported(*hash);
self.move_to_step(TendermintState::Prevote, is_restoring);
return
}
let parent_block_hash = self.prev_block_hash();
if !self.is_signer_proposer(&parent_block_hash) {
Expand Down Expand Up @@ -754,8 +746,8 @@ impl Worker {
self.request_messages_to_all(vote_step, &BitSet::all_set() - &self.votes_received);
if !self.already_generated_message() {
let block_hash_candidate = match &self.last_two_thirds_majority {
TwoThirdsMajority::Empty => self.proposal.imported_block_hash(),
TwoThirdsMajority::Unlock(_) => self.proposal.imported_block_hash(),
TwoThirdsMajority::Empty => self.highest_imported_block_hash(),
TwoThirdsMajority::Unlock(_) => self.highest_imported_block_hash(),
TwoThirdsMajority::Lock(_, block_hash) => Some(*block_hash),
};
let block_hash = block_hash_candidate.filter(|hash| {
Expand Down Expand Up @@ -826,6 +818,21 @@ impl Worker {
}
}

fn highest_imported_block_hash(&self) -> Option<BlockHash> {
match self.step {
TendermintState::Prevote => {
self.votes.block_hashes_from_highest(self.current_sortition_round()).into_iter().find(|block_hash| {
if let BlockStatus::InChain = self.client().block_status(&(*block_hash).into()) {
true
} else {
false
}
})
}
_ => panic!(),
}
}

fn is_generation_time_relevant(&self, block_header: &Header) -> bool {
let acceptable_past_gap = self.time_gap_params.allowed_past_gap;
let acceptable_future_gap = self.time_gap_params.allowed_future_gap;
Expand Down Expand Up @@ -988,7 +995,6 @@ impl Worker {
let current_vote_step = VoteStep::new(self.height, self.view, self.step.to_step());
let proposal_is_for_current = self.votes.has_votes_for(&current_vote_step, proposal.hash());
if proposal_is_for_current {
self.proposal = Proposal::new_imported(proposal.hash());
let current_step = self.step.clone();
match current_step {
TendermintState::Propose => {
Expand All @@ -1008,19 +1014,7 @@ impl Worker {
TendermintSealView::new(proposal.seal()).parent_block_finalized_view().unwrap();

self.jump_to_height(height, finalized_view_of_previous_height);

let proposal_is_for_view0 = self.votes.has_votes_for(
&VoteStep {
height,
view: 0,
step: Step::Propose,
},
proposal.hash(),
);
if proposal_is_for_view0 {
self.proposal = Proposal::new_imported(proposal.hash())
}
self.move_to_step(TendermintState::Prevote, false);
self.move_to_step(TendermintState::new_propose_step(), false);
}
}

Expand Down Expand Up @@ -1054,12 +1048,6 @@ impl Worker {
self.finalized_view_of_previous_block = backup.finalized_view_of_previous_block;
self.finalized_view_of_current_block = backup.finalized_view_of_current_block;

if let Some(proposal) = backup.proposal {
if client.block(&BlockId::Hash(proposal)).is_some() {
self.proposal = Proposal::ProposalImported(proposal);
}
}

for vote in backup.votes {
let bytes = rlp::encode(&vote);
if let Err(err) = self.handle_message(&bytes, true) {
Expand All @@ -1086,7 +1074,6 @@ impl Worker {
return Seal::None
}

assert_eq!(Proposal::None, self.proposal);
assert_eq!(height, self.height);

let view = self.view;
Expand Down Expand Up @@ -1274,7 +1261,7 @@ impl Worker {
if let Some(votes_received) = self.votes_received.borrow_if_mutated() {
self.broadcast_state(
self.vote_step(),
self.proposal.block_hash(),
self.votes.get_highest_proposal_summary(self.current_sortition_round()),
self.last_two_thirds_majority.view(),
votes_received,
);
Expand Down Expand Up @@ -1490,7 +1477,7 @@ impl Worker {
fn repropose_block(&mut self, block: encoded::Block) {
let header = block.decode_header();
self.vote_on_header_for_proposal(&header).expect("I am proposer");
self.proposal = Proposal::new_imported(header.hash());
debug_assert_eq!(self.client().block_status(&header.hash().into()), BlockStatus::InChain);
self.broadcast_proposal_block(self.view, block);
}

Expand Down Expand Up @@ -1799,13 +1786,21 @@ impl Worker {
proposed_view,
author_view
);
self.proposal = Proposal::new_imported(header_view.hash());
} else {
self.proposal = Proposal::new_received(header_view.hash(), bytes.clone(), signature);
} else if Some(priority_info.priority())
>= self
.votes
.get_highest_priority_info(self.current_sortition_round())
.map(|priority_info| priority_info.priority())
{
cdebug!(
ENGINE,
"Received a proposal with the priority {}. Replace the highest proposal",
priority_info.priority()
);
}
self.broadcast_state(
VoteStep::new(self.height, self.view, self.step.to_step()),
self.proposal.block_hash(),
self.votes.get_highest_proposal_summary(self.current_sortition_round()),
self.last_two_thirds_majority.view(),
self.votes_received.borrow_anyway(),
);
Expand Down Expand Up @@ -1862,9 +1857,12 @@ impl Worker {
|| self.view < peer_vote_step.view
|| self.height < peer_vote_step.height;

let need_proposal = self.need_proposal();
if need_proposal && peer_has_proposal {
self.send_request_proposal(token, self.height, self.view, &result);
let is_not_commit_step = !self.step.is_commit();
let peer_has_higher =
self.votes.get_highest_priority(peer_vote_step.into()) < peer_proposal.map(|summary| summary.priority());

if is_not_commit_step && peer_has_proposal && peer_has_higher {
self.send_request_proposal(token, self.current_sortition_round(), &result);
}

let current_step = current_vote_step.step;
Expand Down Expand Up @@ -1945,20 +1943,15 @@ impl Worker {
return
}

if request_height == self.height && request_view > self.view {
return
}

if let Some((signature, _signer_index, block)) = self.first_proposal_at(request_height, request_view) {
ctrace!(ENGINE, "Send proposal {}-{} to {:?}", request_height, request_view, token);
self.send_proposal_block(signature, request_view, block, result);
return
}

if request_height == self.height && request_view == self.view {
if let Proposal::ProposalReceived(_hash, block, signature) = &self.proposal {
self.send_proposal_block(*signature, request_view, block.clone(), result);
}
if let Some((signature, highest_priority_info, block)) = self.highest_proposal_at(requested_round) {
ctrace!(
ENGINE,
"Send proposal of priority {:?} in a round {:?} to {:?}",
highest_priority_info.priority(),
requested_round,
token
);
self.send_proposal_block(signature, highest_priority_info, requested_round.view, block, result);
}
}

Expand Down Expand Up @@ -2155,8 +2148,6 @@ impl Worker {
}
}

// Since we don't have proposal vote, set proposal = None
self.proposal = Proposal::None;
self.view = commit_view;
self.votes_received = MutTrigger::new(vote_bitset);
self.last_two_thirds_majority = TwoThirdsMajority::Empty;
Expand Down