Skip to content

Commit

Permalink
Merge pull request #2 from subspace/concurrent-verification-backport
Browse files Browse the repository at this point in the history
  • Loading branch information
vedhavyas authored Sep 27, 2023
2 parents a20ecc4 + f62d9cc commit c90d6ed
Show file tree
Hide file tree
Showing 26 changed files with 484 additions and 203 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
async-trait = "0.1.73"
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
futures = "0.3.28"
parking_lot = "0.12.1"
tracing = "0.1.37"
schnellru = "0.2.1"

Expand Down
13 changes: 7 additions & 6 deletions cumulus/client/consensus/aura/src/equivocation_import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
/// should be thrown out and which ones should be kept.
use codec::Codec;
use cumulus_client_consensus_common::ParachainBlockImportMarker;
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};

use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams, ForkChoiceStrategy,
BlockImport, BlockImportParams, ForkChoiceStrategy, SharedBlockImport,
};
use sc_consensus_aura::standalone as aura_internal;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
Expand Down Expand Up @@ -71,7 +72,7 @@ struct Verifier<P, Client, Block, CIDP> {
client: Arc<Client>,
create_inherent_data_providers: CIDP,
slot_duration: SlotDuration,
defender: NaiveEquivocationDefender,
defender: Mutex<NaiveEquivocationDefender>,
telemetry: Option<TelemetryHandle>,
_phantom: std::marker::PhantomData<fn() -> (Block, P)>,
}
Expand All @@ -89,7 +90,7 @@ where
CIDP: CreateInherentDataProviders<Block, ()>,
{
async fn verify(
&mut self,
&self,
mut block_params: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
// Skip checks that include execution, if being told so, or when importing only state.
Expand Down Expand Up @@ -132,7 +133,7 @@ where
block_params.post_hash = Some(post_hash);

// Check for and reject egregious amounts of equivocations.
if self.defender.insert_and_check(slot) {
if self.defender.lock().insert_and_check(slot) {
return Err(format!(
"Rejecting block {:?} due to excessive equivocations at slot",
post_hash,
Expand Down Expand Up @@ -239,11 +240,11 @@ where
let verifier = Verifier::<P, _, _, _> {
client,
create_inherent_data_providers,
defender: NaiveEquivocationDefender::default(),
defender: Mutex::new(NaiveEquivocationDefender::default()),
slot_duration,
telemetry,
_phantom: std::marker::PhantomData,
};

BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)
BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry)
}
5 changes: 3 additions & 2 deletions cumulus/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sp_runtime::traits::Block as BlockT;
use sc_consensus::{
block_import::{BlockImport, BlockImportParams},
import_queue::{BasicQueue, Verifier},
SharedBlockImport,
};

use crate::ParachainBlockImportMarker;
Expand All @@ -50,7 +51,7 @@ pub struct VerifyNothing;
#[async_trait::async_trait]
impl<Block: BlockT> Verifier<Block> for VerifyNothing {
async fn verify(
&mut self,
&self,
params: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
Ok(params)
Expand All @@ -72,5 +73,5 @@ where
+ Sync
+ 'static,
{
BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry)
BasicQueue::new(VerifyNothing, SharedBlockImport::new(block_import), None, spawner, registry)
}
4 changes: 2 additions & 2 deletions cumulus/client/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
where
Block: BlockT,
BI: BlockImport<Block> + Send,
BI: BlockImport<Block> + Send + Sync,
BE: Backend<Block>,
{
type Error = BI::Error;

async fn check_block(
&mut self,
&self,
block: sc_consensus::BlockCheckParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
self.inner.check_block(block).await
Expand Down
6 changes: 3 additions & 3 deletions cumulus/client/consensus/relay-chain/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use cumulus_client_consensus_common::ParachainBlockImportMarker;

use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams,
BlockImport, BlockImportParams, SharedBlockImport,
};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
Expand Down Expand Up @@ -52,7 +52,7 @@ where
CIDP: CreateInherentDataProviders<Block, ()>,
{
async fn verify(
&mut self,
&self,
mut block_params: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
// Skip checks that include execution, if being told so, or when importing only state.
Expand Down Expand Up @@ -125,5 +125,5 @@ where
{
let verifier = Verifier::new(client, create_inherent_data_providers);

Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry))
Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry))
}
2 changes: 1 addition & 1 deletion cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ where

/// Import the given `block`.
///
/// This will also recursivley drain `waiting_for_parent` and import them as well.
/// This will also recursively drain `waiting_for_parent` and import them as well.
async fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();

Expand Down
12 changes: 6 additions & 6 deletions cumulus/polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier;
use futures::lock::Mutex;
use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImportParams, ImportQueue,
BlockImportParams, ImportQueue, SharedBlockImport,
};
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
use sc_network::{config::FullNetworkConfiguration, NetworkBlock};
Expand Down Expand Up @@ -1022,7 +1022,7 @@ where

struct Verifier<Client, AuraId> {
client: Arc<Client>,
aura_verifier: BuildOnAccess<Box<dyn VerifierT<Block>>>,
aura_verifier: Mutex<BuildOnAccess<Box<dyn VerifierT<Block>>>>,
relay_chain_verifier: Box<dyn VerifierT<Block>>,
_phantom: PhantomData<AuraId>,
}
Expand All @@ -1035,7 +1035,7 @@ where
AuraId: Send + Sync + Codec,
{
async fn verify(
&mut self,
&self,
block_import: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
if self
Expand All @@ -1044,7 +1044,7 @@ where
.has_api::<dyn AuraApi<Block, AuraId>>(*block_import.header.parent_hash())
.unwrap_or(false)
{
self.aura_verifier.get_mut().verify(block_import).await
self.aura_verifier.lock().await.get_mut().verify(block_import).await
} else {
self.relay_chain_verifier.verify(block_import).await
}
Expand Down Expand Up @@ -1104,14 +1104,14 @@ where
let verifier = Verifier {
client,
relay_chain_verifier,
aura_verifier: BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier))),
aura_verifier: Mutex::new(BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier)))),
_phantom: PhantomData,
};

