Skip to content

Commit

Permalink
Merge branch 'main' into rm/libp2p-dht-persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron authored Mar 6, 2025
2 parents 0880d93 + 1d535da commit 3316767
Show file tree
Hide file tree
Showing 21 changed files with 805 additions and 460 deletions.
456 changes: 236 additions & 220 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,13 @@ where
// Similarly, we can initialize the payload table with a null payload, which can help us
// distinguish between blocks that haven't been produced yet and blocks we haven't received
// yet when answering queries.
self.upsert("payload", ["height"], ["height"], [(height as i64,)])
.await?;
// We don't overwrite the payload if it already exists.
// During epoch transition in PoS, the same height block is sent multiple times.
// The first block may have the payload, but subsequent blocks might be missing it.
// Overwriting would cause the payload to be lost since the block height is the same
let query = query("INSERT INTO payload (height) VALUES ($1) ON CONFLICT DO NOTHING")
.bind(height as i64);
query.execute(self.as_mut()).await?;

// Finally, we insert the leaf itself, which references the header row we created.
// Serialize the full leaf and QC to JSON for easy storage.
Expand Down
40 changes: 8 additions & 32 deletions hotshot-task-impls/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use either::Either;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
Expand All @@ -30,8 +29,11 @@ use tracing::instrument;
use self::handlers::{
handle_quorum_vote_recv, handle_timeout, handle_timeout_vote_recv, handle_view_change,
};
use crate::helpers::{validate_qc_and_next_epoch_qc, wait_for_next_epoch_qc};
use crate::{events::HotShotEvent, helpers::broadcast_event, vote_collection::VoteCollectorsMap};
use crate::{
events::HotShotEvent,
helpers::{broadcast_event, validate_qc_and_next_epoch_qc},
vote_collection::VoteCollectorsMap,
};

/// Event handlers for use in the `handle` method.
mod handlers;
Expand Down Expand Up @@ -140,40 +142,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ConsensusTaskSt
tracing::debug!("Failed to handle Timeout event; error = {e}");
}
}
HotShotEvent::Qc2Formed(Either::Left(quorum_cert)) => {
let cert_view = quorum_cert.view_number();
if !self.upgrade_lock.epochs_enabled(cert_view).await {
tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
return Ok(());
}
if !self
.consensus
.read()
.await
.is_leaf_extended(quorum_cert.data.leaf_commit)
{
tracing::debug!("We formed QC but not eQC. Do nothing");
return Ok(());
}
if wait_for_next_epoch_qc(
quorum_cert,
&self.consensus,
self.timeout,
self.view_start_time,
&receiver,
)
.await
.is_none()
{
tracing::warn!("We formed eQC but we don't have corresponding next epoch eQC.");
return Ok(());
}
HotShotEvent::ExtendedQc2Formed(eqc) => {
let cert_view = eqc.view_number();
let cert_block_number = self
.consensus
.read()
.await
.saved_leaves()
.get(&quorum_cert.data.leaf_commit)
.get(&eqc.data.leaf_commit)
.context(error!(
"Could not find the leaf for the eQC. It shouldn't happen."
))?
Expand Down
6 changes: 6 additions & 0 deletions hotshot-task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub enum HotShotEvent<TYPES: NodeType> {
Qc2Formed(Either<QuorumCertificate2<TYPES>, TimeoutCertificate2<TYPES>>),
/// The next leader has collected enough votes from the next epoch nodes to form a QC; emitted by the next leader in the consensus task; an internal event only
NextEpochQc2Formed(Either<NextEpochQuorumCertificate2<TYPES>, TimeoutCertificate<TYPES>>),
/// A validator formed both a current epoch eQC and a next epoch eQC
ExtendedQc2Formed(QuorumCertificate2<TYPES>),
/// The DA leader has collected enough votes to form a DAC; emitted by the DA leader in the DA task; sent to the entire network via the networking task
DacSend(DaCertificate2<TYPES>, TYPES::SignatureKey),
/// The current view has changed; emitted by the replica in the consensus task or replica in the view sync task; received by almost all other tasks
Expand Down Expand Up @@ -312,6 +314,7 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
either::Left(qc) => Some(qc.view_number()),
either::Right(tc) => Some(tc.view_number()),
},
HotShotEvent::ExtendedQc2Formed(cert) => Some(cert.view_number()),
HotShotEvent::ViewSyncCommitVoteSend(vote)
| HotShotEvent::ViewSyncCommitVoteRecv(vote) => Some(vote.view_number()),
HotShotEvent::ViewSyncPreCommitVoteRecv(vote)
Expand Down Expand Up @@ -447,6 +450,9 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
write!(f, "NextEpochQc2Formed(view_number={:?})", tc.view_number())
}
},
HotShotEvent::ExtendedQc2Formed(cert) => {
write!(f, "ExtendedQc2Formed(view_number={:?})", cert.view_number())
}
HotShotEvent::DacSend(cert, _) => {
write!(f, "DacSend(view_number={:?})", cert.view_number())
}
Expand Down
65 changes: 57 additions & 8 deletions hotshot-task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@ use std::{
time::{Duration, Instant},
};

