From 29289ccaec2685750c2a1239c594ec3c80516b8e Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Fri, 29 Nov 2024 16:53:25 +0400 Subject: [PATCH] feat: add `Primitives` AT to `BlockExecutorProvider` (#12994) --- book/sources/exex/hello-world/src/bin/3.rs | 4 +- book/sources/exex/remote/src/exex.rs | 4 +- book/sources/exex/remote/src/exex_4.rs | 4 +- book/sources/exex/tracking-state/src/bin/1.rs | 6 +- book/sources/exex/tracking-state/src/bin/2.rs | 6 +- crates/blockchain-tree/src/blockchain_tree.rs | 2 +- crates/blockchain-tree/src/chain.rs | 18 +- crates/blockchain-tree/src/shareable.rs | 6 +- crates/cli/commands/src/import.rs | 4 +- .../cli/commands/src/stage/dump/execution.rs | 4 +- crates/cli/commands/src/stage/dump/mod.rs | 2 +- crates/cli/commands/src/stage/mod.rs | 2 +- crates/cli/commands/src/stage/run.rs | 2 +- crates/e2e-test-utils/src/rpc.rs | 6 +- crates/engine/local/src/service.rs | 2 +- crates/engine/service/src/service.rs | 2 +- crates/engine/tree/src/tree/mod.rs | 4 +- crates/ethereum/evm/src/execute.rs | 9 +- crates/ethereum/node/src/node.rs | 2 +- crates/evm/src/either.rs | 47 ++--- crates/evm/src/execute.rs | 133 +++++++------ crates/evm/src/metrics.rs | 24 ++- crates/evm/src/noop.rs | 4 +- crates/evm/src/test_utils.rs | 24 +-- crates/exex/exex/src/backfill/job.rs | 39 ++-- crates/exex/exex/src/backfill/stream.rs | 43 ++--- crates/exex/exex/src/context.rs | 20 +- crates/exex/exex/src/dyn_context.rs | 17 +- crates/exex/exex/src/manager.rs | 175 ++++++++++++------ crates/exex/exex/src/notifications.rs | 112 ++++++----- crates/exex/exex/src/wal/cache.rs | 14 +- crates/exex/exex/src/wal/mod.rs | 41 ++-- crates/exex/exex/src/wal/storage.rs | 26 ++- crates/exex/test-utils/src/lib.rs | 2 +- crates/exex/types/src/notification.rs | 16 +- crates/node/api/src/node.rs | 2 +- crates/node/builder/src/components/builder.rs | 4 +- crates/node/builder/src/components/execute.rs | 7 +- crates/node/builder/src/components/mod.rs | 6 +- crates/node/builder/src/setup.rs | 8 +- crates/optimism/evm/Cargo.toml | 1 + crates/optimism/evm/src/execute.rs | 6 +- crates/optimism/node/src/node.rs | 2 +- crates/revm/src/batch.rs | 20 +- crates/rpc/rpc-builder/src/lib.rs | 33 +++- crates/rpc/rpc/src/debug.rs | 16 +- crates/rpc/rpc/src/validation.rs | 22 ++- crates/stages/stages/src/sets.rs | 3 +- crates/stages/stages/src/stages/execution.rs | 29 +-- .../custom-beacon-withdrawals/src/main.rs | 9 +- examples/custom-evm/src/main.rs | 2 +- examples/stateful-precompile/src/main.rs | 4 +- 52 files changed, 591 insertions(+), 409 deletions(-) diff --git a/book/sources/exex/hello-world/src/bin/3.rs b/book/sources/exex/hello-world/src/bin/3.rs index ebeaf6c84f19..9b429d3eb086 100644 --- a/book/sources/exex/hello-world/src/bin/3.rs +++ b/book/sources/exex/hello-world/src/bin/3.rs @@ -1,10 +1,10 @@ use futures_util::TryStreamExt; -use reth::{api::FullNodeComponents, primitives::Block, providers::BlockReader}; +use reth::{api::FullNodeComponents, builder::NodeTypes, primitives::EthPrimitives}; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; -async fn my_exex>>( +async fn my_exex>>( mut ctx: ExExContext, ) -> eyre::Result<()> { while let Some(notification) = ctx.notifications.try_next().await? { diff --git a/book/sources/exex/remote/src/exex.rs b/book/sources/exex/remote/src/exex.rs index 00392b4dad10..c823d98ded49 100644 --- a/book/sources/exex/remote/src/exex.rs +++ b/book/sources/exex/remote/src/exex.rs @@ -3,7 +3,7 @@ use remote_exex::proto::{ self, remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, }; -use reth::{primitives::Block, providers::BlockReader}; +use reth::{builder::NodeTypes, primitives::EthPrimitives}; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::FullNodeComponents; use reth_node_ethereum::EthereumNode; @@ -45,7 +45,7 @@ impl RemoteExEx for ExExService { } } -async fn remote_exex>>( +async fn remote_exex>>( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { diff --git a/book/sources/exex/remote/src/exex_4.rs b/book/sources/exex/remote/src/exex_4.rs index c37f26d739dc..8286c0289342 100644 --- a/book/sources/exex/remote/src/exex_4.rs +++ b/book/sources/exex/remote/src/exex_4.rs @@ -3,7 +3,7 @@ use remote_exex::proto::{ self, remote_ex_ex_server::{RemoteExEx, RemoteExExServer}, }; -use reth::{primitives::Block, providers::BlockReader}; +use reth::{builder::NodeTypes, primitives::EthPrimitives}; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::FullNodeComponents; use reth_node_ethereum::EthereumNode; @@ -47,7 +47,7 @@ impl RemoteExEx for ExExService { // ANCHOR: snippet #[allow(dead_code)] -async fn remote_exex>>( +async fn remote_exex>>( mut ctx: ExExContext, notifications: Arc>, ) -> eyre::Result<()> { diff --git a/book/sources/exex/tracking-state/src/bin/1.rs b/book/sources/exex/tracking-state/src/bin/1.rs index 2cf43bec3a17..b1a8609b727a 100644 --- a/book/sources/exex/tracking-state/src/bin/1.rs +++ b/book/sources/exex/tracking-state/src/bin/1.rs @@ -5,7 +5,7 @@ use std::{ }; use futures_util::{FutureExt, TryStreamExt}; -use reth::{api::FullNodeComponents, primitives::Block, providers::BlockReader}; +use reth::{api::FullNodeComponents, builder::NodeTypes, primitives::EthPrimitives}; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; @@ -14,7 +14,9 @@ struct MyExEx { ctx: ExExContext, } -impl>> Future for MyExEx { +impl>> Future + for MyExEx +{ type Output = eyre::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/book/sources/exex/tracking-state/src/bin/2.rs b/book/sources/exex/tracking-state/src/bin/2.rs index b58d2a39c85c..7e9aadf8a04f 100644 --- a/book/sources/exex/tracking-state/src/bin/2.rs +++ b/book/sources/exex/tracking-state/src/bin/2.rs @@ -6,7 +6,7 @@ use std::{ use alloy_primitives::BlockNumber; use futures_util::{FutureExt, TryStreamExt}; -use reth::{api::FullNodeComponents, primitives::Block, providers::BlockReader}; +use reth::{api::FullNodeComponents, builder::NodeTypes, primitives::EthPrimitives}; use reth_exex::{ExExContext, ExExEvent}; use reth_node_ethereum::EthereumNode; use reth_tracing::tracing::info; @@ -25,7 +25,9 @@ impl MyExEx { } } -impl>> Future for MyExEx { +impl>> Future + for MyExEx +{ type Output = eyre::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index bbf1cb099617..ec9beb20a076 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -95,7 +95,7 @@ impl BlockchainTree { impl BlockchainTree where N: TreeNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { /// Builds the blockchain tree for the node. /// diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 6ac39c316702..ba4f91d9c798 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -17,7 +17,7 @@ use reth_execution_errors::BlockExecutionError; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{GotExpected, SealedBlockWithSenders, SealedHeader}; use reth_provider::{ - providers::{BundleStateProvider, ConsistentDbView, ProviderNodeTypes}, + providers::{BundleStateProvider, ConsistentDbView, TreeNodeTypes}, DBProvider, FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider, }; @@ -76,8 +76,8 @@ impl AppendableChain { block_validation_kind: BlockValidationKind, ) -> Result where - N: ProviderNodeTypes, - E: BlockExecutorProvider, + N: TreeNodeTypes, + E: BlockExecutorProvider, { let execution_outcome = ExecutionOutcome::default(); let empty = BTreeMap::new(); @@ -114,8 +114,8 @@ impl AppendableChain { block_validation_kind: BlockValidationKind, ) -> Result where - N: ProviderNodeTypes, - E: BlockExecutorProvider, + N: TreeNodeTypes, + E: BlockExecutorProvider, { let parent_number = block.number.checked_sub(1).ok_or(BlockchainTreeError::GenesisBlockHasNoParent)?; @@ -177,8 +177,8 @@ impl AppendableChain { ) -> Result<(ExecutionOutcome, Option), BlockExecutionError> where EDP: FullExecutionDataProvider, - N: ProviderNodeTypes, - E: BlockExecutorProvider, + N: TreeNodeTypes, + E: BlockExecutorProvider, { // some checks are done before blocks comes here. externals.consensus.validate_header_against_parent(&block, parent_block)?; @@ -284,8 +284,8 @@ impl AppendableChain { block_validation_kind: BlockValidationKind, ) -> Result<(), InsertBlockErrorKind> where - N: ProviderNodeTypes, - E: BlockExecutorProvider, + N: TreeNodeTypes, + E: BlockExecutorProvider, { let parent_block = self.chain.tip(); diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index 484b4b51869e..e668f4e2dac0 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -39,7 +39,7 @@ impl ShareableBlockchainTree { impl BlockchainTreeEngine for ShareableBlockchainTree where N: TreeNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> { let mut tree = self.tree.write(); @@ -110,7 +110,7 @@ where impl BlockchainTreeViewer for ShareableBlockchainTree where N: TreeNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { fn header_by_hash(&self, hash: BlockHash) -> Option { trace!(target: "blockchain_tree", ?hash, "Returning header by hash"); @@ -173,7 +173,7 @@ where impl BlockchainTreePendingStateProvider for ShareableBlockchainTree where N: TreeNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { fn find_pending_state_provider( &self, diff --git a/crates/cli/commands/src/import.rs b/crates/cli/commands/src/import.rs index c1f6408b49b0..dc99ae7f98d0 100644 --- a/crates/cli/commands/src/import.rs +++ b/crates/cli/commands/src/import.rs @@ -60,7 +60,7 @@ impl> ImportComm pub async fn execute(self, executor: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, F: FnOnce(Arc) -> E, { info!(target: "reth::cli", "reth {} starting", SHORT_VERSION); @@ -169,7 +169,7 @@ pub fn build_import_pipeline( where N: ProviderNodeTypes + CliNodeTypes, C: Consensus + 'static, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { if !file_client.has_canonical_blocks() { eyre::bail!("unable to import non canonical blocks"); diff --git a/crates/cli/commands/src/stage/dump/execution.rs b/crates/cli/commands/src/stage/dump/execution.rs index 000c1b542dbf..70fd23f9847c 100644 --- a/crates/cli/commands/src/stage/dump/execution.rs +++ b/crates/cli/commands/src/stage/dump/execution.rs @@ -33,7 +33,7 @@ where Receipt = reth_primitives::Receipt, >, >, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { let (output_db, tip_block_number) = setup(from, to, &output_datadir.db(), db_tool)?; @@ -188,7 +188,7 @@ where Receipt = reth_primitives::Receipt, >, >, - E: BlockExecutorProvider, + E: BlockExecutorProvider, { info!(target: "reth::cli", "Executing stage. [dry-run]"); diff --git a/crates/cli/commands/src/stage/dump/mod.rs b/crates/cli/commands/src/stage/dump/mod.rs index 36b8fb122580..9cc0f54dd33f 100644 --- a/crates/cli/commands/src/stage/dump/mod.rs +++ b/crates/cli/commands/src/stage/dump/mod.rs @@ -93,7 +93,7 @@ impl> Command pub async fn execute(self, executor: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, F: FnOnce(Arc) -> E, { let Environment { provider_factory, .. } = self.env.init::(AccessRights::RO)?; diff --git a/crates/cli/commands/src/stage/mod.rs b/crates/cli/commands/src/stage/mod.rs index b9e0725428a9..91ab458daf64 100644 --- a/crates/cli/commands/src/stage/mod.rs +++ b/crates/cli/commands/src/stage/mod.rs @@ -44,7 +44,7 @@ impl> Command pub async fn execute(self, ctx: CliContext, executor: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, F: FnOnce(Arc) -> E, { match self.command { diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index c852eea05a7e..88a5fa6204e8 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -107,7 +107,7 @@ impl> Command pub async fn execute(self, ctx: CliContext, executor: F) -> eyre::Result<()> where N: CliNodeTypes, - E: BlockExecutorProvider, + E: BlockExecutorProvider, F: FnOnce(Arc) -> E, { // Raise the fd limit of the process. diff --git a/crates/e2e-test-utils/src/rpc.rs b/crates/e2e-test-utils/src/rpc.rs index 8399a482dfd6..37ee12987ca5 100644 --- a/crates/e2e-test-utils/src/rpc.rs +++ b/crates/e2e-test-utils/src/rpc.rs @@ -1,7 +1,6 @@ use alloy_consensus::TxEnvelope; use alloy_network::eip2718::Decodable2718; use alloy_primitives::{Bytes, B256}; -use alloy_rlp::Encodable; use reth_chainspec::EthereumHardforks; use reth_node_api::{FullNodeComponents, NodePrimitives}; use reth_node_builder::{rpc::RpcRegistry, NodeTypes}; @@ -21,7 +20,10 @@ where Node: FullNodeComponents< Types: NodeTypes< ChainSpec: EthereumHardforks, - Primitives: NodePrimitives, + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, >, >, EthApi: EthApiSpec + EthTransactions + TraceExt, diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index 5838cb89116b..79d9d844d734 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -65,7 +65,7 @@ where #[allow(clippy::too_many_arguments)] pub fn new( consensus: Arc, - executor_factory: impl BlockExecutorProvider, + executor_factory: impl BlockExecutorProvider, provider: ProviderFactory, blockchain_db: BlockchainProvider2, pruner: PrunerWithFactory>, diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index a54a2ef9e1a1..8bb26d69140c 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -60,7 +60,7 @@ impl EngineService where N: EngineNodeTypes + PersistenceNodeTypes, Client: EthBlockClient + 'static, - E: BlockExecutorProvider + 'static, + E: BlockExecutorProvider + 'static, { /// Constructor for `EngineService`. #[allow(clippy::too_many_arguments)] diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index fd0e5aeec838..7b8ec8838922 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -540,7 +540,7 @@ impl std::fmt::Debug impl EngineApiTreeHandler where - N: NodePrimitives, + N: NodePrimitives, P: DatabaseProviderFactory + BlockReader + StateProviderFactory @@ -548,7 +548,7 @@ where + Clone + 'static,

::Provider: BlockReader, - E: BlockExecutorProvider, + E: BlockExecutorProvider, T: EngineTypes, V: EngineValidator, { diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index 8642df89698d..f2a3925572bc 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -20,7 +20,7 @@ use reth_evm::{ system_calls::{OnStateHook, SystemCaller}, ConfigureEvm, TxEnvOverrides, }; -use reth_primitives::{BlockWithSenders, Receipt}; +use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; use reth_revm::db::State; use revm_primitives::{ db::{Database, DatabaseCommit}, @@ -60,6 +60,8 @@ where EvmConfig: Clone + Unpin + Sync + Send + 'static + ConfigureEvm

, { + type Primitives = EthPrimitives; + type Strategy + Display>> = EthExecutionStrategy; @@ -122,13 +124,16 @@ where } } -impl BlockExecutionStrategy for EthExecutionStrategy +impl BlockExecutionStrategy for EthExecutionStrategy where DB: Database + Display>, EvmConfig: ConfigureEvm
, { + type DB = DB; type Error = BlockExecutionError; + type Primitives = EthPrimitives; + fn init(&mut self, tx_env_overrides: Box) { self.tx_env_overrides = Some(tx_env_overrides); } diff --git a/crates/ethereum/node/src/node.rs b/crates/ethereum/node/src/node.rs index a536b9dff907..9db0c44c6c69 100644 --- a/crates/ethereum/node/src/node.rs +++ b/crates/ethereum/node/src/node.rs @@ -133,7 +133,7 @@ pub struct EthereumExecutorBuilder; impl ExecutorBuilder for EthereumExecutorBuilder where - Types: NodeTypesWithEngine, + Types: NodeTypesWithEngine, Node: FullNodeTypes, { type EVM = EthEvmConfig; diff --git a/crates/evm/src/either.rs b/crates/evm/src/either.rs index 85bc7e7f9a79..4faeb1a72030 100644 --- a/crates/evm/src/either.rs +++ b/crates/evm/src/either.rs @@ -8,9 +8,6 @@ use crate::{ }; use alloc::boxed::Box; use alloy_primitives::BlockNumber; -use reth_execution_errors::BlockExecutionError; -use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, ExecutionOutcome}; -use reth_primitives::{BlockWithSenders, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; use revm_primitives::db::Database; @@ -22,8 +19,10 @@ use revm::State; impl BlockExecutorProvider for Either where A: BlockExecutorProvider, - B: BlockExecutorProvider, + B: BlockExecutorProvider, { + type Primitives = A::Primitives; + type Executor + Display>> = Either, B::Executor>; @@ -53,23 +52,13 @@ where impl Executor for Either where - A: for<'a> Executor< - DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = BlockExecutionOutput, - Error = BlockExecutionError, - >, - B: for<'a> Executor< - DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = BlockExecutionOutput, - Error = BlockExecutionError, - >, + A: Executor, + B: for<'a> Executor = A::Input<'a>, Output = A::Output, Error = A::Error>, DB: Database + Display>, { - type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>; - type Output = BlockExecutionOutput; - type Error = BlockExecutionError; + type Input<'a> = A::Input<'a>; + type Output = A::Output; + type Error = A::Error; fn init(&mut self, tx_env_overrides: Box) { match self { @@ -116,23 +105,13 @@ where impl BatchExecutor for Either where - A: for<'a> BatchExecutor< - DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = ExecutionOutcome, - Error = BlockExecutionError, - >, - B: for<'a> BatchExecutor< - DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = ExecutionOutcome, - Error = BlockExecutionError, - >, + A: BatchExecutor, + B: for<'a> BatchExecutor = A::Input<'a>, Output = A::Output, Error = A::Error>, DB: Database + Display>, { - type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>; - type Output = ExecutionOutcome; - type Error = BlockExecutionError; + type Input<'a> = A::Input<'a>; + type Output = A::Output; + type Error = A::Error; fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> { match self { diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 42c756f4d93f..bc6e535b7b79 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -1,19 +1,21 @@ //! Traits for execution. +use alloy_consensus::BlockHeader; // Re-export execution types pub use reth_execution_errors::{ BlockExecutionError, BlockValidationError, InternalBlockExecutionError, }; pub use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, ExecutionOutcome}; +use reth_primitives_traits::Block as _; pub use reth_storage_errors::provider::ProviderError; use crate::{system_calls::OnStateHook, TxEnvOverrides}; use alloc::{boxed::Box, vec::Vec}; use alloy_eips::eip7685::Requests; use alloy_primitives::BlockNumber; -use core::{fmt::Display, marker::PhantomData}; +use core::fmt::Display; use reth_consensus::ConsensusError; -use reth_primitives::{BlockWithSenders, Receipt}; +use reth_primitives::{BlockWithSenders, NodePrimitives, Receipt}; use reth_prune_types::PruneModes; use reth_revm::batch::BlockBatchRecord; use revm::{ @@ -130,6 +132,9 @@ pub trait BatchExecutor { /// A type that can create a new executor for block execution. pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { + /// Receipt type. + type Primitives: NodePrimitives; + /// An executor that can execute a single block given a database. /// /// # Verification @@ -143,16 +148,22 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// the returned state. type Executor + Display>>: for<'a> Executor< DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = BlockExecutionOutput, + Input<'a> = BlockExecutionInput< + 'a, + BlockWithSenders<::Block>, + >, + Output = BlockExecutionOutput<::Receipt>, Error = BlockExecutionError, >; /// An executor that can execute a batch of blocks given a database. type BatchExecutor + Display>>: for<'a> BatchExecutor< DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, - Output = ExecutionOutcome, + Input<'a> = BlockExecutionInput< + 'a, + BlockWithSenders<::Block>, + >, + Output = ExecutionOutcome<::Receipt>, Error = BlockExecutionError, >; @@ -174,18 +185,21 @@ pub trait BlockExecutorProvider: Send + Sync + Clone + Unpin + 'static { /// Helper type for the output of executing a block. #[derive(Debug, Clone)] -pub struct ExecuteOutput { +pub struct ExecuteOutput { /// Receipts obtained after executing a block. - pub receipts: Vec, + pub receipts: Vec, /// Cumulative gas used in the block execution. pub gas_used: u64, } /// Defines the strategy for executing a single block. -pub trait BlockExecutionStrategy -where - DB: Database, -{ +pub trait BlockExecutionStrategy { + /// Database this strategy operates on. + type DB: Database; + + /// Primitive types used by the strategy. + type Primitives: NodePrimitives; + /// The error type returned by this strategy's methods. type Error: From + core::error::Error; @@ -195,30 +209,30 @@ where /// Applies any necessary changes before executing the block's transactions. fn apply_pre_execution_changes( &mut self, - block: &BlockWithSenders, + block: &BlockWithSenders<::Block>, total_difficulty: U256, ) -> Result<(), Self::Error>; /// Executes all transactions in the block. fn execute_transactions( &mut self, - block: &BlockWithSenders, + block: &BlockWithSenders<::Block>, total_difficulty: U256, - ) -> Result; + ) -> Result::Receipt>, Self::Error>; /// Applies any necessary changes after executing the block's transactions. fn apply_post_execution_changes( &mut self, - block: &BlockWithSenders, + block: &BlockWithSenders<::Block>, total_difficulty: U256, - receipts: &[Receipt], + receipts: &[::Receipt], ) -> Result; /// Returns a reference to the current state. - fn state_ref(&self) -> &State; + fn state_ref(&self) -> &State; /// Returns a mutable reference to the current state. - fn state_mut(&mut self) -> &mut State; + fn state_mut(&mut self) -> &mut State; /// Sets a hook to be called after each state change during execution. fn with_state_hook(&mut self, _hook: Option>) {} @@ -232,8 +246,8 @@ where /// Validate a block with regard to execution results. fn validate_block_post_execution( &self, - _block: &BlockWithSenders, - _receipts: &[Receipt], + _block: &BlockWithSenders<::Block>, + _receipts: &[::Receipt], _requests: &Requests, ) -> Result<(), ConsensusError> { Ok(()) @@ -242,9 +256,13 @@ where /// A strategy factory that can create block execution strategies. pub trait BlockExecutionStrategyFactory: Send + Sync + Clone + Unpin + 'static { + /// Primitive types used by the strategy. + type Primitives: NodePrimitives; + /// Associated strategy type. type Strategy + Display>>: BlockExecutionStrategy< - DB, + DB = DB, + Primitives = Self::Primitives, Error = BlockExecutionError, >; @@ -280,11 +298,13 @@ impl BlockExecutorProvider for BasicBlockExecutorProvider where F: BlockExecutionStrategyFactory, { + type Primitives = F::Primitives; + type Executor + Display>> = - BasicBlockExecutor, DB>; + BasicBlockExecutor>; type BatchExecutor + Display>> = - BasicBatchExecutor, DB>; + BasicBatchExecutor>; fn executor(&self, db: DB) -> Self::Executor where @@ -307,34 +327,26 @@ where /// A generic block executor that uses a [`BlockExecutionStrategy`] to /// execute blocks. #[allow(missing_debug_implementations, dead_code)] -pub struct BasicBlockExecutor -where - S: BlockExecutionStrategy, - DB: Database, -{ +pub struct BasicBlockExecutor { /// Block execution strategy. pub(crate) strategy: S, - _phantom: PhantomData, } -impl BasicBlockExecutor -where - S: BlockExecutionStrategy, - DB: Database, -{ +impl BasicBlockExecutor { /// Creates a new `BasicBlockExecutor` with the given strategy. pub const fn new(strategy: S) -> Self { - Self { strategy, _phantom: PhantomData } + Self { strategy } } } -impl Executor for BasicBlockExecutor +impl Executor for BasicBlockExecutor where - S: BlockExecutionStrategy, + S: BlockExecutionStrategy, DB: Database + Display>, { - type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>; - type Output = BlockExecutionOutput; + type Input<'a> = + BlockExecutionInput<'a, BlockWithSenders<::Block>>; + type Output = BlockExecutionOutput<::Receipt>; type Error = S::Error; fn init(&mut self, env_overrides: Box) { @@ -404,43 +416,44 @@ where /// A generic batch executor that uses a [`BlockExecutionStrategy`] to /// execute batches. #[allow(missing_debug_implementations)] -pub struct BasicBatchExecutor +pub struct BasicBatchExecutor where - S: BlockExecutionStrategy, - DB: Database, + S: BlockExecutionStrategy, { /// Batch execution strategy. pub(crate) strategy: S, /// Keeps track of batch execution receipts and requests. - pub(crate) batch_record: BlockBatchRecord, - _phantom: PhantomData, + pub(crate) batch_record: BlockBatchRecord<::Receipt>, } -impl BasicBatchExecutor +impl BasicBatchExecutor where - S: BlockExecutionStrategy, - DB: Database, + S: BlockExecutionStrategy, { /// Creates a new `BasicBatchExecutor` with the given strategy. - pub const fn new(strategy: S, batch_record: BlockBatchRecord) -> Self { - Self { strategy, batch_record, _phantom: PhantomData } + pub const fn new( + strategy: S, + batch_record: BlockBatchRecord<::Receipt>, + ) -> Self { + Self { strategy, batch_record } } } -impl BatchExecutor for BasicBatchExecutor +impl BatchExecutor for BasicBatchExecutor where - S: BlockExecutionStrategy, + S: BlockExecutionStrategy, DB: Database + Display>, { - type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>; - type Output = ExecutionOutcome; + type Input<'a> = + BlockExecutionInput<'a, BlockWithSenders<::Block>>; + type Output = ExecutionOutcome<::Receipt>; type Error = BlockExecutionError; fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> { let BlockExecutionInput { block, total_difficulty } = input; if self.batch_record.first_block().is_none() { - self.batch_record.set_first_block(block.number); + self.batch_record.set_first_block(block.header().number()); } self.strategy.apply_pre_execution_changes(block, total_difficulty)?; @@ -452,7 +465,7 @@ where self.strategy.validate_block_post_execution(block, &receipts, &requests)?; // prepare the state according to the prune mode - let retention = self.batch_record.bundle_retention(block.number); + let retention = self.batch_record.bundle_retention(block.header().number()); self.strategy.state_mut().merge_transitions(retention); // store receipts in the set @@ -490,7 +503,9 @@ where mod tests { use super::*; use alloy_primitives::U256; + use core::marker::PhantomData; use reth_chainspec::{ChainSpec, MAINNET}; + use reth_primitives::EthPrimitives; use revm::db::{CacheDB, EmptyDBTyped}; use revm_primitives::{bytes, TxEnv}; use std::sync::Arc; @@ -499,6 +514,7 @@ mod tests { struct TestExecutorProvider; impl BlockExecutorProvider for TestExecutorProvider { + type Primitives = EthPrimitives; type Executor + Display>> = TestExecutor; type BatchExecutor + Display>> = TestExecutor; @@ -596,6 +612,7 @@ mod tests { } impl BlockExecutionStrategyFactory for TestExecutorStrategyFactory { + type Primitives = EthPrimitives; type Strategy + Display>> = TestExecutorStrategy; @@ -622,10 +639,12 @@ mod tests { } } - impl BlockExecutionStrategy for TestExecutorStrategy + impl BlockExecutionStrategy for TestExecutorStrategy where DB: Database, { + type DB = DB; + type Primitives = EthPrimitives; type Error = BlockExecutionError; fn apply_pre_execution_changes( diff --git a/crates/evm/src/metrics.rs b/crates/evm/src/metrics.rs index 3464bb96f4c7..f42b942afd93 100644 --- a/crates/evm/src/metrics.rs +++ b/crates/evm/src/metrics.rs @@ -3,6 +3,7 @@ //! Block processing related to syncing should take care to update the metrics by using either //! [`ExecutorMetrics::execute_metered`] or [`ExecutorMetrics::metered_one`]. use crate::{execute::Executor, system_calls::OnStateHook}; +use alloy_consensus::BlockHeader; use metrics::{Counter, Gauge, Histogram}; use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput}; use reth_metrics::Metrics; @@ -69,9 +70,10 @@ pub struct ExecutorMetrics { } impl ExecutorMetrics { - fn metered(&self, block: &BlockWithSenders, f: F) -> R + fn metered(&self, block: &BlockWithSenders, f: F) -> R where F: FnOnce() -> R, + B: reth_primitives_traits::Block, { // Execute the block and record the elapsed time. let execute_start = Instant::now(); @@ -79,8 +81,8 @@ impl ExecutorMetrics { let execution_duration = execute_start.elapsed().as_secs_f64(); // Update gas metrics. - self.gas_processed_total.increment(block.gas_used); - self.gas_per_second.set(block.gas_used as f64 / execution_duration); + self.gas_processed_total.increment(block.header().gas_used()); + self.gas_per_second.set(block.header().gas_used() as f64 / execution_duration); self.execution_histogram.record(execution_duration); self.execution_duration.set(execution_duration); @@ -94,19 +96,20 @@ impl ExecutorMetrics { /// of accounts, storage slots and bytecodes loaded and updated. /// Execute the given block using the provided [`Executor`] and update metrics for the /// execution. - pub fn execute_metered<'a, E, DB, O, Error>( + pub fn execute_metered<'a, E, DB, O, Error, B>( &self, executor: E, - input: BlockExecutionInput<'a, BlockWithSenders>, + input: BlockExecutionInput<'a, BlockWithSenders>, state_hook: Box, ) -> Result, Error> where E: Executor< DB, - Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, + Input<'a> = BlockExecutionInput<'a, BlockWithSenders>, Output = BlockExecutionOutput, Error = Error, >, + B: reth_primitives_traits::Block, { // clone here is cheap, all the metrics are Option>. additionally // they are gloally registered so that the data recorded in the hook will @@ -133,9 +136,14 @@ impl ExecutorMetrics { } /// Execute the given block and update metrics for the execution. - pub fn metered_one(&self, input: BlockExecutionInput<'_, BlockWithSenders>, f: F) -> R + pub fn metered_one( + &self, + input: BlockExecutionInput<'_, BlockWithSenders>, + f: F, + ) -> R where - F: FnOnce(BlockExecutionInput<'_, BlockWithSenders>) -> R, + F: FnOnce(BlockExecutionInput<'_, BlockWithSenders>) -> R, + B: reth_primitives_traits::Block, { self.metered(input.block, || f(input)) } diff --git a/crates/evm/src/noop.rs b/crates/evm/src/noop.rs index 4fdc6d367a27..7b1063533da3 100644 --- a/crates/evm/src/noop.rs +++ b/crates/evm/src/noop.rs @@ -4,7 +4,7 @@ use alloy_primitives::BlockNumber; use core::fmt::Display; use reth_execution_errors::BlockExecutionError; use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, ExecutionOutcome}; -use reth_primitives::{BlockWithSenders, Receipt}; +use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; use revm::State; @@ -23,6 +23,8 @@ const UNAVAILABLE_FOR_NOOP: &str = "execution unavailable for noop"; pub struct NoopBlockExecutorProvider; impl BlockExecutorProvider for NoopBlockExecutorProvider { + type Primitives = EthPrimitives; + type Executor + Display>> = Self; type BatchExecutor + Display>> = Self; diff --git a/crates/evm/src/test_utils.rs b/crates/evm/src/test_utils.rs index a4dc906494ce..22ba4a316e2e 100644 --- a/crates/evm/src/test_utils.rs +++ b/crates/evm/src/test_utils.rs @@ -12,7 +12,7 @@ use alloy_primitives::BlockNumber; use parking_lot::Mutex; use reth_execution_errors::BlockExecutionError; use reth_execution_types::ExecutionOutcome; -use reth_primitives::{BlockWithSenders, Receipt, Receipts}; +use reth_primitives::{BlockWithSenders, EthPrimitives, NodePrimitives, Receipt, Receipts}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; use revm::State; @@ -33,6 +33,8 @@ impl MockExecutorProvider { } impl BlockExecutorProvider for MockExecutorProvider { + type Primitives = EthPrimitives; + type Executor + Display>> = Self; type BatchExecutor + Display>> = Self; @@ -116,15 +118,14 @@ impl BatchExecutor for MockExecutorProvider { } } -impl BasicBlockExecutor +impl BasicBlockExecutor where - S: BlockExecutionStrategy, - DB: Database, + S: BlockExecutionStrategy, { /// Provides safe read access to the state pub fn with_state(&self, f: F) -> R where - F: FnOnce(&State) -> R, + F: FnOnce(&State) -> R, { f(self.strategy.state_ref()) } @@ -132,21 +133,20 @@ where /// Provides safe write access to the state pub fn with_state_mut(&mut self, f: F) -> R where - F: FnOnce(&mut State) -> R, + F: FnOnce(&mut State) -> R, { f(self.strategy.state_mut()) } } -impl BasicBatchExecutor +impl BasicBatchExecutor where - S: BlockExecutionStrategy, - DB: Database, + S: BlockExecutionStrategy, { /// Provides safe read access to the state pub fn with_state(&self, f: F) -> R where - F: FnOnce(&State) -> R, + F: FnOnce(&State) -> R, { f(self.strategy.state_ref()) } @@ -154,13 +154,13 @@ where /// Provides safe write access to the state pub fn with_state_mut(&mut self, f: F) -> R where - F: FnOnce(&mut State) -> R, + F: FnOnce(&mut State) -> R, { f(self.strategy.state_mut()) } /// Accessor for batch executor receipts. - pub const fn receipts(&self) -> &Receipts { + pub const fn receipts(&self) -> &Receipts<::Receipt> { self.batch_record.receipts() } } diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index 7e670620472c..f93c5efa7212 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -9,7 +9,7 @@ use alloy_primitives::BlockNumber; use reth_evm::execute::{ BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor, }; -use reth_node_api::{Block as _, BlockBody as _}; +use reth_node_api::{Block as _, BlockBody as _, NodePrimitives}; use reth_primitives::{BlockExt, BlockWithSenders, Receipt}; use reth_primitives_traits::{format_gas_throughput, SignedTransaction}; use reth_provider::{ @@ -38,12 +38,10 @@ pub struct BackfillJob { impl Iterator for BackfillJob where - E: BlockExecutorProvider, - P: HeaderProvider - + BlockReader - + StateProviderFactory, + E: BlockExecutorProvider>, + P: HeaderProvider + BlockReader + StateProviderFactory, { - type Item = BackfillJobResult; + type Item = BackfillJobResult>; fn next(&mut self) -> Option { if self.range.is_empty() { @@ -56,10 +54,8 @@ where impl BackfillJob where - E: BlockExecutorProvider, - P: BlockReader - + HeaderProvider - + StateProviderFactory, + E: BlockExecutorProvider>, + P: BlockReader + HeaderProvider + StateProviderFactory, { /// Converts the backfill job into a single block backfill job. pub fn into_single_blocks(self) -> SingleBlockBackfillJob { @@ -67,11 +63,11 @@ where } /// Converts the backfill job into a stream. - pub fn into_stream(self) -> StreamBackfillJob { + pub fn into_stream(self) -> StreamBackfillJob> { self.into() } - fn execute_range(&mut self) -> BackfillJobResult { + fn execute_range(&mut self) -> BackfillJobResult> { debug!( target: "exex::backfill", range = ?self.range, @@ -169,10 +165,13 @@ pub struct SingleBlockBackfillJob { impl Iterator for SingleBlockBackfillJob where - E: BlockExecutorProvider, - P: HeaderProvider + BlockReader + StateProviderFactory, + E: BlockExecutorProvider>, + P: HeaderProvider + BlockReader + StateProviderFactory, { - type Item = BackfillJobResult<(BlockWithSenders, BlockExecutionOutput)>; + type Item = BackfillJobResult<( + BlockWithSenders, + BlockExecutionOutput<::Receipt>, + )>; fn next(&mut self) -> Option { self.range.next().map(|block_number| self.execute_block(block_number)) @@ -181,8 +180,8 @@ where impl SingleBlockBackfillJob where - E: BlockExecutorProvider, - P: HeaderProvider + BlockReader + StateProviderFactory, + E: BlockExecutorProvider>, + P: HeaderProvider + BlockReader + StateProviderFactory, { /// Converts the single block backfill job into a stream. pub fn into_stream( @@ -191,10 +190,14 @@ where self.into() } + #[expect(clippy::type_complexity)] pub(crate) fn execute_block( &self, block_number: u64, - ) -> BackfillJobResult<(BlockWithSenders, BlockExecutionOutput)> { + ) -> BackfillJobResult<( + BlockWithSenders, + BlockExecutionOutput<::Receipt>, + )> { let td = self .provider .header_td_by_number(block_number)? diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index 46177ceda122..95da076c7c87 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -11,7 +11,8 @@ use futures::{ StreamExt, }; use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider}; -use reth_primitives::{BlockWithSenders, Receipt}; +use reth_node_api::NodePrimitives; +use reth_primitives::{BlockWithSenders, EthPrimitives}; use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_prune_types::PruneModes; use reth_stages_api::ExecutionStageThresholds; @@ -38,8 +39,11 @@ struct BackfillTaskOutput { /// Ordered queue of [`JoinHandle`]s that yield [`BackfillTaskOutput`]s. type BackfillTasks = FuturesOrdered>>; -type SingleBlockStreamItem = (BlockWithSenders, BlockExecutionOutput); -type BatchBlockStreamItem = Chain; +type SingleBlockStreamItem = ( + BlockWithSenders<::Block>, + BlockExecutionOutput<::Receipt>, +); +type BatchBlockStreamItem = Chain; /// Stream for processing backfill jobs asynchronously. /// @@ -100,18 +104,12 @@ where } } -impl Stream for StreamBackfillJob +impl Stream for StreamBackfillJob> where - E: BlockExecutorProvider + Clone + Send + 'static, - P: HeaderProvider - + BlockReader - + StateProviderFactory - + Clone - + Send - + Unpin - + 'static, + E: BlockExecutorProvider> + Clone + Send + 'static, + P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static, { - type Item = BackfillJobResult; + type Item = BackfillJobResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -139,18 +137,12 @@ where } } -impl Stream for StreamBackfillJob +impl Stream for StreamBackfillJob> where - E: BlockExecutorProvider + Clone + Send + 'static, - P: HeaderProvider - + BlockReader - + StateProviderFactory - + Clone - + Send - + Unpin - + 'static, + E: BlockExecutorProvider> + Clone + Send + 'static, + P: HeaderProvider + BlockReader + StateProviderFactory + Clone + Send + Unpin + 'static, { - type Item = BackfillJobResult; + type Item = BackfillJobResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -200,7 +192,10 @@ impl From> for StreamBackfillJob From> for StreamBackfillJob { +impl From> for StreamBackfillJob> +where + E: BlockExecutorProvider, +{ fn from(job: BackfillJob) -> Self { let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize); Self { diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 3d303c9bbac0..f536ed515f9e 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -1,6 +1,6 @@ use crate::{ExExContextDyn, ExExEvent, ExExNotifications, ExExNotificationsStream}; use reth_exex_types::ExExHead; -use reth_node_api::{FullNodeComponents, NodeTypes}; +use reth_node_api::{FullNodeComponents, NodePrimitives, NodeTypes}; use reth_node_core::node_config::NodeConfig; use reth_primitives::Head; use reth_provider::BlockReader; @@ -57,11 +57,12 @@ where impl ExExContext where Node: FullNodeComponents, - Node::Provider: Debug + BlockReader, + Node::Provider: Debug + BlockReader, Node::Executor: Debug, + Node::Types: NodeTypes, { /// Returns dynamic version of the context - pub fn into_dyn(self) -> ExExContextDyn { + pub fn into_dyn(self) -> ExExContextDyn<::Primitives> { ExExContextDyn::from(self) } } @@ -69,6 +70,7 @@ where impl ExExContext where Node: FullNodeComponents, + Node::Types: NodeTypes, { /// Returns the transaction pool of the node. pub fn pool(&self) -> &Node::Pool { @@ -107,19 +109,13 @@ where /// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of /// notifications without a head. - pub fn set_notifications_without_head(&mut self) - where - Node::Provider: BlockReader, - { + pub fn set_notifications_without_head(&mut self) { self.notifications.set_without_head(); } /// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications /// with the provided head. - pub fn set_notifications_with_head(&mut self, head: ExExHead) - where - Node::Provider: BlockReader, - { + pub fn set_notifications_with_head(&mut self, head: ExExHead) { self.notifications.set_with_head(head); } } @@ -142,7 +138,7 @@ mod tests { impl ExEx where - Node::Provider: BlockReader, + Node::Provider: BlockReader, { async fn _test_bounds(mut self) -> eyre::Result<()> { self.ctx.pool(); diff --git a/crates/exex/exex/src/dyn_context.rs b/crates/exex/exex/src/dyn_context.rs index 3ce0f488f40c..12efa5f069b7 100644 --- a/crates/exex/exex/src/dyn_context.rs +++ b/crates/exex/exex/src/dyn_context.rs @@ -4,8 +4,9 @@ use std::fmt::Debug; use reth_chainspec::{EthChainSpec, Head}; -use reth_node_api::FullNodeComponents; +use reth_node_api::{FullNodeComponents, NodePrimitives, NodeTypes}; use reth_node_core::node_config::NodeConfig; +use reth_primitives::EthPrimitives; use reth_provider::BlockReader; use tokio::sync::mpsc; @@ -13,7 +14,7 @@ use crate::{ExExContext, ExExEvent, ExExNotificationsStream}; // TODO(0xurb) - add `node` after abstractions /// Captures the context that an `ExEx` has access to. -pub struct ExExContextDyn { +pub struct ExExContextDyn { /// The current head of the blockchain at launch. pub head: Head, /// The config of the node @@ -34,10 +35,10 @@ pub struct ExExContextDyn { /// /// Once an [`ExExNotification`](crate::ExExNotification) is sent over the channel, it is /// considered delivered by the node. - pub notifications: Box, + pub notifications: Box>, } -impl Debug for ExExContextDyn { +impl Debug for ExExContextDyn { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExContext") .field("head", &self.head) @@ -49,16 +50,16 @@ impl Debug for ExExContextDyn { } } -impl From> for ExExContextDyn +impl From> for ExExContextDyn<::Primitives> where - Node: FullNodeComponents, - Node::Provider: Debug + BlockReader, + Node: FullNodeComponents>, + Node::Provider: Debug + BlockReader, Node::Executor: Debug, { fn from(ctx: ExExContext) -> Self { let config = ctx.config.map_chainspec(|chainspec| Box::new(chainspec) as Box); - let notifications = Box::new(ctx.notifications) as Box; + let notifications = Box::new(ctx.notifications) as Box<_>; Self { head: ctx.head, diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index ea5ddf2e8c62..a3b92e9f17ab 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,14 +1,17 @@ use crate::{ wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle, }; +use alloy_consensus::BlockHeader; use alloy_eips::BlockNumHash; use futures::StreamExt; use itertools::Itertools; use metrics::Gauge; use reth_chain_state::ForkChoiceStream; use reth_chainspec::Head; +use reth_evm::execute::BlockExecutorProvider; use reth_metrics::{metrics::Counter, Metrics}; -use reth_primitives::SealedHeader; +use reth_node_api::NodePrimitives; +use reth_primitives::{EthPrimitives, SealedHeader}; use reth_provider::HeaderProvider; use reth_tracing::tracing::{debug, warn}; use std::{ @@ -69,13 +72,13 @@ struct ExExMetrics { /// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to /// the manager in [`ExExManager::new`]. #[derive(Debug)] -pub struct ExExHandle { +pub struct ExExHandle { /// The execution extension's ID. id: String, /// Metrics for an `ExEx`. metrics: ExExMetrics, /// Channel to send [`ExExNotification`]s to the `ExEx`. - sender: PollSender, + sender: PollSender>, /// Channel to receive [`ExExEvent`]s from the `ExEx`. receiver: UnboundedReceiver, /// The ID of the next notification to send to this `ExEx`. @@ -86,17 +89,17 @@ pub struct ExExHandle { finished_height: Option, } -impl ExExHandle { +impl ExExHandle { /// Create a new handle for the given `ExEx`. /// /// Returns the handle, as well as a [`UnboundedSender`] for [`ExExEvent`]s and a /// [`mpsc::Receiver`] for [`ExExNotification`]s that should be given to the `ExEx`. - pub fn new( + pub fn new>( id: String, node_head: Head, provider: P, executor: E, - wal_handle: WalHandle, + wal_handle: WalHandle, ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); @@ -124,21 +127,21 @@ impl ExExHandle { fn send( &mut self, cx: &mut Context<'_>, - (notification_id, notification): &(usize, ExExNotification), - ) -> Poll>> { + (notification_id, notification): &(usize, ExExNotification), + ) -> Poll>>> { if let Some(finished_height) = self.finished_height { match notification { ExExNotification::ChainCommitted { new } => { // Skip the chain commit notification if the finished height of the ExEx is // higher than or equal to the tip of the new notification. // I.e., the ExEx has already processed the notification. - if finished_height.number >= new.tip().number { + if finished_height.number >= new.tip().number() { debug!( target: "exex::manager", exex_id = %self.id, %notification_id, ?finished_height, - new_tip = %new.tip().number, + new_tip = %new.tip().number(), "Skipping notification" ); @@ -208,15 +211,15 @@ pub struct ExExManagerMetrics { /// - Error handling /// - Monitoring #[derive(Debug)] -pub struct ExExManager

{ +pub struct ExExManager { /// Provider for querying headers. provider: P, /// Handles to communicate with the `ExEx`'s. - exex_handles: Vec, + exex_handles: Vec>, /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s. - handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>, + handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>, /// The minimum notification ID currently present in the buffer. min_id: usize, @@ -226,7 +229,7 @@ pub struct ExExManager

{ /// /// The first element of the tuple is a monotonically increasing ID unique to the notification /// (the second element of the tuple). - buffer: VecDeque<(usize, ExExNotification)>, + buffer: VecDeque<(usize, ExExNotification)>, /// Max size of the internal state notifications buffer. max_capacity: usize, /// Current state notifications buffer capacity. @@ -241,17 +244,20 @@ pub struct ExExManager

{ finished_height: watch::Sender, /// Write-Ahead Log for the [`ExExNotification`]s. - wal: Wal, + wal: Wal, /// A stream of finalized headers. finalized_header_stream: ForkChoiceStream, /// A handle to the `ExEx` manager. - handle: ExExManagerHandle, + handle: ExExManagerHandle, /// Metrics for the `ExEx` manager. metrics: ExExManagerMetrics, } -impl

ExExManager

{ +impl ExExManager +where + N: NodePrimitives, +{ /// Create a new [`ExExManager`]. /// /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the @@ -261,9 +267,9 @@ impl

ExExManager

{ /// notifications over [`ExExManagerHandle`]s until there is capacity again. pub fn new( provider: P, - handles: Vec, + handles: Vec>, max_capacity: usize, - wal: Wal, + wal: Wal, finalized_header_stream: ForkChoiceStream, ) -> Self { let num_exexs = handles.len(); @@ -314,7 +320,7 @@ impl

ExExManager

{ } /// Returns the handle to the manager. - pub fn handle(&self) -> ExExManagerHandle { + pub fn handle(&self) -> ExExManagerHandle { self.handle.clone() } @@ -333,16 +339,17 @@ impl

ExExManager

{ /// Pushes a new notification into the managers internal buffer, assigning the notification a /// unique ID. - fn push_notification(&mut self, notification: ExExNotification) { + fn push_notification(&mut self, notification: ExExNotification) { let next_id = self.next_id; self.buffer.push_back((next_id, notification)); self.next_id += 1; } } -impl

ExExManager

+impl ExExManager where P: HeaderProvider, + N: NodePrimitives, { /// Finalizes the WAL according to the passed finalized header. /// @@ -413,9 +420,10 @@ where } } -impl

Future for ExExManager

+impl Future for ExExManager where P: HeaderProvider + Unpin + 'static, + N: NodePrimitives, { type Output = eyre::Result<()>; @@ -456,8 +464,9 @@ where // Drain handle notifications while this.buffer.len() < this.max_capacity { if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) { - let committed_tip = notification.committed_chain().map(|chain| chain.tip().number); - let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number); + let committed_tip = + notification.committed_chain().map(|chain| chain.tip().number()); + let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number()); debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification"); // Commit to WAL only notifications from blockchain tree. Pipeline notifications @@ -524,9 +533,9 @@ where /// A handle to communicate with the [`ExExManager`]. #[derive(Debug)] -pub struct ExExManagerHandle { +pub struct ExExManagerHandle { /// Channel to send notifications to the `ExEx` manager. - exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>, + exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>, /// The number of `ExEx`'s running on the node. num_exexs: usize, /// A watch channel denoting whether the manager is ready for new notifications or not. @@ -544,7 +553,7 @@ pub struct ExExManagerHandle { finished_height: watch::Receiver, } -impl ExExManagerHandle { +impl ExExManagerHandle { /// Creates an empty manager handle. /// /// Use this if there is no manager present. @@ -571,8 +580,8 @@ impl ExExManagerHandle { pub fn send( &self, source: ExExNotificationSource, - notification: ExExNotification, - ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> { + notification: ExExNotification, + ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> { self.exex_tx.send((source, notification)) } @@ -583,8 +592,8 @@ impl ExExManagerHandle { pub async fn send_async( &mut self, source: ExExNotificationSource, - notification: ExExNotification, - ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> { + notification: ExExNotification, + ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> { self.ready().await; self.exex_tx.send((source, notification)) } @@ -633,7 +642,7 @@ async fn make_wait_future(mut rx: watch::Receiver) -> watch::Receiver Clone for ExExManagerHandle { fn clone(&self) -> Self { Self { exex_tx: self.exex_tx.clone(), @@ -653,6 +662,7 @@ mod tests { use futures::{StreamExt, TryStreamExt}; use rand::Rng; use reth_db_common::init::init_genesis; + use reth_evm::test_utils::MockExecutorProvider; use reth_evm_ethereum::execute::EthExecutorProvider; use reth_primitives::SealedBlockWithSenders; use reth_provider::{ @@ -673,8 +683,13 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, event_tx, mut _notification_rx) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); // Send an event and check that it's delivered correctly let event = ExExEvent::FinishedHeight(BlockNumHash::new(42, B256::random())); @@ -688,8 +703,13 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (exex_handle_1, _, _) = - ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle_1, _, _) = ExExHandle::new( + "test_exex_1".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream()) .handle @@ -705,8 +725,13 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (exex_handle_1, _, _) = - ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle_1, _, _) = ExExHandle::new( + "test_exex_1".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream()) .handle @@ -728,8 +753,13 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (exex_handle, _, _) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle, _, _) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); // Create a mock ExExManager and add the exex_handle to it let mut exex_manager = @@ -778,8 +808,13 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (exex_handle, _, _) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle, _, _) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; @@ -824,8 +859,13 @@ mod tests { let provider_factory = create_test_provider_factory(); - let (exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); // Check initial block height assert!(exex_handle.finished_height.is_none()); @@ -874,10 +914,20 @@ mod tests { let provider_factory = create_test_provider_factory(); // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = - ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle()); - let (exex_handle2, event_tx2, _) = - ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle1, event_tx1, _) = ExExHandle::new( + "test_exex1".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); + let (exex_handle2, event_tx2, _) = ExExHandle::new( + "test_exex2".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); let block1 = BlockNumHash::new(42, B256::random()); let block2 = BlockNumHash::new(10, B256::random()); @@ -921,10 +971,20 @@ mod tests { let provider_factory = create_test_provider_factory(); // Create two `ExExHandle` instances - let (exex_handle1, event_tx1, _) = - ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle()); - let (exex_handle2, event_tx2, _) = - ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle1, event_tx1, _) = ExExHandle::new( + "test_exex1".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); + let (exex_handle2, event_tx2, _) = ExExHandle::new( + "test_exex2".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); // Assert that the initial block height is `None` for the first `ExExHandle`. assert!(exex_handle1.finished_height.is_none()); @@ -974,8 +1034,13 @@ mod tests { let provider_factory = create_test_provider_factory(); - let (exex_handle_1, _, _) = - ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); + let (exex_handle_1, _, _) = ExExHandle::new( + "test_exex_1".to_string(), + Head::default(), + (), + MockExecutorProvider::default(), + wal.handle(), + ); // Create an ExExManager with a small max capacity let max_capacity = 2; diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 954a057fc09c..05892e2f90d5 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -1,8 +1,11 @@ use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle}; +use alloy_consensus::BlockHeader; use futures::{Stream, StreamExt}; use reth_chainspec::Head; use reth_evm::execute::BlockExecutorProvider; use reth_exex_types::ExExHead; +use reth_node_api::NodePrimitives; +use reth_primitives::EthPrimitives; use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_tracing::tracing::debug; use std::{ @@ -17,14 +20,19 @@ use tokio::sync::mpsc::Receiver; /// stream is configured with a head via [`ExExNotifications::set_with_head`] or /// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head. #[derive(Debug)] -pub struct ExExNotifications { +pub struct ExExNotifications +where + E: BlockExecutorProvider, +{ inner: ExExNotificationsInner, } /// A trait, that represents a stream of [`ExExNotification`]s. The stream will emit notifications /// for all blocks. If the stream is configured with a head via [`ExExNotifications::set_with_head`] /// or [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head. -pub trait ExExNotificationsStream: Stream> + Unpin { +pub trait ExExNotificationsStream: + Stream>> + Unpin +{ /// Sets [`ExExNotificationsStream`] to a stream of [`ExExNotification`]s without a head. /// /// It's a no-op if the stream has already been configured without a head. @@ -56,7 +64,10 @@ pub trait ExExNotificationsStream: Stream> } #[derive(Debug)] -enum ExExNotificationsInner { +enum ExExNotificationsInner +where + E: BlockExecutorProvider, +{ /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. WithoutHead(ExExNotificationsWithoutHead), /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that @@ -67,14 +78,17 @@ enum ExExNotificationsInner { Invalid, } -impl ExExNotifications { +impl ExExNotifications +where + E: BlockExecutorProvider, +{ /// Creates a new stream of [`ExExNotifications`] without a head. pub const fn new( node_head: Head, provider: P, executor: E, - notifications: Receiver, - wal_handle: WalHandle, + notifications: Receiver>, + wal_handle: WalHandle, ) -> Self { Self { inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new( @@ -88,15 +102,13 @@ impl ExExNotifications { } } -impl ExExNotificationsStream for ExExNotifications +impl ExExNotificationsStream for ExExNotifications where - P: BlockReader - + HeaderProvider - + StateProviderFactory + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider> + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, { fn set_without_head(&mut self) { let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); @@ -144,15 +156,13 @@ where impl Stream for ExExNotifications where - P: BlockReader - + HeaderProvider - + StateProviderFactory + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider> + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, { - type Item = eyre::Result; + type Item = eyre::Result>; fn poll_next( self: std::pin::Pin<&mut Self>, @@ -169,15 +179,21 @@ where } /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. -pub struct ExExNotificationsWithoutHead { +pub struct ExExNotificationsWithoutHead +where + E: BlockExecutorProvider, +{ node_head: Head, provider: P, executor: E, - notifications: Receiver, - wal_handle: WalHandle, + notifications: Receiver>, + wal_handle: WalHandle, } -impl Debug for ExExNotificationsWithoutHead { +impl Debug for ExExNotificationsWithoutHead +where + E: Debug + BlockExecutorProvider, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExNotifications") .field("provider", &self.provider) @@ -187,14 +203,17 @@ impl Debug for ExExNotificationsWithoutHead { } } -impl ExExNotificationsWithoutHead { +impl ExExNotificationsWithoutHead +where + E: BlockExecutorProvider, +{ /// Creates a new instance of [`ExExNotificationsWithoutHead`]. const fn new( node_head: Head, provider: P, executor: E, - notifications: Receiver, - wal_handle: WalHandle, + notifications: Receiver>, + wal_handle: WalHandle, ) -> Self { Self { node_head, provider, executor, notifications, wal_handle } } @@ -212,8 +231,11 @@ impl ExExNotificationsWithoutHead { } } -impl Stream for ExExNotificationsWithoutHead { - type Item = ExExNotification; +impl Stream for ExExNotificationsWithoutHead +where + E: Unpin + BlockExecutorProvider, +{ + type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().notifications.poll_recv(cx) @@ -229,12 +251,15 @@ impl Stream for ExExNotificationsWithoutHead { /// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to /// process block 11. #[derive(Debug)] -pub struct ExExNotificationsWithHead { +pub struct ExExNotificationsWithHead +where + E: BlockExecutorProvider, +{ node_head: Head, provider: P, executor: E, - notifications: Receiver, - wal_handle: WalHandle, + notifications: Receiver>, + wal_handle: WalHandle, exex_head: ExExHead, /// If true, then we need to check if the ExEx head is on the canonical chain and if not, /// revert its head. @@ -243,17 +268,20 @@ pub struct ExExNotificationsWithHead { /// the missing blocks. pending_check_backfill: bool, /// The backfill job to run before consuming any notifications. - backfill_job: Option>, + backfill_job: Option>>, } -impl ExExNotificationsWithHead { +impl ExExNotificationsWithHead +where + E: BlockExecutorProvider, +{ /// Creates a new [`ExExNotificationsWithHead`]. const fn new( node_head: Head, provider: P, executor: E, - notifications: Receiver, - wal_handle: WalHandle, + notifications: Receiver>, + wal_handle: WalHandle, exex_head: ExExHead, ) -> Self { Self { @@ -272,20 +300,18 @@ impl ExExNotificationsWithHead { impl ExExNotificationsWithHead where - P: BlockReader - + HeaderProvider - + StateProviderFactory + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider> + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, { /// Checks if the ExEx head is on the canonical chain. /// /// If the head block is not found in the database or it's ahead of the node head, it means /// we're not on the canonical chain and we need to revert the notification with the ExEx /// head block. - fn check_canonical(&mut self) -> eyre::Result> { + fn check_canonical(&mut self) -> eyre::Result>> { if self.provider.is_known(&self.exex_head.block.hash)? && self.exex_head.block.number <= self.node_head.number { @@ -309,7 +335,7 @@ where // Update the head block hash to the parent hash of the first committed block. let committed_chain = notification.committed_chain().unwrap(); let new_exex_head = - (committed_chain.first().parent_hash, committed_chain.first().number - 1).into(); + (committed_chain.first().parent_hash(), committed_chain.first().number() - 1).into(); debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated"); self.exex_head.block = new_exex_head; @@ -354,15 +380,13 @@ where impl Stream for ExExNotificationsWithHead where - P: BlockReader - + HeaderProvider - + StateProviderFactory + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider> + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, { - type Item = eyre::Result; + type Item = eyre::Result>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -402,7 +426,7 @@ where this.exex_head.block = committed_chain.tip().num_hash(); } else if let Some(reverted_chain) = notification.reverted_chain() { let first_block = reverted_chain.first(); - this.exex_head.block = (first_block.parent_hash, first_block.number - 1).into(); + this.exex_head.block = (first_block.parent_hash(), first_block.number() - 1).into(); } Poll::Ready(Some(Ok(notification))) diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 86943f33cfa0..b5e0f2034e8d 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -3,9 +3,11 @@ use std::{ collections::{BinaryHeap, HashSet}, }; +use alloy_consensus::BlockHeader; use alloy_eips::BlockNumHash; use alloy_primitives::{map::FbHashMap, BlockNumber, B256}; use reth_exex_types::ExExNotification; +use reth_node_api::NodePrimitives; /// The block cache of the WAL. /// @@ -91,16 +93,16 @@ impl BlockCache { } /// Inserts the blocks from the notification into the cache with the given file ID. - pub(super) fn insert_notification_blocks_with_file_id( + pub(super) fn insert_notification_blocks_with_file_id( &mut self, file_id: u32, - notification: &ExExNotification, + notification: &ExExNotification, ) { let reverted_chain = notification.reverted_chain(); let committed_chain = notification.committed_chain(); let max_block = - reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max(); + reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number()).max(); if let Some(max_block) = max_block { self.notification_max_blocks.push(Reverse((max_block, file_id))); } @@ -108,13 +110,13 @@ impl BlockCache { if let Some(committed_chain) = &committed_chain { for block in committed_chain.blocks().values() { let cached_block = CachedBlock { - block: (block.number, block.hash()).into(), - parent_hash: block.parent_hash, + block: (block.number(), block.hash()).into(), + parent_hash: block.parent_hash(), }; self.committed_blocks.insert(block.hash(), (file_id, cached_block)); } - self.highest_committed_block_height = Some(committed_chain.tip().number); + self.highest_committed_block_height = Some(committed_chain.tip().number()); } } diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 066fbe1b58c1..fb6be6e8c852 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -3,6 +3,8 @@ mod cache; pub use cache::BlockCache; mod storage; +use reth_node_api::NodePrimitives; +use reth_primitives::EthPrimitives; pub use storage::Storage; mod metrics; use metrics::Metrics; @@ -32,23 +34,26 @@ use reth_tracing::tracing::{debug, instrument}; /// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the /// WAL. #[derive(Debug, Clone)] -pub struct Wal { - inner: Arc, +pub struct Wal { + inner: Arc>, } -impl Wal { +impl Wal +where + N: NodePrimitives, +{ /// Creates a new instance of [`Wal`]. pub fn new(directory: impl AsRef) -> eyre::Result { Ok(Self { inner: Arc::new(WalInner::new(directory)?) }) } /// Returns a read-only handle to the WAL. - pub fn handle(&self) -> WalHandle { + pub fn handle(&self) -> WalHandle { WalHandle { wal: self.inner.clone() } } /// Commits the notification to WAL. - pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { + pub fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { self.inner.commit(notification) } @@ -63,7 +68,7 @@ impl Wal { /// Returns an iterator over all notifications in the WAL. pub fn iter_notifications( &self, - ) -> eyre::Result> + '_>> { + ) -> eyre::Result>> + '_>> { self.inner.iter_notifications() } @@ -75,16 +80,19 @@ impl Wal { /// Inner type for the WAL. #[derive(Debug)] -struct WalInner { +struct WalInner { next_file_id: AtomicU32, /// The underlying WAL storage backed by a file. - storage: Storage, + storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. block_cache: RwLock, metrics: Metrics, } -impl WalInner { +impl WalInner +where + N: NodePrimitives, +{ fn new(directory: impl AsRef) -> eyre::Result { let mut wal = Self { next_file_id: AtomicU32::new(0), @@ -137,7 +145,7 @@ impl WalInner { reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] - fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { + fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { let mut block_cache = self.block_cache.write(); let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); @@ -187,7 +195,7 @@ impl WalInner { /// Returns an iterator over all notifications in the WAL. fn iter_notifications( &self, - ) -> eyre::Result> + '_>> { + ) -> eyre::Result>> + '_>> { let Some(range) = self.storage.files_range()? else { return Ok(Box::new(std::iter::empty())) }; @@ -198,16 +206,19 @@ impl WalInner { /// A read-only handle to the WAL that can be shared. #[derive(Debug)] -pub struct WalHandle { - wal: Arc, +pub struct WalHandle { + wal: Arc>, } -impl WalHandle { +impl WalHandle +where + N: NodePrimitives, +{ /// Returns the notification for the given committed block hash if it exists. pub fn get_committed_notification_by_block_hash( &self, block_hash: &B256, - ) -> eyre::Result> { + ) -> eyre::Result>> { let Some(file_id) = self.wal.block_cache().get_file_id_by_committed_block_hash(block_hash) else { return Ok(None) diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index aaa4398fd0b6..699d88ba2a74 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -6,6 +6,8 @@ use std::{ use eyre::OptionExt; use reth_exex_types::ExExNotification; +use reth_node_api::NodePrimitives; +use reth_primitives::EthPrimitives; use reth_tracing::tracing::debug; use tracing::instrument; @@ -16,18 +18,22 @@ static FILE_EXTENSION: &str = "wal"; /// Each notification is represented by a single file that contains a MessagePack-encoded /// notification. #[derive(Debug, Clone)] -pub struct Storage { +pub struct Storage { /// The path to the WAL file. path: PathBuf, + _pd: std::marker::PhantomData, } -impl Storage { +impl Storage +where + N: NodePrimitives, +{ /// Creates a new instance of [`Storage`] backed by the file at the given path and creates /// it doesn't exist. pub(super) fn new(path: impl AsRef) -> eyre::Result { reth_fs_util::create_dir_all(&path)?; - Ok(Self { path: path.as_ref().to_path_buf() }) + Ok(Self { path: path.as_ref().to_path_buf(), _pd: std::marker::PhantomData }) } fn file_path(&self, id: u32) -> PathBuf { @@ -110,7 +116,7 @@ impl Storage { pub(super) fn iter_notifications( &self, range: RangeInclusive, - ) -> impl Iterator> + '_ { + ) -> impl Iterator)>> + '_ { range.map(move |id| { let (notification, size) = self.read_notification(id)?.ok_or_eyre("notification {id} not found")?; @@ -124,7 +130,7 @@ impl Storage { pub(super) fn read_notification( &self, file_id: u32, - ) -> eyre::Result> { + ) -> eyre::Result, u64)>> { let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL"); @@ -136,7 +142,7 @@ impl Storage { let size = file.metadata()?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper - let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> = + let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> = rmp_serde::decode::from_read(&mut file).map_err(|err| { eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}") })?; @@ -153,14 +159,14 @@ impl Storage { pub(super) fn write_notification( &self, file_id: u32, - notification: &ExExNotification, + notification: &ExExNotification, ) -> eyre::Result { let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); // Serialize using the bincode- and msgpack-compatible serde wrapper let notification = - reth_exex_types::serde_bincode_compat::ExExNotification::from(notification); + reth_exex_types::serde_bincode_compat::ExExNotification::::from(notification); reth_fs_util::atomic_write_file(&file_path, |file| { rmp_serde::encode::write(file, ¬ification) @@ -186,7 +192,7 @@ mod tests { let mut rng = generators::rng(); let temp_dir = tempfile::tempdir()?; - let storage = Storage::new(&temp_dir)?; + let storage: Storage = Storage::new(&temp_dir)?; let old_block = random_block(&mut rng, 0, Default::default()) .seal_with_senders() @@ -215,7 +221,7 @@ mod tests { #[test] fn test_files_range() -> eyre::Result<()> { let temp_dir = tempfile::tempdir()?; - let storage = Storage::new(&temp_dir)?; + let storage: Storage = Storage::new(&temp_dir)?; // Create WAL files File::create(storage.file_path(1))?; diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index ca0ea46551c5..9acad4d4b659 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -80,7 +80,7 @@ pub struct TestExecutorBuilder; impl ExecutorBuilder for TestExecutorBuilder where - Node: FullNodeTypes>, + Node: FullNodeTypes>, { type EVM = EthEvmConfig; type Executor = MockExecutorProvider; diff --git a/crates/exex/types/src/notification.rs b/crates/exex/types/src/notification.rs index 5ded40d061bb..44eeb25084a2 100644 --- a/crates/exex/types/src/notification.rs +++ b/crates/exex/types/src/notification.rs @@ -7,30 +7,30 @@ use reth_primitives_traits::NodePrimitives; /// Notifications sent to an `ExEx`. #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum ExExNotification { +pub enum ExExNotification { /// Chain got committed without a reorg, and only the new chain is returned. ChainCommitted { /// The new chain after commit. - new: Arc>, + new: Arc>, }, /// Chain got reorged, and both the old and the new chains are returned. ChainReorged { /// The old chain before reorg. - old: Arc>, + old: Arc>, /// The new chain after reorg. - new: Arc>, + new: Arc>, }, /// Chain got reverted, and only the old chain is returned. ChainReverted { /// The old chain before reversion. - old: Arc>, + old: Arc>, }, } -impl ExExNotification { +impl ExExNotification { /// Returns the committed chain from the [`Self::ChainCommitted`] and [`Self::ChainReorged`] /// variants, if any. - pub fn committed_chain(&self) -> Option> { + pub fn committed_chain(&self) -> Option>> { match self { Self::ChainCommitted { new } | Self::ChainReorged { old: _, new } => Some(new.clone()), Self::ChainReverted { .. } => None, @@ -39,7 +39,7 @@ impl ExExNotification { /// Returns the reverted chain from the [`Self::ChainReorged`] and [`Self::ChainReverted`] /// variants, if any. - pub fn reverted_chain(&self) -> Option> { + pub fn reverted_chain(&self) -> Option>> { match self { Self::ChainReorged { old, new: _ } | Self::ChainReverted { old } => Some(old.clone()), Self::ChainCommitted { .. } => None, diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index 5d25d8d592c7..d640c0dbb0ea 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -53,7 +53,7 @@ pub trait FullNodeComponents: FullNodeTypes + Clone + 'static { type Evm: ConfigureEvm

; /// The type that knows how to execute blocks. - type Executor: BlockExecutorProvider; + type Executor: BlockExecutorProvider::Primitives>; /// The consensus type of the node. type Consensus: Consensus + Clone + Unpin + 'static; diff --git a/crates/node/builder/src/components/builder.rs b/crates/node/builder/src/components/builder.rs index 95c0c764b5c3..46b6824dba73 100644 --- a/crates/node/builder/src/components/builder.rs +++ b/crates/node/builder/src/components/builder.rs @@ -10,7 +10,7 @@ use crate::{ use alloy_consensus::Header; use reth_consensus::Consensus; use reth_evm::execute::BlockExecutorProvider; -use reth_node_api::NodeTypesWithEngine; +use reth_node_api::{NodeTypes, NodeTypesWithEngine}; use reth_payload_builder::PayloadBuilderHandle; use reth_transaction_pool::TransactionPool; use std::{future::Future, marker::PhantomData}; @@ -377,7 +377,7 @@ where Fut: Future>> + Send, Pool: TransactionPool + Unpin + 'static, EVM: ConfigureEvm
, - Executor: BlockExecutorProvider, + Executor: BlockExecutorProvider::Primitives>, Cons: Consensus + Clone + Unpin + 'static, { type Components = Components; diff --git a/crates/node/builder/src/components/execute.rs b/crates/node/builder/src/components/execute.rs index 4e8f63f412bc..0c75ef3016f2 100644 --- a/crates/node/builder/src/components/execute.rs +++ b/crates/node/builder/src/components/execute.rs @@ -13,7 +13,9 @@ pub trait ExecutorBuilder: Send { type EVM: ConfigureEvm
; /// The type that knows how to execute blocks. - type Executor: BlockExecutorProvider; + type Executor: BlockExecutorProvider< + Primitives = ::Primitives, + >; /// Creates the EVM config. fn build_evm( @@ -26,7 +28,8 @@ impl ExecutorBuilder for F where Node: FullNodeTypes, EVM: ConfigureEvm
, - Executor: BlockExecutorProvider, + Executor: + BlockExecutorProvider::Primitives>, F: FnOnce(&BuilderContext) -> Fut + Send, Fut: Future> + Send, { diff --git a/crates/node/builder/src/components/mod.rs b/crates/node/builder/src/components/mod.rs index 1fe35e554d51..a3f3017463db 100644 --- a/crates/node/builder/src/components/mod.rs +++ b/crates/node/builder/src/components/mod.rs @@ -27,7 +27,7 @@ use reth_consensus::Consensus; use reth_evm::execute::BlockExecutorProvider; use reth_network::NetworkHandle; use reth_network_api::FullNetwork; -use reth_node_api::NodeTypesWithEngine; +use reth_node_api::{NodeTypes, NodeTypesWithEngine}; use reth_payload_builder::PayloadBuilderHandle; use reth_transaction_pool::TransactionPool; @@ -44,7 +44,7 @@ pub trait NodeComponents: Clone + Unpin + Send + Sync + 'stati type Evm: ConfigureEvm
; /// The type that knows how to execute blocks. - type Executor: BlockExecutorProvider; + type Executor: BlockExecutorProvider::Primitives>; /// The consensus type of the node. type Consensus: Consensus + Clone + Unpin + 'static; @@ -99,7 +99,7 @@ where Node: FullNodeTypes, Pool: TransactionPool + Unpin + 'static, EVM: ConfigureEvm
, - Executor: BlockExecutorProvider, + Executor: BlockExecutorProvider::Primitives>, Cons: Consensus + Clone + Unpin + 'static, { type Pool = Pool; diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 092c1fdf6518..5c76718a3b17 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -35,12 +35,12 @@ pub fn build_networked_pipeline( max_block: Option, static_file_producer: StaticFileProducer>, executor: Executor, - exex_manager_handle: ExExManagerHandle, + exex_manager_handle: ExExManagerHandle, ) -> eyre::Result> where N: ProviderNodeTypes, Client: EthBlockClient + 'static, - Executor: BlockExecutorProvider, + Executor: BlockExecutorProvider, N::Primitives: FullNodePrimitives< Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody, @@ -86,13 +86,13 @@ pub fn build_pipeline( prune_config: Option, static_file_producer: StaticFileProducer>, executor: Executor, - exex_manager_handle: ExExManagerHandle, + exex_manager_handle: ExExManagerHandle, ) -> eyre::Result> where N: ProviderNodeTypes, H: HeaderDownloader
+ 'static, B: BodyDownloader> + 'static, - Executor: BlockExecutorProvider, + Executor: BlockExecutorProvider, N::Primitives: FullNodePrimitives< Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody, diff --git a/crates/optimism/evm/Cargo.toml b/crates/optimism/evm/Cargo.toml index 807f224ca4b8..b5d6fac50733 100644 --- a/crates/optimism/evm/Cargo.toml +++ b/crates/optimism/evm/Cargo.toml @@ -32,6 +32,7 @@ alloy-consensus.workspace = true reth-optimism-consensus.workspace = true reth-optimism-chainspec.workspace = true reth-optimism-forks.workspace = true +reth-optimism-primitives.workspace = true # revm revm.workspace = true diff --git a/crates/optimism/evm/src/execute.rs b/crates/optimism/evm/src/execute.rs index 1c93d2b71d03..4b4bccae4067 100644 --- a/crates/optimism/evm/src/execute.rs +++ b/crates/optimism/evm/src/execute.rs @@ -20,6 +20,7 @@ use reth_evm::{ use reth_optimism_chainspec::OpChainSpec; use reth_optimism_consensus::validate_block_post_execution; use reth_optimism_forks::OpHardfork; +use reth_optimism_primitives::OpPrimitives; use reth_primitives::{BlockWithSenders, Receipt, TxType}; use reth_revm::{Database, State}; use revm_primitives::{db::DatabaseCommit, EnvWithHandlerCfg, ResultAndState, U256}; @@ -53,6 +54,7 @@ where EvmConfig: Clone + Unpin + Sync + Send + 'static + ConfigureEvm
, { + type Primitives = OpPrimitives; type Strategy + Display>> = OpExecutionStrategy; @@ -109,11 +111,13 @@ where } } -impl BlockExecutionStrategy for OpExecutionStrategy +impl BlockExecutionStrategy for OpExecutionStrategy where DB: Database + Display>, EvmConfig: ConfigureEvm
, { + type DB = DB; + type Primitives = OpPrimitives; type Error = BlockExecutionError; fn init(&mut self, tx_env_overrides: Box) { diff --git a/crates/optimism/node/src/node.rs b/crates/optimism/node/src/node.rs index d6cd47cf2af1..429cb9ae229b 100644 --- a/crates/optimism/node/src/node.rs +++ b/crates/optimism/node/src/node.rs @@ -277,7 +277,7 @@ pub struct OpExecutorBuilder; impl ExecutorBuilder for OpExecutorBuilder where - Node: FullNodeTypes>, + Node: FullNodeTypes>, { type EVM = OpEvmConfig; type Executor = BasicBlockExecutorProvider; diff --git a/crates/revm/src/batch.rs b/crates/revm/src/batch.rs index 15ba049250f5..01b0bd421d75 100644 --- a/crates/revm/src/batch.rs +++ b/crates/revm/src/batch.rs @@ -14,7 +14,7 @@ use revm::db::states::bundle_state::BundleRetention; /// - recording receipts during execution of multiple blocks. /// - pruning receipts according to the pruning configuration. /// - batch range if known -#[derive(Debug, Default)] +#[derive(Debug)] pub struct BlockBatchRecord { /// Pruning configuration. prune_modes: PruneModes, @@ -43,6 +43,19 @@ pub struct BlockBatchRecord { tip: Option, } +impl Default for BlockBatchRecord { + fn default() -> Self { + Self { + prune_modes: Default::default(), + receipts: Default::default(), + requests: Default::default(), + pruning_address_filter: Default::default(), + first_block: Default::default(), + tip: Default::default(), + } + } +} + impl BlockBatchRecord { /// Create a new receipts recorder with the given pruning configuration. pub fn new(prune_modes: PruneModes) -> Self @@ -83,10 +96,7 @@ impl BlockBatchRecord { } /// Returns all recorded receipts. - pub fn take_receipts(&mut self) -> Receipts - where - T: Default, - { + pub fn take_receipts(&mut self) -> Receipts { core::mem::take(&mut self.receipts) } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 8f5c84835aaa..46b3888f05b1 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -48,7 +48,7 @@ //! Events: //! CanonStateSubscriptions + Clone + 'static, //! EvmConfig: ConfigureEvm
, -//! BlockExecutor: BlockExecutorProvider, +//! BlockExecutor: BlockExecutorProvider, //! Consensus: reth_consensus::Consensus + Clone + 'static, //! { //! // configure the rpc module per transport @@ -130,7 +130,7 @@ //! EngineApi: EngineApiServer, //! EngineT: EngineTypes, //! EvmConfig: ConfigureEvm
, -//! BlockExecutor: BlockExecutorProvider, +//! BlockExecutor: BlockExecutorProvider, //! Consensus: reth_consensus::Consensus + Clone + 'static, //! { //! // configure the rpc module per transport @@ -198,7 +198,7 @@ use reth_consensus::Consensus; use reth_engine_primitives::EngineTypes; use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm}; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; -use reth_primitives::EthPrimitives; +use reth_primitives::{EthPrimitives, NodePrimitives}; use reth_provider::{ AccountReader, BlockReader, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, FullRpcProvider, ReceiptProvider, StateProviderFactory, @@ -278,7 +278,12 @@ where Events: CanonStateSubscriptions + Clone + 'static, EvmConfig: ConfigureEvm
, EthApi: FullEthApiServer, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, { let module_config = module_config.into(); server_config @@ -630,7 +635,12 @@ where Tasks: TaskSpawner + Clone + 'static, Events: CanonStateSubscriptions + Clone + 'static, EvmConfig: ConfigureEvm
, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, Consensus: reth_consensus::Consensus + Clone + 'static, { /// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can @@ -1104,7 +1114,8 @@ where RpcBlock, RpcReceipt, > + EthApiTypes, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: + BlockExecutorProvider>, { /// Register Eth Namespace /// @@ -1250,7 +1261,8 @@ where pub fn debug_api(&self) -> DebugApi where EthApi: EthApiSpec + EthTransactions + TraceExt, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: + BlockExecutorProvider>, { DebugApi::new( self.provider.clone(), @@ -1306,7 +1318,12 @@ where Tasks: TaskSpawner + Clone + 'static, Events: CanonStateSubscriptions + Clone + 'static, EthApi: FullEthApiServer, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, Consensus: reth_consensus::Consensus + Clone + 'static, { /// Configures the auth module that includes the diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index 9fc1be93a2f2..d64cdf3afeac 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -18,17 +18,17 @@ use reth_evm::{ execute::{BlockExecutorProvider, Executor}, ConfigureEvmEnv, }; -use reth_primitives::{Block, BlockExt, SealedBlockWithSenders}; +use reth_primitives::{Block, BlockExt, NodePrimitives, SealedBlockWithSenders}; use reth_primitives_traits::SignedTransaction; use reth_provider::{ - BlockReaderIdExt, ChainSpecProvider, HeaderProvider, StateProofProvider, StateProviderFactory, - TransactionVariant, + BlockReader, BlockReaderIdExt, ChainSpecProvider, HeaderProvider, StateProofProvider, + StateProviderFactory, TransactionVariant, }; use reth_revm::{database::StateProviderDatabase, witness::ExecutionWitnessRecord}; use reth_rpc_api::DebugApiServer; use reth_rpc_eth_api::{ helpers::{EthApiSpec, EthTransactions, TraceExt}, - EthApiTypes, FromEthApiError, + EthApiTypes, FromEthApiError, RpcNodeCore, }; use reth_rpc_eth_types::{EthApiError, StateCacheDb}; use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult}; @@ -81,7 +81,9 @@ where + StateProviderFactory + 'static, Eth: EthApiTypes + TraceExt + 'static, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives::Provider as BlockReader>::Block>, + >, { /// Acquires a permit to execute a tracing call. async fn acquire_trace_permit(&self) -> Result { @@ -800,7 +802,9 @@ where + StateProviderFactory + 'static, Eth: EthApiSpec + EthTransactions + TraceExt + 'static, - BlockExecutor: BlockExecutorProvider, + BlockExecutor: BlockExecutorProvider< + Primitives: NodePrimitives::Provider as BlockReader>::Block>, + >, { /// Handler for `debug_getRawHeader` async fn raw_header(&self, block_id: BlockId) -> RpcResult { diff --git a/crates/rpc/rpc/src/validation.rs b/crates/rpc/rpc/src/validation.rs index a5e29bb739f9..1885c8ad2e0c 100644 --- a/crates/rpc/rpc/src/validation.rs +++ b/crates/rpc/rpc/src/validation.rs @@ -1,4 +1,4 @@ -use alloy_consensus::{BlobTransactionValidationError, EnvKzgSettings, Transaction}; +use alloy_consensus::{BlobTransactionValidationError, EnvKzgSettings, Transaction, TxReceipt}; use alloy_eips::eip4844::kzg_to_versioned_hash; use alloy_rpc_types_beacon::relay::{ BidTrace, BuilderBlockValidationRequest, BuilderBlockValidationRequestV2, @@ -15,7 +15,7 @@ use reth_errors::{BlockExecutionError, ConsensusError, ProviderError}; use reth_ethereum_consensus::GAS_LIMIT_BOUND_DIVISOR; use reth_evm::execute::{BlockExecutorProvider, Executor}; use reth_payload_validator::ExecutionPayloadValidator; -use reth_primitives::{Block, GotExpected, Receipt, SealedBlockWithSenders, SealedHeader}; +use reth_primitives::{Block, GotExpected, NodePrimitives, SealedBlockWithSenders, SealedHeader}; use reth_provider::{ AccountReader, BlockExecutionInput, BlockExecutionOutput, BlockReaderIdExt, HeaderProvider, StateProviderFactory, WithdrawalsProvider, @@ -95,7 +95,12 @@ where + AccountReader + WithdrawalsProvider + 'static, - E: BlockExecutorProvider, + E: BlockExecutorProvider< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, { /// Validates the given block and a [`BidTrace`] against it. pub async fn validate_message_against_block( @@ -258,7 +263,7 @@ where fn ensure_payment( &self, block: &Block, - output: &BlockExecutionOutput, + output: &BlockExecutionOutput<::Receipt>, message: &BidTrace, ) -> Result<(), ValidationApiError> { let (mut balance_before, balance_after) = if let Some(acc) = @@ -292,7 +297,7 @@ where .zip(block.body.transactions.last()) .ok_or(ValidationApiError::ProposerPayment)?; - if !receipt.success { + if !receipt.status() { return Err(ValidationApiError::ProposerPayment) } @@ -407,7 +412,12 @@ where + WithdrawalsProvider + Clone + 'static, - E: BlockExecutorProvider, + E: BlockExecutorProvider< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, { async fn validate_builder_submission_v1( &self, diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index d04a96470a03..df5a4c542bfa 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -20,8 +20,9 @@ //! # use reth_static_file::StaticFileProducer; //! # use reth_config::config::StageConfig; //! # use reth_evm::execute::BlockExecutorProvider; +//! # use reth_primitives::EthPrimitives; //! -//! # fn create(exec: impl BlockExecutorProvider) { +//! # fn create(exec: impl BlockExecutorProvider) { //! //! let provider_factory = create_test_provider_factory(); //! let static_file_producer = diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index ce969f2577d8..f7832dd788ee 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -67,7 +67,10 @@ use super::missing_static_data_error; /// values to [`tables::PlainStorageState`] // false positive, we cannot derive it if !DB: Debug. #[allow(missing_debug_implementations)] -pub struct ExecutionStage { +pub struct ExecutionStage +where + E: BlockExecutorProvider, +{ /// The stage's internal block executor executor_provider: E, /// The commit thresholds of the execution stage. @@ -82,25 +85,28 @@ pub struct ExecutionStage { /// Input for the post execute commit hook. /// Set after every [`ExecutionStage::execute`] and cleared after /// [`ExecutionStage::post_execute_commit`]. - post_execute_commit_input: Option, + post_execute_commit_input: Option>, /// Input for the post unwind commit hook. /// Set after every [`ExecutionStage::unwind`] and cleared after /// [`ExecutionStage::post_unwind_commit`]. - post_unwind_commit_input: Option, + post_unwind_commit_input: Option>, /// Handle to communicate with `ExEx` manager. - exex_manager_handle: ExExManagerHandle, + exex_manager_handle: ExExManagerHandle, /// Executor metrics. metrics: ExecutorMetrics, } -impl ExecutionStage { +impl ExecutionStage +where + E: BlockExecutorProvider, +{ /// Create new execution stage with specified config. pub fn new( executor_provider: E, thresholds: ExecutionStageThresholds, external_clean_threshold: u64, prune_modes: PruneModes, - exex_manager_handle: ExExManagerHandle, + exex_manager_handle: ExExManagerHandle, ) -> Self { Self { external_clean_threshold, @@ -257,13 +263,13 @@ impl ExecutionStage { impl Stage for ExecutionStage where - E: BlockExecutorProvider, + E: BlockExecutorProvider>, Provider: DBProvider - + BlockReader + + BlockReader::Block> + StaticFileProviderFactory + StatsReader + BlockHashReader - + StateWriter + + StateWriter::Receipt> + StateCommitmentProvider, { /// Return the id of the stage @@ -373,7 +379,7 @@ where } stage_progress = block_number; - stage_checkpoint.progress.processed += block.gas_used(); + stage_checkpoint.progress.processed += block.header().gas_used(); // If we have ExExes we need to save the block in memory for later if self.exex_manager_handle.has_exexs() { @@ -512,7 +518,8 @@ where stage_checkpoint.progress.processed -= provider .block_by_number(block_number)? .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))? - .gas_used; + .header() + .gas_used(); } } let checkpoint = if let Some(stage_checkpoint) = stage_checkpoint { diff --git a/examples/custom-beacon-withdrawals/src/main.rs b/examples/custom-beacon-withdrawals/src/main.rs index ccba73afbc1d..3d756ae92c0f 100644 --- a/examples/custom-beacon-withdrawals/src/main.rs +++ b/examples/custom-beacon-withdrawals/src/main.rs @@ -26,7 +26,7 @@ use reth_evm::execute::{ }; use reth_evm_ethereum::EthEvmConfig; use reth_node_ethereum::{node::EthereumAddOns, BasicBlockExecutorProvider, EthereumNode}; -use reth_primitives::{BlockWithSenders, Receipt}; +use reth_primitives::{BlockWithSenders, EthPrimitives, Receipt}; use std::{fmt::Display, sync::Arc}; pub const SYSTEM_ADDRESS: Address = address!("fffffffffffffffffffffffffffffffffffffffe"); @@ -59,7 +59,7 @@ pub struct CustomExecutorBuilder; impl ExecutorBuilder for CustomExecutorBuilder where - Types: NodeTypesWithEngine, + Types: NodeTypesWithEngine, Node: FullNodeTypes, { type EVM = EthEvmConfig; @@ -88,6 +88,7 @@ pub struct CustomExecutorStrategyFactory { } impl BlockExecutionStrategyFactory for CustomExecutorStrategyFactory { + type Primitives = EthPrimitives; type Strategy + Display>> = CustomExecutorStrategy; fn create_strategy(&self, db: DB) -> Self::Strategy @@ -135,10 +136,12 @@ where } } -impl BlockExecutionStrategy for CustomExecutorStrategy +impl BlockExecutionStrategy for CustomExecutorStrategy where DB: Database + Display>, { + type DB = DB; + type Primitives = EthPrimitives; type Error = BlockExecutionError; fn apply_pre_execution_changes( diff --git a/examples/custom-evm/src/main.rs b/examples/custom-evm/src/main.rs index b9a4fc26a95b..e7999818ae18 100644 --- a/examples/custom-evm/src/main.rs +++ b/examples/custom-evm/src/main.rs @@ -153,7 +153,7 @@ pub struct MyExecutorBuilder; impl ExecutorBuilder for MyExecutorBuilder where - Node: FullNodeTypes>, + Node: FullNodeTypes>, { type EVM = MyEvmConfig; type Executor = BasicBlockExecutorProvider>; diff --git a/examples/stateful-precompile/src/main.rs b/examples/stateful-precompile/src/main.rs index f683af4e430a..29d5051434bf 100644 --- a/examples/stateful-precompile/src/main.rs +++ b/examples/stateful-precompile/src/main.rs @@ -28,7 +28,7 @@ use reth_node_ethereum::{ node::EthereumAddOns, BasicBlockExecutorProvider, EthEvmConfig, EthExecutionStrategyFactory, EthereumNode, }; -use reth_primitives::TransactionSigned; +use reth_primitives::{EthPrimitives, TransactionSigned}; use reth_tracing::{RethTracer, Tracer}; use schnellru::{ByLength, LruMap}; use std::{collections::HashMap, convert::Infallible, sync::Arc}; @@ -226,7 +226,7 @@ pub struct MyExecutorBuilder { impl ExecutorBuilder for MyExecutorBuilder where - Node: FullNodeTypes>, + Node: FullNodeTypes>, { type EVM = MyEvmConfig; type Executor = BasicBlockExecutorProvider>;