Skip to content

Commit

Permalink
[Consensus Observer] Add message verification to commit processing
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 11, 2024
1 parent 9f49ce5 commit 7e2100f
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 21 deletions.
4 changes: 4 additions & 0 deletions consensus/src/consensus_observer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
#[error("Invalid message error: {0}")]
InvalidMessageError(String),

#[error("Network error: {0}")]
NetworkError(String),

Expand All @@ -32,6 +35,7 @@ impl Error {
/// Returns a summary label for the error
pub fn get_label(&self) -> &'static str {
match self {
Self::InvalidMessageError(_) => "invalid_message_error",
Self::NetworkError(_) => "network_error",
Self::RpcError(_) => "rpc_error",
Self::SubscriptionDisconnected(_) => "subscription_disconnected",
Expand Down
80 changes: 61 additions & 19 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use aptos_storage_interface::DbReader;
use aptos_time_service::TimeService;
use aptos_types::{
block_info::{BlockInfo, Round},
epoch_change::Verifier,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
on_chain_config::{
Expand All @@ -69,8 +70,8 @@ pub struct ConsensusObserver {
consensus_observer_client:
Arc<ConsensusObserverClient<NetworkClient<ConsensusObserverMessage>>>,

// The current epoch
epoch: u64,
// The current epoch state
epoch_state: Option<Arc<EpochState>>,
// The latest ledger info (updated via a callback)
root: Arc<Mutex<LedgerInfoWithSignatures>>,

Expand Down Expand Up @@ -119,7 +120,7 @@ impl ConsensusObserver {
Self {
consensus_observer_config,
consensus_observer_client,
epoch: root.commit_info().epoch(),
epoch_state: None,
root: Arc::new(Mutex::new(root)),
pending_ordered_blocks: PendingOrderedBlocks::new(),
execution_client,
Expand Down Expand Up @@ -398,6 +399,13 @@ impl ConsensusObserver {
};
}

/// Returns the current epoch state, and panics if it is not set
fn get_epoch_state(&self) -> Arc<EpochState> {
self.epoch_state
.clone()
.expect("The epoch state is not set! This should never happen!")
}

/// Returns the last known block
fn get_last_block(&self) -> BlockInfo {
if let Some(last_pending_block) = self.pending_ordered_blocks.get_last_pending_block() {
Expand Down Expand Up @@ -445,19 +453,36 @@ impl ConsensusObserver {

/// Processes the commit decision
fn process_commit_decision(&mut self, commit_decision: CommitDecision) {
// TODO: verify the commit decision!
// If the commit decision is for the current epoch, verify it
let commit_decision_epoch = commit_decision.epoch();
if commit_decision_epoch == self.get_epoch_state().epoch {
// Verify the commit decision
if let Err(error) = self.verify_commit_decision(&commit_decision) {
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to verify commit decision! Ignoring: {:?}, Error: {:?}",
commit_decision.ledger_info().commit_info(),
error
))
);
return;
}

// Update the pending blocks with the commit decision
if self.process_commit_decision_for_pending_block(&commit_decision) {
return; // The commit decision was successfully processed
// Update the pending blocks with the commit decision
if self.process_commit_decision_for_pending_block(&commit_decision) {
return; // The commit decision was successfully processed
}
}

// Otherwise, check if we need to state sync (i.e., the
// commit decision is for a future epoch or round).
let decision_epoch = commit_decision.ledger_info().commit_info().epoch();
let decision_round = commit_decision.round();
// TODO: identify the best way to handle an invalid commit decision
// for a future epoch. In such cases, we currently rely on state sync.

// Otherwise, we failed to process the commit decision. If the commit
// is for a future epoch or round, we need to state sync.
let commit_decision_round = commit_decision.round();
let last_block = self.get_last_block();
if decision_epoch > last_block.epoch() || decision_round > last_block.round() {
if commit_decision_epoch > last_block.epoch() || commit_decision_round > last_block.round()
{
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Started syncing to {}!",
Expand All @@ -472,8 +497,8 @@ impl ConsensusObserver {
// Start the state sync process
let abort_handle = sync_to_commit_decision(
commit_decision,
decision_epoch,
decision_round,
commit_decision_epoch,
commit_decision_round,
self.execution_client.clone(),
self.sync_notification_sender.clone(),
);
Expand Down Expand Up @@ -696,7 +721,7 @@ impl ConsensusObserver {
}

// If the epoch has changed, end the current epoch and start the new one
if epoch > self.epoch {
if epoch > self.get_epoch_state().epoch {
self.execution_client.end_epoch().await;
self.wait_for_epoch_start().await;
}
Expand Down Expand Up @@ -843,6 +868,19 @@ impl ConsensusObserver {
);
}

/// Verifies the commit decision and returns an error if the decision is invalid
fn verify_commit_decision(&self, commit_decision: &CommitDecision) -> Result<(), Error> {
self.get_epoch_state()
.verify(commit_decision.ledger_info())
.map_err(|error| {
Error::InvalidMessageError(format!(
"Failed to verify the commit decision ledger info: {:?}, Error: {:?}",
commit_decision.ledger_info().commit_info(),
error
))
})
}

/// Waits for a new epoch to start
async fn wait_for_epoch_start(&mut self) {
// Extract the epoch state and on-chain configs
Expand All @@ -856,10 +894,14 @@ impl ConsensusObserver {
panic!("Reconfig events are required to wait for a new epoch to start! Something has gone wrong!")
};

// Update the local epoch
self.epoch = epoch_state.epoch;
info!(LogSchema::new(LogEntry::ConsensusObserver)
.message(&format!("New epoch started: {}", self.epoch)));
// Update the local epoch state
self.epoch_state = Some(epoch_state.clone());
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"New epoch started: {}. Updated the epoch state!",
epoch_state.epoch
))
);

// Create the payload manager
let payload_manager = if consensus_config.quorum_store_enabled() {
Expand Down
2 changes: 0 additions & 2 deletions state-sync/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ impl<
);
self.update_consensus_commit_metrics(&consensus_commit_notification);

// TODO(joshlind): can we get consensus to forward the events?

// Handle the commit notification
let committed_transactions = CommittedTransactions {
events: consensus_commit_notification.subscribable_events.clone(),
Expand Down

0 comments on commit 7e2100f

Please sign in to comment.