use crate::{
events::HotShotEvent,
helpers::{broadcast_event, parent_leaf_and_state, wait_for_next_epoch_qc},
quorum_proposal::{QuorumProposalTaskState, UpgradeLock, Versions},
};
use anyhow::{ensure, Context, Result};
use async_broadcast::{Receiver, Sender};
use async_lock::RwLock;
use committable::Committable;
use committable::{Commitment, Committable};
use hotshot_task::dependency_task::HandleDepOutput;
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
message::Proposal,
simple_certificate::{QuorumCertificate2, UpgradeCertificate},
traits::{
block_contents::BlockHeader, election::Membership, node_implementation::NodeType,
block_contents::BlockHeader,
election::Membership,
node_implementation::{NodeImplementation, NodeType},
signature_key::SignatureKey,
},
utils::{is_last_block_in_epoch, option_epoch_from_block_number},
Expand All @@ -35,12 +42,6 @@ use hotshot_utils::anytrace::*;
use tracing::instrument;
use vbs::version::StaticVersionType;

use crate::{
events::HotShotEvent,
helpers::{broadcast_event, parent_leaf_and_state, wait_for_next_epoch_qc},
quorum_proposal::{UpgradeLock, Versions},
};

/// Proposal dependency types. These types represent events that precipitate a proposal.
#[derive(PartialEq, Debug)]
pub(crate) enum ProposalDependency {
Expand Down Expand Up @@ -500,3 +501,51 @@ impl<TYPES: NodeType, V: Versions> HandleDepOutput for ProposalDependencyHandle<
}
}
}

pub(super) async fn handle_eqc_formed<
TYPES: NodeType,
I: NodeImplementation<TYPES>,
V: Versions,
>(
cert_view: TYPES::View,
leaf_commit: Commitment<Leaf2<TYPES>>,
task_state: &QuorumProposalTaskState<TYPES, I, V>,
event_sender: &Sender<Arc<HotShotEvent<TYPES>>>,
) {
if !task_state.upgrade_lock.epochs_enabled(cert_view).await {
tracing::debug!("QC2 formed but epochs not enabled. Do nothing");
return;
}
if !task_state
.consensus
.read()
.await
.is_leaf_extended(leaf_commit)
{
tracing::debug!("We formed QC but not eQC. Do nothing");
return;
}

let consensus_reader = task_state.consensus.read().await;
let current_epoch_qc = consensus_reader.high_qc();
let Some(next_epoch_qc) = consensus_reader.next_epoch_high_qc() else {
tracing::debug!("We formed the eQC but we don't have the next epoch eQC at all.");
return;
};
if current_epoch_qc.view_number() != next_epoch_qc.view_number()
|| current_epoch_qc.data != *next_epoch_qc.data
{
tracing::debug!(
"We formed the eQC but the current and next epoch QCs do not correspond to each other."
);
return;
}
let current_epoch_qc_clone = current_epoch_qc.clone();
drop(consensus_reader);

broadcast_event(
Arc::new(HotShotEvent::ExtendedQc2Formed(current_epoch_qc_clone)),
event_sender,
)
.await;
}
13 changes: 13 additions & 0 deletions hotshot-task-impls/src/quorum_proposal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use tracing::instrument;

use self::handlers::{ProposalDependency, ProposalDependencyHandle};
use crate::events::HotShotEvent;
use crate::quorum_proposal::handlers::handle_eqc_formed;