let registry = config.prometheus_registry();
let spawner = task_manager.spawn_essential_handle();

Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry))
Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, &spawner, registry))
}

/// Start an aura powered parachain node. Asset Hub and Collectives use this.
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/consensus/aura/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider};
use sc_consensus::{
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
SharedBlockImport,
};
use sc_consensus_slots::{check_equivocation, CheckedHeader, InherentDataProviderExt};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
Expand Down Expand Up @@ -174,7 +175,7 @@ where
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
async fn verify(
&mut self,
&self,
mut block: BlockImportParams<B>,
) -> Result<BlockImportParams<B>, String> {
// Skip checks that include execution, if being told so or when importing only state.
Expand Down Expand Up @@ -376,7 +377,13 @@ where
compatibility_mode,
});

Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry))
Ok(BasicQueue::new(
verifier,
SharedBlockImport::new(block_import),
justification_import,
spawner,
registry,
))
}

/// Parameters of [`build_verifier`].
Expand Down
13 changes: 10 additions & 3 deletions substrate/client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ use sc_consensus::{
StateAction,
},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
SharedBlockImport,
};
use sc_consensus_epochs::{
descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor,
Expand Down Expand Up @@ -1130,7 +1131,7 @@ where
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
async fn verify(
&mut self,
&self,
mut block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
trace!(
Expand Down Expand Up @@ -1683,7 +1684,7 @@ where
}

async fn check_block(
&mut self,
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await.map_err(Into::into)
Expand Down Expand Up @@ -1856,7 +1857,13 @@ where
spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());

Ok((
BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
BasicQueue::new(
verifier,
SharedBlockImport::new(block_import),
justification_import,
spawner,
registry,
),
BabeWorkerHandle(worker_tx),
))
}
Expand Down
Loading

0 comments on commit c90d6ed

Please sign in to comment.