mod handlers;

Expand Down Expand Up @@ -437,6 +438,10 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
.await
.wrap()
.context(error!("Failed to update high QC in storage!"))?;

handle_eqc_formed(qc.view_number(), qc.data.leaf_commit, self, &event_sender)
.await;

let view_number = qc.view_number() + 1;
self.create_dependency_task_if_new(
view_number,
Expand Down Expand Up @@ -598,6 +603,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>
.await
.wrap()
.context(error!("Failed to update next epoch high QC in storage!"))?;

handle_eqc_formed(
next_epoch_qc.view_number(),
next_epoch_qc.data.leaf_commit,
self,
&event_sender,
)
.await;
}
_ => {}
}
Expand Down
96 changes: 48 additions & 48 deletions hotshot-testing/tests/tests_6/test_epochs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,54 +491,54 @@ cross_tests!(
}
);

cross_tests!(
TestName: test_all_restart_epochs,
Impls: [CombinedImpl, PushCdnImpl],
Types: [TestTypes, TestTypesRandomizedLeader, TestTwoStakeTablesTypes],
Versions: [EpochsTestVersions],
Ignore: false,
Metadata: {
let timing_data = TimingData {
next_view_timeout: 2000,
..Default::default()
};
let mut metadata = TestDescription::default().set_num_nodes(20,20);
let mut catchup_nodes = vec![];

for i in 0..20 {
catchup_nodes.push(ChangeNode {
idx: i,
updown: NodeAction::RestartDown(0),
})
}

metadata.timing_data = timing_data;

metadata.spinning_properties = SpinningTaskDescription {
// Restart all the nodes in view 10
node_changes: vec![(10, catchup_nodes)],
};
metadata.view_sync_properties =
hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20);

metadata.completion_task_description =
CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
TimeBasedCompletionTaskDescription {
duration: Duration::from_secs(60),
},
);
metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
// Make sure we keep committing rounds after the catchup, but not the full 50.
num_successful_views: 22,
expected_view_failures: vec![10],
possible_view_failures: vec![9, 11],
decide_timeout: Duration::from_secs(20),
..Default::default()
};

metadata
},
);
// cross_tests!(
// TestName: test_all_restart_epochs,
// Impls: [CombinedImpl, PushCdnImpl],
// Types: [TestTypes, TestTypesRandomizedLeader, TestTwoStakeTablesTypes],
// Versions: [EpochsTestVersions],
// Ignore: false,
// Metadata: {
// let timing_data = TimingData {
// next_view_timeout: 2000,
// ..Default::default()
// };
// let mut metadata = TestDescription::default().set_num_nodes(20,20);
// let mut catchup_nodes = vec![];
//
// for i in 0..20 {
// catchup_nodes.push(ChangeNode {
// idx: i,
// updown: NodeAction::RestartDown(0),
// })
// }
//
// metadata.timing_data = timing_data;
//
// metadata.spinning_properties = SpinningTaskDescription {
// // Restart all the nodes in view 10
// node_changes: vec![(10, catchup_nodes)],
// };
// metadata.view_sync_properties =
// hotshot_testing::view_sync_task::ViewSyncTaskDescription::Threshold(0, 20);
//
// metadata.completion_task_description =
// CompletionTaskDescription::TimeBasedCompletionTaskBuilder(
// TimeBasedCompletionTaskDescription {
// duration: Duration::from_secs(60),
// },
// );
// metadata.overall_safety_properties = OverallSafetyPropertiesDescription {
// // Make sure we keep committing rounds after the catchup, but not the full 50.
// num_successful_views: 22,
// expected_view_failures: vec![10],
// possible_view_failures: vec![9, 11],
// decide_timeout: Duration::from_secs(20),
// ..Default::default()
// };
//
// metadata
// },
// );

cross_tests!(
TestName: test_all_restart_one_da_with_epochs,
Expand Down
1 change: 0 additions & 1 deletion request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ async-trait = { workspace = true }
bincode = { workspace = true }
blake3 = { workspace = true }
byteorder = { version = "1", default-features = false }
derive_builder = { workspace = true }
derive_more = { workspace = true }
hotshot-types = { workspace = true }
parking_lot = { workspace = true }
Expand Down
Loading

0 comments on commit 3316767

Please sign in to comment.