diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index be33e1fd79a8..a850e66521a6 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -4,7 +4,7 @@ use crate::{ CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, ChainInfoTracker, MemoryOverlayStateProvider, }; -use alloy_eips::BlockNumHash; +use alloy_eips::{BlockHashOrNumber, BlockNumHash}; use alloy_primitives::{map::HashMap, Address, TxHash, B256}; use parking_lot::RwLock; use reth_chainspec::ChainInfo; @@ -514,7 +514,7 @@ impl CanonicalInMemoryState { historical: StateProviderBox, ) -> MemoryOverlayStateProvider { let in_memory = if let Some(state) = self.state_by_hash(hash) { - state.chain().into_iter().map(|block_state| block_state.block()).collect() + state.chain().map(|block_state| block_state.block()).collect() } else { Vec::new() }; @@ -692,10 +692,8 @@ impl BlockState { /// Returns a vector of `BlockStates` representing the entire in memory chain. /// The block state order in the output vector is newest to oldest (highest to lowest), /// including self as the first element. - pub fn chain(&self) -> Vec<&Self> { - let mut chain = vec![self]; - self.append_parent_chain(&mut chain); - chain + pub fn chain(&self) -> impl Iterator { + std::iter::successors(Some(self), |state| state.parent.as_deref()) } /// Appends the parent chain of this [`BlockState`] to the given vector. @@ -715,10 +713,59 @@ impl BlockState { /// This merges the state of all blocks that are part of the chain that the this block is /// the head of. This includes all blocks that connect back to the canonical block on disk. pub fn state_provider(&self, historical: StateProviderBox) -> MemoryOverlayStateProvider { - let in_memory = self.chain().into_iter().map(|block_state| block_state.block()).collect(); + let in_memory = self.chain().map(|block_state| block_state.block()).collect(); MemoryOverlayStateProvider::new(historical, in_memory) } + + /// Tries to find a block by [`BlockHashOrNumber`] in the chain ending at this block. + pub fn block_on_chain(&self, hash_or_num: BlockHashOrNumber) -> Option<&Self> { + self.chain().find(|block| match hash_or_num { + BlockHashOrNumber::Hash(hash) => block.hash() == hash, + BlockHashOrNumber::Number(number) => block.number() == number, + }) + } + + /// Tries to find a transaction by [`TxHash`] in the chain ending at this block. + pub fn transaction_on_chain(&self, hash: TxHash) -> Option { + self.chain().find_map(|block_state| { + block_state + .block_ref() + .block() + .body + .transactions() + .find(|tx| tx.hash() == hash) + .cloned() + }) + } + + /// Tries to find a transaction with meta by [`TxHash`] in the chain ending at this block. + pub fn transaction_meta_on_chain( + &self, + tx_hash: TxHash, + ) -> Option<(TransactionSigned, TransactionMeta)> { + self.chain().find_map(|block_state| { + block_state + .block_ref() + .block() + .body + .transactions() + .enumerate() + .find(|(_, tx)| tx.hash() == tx_hash) + .map(|(index, tx)| { + let meta = TransactionMeta { + tx_hash, + index: index as u64, + block_hash: block_state.hash(), + block_number: block_state.block_ref().block.number, + base_fee: block_state.block_ref().block.header.base_fee_per_gas, + timestamp: block_state.block_ref().block.timestamp, + excess_blob_gas: block_state.block_ref().block.excess_blob_gas, + }; + (tx.clone(), meta) + }) + }) + } } /// Represents an executed block stored in-memory. @@ -1382,7 +1429,7 @@ mod tests { let parents = single_block.parent_state_chain(); assert_eq!(parents.len(), 0); - let block_state_chain = single_block.chain(); + let block_state_chain = single_block.chain().collect::>(); assert_eq!(block_state_chain.len(), 1); assert_eq!(block_state_chain[0].block().block.number, single_block_number); assert_eq!(block_state_chain[0].block().block.hash(), single_block_hash); @@ -1393,18 +1440,18 @@ mod tests { let mut test_block_builder = TestBlockBuilder::default(); let chain = create_mock_state_chain(&mut test_block_builder, 3); - let block_state_chain = chain[2].chain(); + let block_state_chain = chain[2].chain().collect::>(); assert_eq!(block_state_chain.len(), 3); assert_eq!(block_state_chain[0].block().block.number, 3); assert_eq!(block_state_chain[1].block().block.number, 2); assert_eq!(block_state_chain[2].block().block.number, 1); - let block_state_chain = chain[1].chain(); + let block_state_chain = chain[1].chain().collect::>(); assert_eq!(block_state_chain.len(), 2); assert_eq!(block_state_chain[0].block().block.number, 2); assert_eq!(block_state_chain[1].block().block.number, 1); - let block_state_chain = chain[0].chain(); + let block_state_chain = chain[0].chain().collect::>(); assert_eq!(block_state_chain.len(), 1); assert_eq!(block_state_chain[0].block().block.number, 1); } diff --git a/crates/chain-state/src/lib.rs b/crates/chain-state/src/lib.rs index 50a103111071..bd9b43a59eae 100644 --- a/crates/chain-state/src/lib.rs +++ b/crates/chain-state/src/lib.rs @@ -22,7 +22,7 @@ pub use notifications::{ }; mod memory_overlay; -pub use memory_overlay::MemoryOverlayStateProvider; +pub use memory_overlay::{MemoryOverlayStateProvider, MemoryOverlayStateProviderRef}; #[cfg(any(test, feature = "test-utils"))] /// Common test helpers diff --git a/crates/chain-state/src/memory_overlay.rs b/crates/chain-state/src/memory_overlay.rs index eb125dad115e..ada0faee4907 100644 --- a/crates/chain-state/src/memory_overlay.rs +++ b/crates/chain-state/src/memory_overlay.rs @@ -7,14 +7,26 @@ use alloy_primitives::{ use reth_errors::ProviderResult; use reth_primitives::{Account, Bytecode}; use reth_storage_api::{ - AccountReader, BlockHashReader, StateProofProvider, StateProvider, StateProviderBox, - StateRootProvider, StorageRootProvider, + AccountReader, BlockHashReader, StateProofProvider, StateProvider, StateRootProvider, + StorageRootProvider, }; use reth_trie::{ updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof, TrieInput, }; use std::sync::OnceLock; +/// A state provider that stores references to in-memory blocks along with their state as well as a +/// reference of the historical state provider for fallback lookups. +#[allow(missing_debug_implementations)] +pub struct MemoryOverlayStateProviderRef<'a> { + /// Historical state provider for state lookups that are not found in in-memory blocks. + pub(crate) historical: Box, + /// The collection of executed parent blocks. Expected order is newest to oldest. + pub(crate) in_memory: Vec, + /// Lazy-loaded in-memory trie data. + pub(crate) trie_state: OnceLock, +} + /// A state provider that stores references to in-memory blocks along with their state as well as /// the historical state provider for fallback lookups. #[allow(missing_debug_implementations)] @@ -27,193 +39,200 @@ pub struct MemoryOverlayStateProvider { pub(crate) trie_state: OnceLock, } -impl MemoryOverlayStateProvider { - /// Create new memory overlay state provider. - /// - /// ## Arguments - /// - /// - `in_memory` - the collection of executed ancestor blocks in reverse. - /// - `historical` - a historical state provider for the latest ancestor block stored in the - /// database. - pub fn new(historical: Box, in_memory: Vec) -> Self { - Self { historical, in_memory, trie_state: OnceLock::new() } - } - - /// Turn this state provider into a [`StateProviderBox`] - pub fn boxed(self) -> StateProviderBox { - Box::new(self) - } - - /// Return lazy-loaded trie state aggregated from in-memory blocks. - fn trie_state(&self) -> &MemoryOverlayTrieState { - self.trie_state.get_or_init(|| { - let mut trie_state = MemoryOverlayTrieState::default(); - for block in self.in_memory.iter().rev() { - trie_state.state.extend_ref(block.hashed_state.as_ref()); - trie_state.nodes.extend_ref(block.trie.as_ref()); - } - trie_state - }) - } -} +macro_rules! impl_state_provider { + ([$($tokens:tt)*],$type:ty, $historical_type:ty) => { + impl $($tokens)* $type { + /// Create new memory overlay state provider. + /// + /// ## Arguments + /// + /// - `in_memory` - the collection of executed ancestor blocks in reverse. + /// - `historical` - a historical state provider for the latest ancestor block stored in the + /// database. + pub fn new(historical: $historical_type, in_memory: Vec) -> Self { + Self { historical, in_memory, trie_state: OnceLock::new() } + } + + /// Turn this state provider into a state provider + pub fn boxed(self) -> $historical_type { + Box::new(self) + } -impl BlockHashReader for MemoryOverlayStateProvider { - fn block_hash(&self, number: BlockNumber) -> ProviderResult> { - for block in &self.in_memory { - if block.block.number == number { - return Ok(Some(block.block.hash())) + /// Return lazy-loaded trie state aggregated from in-memory blocks. + fn trie_state(&self) -> &MemoryOverlayTrieState { + self.trie_state.get_or_init(|| { + let mut trie_state = MemoryOverlayTrieState::default(); + for block in self.in_memory.iter().rev() { + trie_state.state.extend_ref(block.hashed_state.as_ref()); + trie_state.nodes.extend_ref(block.trie.as_ref()); + } + trie_state + }) } } - self.historical.block_hash(number) - } - - fn canonical_hashes_range( - &self, - start: BlockNumber, - end: BlockNumber, - ) -> ProviderResult> { - let range = start..end; - let mut earliest_block_number = None; - let mut in_memory_hashes = Vec::new(); - for block in &self.in_memory { - if range.contains(&block.block.number) { - in_memory_hashes.insert(0, block.block.hash()); - earliest_block_number = Some(block.block.number); + impl $($tokens)* BlockHashReader for $type { + fn block_hash(&self, number: BlockNumber) -> ProviderResult> { + for block in &self.in_memory { + if block.block.number == number { + return Ok(Some(block.block.hash())) + } + } + + self.historical.block_hash(number) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + let range = start..end; + let mut earliest_block_number = None; + let mut in_memory_hashes = Vec::new(); + for block in &self.in_memory { + if range.contains(&block.block.number) { + in_memory_hashes.insert(0, block.block.hash()); + earliest_block_number = Some(block.block.number); + } + } + + let mut hashes = + self.historical.canonical_hashes_range(start, earliest_block_number.unwrap_or(end))?; + hashes.append(&mut in_memory_hashes); + Ok(hashes) } } - let mut hashes = - self.historical.canonical_hashes_range(start, earliest_block_number.unwrap_or(end))?; - hashes.append(&mut in_memory_hashes); - Ok(hashes) - } -} + impl $($tokens)* AccountReader for $type { + fn basic_account(&self, address: Address) -> ProviderResult> { + for block in &self.in_memory { + if let Some(account) = block.execution_output.account(&address) { + return Ok(account) + } + } -impl AccountReader for MemoryOverlayStateProvider { - fn basic_account(&self, address: Address) -> ProviderResult> { - for block in &self.in_memory { - if let Some(account) = block.execution_output.account(&address) { - return Ok(account) + self.historical.basic_account(address) } } - self.historical.basic_account(address) - } -} + impl $($tokens)* StateRootProvider for $type { + fn state_root(&self, state: HashedPostState) -> ProviderResult { + self.state_root_from_nodes(TrieInput::from_state(state)) + } -impl StateRootProvider for MemoryOverlayStateProvider { - fn state_root(&self, state: HashedPostState) -> ProviderResult { - self.state_root_from_nodes(TrieInput::from_state(state)) - } - - fn state_root_from_nodes(&self, mut input: TrieInput) -> ProviderResult { - let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); - input.prepend_cached(nodes, state); - self.historical.state_root_from_nodes(input) - } - - fn state_root_with_updates( - &self, - state: HashedPostState, - ) -> ProviderResult<(B256, TrieUpdates)> { - self.state_root_from_nodes_with_updates(TrieInput::from_state(state)) - } - - fn state_root_from_nodes_with_updates( - &self, - mut input: TrieInput, - ) -> ProviderResult<(B256, TrieUpdates)> { - let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); - input.prepend_cached(nodes, state); - self.historical.state_root_from_nodes_with_updates(input) - } -} + fn state_root_from_nodes(&self, mut input: TrieInput) -> ProviderResult { + let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); + input.prepend_cached(nodes, state); + self.historical.state_root_from_nodes(input) + } -impl StorageRootProvider for MemoryOverlayStateProvider { - // TODO: Currently this does not reuse available in-memory trie nodes. - fn storage_root(&self, address: Address, storage: HashedStorage) -> ProviderResult { - let state = &self.trie_state().state; - let mut hashed_storage = - state.storages.get(&keccak256(address)).cloned().unwrap_or_default(); - hashed_storage.extend(&storage); - self.historical.storage_root(address, hashed_storage) - } - - // TODO: Currently this does not reuse available in-memory trie nodes. - fn storage_proof( - &self, - address: Address, - slot: B256, - storage: HashedStorage, - ) -> ProviderResult { - let state = &self.trie_state().state; - let mut hashed_storage = - state.storages.get(&keccak256(address)).cloned().unwrap_or_default(); - hashed_storage.extend(&storage); - self.historical.storage_proof(address, slot, hashed_storage) - } -} + fn state_root_with_updates( + &self, + state: HashedPostState, + ) -> ProviderResult<(B256, TrieUpdates)> { + self.state_root_from_nodes_with_updates(TrieInput::from_state(state)) + } -impl StateProofProvider for MemoryOverlayStateProvider { - fn proof( - &self, - mut input: TrieInput, - address: Address, - slots: &[B256], - ) -> ProviderResult { - let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); - input.prepend_cached(nodes, state); - self.historical.proof(input, address, slots) - } - - fn multiproof( - &self, - mut input: TrieInput, - targets: HashMap>, - ) -> ProviderResult { - let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); - input.prepend_cached(nodes, state); - self.historical.multiproof(input, targets) - } - - fn witness( - &self, - mut input: TrieInput, - target: HashedPostState, - ) -> ProviderResult> { - let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); - input.prepend_cached(nodes, state); - self.historical.witness(input, target) - } -} + fn state_root_from_nodes_with_updates( + &self, + mut input: TrieInput, + ) -> ProviderResult<(B256, TrieUpdates)> { + let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); + input.prepend_cached(nodes, state); + self.historical.state_root_from_nodes_with_updates(input) + } + } + + impl $($tokens)* StorageRootProvider for $type { + // TODO: Currently this does not reuse available in-memory trie nodes. + fn storage_root(&self, address: Address, storage: HashedStorage) -> ProviderResult { + let state = &self.trie_state().state; + let mut hashed_storage = + state.storages.get(&keccak256(address)).cloned().unwrap_or_default(); + hashed_storage.extend(&storage); + self.historical.storage_root(address, hashed_storage) + } -impl StateProvider for MemoryOverlayStateProvider { - fn storage( - &self, - address: Address, - storage_key: StorageKey, - ) -> ProviderResult> { - for block in &self.in_memory { - if let Some(value) = block.execution_output.storage(&address, storage_key.into()) { - return Ok(Some(value)) + // TODO: Currently this does not reuse available in-memory trie nodes. + fn storage_proof( + &self, + address: Address, + slot: B256, + storage: HashedStorage, + ) -> ProviderResult { + let state = &self.trie_state().state; + let mut hashed_storage = + state.storages.get(&keccak256(address)).cloned().unwrap_or_default(); + hashed_storage.extend(&storage); + self.historical.storage_proof(address, slot, hashed_storage) } } - self.historical.storage(address, storage_key) - } + impl $($tokens)* StateProofProvider for $type { + fn proof( + &self, + mut input: TrieInput, + address: Address, + slots: &[B256], + ) -> ProviderResult { + let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); + input.prepend_cached(nodes, state); + self.historical.proof(input, address, slots) + } + + fn multiproof( + &self, + mut input: TrieInput, + targets: HashMap>, + ) -> ProviderResult { + let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); + input.prepend_cached(nodes, state); + self.historical.multiproof(input, targets) + } - fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { - for block in &self.in_memory { - if let Some(contract) = block.execution_output.bytecode(&code_hash) { - return Ok(Some(contract)) + fn witness( + &self, + mut input: TrieInput, + target: HashedPostState, + ) -> ProviderResult> { + let MemoryOverlayTrieState { nodes, state } = self.trie_state().clone(); + input.prepend_cached(nodes, state); + self.historical.witness(input, target) } } - self.historical.bytecode_by_hash(code_hash) - } + impl $($tokens)* StateProvider for $type { + fn storage( + &self, + address: Address, + storage_key: StorageKey, + ) -> ProviderResult> { + for block in &self.in_memory { + if let Some(value) = block.execution_output.storage(&address, storage_key.into()) { + return Ok(Some(value)) + } + } + + self.historical.storage(address, storage_key) + } + + fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + for block in &self.in_memory { + if let Some(contract) = block.execution_output.bytecode(&code_hash) { + return Ok(Some(contract)) + } + } + + self.historical.bytecode_by_hash(code_hash) + } + } + }; } +impl_state_provider!([], MemoryOverlayStateProvider, Box); +impl_state_provider!([<'a>], MemoryOverlayStateProviderRef<'a>, Box); + /// The collection of data necessary for trie-related operations for [`MemoryOverlayStateProvider`]. #[derive(Clone, Default, Debug)] pub(crate) struct MemoryOverlayTrieState { diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 026476a8260f..198438d457f1 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -151,7 +151,9 @@ mod tests { use reth_exex_types::FinishedExExHeight; use reth_network_p2p::test_utils::TestFullBlockClient; use reth_primitives::SealedHeader; - use reth_provider::test_utils::create_test_provider_factory_with_chain_spec; + use reth_provider::{ + providers::BlockchainProvider2, test_utils::create_test_provider_factory_with_chain_spec, + }; use reth_prune::Pruner; use reth_tasks::TokioTaskExecutor; use std::sync::Arc; diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 13215e11a8e2..64a8a204a329 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -1,13 +1,15 @@ +#![allow(unused)] use crate::{ - providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, - BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications, - CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader, - DatabaseProviderFactory, DatabaseProviderRO, EvmEnvProvider, HeaderProvider, ProviderError, + providers::{ConsistentProvider, StaticFileProvider}, + AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, + BlockSource, CanonChainTracker, CanonStateNotifications, CanonStateSubscriptions, + ChainSpecProvider, ChainStateBlockReader, ChangeSetReader, DatabaseProvider, + DatabaseProviderFactory, EvmEnvProvider, FullProvider, HeaderProvider, ProviderError, ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; -use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber}; +use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag}; use alloy_primitives::{Address, BlockHash, BlockNumber, Sealable, TxHash, TxNumber, B256, U256}; use alloy_rpc_types_engine::ForkchoiceState; use reth_chain_state::{ @@ -15,10 +17,10 @@ use reth_chain_state::{ MemoryOverlayStateProvider, }; use reth_chainspec::{ChainInfo, EthereumHardforks}; -use reth_db::models::BlockNumberAddress; +use reth_db::{models::BlockNumberAddress, transaction::DbTx, Database}; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; -use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit}; +use reth_execution_types::ExecutionOutcome; use reth_node_types::NodeTypesWithDB; use reth_primitives::{ Account, Block, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders, @@ -27,21 +29,17 @@ use reth_primitives::{ }; use reth_prune_types::{PruneCheckpoint, PruneSegment}; use reth_stages_types::{StageCheckpoint, StageId}; -use reth_storage_api::StorageChangeSetReader; +use reth_storage_api::{DBProvider, StorageChangeSetReader}; use reth_storage_errors::provider::ProviderResult; -use revm::{ - db::states::PlainStorageRevert, - primitives::{BlockEnv, CfgEnvWithHandlerCfg}, -}; +use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg}; use std::{ - collections::{hash_map, HashMap}, - ops::{Add, Bound, RangeBounds, RangeInclusive, Sub}, + ops::{Add, RangeBounds, RangeInclusive, Sub}, sync::Arc, time::Instant, }; use tracing::trace; -use super::ProviderNodeTypes; +use crate::providers::ProviderNodeTypes; /// The main type for interacting with the blockchain. /// @@ -50,11 +48,11 @@ use super::ProviderNodeTypes; /// type that holds an instance of the database and the blockchain tree. #[derive(Debug)] pub struct BlockchainProvider2 { - /// Provider type used to access the database. - database: ProviderFactory, + /// Provider factory used to access the database. + pub(crate) database: ProviderFactory, /// Tracks the chain info wrt forkchoice updates and in memory canonical /// state. - pub(super) canonical_in_memory_state: CanonicalInMemoryState, + pub(crate) canonical_in_memory_state: CanonicalInMemoryState, } impl Clone for BlockchainProvider2 { @@ -67,15 +65,15 @@ impl Clone for BlockchainProvider2 { } impl BlockchainProvider2 { - /// Create a new provider using only the database, fetching the latest header from - /// the database to initialize the provider. - pub fn new(database: ProviderFactory) -> ProviderResult { - let provider = database.provider()?; + /// Create a new [`BlockchainProvider2`] using only the storage, fetching the latest + /// header from the database to initialize the provider. + pub fn new(storage: ProviderFactory) -> ProviderResult { + let provider = storage.provider()?; let best = provider.chain_info()?; match provider.header_by_number(best.best_number)? { Some(header) => { drop(provider); - Ok(Self::with_latest(database, SealedHeader::new(header, best.best_hash))?) + Ok(Self::with_latest(storage, SealedHeader::new(header, best.best_hash))?) } None => Err(ProviderError::HeaderNotFound(best.best_number.into())), } @@ -86,8 +84,8 @@ impl BlockchainProvider2 { /// /// This returns a `ProviderResult` since it tries the retrieve the last finalized header from /// `database`. - pub fn with_latest(database: ProviderFactory, latest: SealedHeader) -> ProviderResult { - let provider = database.provider()?; + pub fn with_latest(storage: ProviderFactory, latest: SealedHeader) -> ProviderResult { + let provider = storage.provider()?; let finalized_header = provider .last_finalized_block_number()? .map(|num| provider.sealed_header(num)) @@ -104,7 +102,7 @@ impl BlockchainProvider2 { .transpose()? .flatten(); Ok(Self { - database, + database: storage, canonical_in_memory_state: CanonicalInMemoryState::with_head( latest, finalized_header, @@ -118,281 +116,12 @@ impl BlockchainProvider2 { self.canonical_in_memory_state.clone() } - // Helper function to convert range bounds - fn convert_range_bounds( - &self, - range: impl RangeBounds, - end_unbounded: impl FnOnce() -> T, - ) -> (T, T) - where - T: Copy + Add + Sub + From, - { - let start = match range.start_bound() { - Bound::Included(&n) => n, - Bound::Excluded(&n) => n + T::from(1u8), - Bound::Unbounded => T::from(0u8), - }; - - let end = match range.end_bound() { - Bound::Included(&n) => n, - Bound::Excluded(&n) => n - T::from(1u8), - Bound::Unbounded => end_unbounded(), - }; - - (start, end) - } - - /// Return the last N blocks of state, recreating the [`ExecutionOutcome`]. - /// - /// If the range is empty, or there are no blocks for the given range, then this returns `None`. - pub fn get_state( - &self, - range: RangeInclusive, - ) -> ProviderResult> { - if range.is_empty() { - return Ok(None) - } - let start_block_number = *range.start(); - let end_block_number = *range.end(); - - // We are not removing block meta as it is used to get block changesets. - let mut block_bodies = Vec::new(); - for block_num in range.clone() { - let block_body = self - .block_body_indices(block_num)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?; - block_bodies.push((block_num, block_body)) - } - - // get transaction receipts - let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num()) - else { - return Ok(None) - }; - let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else { - return Ok(None) - }; - - let mut account_changeset = Vec::new(); - for block_num in range.clone() { - let changeset = - self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem)); - account_changeset.extend(changeset); - } - - let mut storage_changeset = Vec::new(); - for block_num in range { - let changeset = self.storage_changeset(block_num)?; - storage_changeset.extend(changeset); - } - - let (state, reverts) = - self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?; - - let mut receipt_iter = - self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter(); - - let mut receipts = Vec::with_capacity(block_bodies.len()); - // loop break if we are at the end of the blocks. - for (_, block_body) in block_bodies { - let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); - for tx_num in block_body.tx_num_range() { - let receipt = receipt_iter - .next() - .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?; - block_receipts.push(Some(receipt)); - } - receipts.push(block_receipts); - } - - Ok(Some(ExecutionOutcome::new_init( - state, - reverts, - // We skip new contracts since we never delete them from the database - Vec::new(), - receipts.into(), - start_block_number, - Vec::new(), - ))) - } - - /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the - /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given - /// storage and account changesets. - fn populate_bundle_state( - &self, - account_changeset: Vec<(u64, AccountBeforeTx)>, - storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>, - block_range_end: BlockNumber, - ) -> ProviderResult<(BundleStateInit, RevertsInit)> { - let mut state: BundleStateInit = HashMap::new(); - let mut reverts: RevertsInit = HashMap::new(); - let state_provider = self.state_by_block_number_or_tag(block_range_end.into())?; - - // add account changeset changes - for (block_number, account_before) in account_changeset.into_iter().rev() { - let AccountBeforeTx { info: old_info, address } = account_before; - match state.entry(address) { - hash_map::Entry::Vacant(entry) => { - let new_info = state_provider.basic_account(address)?; - entry.insert((old_info, new_info, HashMap::new())); - } - hash_map::Entry::Occupied(mut entry) => { - // overwrite old account state. - entry.get_mut().0 = old_info; - } - } - // insert old info into reverts. - reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info); - } - - // add storage changeset changes - for (block_and_address, old_storage) in storage_changeset.into_iter().rev() { - let BlockNumberAddress((block_number, address)) = block_and_address; - // get account state or insert from plain state. - let account_state = match state.entry(address) { - hash_map::Entry::Vacant(entry) => { - let present_info = state_provider.basic_account(address)?; - entry.insert((present_info, present_info, HashMap::new())) - } - hash_map::Entry::Occupied(entry) => entry.into_mut(), - }; - - // match storage. - match account_state.2.entry(old_storage.key) { - hash_map::Entry::Vacant(entry) => { - let new_storage_value = - state_provider.storage(address, old_storage.key)?.unwrap_or_default(); - entry.insert((old_storage.value, new_storage_value)); - } - hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().0 = old_storage.value; - } - }; - - reverts - .entry(block_number) - .or_default() - .entry(address) - .or_default() - .1 - .push(old_storage); - } - - Ok((state, reverts)) - } - - /// Fetches a range of data from both in-memory state and persistent storage while a predicate - /// is met. - /// - /// Creates a snapshot of the in-memory chain state and database provider to prevent - /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing - /// recent in-memory blocks in case of overlaps. - /// - /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the - /// user to retrieve the required items from the database using [`RangeInclusive`]. - /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory - /// state, allowing for selection or filtering for the desired data. - fn get_in_memory_or_storage_by_block_range_while( - &self, - range: impl RangeBounds, - fetch_db_range: F, - map_block_state_item: G, - mut predicate: P, - ) -> ProviderResult> - where - F: FnOnce( - &DatabaseProviderRO, - RangeInclusive, - &mut P, - ) -> ProviderResult>, - G: Fn(Arc, &mut P) -> Option, - P: FnMut(&T) -> bool, - { - // Each one provides a snapshot at the time of instantiation, but its order matters. - // - // If we acquire first the database provider, it's possible that before the in-memory chain - // snapshot is instantiated, it will flush blocks to disk. This would - // mean that our database provider would not have access to the flushed blocks (since it's - // working under an older view), while the in-memory state may have deleted them - // entirely. Resulting in gaps on the range. - let mut in_memory_chain = - self.canonical_in_memory_state.canonical_chain().collect::>(); - let db_provider = self.database_provider_ro()?; - - let (start, end) = self.convert_range_bounds(range, || { - // the first block is the highest one. - in_memory_chain - .first() - .map(|b| b.number()) - .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default()) - }); - - if start > end { - return Ok(vec![]) - } - - // Split range into storage_range and in-memory range. If the in-memory range is not - // necessary drop it early. - // - // The last block of `in_memory_chain` is the lowest block number. - let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) { - Some(lowest_memory_block) if lowest_memory_block <= end => { - let highest_memory_block = - in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed"); - - // Database will for a time overlap with in-memory-chain blocks. In - // case of a re-org, it can mean that the database blocks are of a forked chain, and - // so, we should prioritize the in-memory overlapped blocks. - let in_memory_range = - lowest_memory_block.max(start)..=end.min(highest_memory_block); - - // If requested range is in the middle of the in-memory range, remove the necessary - // lowest blocks - in_memory_chain.truncate( - in_memory_chain - .len() - .saturating_sub(start.saturating_sub(lowest_memory_block) as usize), - ); - - let storage_range = - (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1); - - (Some((in_memory_chain, in_memory_range)), storage_range) - } - _ => { - // Drop the in-memory chain so we don't hold blocks in memory. - drop(in_memory_chain); - - (None, Some(start..=end)) - } - }; - - let mut items = Vec::with_capacity((end - start + 1) as usize); - - if let Some(storage_range) = storage_range { - let mut db_items = fetch_db_range(&db_provider, storage_range.clone(), &mut predicate)?; - items.append(&mut db_items); - - // The predicate was not met, if the number of items differs from the expected. So, we - // return what we have. - if items.len() as u64 != storage_range.end() - storage_range.start() + 1 { - return Ok(items) - } - } - - if let Some((in_memory_chain, in_memory_range)) = in_memory { - for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) { - debug_assert!(num == block.number()); - if let Some(item) = map_block_state_item(block, &mut predicate) { - items.push(item); - } else { - break - } - } - } - - Ok(items) + /// Returns a provider with a created `DbTx` inside, which allows fetching data from the + /// database using different types of providers. Example: [`HeaderProvider`] + /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. + #[track_caller] + pub fn consistent_provider(&self) -> ProviderResult> { + ConsistentProvider::new(self.database.clone(), self.canonical_in_memory_state()) } /// This uses a given [`BlockState`] to initialize a state provider for that block. @@ -405,222 +134,14 @@ impl BlockchainProvider2 { Ok(state.state_provider(latest_historical)) } - /// Fetches data from either in-memory state or persistent storage for a range of transactions. + /// Return the last N blocks of state, recreating the [`ExecutionOutcome`]. /// - /// * `fetch_from_db`: has a [`DatabaseProviderRO`] and the storage specific range. - /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from - /// [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block. - fn get_in_memory_or_storage_by_tx_range( - &self, - range: impl RangeBounds, - fetch_from_db: S, - fetch_from_block_state: M, - ) -> ProviderResult> - where - S: FnOnce( - DatabaseProviderRO, - RangeInclusive, - ) -> ProviderResult>, - M: Fn(RangeInclusive, Arc) -> ProviderResult>, - { - let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::>(); - let provider = self.database.provider()?; - - // Get the last block number stored in the storage which does NOT overlap with in-memory - // chain. - let last_database_block_number = in_mem_chain - .last() - .map(|b| Ok(b.anchor().number)) - .unwrap_or_else(|| provider.last_block_number())?; - - // Get the next tx number for the last block stored in the storage, which marks the start of - // the in-memory state. - let last_block_body_index = provider - .block_body_indices(last_database_block_number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; - let mut in_memory_tx_num = last_block_body_index.next_tx_num(); - - let (start, end) = self.convert_range_bounds(range, || { - in_mem_chain - .iter() - .map(|b| b.block_ref().block().body.transactions.len() as u64) - .sum::() + - last_block_body_index.last_tx_num() - }); - - if start > end { - return Ok(vec![]) - } - - let mut tx_range = start..=end; - - // If the range is entirely before the first in-memory transaction number, fetch from - // storage - if *tx_range.end() < in_memory_tx_num { - return fetch_from_db(provider, tx_range); - } - - let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize); - - // If the range spans storage and memory, get elements from storage first. - if *tx_range.start() < in_memory_tx_num { - // Determine the range that needs to be fetched from storage. - let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1); - - // Set the remaining transaction range for in-memory - tx_range = in_memory_tx_num..=*tx_range.end(); - - items.extend(fetch_from_db(provider, db_range)?); - } - - // Iterate from the lowest block to the highest in-memory chain - for block_state in in_mem_chain.into_iter().rev() { - let block_tx_count = block_state.block_ref().block().body.transactions.len(); - let remaining = (tx_range.end() - tx_range.start() + 1) as usize; - - // If the transaction range start is equal or higher than the next block first - // transaction, advance - if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 { - in_memory_tx_num += block_tx_count as u64; - continue - } - - // This should only be more than 0 once, in case of a partial range inside a block. - let skip = (tx_range.start() - in_memory_tx_num) as usize; - - items.extend(fetch_from_block_state( - skip..=skip + (remaining.min(block_tx_count - skip) - 1), - block_state, - )?); - - in_memory_tx_num += block_tx_count as u64; - - // Break if the range has been fully processed - if in_memory_tx_num > *tx_range.end() { - break - } - - // Set updated range - tx_range = in_memory_tx_num..=*tx_range.end(); - } - - Ok(items) - } - - /// Fetches data from either in-memory state or persistent storage by transaction - /// [`HashOrNumber`]. - fn get_in_memory_or_storage_by_tx( - &self, - id: HashOrNumber, - fetch_from_db: S, - fetch_from_block_state: M, - ) -> ProviderResult> - where - S: FnOnce(DatabaseProviderRO) -> ProviderResult>, - M: Fn(usize, TxNumber, Arc) -> ProviderResult>, - { - // Order of instantiation matters. More information on: - // `get_in_memory_or_storage_by_block_range_while`. - let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::>(); - let provider = self.database.provider()?; - - // Get the last block number stored in the database which does NOT overlap with in-memory - // chain. - let last_database_block_number = in_mem_chain - .last() - .map(|b| Ok(b.anchor().number)) - .unwrap_or_else(|| provider.last_block_number())?; - - // Get the next tx number for the last block stored in the database and consider it the - // first tx number of the in-memory state - let last_block_body_index = provider - .block_body_indices(last_database_block_number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; - let mut in_memory_tx_num = last_block_body_index.next_tx_num(); - - // If the transaction number is less than the first in-memory transaction number, make a - // database lookup - if let HashOrNumber::Number(id) = id { - if id < in_memory_tx_num { - return fetch_from_db(provider) - } - } - - // Iterate from the lowest block to the highest - for block_state in in_mem_chain.into_iter().rev() { - let executed_block = block_state.block_ref(); - let block = executed_block.block(); - - for tx_index in 0..block.body.transactions.len() { - match id { - HashOrNumber::Hash(tx_hash) => { - if tx_hash == block.body.transactions[tx_index].hash() { - return fetch_from_block_state(tx_index, in_memory_tx_num, block_state) - } - } - HashOrNumber::Number(id) => { - if id == in_memory_tx_num { - return fetch_from_block_state(tx_index, in_memory_tx_num, block_state) - } - } - } - - in_memory_tx_num += 1; - } - } - - // Not found in-memory, so check database. - if let HashOrNumber::Hash(_) = id { - return fetch_from_db(provider) - } - - Ok(None) - } - - /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`]. - fn get_in_memory_or_storage_by_block( + /// If the range is empty, or there are no blocks for the given range, then this returns `None`. + pub fn get_state( &self, - id: BlockHashOrNumber, - fetch_from_db: S, - fetch_from_block_state: M, - ) -> ProviderResult - where - S: FnOnce(DatabaseProviderRO) -> ProviderResult, - M: Fn(Arc) -> ProviderResult, - { - let block_state = match id { - BlockHashOrNumber::Hash(block_hash) => { - self.canonical_in_memory_state.state_by_hash(block_hash) - } - BlockHashOrNumber::Number(block_number) => { - self.canonical_in_memory_state.state_by_number(block_number) - } - }; - - if let Some(block_state) = block_state { - return fetch_from_block_state(block_state) - } - fetch_from_db(self.database_provider_ro()?) - } -} - -impl BlockchainProvider2 { - /// Ensures that the given block number is canonical (synced) - /// - /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are - /// out of range and would lead to invalid results, mainly during initial sync. - /// - /// Verifying the `block_number` would be expensive since we need to lookup sync table - /// Instead, we ensure that the `block_number` is within the range of the - /// [`Self::best_block_number`] which is updated when a block is synced. - #[inline] - fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> { - let latest = self.best_block_number()?; - if block_number > latest { - Err(ProviderError::HeaderNotFound(block_number.into())) - } else { - Ok(()) - } + range: RangeInclusive, + ) -> ProviderResult> { + self.consistent_provider()?.get_state(range) } } @@ -646,78 +167,34 @@ impl StaticFileProviderFactory for BlockchainProvider2 impl HeaderProvider for BlockchainProvider2 { fn header(&self, block_hash: &BlockHash) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - (*block_hash).into(), - |db_provider| db_provider.header(block_hash), - |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())), - ) + self.consistent_provider()?.header(block_hash) } fn header_by_number(&self, num: BlockNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - num.into(), - |db_provider| db_provider.header_by_number(num), - |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())), - ) + self.consistent_provider()?.header_by_number(num) } fn header_td(&self, hash: &BlockHash) -> ProviderResult> { - if let Some(num) = self.block_number(*hash)? { - self.header_td_by_number(num) - } else { - Ok(None) - } + self.consistent_provider()?.header_td(hash) } fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult> { - let number = if self.canonical_in_memory_state.hash_by_number(number).is_some() { - // If the block exists in memory, we should return a TD for it. - // - // The canonical in memory state should only store post-merge blocks. Post-merge blocks - // have zero difficulty. This means we can use the total difficulty for the last - // finalized block number if present (so that we are not affected by reorgs), if not the - // last number in the database will be used. - if let Some(last_finalized_num_hash) = - self.canonical_in_memory_state.get_finalized_num_hash() - { - last_finalized_num_hash.number - } else { - self.last_block_number()? - } - } else { - // Otherwise, return what we have on disk for the input block - number - }; - self.database.header_td_by_number(number) + self.consistent_provider()?.header_td_by_number(number) } fn headers_range(&self, range: impl RangeBounds) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, _| db_provider.headers_range(range), - |block_state, _| Some(block_state.block_ref().block().header.header().clone()), - |_| true, - ) + self.consistent_provider()?.headers_range(range) } fn sealed_header(&self, number: BlockNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - number.into(), - |db_provider| db_provider.sealed_header(number), - |block_state| Ok(Some(block_state.block_ref().block().header.clone())), - ) + self.consistent_provider()?.sealed_header(number) } fn sealed_headers_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, _| db_provider.sealed_headers_range(range), - |block_state, _| Some(block_state.block_ref().block().header.clone()), - |_| true, - ) + self.consistent_provider()?.sealed_headers_range(range) } fn sealed_headers_while( @@ -725,25 +202,13 @@ impl HeaderProvider for BlockchainProvider2 { range: impl RangeBounds, predicate: impl FnMut(&SealedHeader) -> bool, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate), - |block_state, predicate| { - let header = &block_state.block_ref().block().header; - predicate(header).then(|| header.clone()) - }, - predicate, - ) + self.consistent_provider()?.sealed_headers_while(range, predicate) } } impl BlockHashReader for BlockchainProvider2 { fn block_hash(&self, number: u64) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - number.into(), - |db_provider| db_provider.block_hash(number), - |block_state| Ok(Some(block_state.hash())), - ) + self.consistent_provider()?.block_hash(number) } fn canonical_hashes_range( @@ -751,15 +216,7 @@ impl BlockHashReader for BlockchainProvider2 { start: BlockNumber, end: BlockNumber, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - start..end, - |db_provider, inclusive_range, _| { - db_provider - .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1) - }, - |block_state, _| Some(block_state.hash()), - |_| true, - ) + self.consistent_provider()?.canonical_hashes_range(start, end) } } @@ -777,11 +234,7 @@ impl BlockNumReader for BlockchainProvider2 { } fn block_number(&self, hash: B256) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - hash.into(), - |db_provider| db_provider.block_number(hash), - |block_state| Ok(Some(block_state.number())), - ) + self.consistent_provider()?.block_number(hash) } } @@ -801,28 +254,11 @@ impl BlockIdReader for BlockchainProvider2 { impl BlockReader for BlockchainProvider2 { fn find_block_by_hash(&self, hash: B256, source: BlockSource) -> ProviderResult> { - match source { - BlockSource::Any | BlockSource::Canonical => { - // Note: it's fine to return the unsealed block because the caller already has - // the hash - self.get_in_memory_or_storage_by_block( - hash.into(), - |db_provider| db_provider.find_block_by_hash(hash, source), - |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())), - ) - } - BlockSource::Pending => { - Ok(self.canonical_in_memory_state.pending_block().map(|block| block.unseal())) - } - } + self.consistent_provider()?.find_block_by_hash(hash, source) } fn block(&self, id: BlockHashOrNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - id, - |db_provider| db_provider.block(id), - |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())), - ) + self.consistent_provider()?.block(id) } fn pending_block(&self) -> ProviderResult> { @@ -838,51 +274,14 @@ impl BlockReader for BlockchainProvider2 { } fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult>> { - self.get_in_memory_or_storage_by_block( - id, - |db_provider| db_provider.ommers(id), - |block_state| { - if self.chain_spec().final_paris_total_difficulty(block_state.number()).is_some() { - return Ok(Some(Vec::new())) - } - - Ok(Some(block_state.block_ref().block().body.ommers.clone())) - }, - ) + self.consistent_provider()?.ommers(id) } fn block_body_indices( &self, number: BlockNumber, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - number.into(), - |db_provider| db_provider.block_body_indices(number), - |block_state| { - // Find the last block indices on database - let last_storage_block_number = block_state.anchor().number; - let mut stored_indices = self - .database - .block_body_indices(last_storage_block_number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?; - - // Prepare our block indices - stored_indices.first_tx_num = stored_indices.next_tx_num(); - stored_indices.tx_count = 0; - - // Iterate from the lowest block in memory until our target block - for state in block_state.chain().into_iter().rev() { - let block_tx_count = state.block_ref().block.body.transactions.len() as u64; - if state.block_ref().block().number == number { - stored_indices.tx_count = block_tx_count; - } else { - stored_indices.first_tx_num += block_tx_count; - } - } - - Ok(Some(stored_indices)) - }, - ) + self.consistent_provider()?.block_body_indices(number) } /// Returns the block with senders with matching number or hash from database. @@ -896,11 +295,7 @@ impl BlockReader for BlockchainProvider2 { id: BlockHashOrNumber, transaction_kind: TransactionVariant, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - id, - |db_provider| db_provider.block_with_senders(id, transaction_kind), - |block_state| Ok(Some(block_state.block_with_senders())), - ) + self.consistent_provider()?.block_with_senders(id, transaction_kind) } fn sealed_block_with_senders( @@ -908,259 +303,116 @@ impl BlockReader for BlockchainProvider2 { id: BlockHashOrNumber, transaction_kind: TransactionVariant, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block( - id, - |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind), - |block_state| Ok(Some(block_state.sealed_block_with_senders())), - ) + self.consistent_provider()?.sealed_block_with_senders(id, transaction_kind) } fn block_range(&self, range: RangeInclusive) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, _| db_provider.block_range(range), - |block_state, _| Some(block_state.block_ref().block().clone().unseal()), - |_| true, - ) + self.consistent_provider()?.block_range(range) } fn block_with_senders_range( &self, range: RangeInclusive, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, _| db_provider.block_with_senders_range(range), - |block_state, _| Some(block_state.block_with_senders()), - |_| true, - ) + self.consistent_provider()?.block_with_senders_range(range) } fn sealed_block_with_senders_range( &self, range: RangeInclusive, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, _| db_provider.sealed_block_with_senders_range(range), - |block_state, _| Some(block_state.sealed_block_with_senders()), - |_| true, - ) + self.consistent_provider()?.sealed_block_with_senders_range(range) } } impl TransactionsProvider for BlockchainProvider2 { fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx( - tx_hash.into(), - |db_provider| db_provider.transaction_id(tx_hash), - |_, tx_number, _| Ok(Some(tx_number)), - ) + self.consistent_provider()?.transaction_id(tx_hash) } fn transaction_by_id(&self, id: TxNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx( - id.into(), - |provider| provider.transaction_by_id(id), - |tx_index, _, block_state| { - Ok(block_state.block_ref().block().body.transactions.get(tx_index).cloned()) - }, - ) + self.consistent_provider()?.transaction_by_id(id) } fn transaction_by_id_no_hash( &self, id: TxNumber, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx( - id.into(), - |provider| provider.transaction_by_id_no_hash(id), - |tx_index, _, block_state| { - Ok(block_state - .block_ref() - .block() - .body - .transactions - .get(tx_index) - .cloned() - .map(Into::into)) - }, - ) + self.consistent_provider()?.transaction_by_id_no_hash(id) } fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult> { - if let Some(tx) = self.canonical_in_memory_state.transaction_by_hash(hash) { - return Ok(Some(tx)) - } - - self.database.transaction_by_hash(hash) + self.consistent_provider()?.transaction_by_hash(hash) } fn transaction_by_hash_with_meta( &self, tx_hash: TxHash, ) -> ProviderResult> { - if let Some((tx, meta)) = - self.canonical_in_memory_state.transaction_by_hash_with_meta(tx_hash) - { - return Ok(Some((tx, meta))) - } - - self.database.transaction_by_hash_with_meta(tx_hash) + self.consistent_provider()?.transaction_by_hash_with_meta(tx_hash) } fn transaction_block(&self, id: TxNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx( - id.into(), - |provider| provider.transaction_block(id), - |_, _, block_state| Ok(Some(block_state.block_ref().block().number)), - ) + self.consistent_provider()?.transaction_block(id) } fn transactions_by_block( &self, id: BlockHashOrNumber, ) -> ProviderResult>> { - self.get_in_memory_or_storage_by_block( - id, - |provider| provider.transactions_by_block(id), - |block_state| Ok(Some(block_state.block_ref().block().body.transactions.clone())), - ) + self.consistent_provider()?.transactions_by_block(id) } fn transactions_by_block_range( &self, range: impl RangeBounds, ) -> ProviderResult>> { - self.get_in_memory_or_storage_by_block_range_while( - range, - |db_provider, range, _| db_provider.transactions_by_block_range(range), - |block_state, _| Some(block_state.block_ref().block().body.transactions.clone()), - |_| true, - ) + self.consistent_provider()?.transactions_by_block_range(range) } fn transactions_by_tx_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx_range( - range, - |db_provider, db_range| db_provider.transactions_by_tx_range(db_range), - |index_range, block_state| { - Ok(block_state.block_ref().block().body.transactions[index_range] - .iter() - .cloned() - .map(Into::into) - .collect()) - }, - ) + self.consistent_provider()?.transactions_by_tx_range(range) } fn senders_by_tx_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx_range( - range, - |db_provider, db_range| db_provider.senders_by_tx_range(db_range), - |index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()), - ) + self.consistent_provider()?.senders_by_tx_range(range) } fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx( - id.into(), - |provider| provider.transaction_sender(id), - |tx_index, _, block_state| Ok(block_state.block_ref().senders.get(tx_index).copied()), - ) + self.consistent_provider()?.transaction_sender(id) } } impl ReceiptProvider for BlockchainProvider2 { fn receipt(&self, id: TxNumber) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx( - id.into(), - |provider| provider.receipt(id), - |tx_index, _, block_state| { - Ok(block_state.executed_block_receipts().get(tx_index).cloned()) - }, - ) + self.consistent_provider()?.receipt(id) } fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult> { - for block_state in self.canonical_in_memory_state.canonical_chain() { - let executed_block = block_state.block_ref(); - let block = executed_block.block(); - let receipts = block_state.executed_block_receipts(); - - // assuming 1:1 correspondence between transactions and receipts - debug_assert_eq!( - block.body.transactions.len(), - receipts.len(), - "Mismatch between transaction and receipt count" - ); - - if let Some(tx_index) = block.body.transactions.iter().position(|tx| tx.hash() == hash) - { - // safe to use tx_index for receipts due to 1:1 correspondence - return Ok(receipts.get(tx_index).cloned()); - } - } - - self.database.receipt_by_hash(hash) + self.consistent_provider()?.receipt_by_hash(hash) } fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult>> { - self.get_in_memory_or_storage_by_block( - block, - |db_provider| db_provider.receipts_by_block(block), - |block_state| Ok(Some(block_state.executed_block_receipts())), - ) + self.consistent_provider()?.receipts_by_block(block) } fn receipts_by_tx_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.get_in_memory_or_storage_by_tx_range( - range, - |db_provider, db_range| db_provider.receipts_by_tx_range(db_range), - |index_range, block_state| { - Ok(block_state.executed_block_receipts().drain(index_range).collect()) - }, - ) + self.consistent_provider()?.receipts_by_tx_range(range) } } impl ReceiptProviderIdExt for BlockchainProvider2 { fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult>> { - match block { - BlockId::Hash(rpc_block_hash) => { - let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?; - if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) { - let block_state = self - .canonical_in_memory_state - .state_by_hash(rpc_block_hash.block_hash) - .ok_or(ProviderError::StateForHashNotFound(rpc_block_hash.block_hash))?; - receipts = Some(block_state.executed_block_receipts()); - } - Ok(receipts) - } - BlockId::Number(num_tag) => match num_tag { - BlockNumberOrTag::Pending => Ok(self - .canonical_in_memory_state - .pending_state() - .map(|block_state| block_state.executed_block_receipts())), - _ => { - if let Some(num) = self.convert_block_number(num_tag)? { - self.receipts_by_block(num.into()) - } else { - Ok(None) - } - } - }, - } + self.consistent_provider()?.receipts_by_block_id(block) } } @@ -1170,47 +422,25 @@ impl WithdrawalsProvider for BlockchainProvider2 { id: BlockHashOrNumber, timestamp: u64, ) -> ProviderResult> { - if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) { - return Ok(None) - } - - self.get_in_memory_or_storage_by_block( - id, - |db_provider| db_provider.withdrawals_by_block(id, timestamp), - |block_state| Ok(block_state.block_ref().block().body.withdrawals.clone()), - ) + self.consistent_provider()?.withdrawals_by_block(id, timestamp) } fn latest_withdrawal(&self) -> ProviderResult> { - let best_block_num = self.best_block_number()?; - - self.get_in_memory_or_storage_by_block( - best_block_num.into(), - |db_provider| db_provider.latest_withdrawal(), - |block_state| { - Ok(block_state - .block_ref() - .block() - .body - .withdrawals - .clone() - .and_then(|mut w| w.pop())) - }, - ) + self.consistent_provider()?.latest_withdrawal() } } impl StageCheckpointReader for BlockchainProvider2 { fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult> { - self.database.provider()?.get_stage_checkpoint(id) + self.consistent_provider()?.get_stage_checkpoint(id) } fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult>> { - self.database.provider()?.get_stage_checkpoint_progress(id) + self.consistent_provider()?.get_stage_checkpoint_progress(id) } fn get_all_checkpoints(&self) -> ProviderResult> { - self.database.provider()?.get_all_checkpoints() + self.consistent_provider()?.get_all_checkpoints() } } @@ -1225,9 +455,7 @@ impl EvmEnvProvider for BlockchainProvider2 { where EvmConfig: ConfigureEvmEnv
, { - let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; - let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; - self.fill_env_with_header(cfg, block_env, &header, evm_config) + self.consistent_provider()?.fill_env_at(cfg, block_env, at, evm_config) } fn fill_env_with_header( @@ -1240,11 +468,7 @@ impl EvmEnvProvider for BlockchainProvider2 { where EvmConfig: ConfigureEvmEnv
, { - let total_difficulty = self - .header_td_by_number(header.number)? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; - evm_config.fill_cfg_and_block_env(cfg, block_env, header, total_difficulty); - Ok(()) + self.consistent_provider()?.fill_env_with_header(cfg, block_env, header, evm_config) } fn fill_cfg_env_at( @@ -1256,9 +480,7 @@ impl EvmEnvProvider for BlockchainProvider2 { where EvmConfig: ConfigureEvmEnv
, { - let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; - let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; - self.fill_cfg_env_with_header(cfg, &header, evm_config) + self.consistent_provider()?.fill_cfg_env_at(cfg, at, evm_config) } fn fill_cfg_env_with_header( @@ -1270,11 +492,7 @@ impl EvmEnvProvider for BlockchainProvider2 { where EvmConfig: ConfigureEvmEnv
, { - let total_difficulty = self - .header_td_by_number(header.number)? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; - evm_config.fill_cfg_env(cfg, header, total_difficulty); - Ok(()) + self.consistent_provider()?.fill_cfg_env_with_header(cfg, header, evm_config) } } @@ -1283,11 +501,11 @@ impl PruneCheckpointReader for BlockchainProvider2 { &self, segment: PruneSegment, ) -> ProviderResult> { - self.database.provider()?.get_prune_checkpoint(segment) + self.consistent_provider()?.get_prune_checkpoint(segment) } fn get_prune_checkpoints(&self) -> ProviderResult> { - self.database.provider()?.get_prune_checkpoints() + self.consistent_provider()?.get_prune_checkpoints() } } @@ -1318,8 +536,9 @@ impl StateProviderFactory for BlockchainProvider2 { block_number: BlockNumber, ) -> ProviderResult { trace!(target: "providers::blockchain", ?block_number, "Getting history by block number"); - self.ensure_canonical_block(block_number)?; - let hash = self + let provider = self.consistent_provider()?; + provider.ensure_canonical_block(block_number)?; + let hash = provider .block_hash(block_number)? .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?; self.history_by_block_hash(hash) @@ -1328,14 +547,11 @@ impl StateProviderFactory for BlockchainProvider2 { fn history_by_block_hash(&self, block_hash: BlockHash) -> ProviderResult { trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash"); - self.get_in_memory_or_storage_by_block( + self.consistent_provider()?.get_in_memory_or_storage_by_block( block_hash.into(), - |_| { - // TODO(joshie): port history_by_block_hash to DatabaseProvider and use db_provider - self.database.history_by_block_hash(block_hash) - }, + |_| self.database.history_by_block_hash(block_hash), |block_state| { - let state_provider = self.block_state_provider(&block_state)?; + let state_provider = self.block_state_provider(block_state)?; Ok(Box::new(state_provider)) }, ) @@ -1444,105 +660,35 @@ where } } -impl BlockReaderIdExt for BlockchainProvider2 +impl BlockReaderIdExt for BlockchainProvider2 where Self: BlockReader + ReceiptProviderIdExt, { fn block_by_id(&self, id: BlockId) -> ProviderResult> { - match id { - BlockId::Number(num) => self.block_by_number_or_tag(num), - BlockId::Hash(hash) => { - // TODO: should we only apply this for the RPCs that are listed in EIP-1898? - // so not at the provider level? - // if we decide to do this at a higher level, then we can make this an automatic - // trait impl - if Some(true) == hash.require_canonical { - // check the database, canonical blocks are only stored in the database - self.find_block_by_hash(hash.block_hash, BlockSource::Canonical) - } else { - self.block_by_hash(hash.block_hash) - } - } - } + self.consistent_provider()?.block_by_id(id) } fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult> { - Ok(match id { - BlockNumberOrTag::Latest => { - Some(self.canonical_in_memory_state.get_canonical_head().unseal()) - } - BlockNumberOrTag::Finalized => { - self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal()) - } - BlockNumberOrTag::Safe => { - self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal()) - } - BlockNumberOrTag::Earliest => self.header_by_number(0)?, - BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(), - - BlockNumberOrTag::Number(num) => self.header_by_number(num)?, - }) + self.consistent_provider()?.header_by_number_or_tag(id) } fn sealed_header_by_number_or_tag( &self, id: BlockNumberOrTag, ) -> ProviderResult> { - match id { - BlockNumberOrTag::Latest => { - Ok(Some(self.canonical_in_memory_state.get_canonical_head())) - } - BlockNumberOrTag::Finalized => { - Ok(self.canonical_in_memory_state.get_finalized_header()) - } - BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()), - BlockNumberOrTag::Earliest => self.header_by_number(0)?.map_or_else( - || Ok(None), - |h| { - let sealed = h.seal_slow(); - let (header, seal) = sealed.into_parts(); - Ok(Some(SealedHeader::new(header, seal))) - }, - ), - BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()), - BlockNumberOrTag::Number(num) => self.header_by_number(num)?.map_or_else( - || Ok(None), - |h| { - let sealed = h.seal_slow(); - let (header, seal) = sealed.into_parts(); - Ok(Some(SealedHeader::new(header, seal))) - }, - ), - } + self.consistent_provider()?.sealed_header_by_number_or_tag(id) } fn sealed_header_by_id(&self, id: BlockId) -> ProviderResult> { - Ok(match id { - BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?, - BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(|h| { - let sealed = h.seal_slow(); - let (header, seal) = sealed.into_parts(); - SealedHeader::new(header, seal) - }), - }) + self.consistent_provider()?.sealed_header_by_id(id) } fn header_by_id(&self, id: BlockId) -> ProviderResult> { - Ok(match id { - BlockId::Number(num) => self.header_by_number_or_tag(num)?, - BlockId::Hash(hash) => self.header(&hash.block_hash)?, - }) + self.consistent_provider()?.header_by_id(id) } fn ommers_by_id(&self, id: BlockId) -> ProviderResult>> { - match id { - BlockId::Number(num) => self.ommers_by_number_or_tag(num), - BlockId::Hash(hash) => { - // TODO: EIP-1898 question, see above - // here it is not handled - self.ommers(BlockHashOrNumber::Hash(hash.block_hash)) - } - } + self.consistent_provider()?.ommers_by_id(id) } } @@ -1569,49 +715,7 @@ impl StorageChangeSetReader for BlockchainProvider2 { &self, block_number: BlockNumber, ) -> ProviderResult> { - if let Some(state) = self.canonical_in_memory_state.state_by_number(block_number) { - let changesets = state - .block() - .execution_output - .bundle - .reverts - .clone() - .into_plain_state_reverts() - .storage - .into_iter() - .flatten() - .flat_map(|revert: PlainStorageRevert| { - revert.storage_revert.into_iter().map(move |(key, value)| { - ( - BlockNumberAddress((block_number, revert.address)), - StorageEntry { key: key.into(), value: value.to_previous_value() }, - ) - }) - }) - .collect(); - Ok(changesets) - } else { - // Perform checks on whether or not changesets exist for the block. - let provider = self.database.provider()?; - - // No prune checkpoint means history should exist and we should `unwrap_or(true)` - let storage_history_exists = provider - .get_prune_checkpoint(PruneSegment::StorageHistory)? - .and_then(|checkpoint| { - // return true if the block number is ahead of the prune checkpoint. - // - // The checkpoint stores the highest pruned block number, so we should make - // sure the block_number is strictly greater. - checkpoint.block_number.map(|checkpoint| block_number > checkpoint) - }) - .unwrap_or(true); - - if !storage_history_exists { - return Err(ProviderError::StateAtBlockPruned(block_number)) - } - - provider.storage_changeset(block_number) - } + self.consistent_provider()?.storage_changeset(block_number) } } @@ -1620,50 +724,14 @@ impl ChangeSetReader for BlockchainProvider2 { &self, block_number: BlockNumber, ) -> ProviderResult> { - if let Some(state) = self.canonical_in_memory_state.state_by_number(block_number) { - let changesets = state - .block_ref() - .execution_output - .bundle - .reverts - .clone() - .into_plain_state_reverts() - .accounts - .into_iter() - .flatten() - .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) }) - .collect(); - Ok(changesets) - } else { - // Perform checks on whether or not changesets exist for the block. - let provider = self.database.provider()?; - // No prune checkpoint means history should exist and we should `unwrap_or(true)` - let account_history_exists = provider - .get_prune_checkpoint(PruneSegment::AccountHistory)? - .and_then(|checkpoint| { - // return true if the block number is ahead of the prune checkpoint. - // - // The checkpoint stores the highest pruned block number, so we should make - // sure the block_number is strictly greater. - checkpoint.block_number.map(|checkpoint| block_number > checkpoint) - }) - .unwrap_or(true); - - if !account_history_exists { - return Err(ProviderError::StateAtBlockPruned(block_number)) - } - - provider.account_block_changeset(block_number) - } + self.consistent_provider()?.account_block_changeset(block_number) } } impl AccountReader for BlockchainProvider2 { /// Get basic account information. fn basic_account(&self, address: Address) -> ProviderResult> { - // use latest state provider - let state_provider = self.latest()?; - state_provider.basic_account(address) + self.consistent_provider()?.basic_account(address) } } @@ -1678,12 +746,7 @@ impl StateReader for BlockchainProvider2 { /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the /// first place. fn get_state(&self, block: BlockNumber) -> ProviderResult> { - if let Some(state) = self.canonical_in_memory_state.state_by_number(block) { - let state = state.block_ref().execution_outcome().clone(); - Ok(Some(state)) - } else { - self.get_state(block..=block) - } + StateReader::get_state(&self.consistent_provider()?, block) } } diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs new file mode 100644 index 000000000000..d6847fa1b8f6 --- /dev/null +++ b/crates/storage/provider/src/providers/consistent.rs @@ -0,0 +1,1871 @@ +use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes}; +use crate::{ + providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, + BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, + HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, + StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant, + TransactionsProvider, WithdrawalsProvider, +}; +use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber}; +use alloy_primitives::{Address, BlockHash, BlockNumber, Sealable, TxHash, TxNumber, B256, U256}; +use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef}; +use reth_chainspec::{ChainInfo, EthereumHardforks}; +use reth_db::models::BlockNumberAddress; +use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; +use reth_evm::ConfigureEvmEnv; +use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit}; +use reth_primitives::{ + Account, Block, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders, + SealedHeader, StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedNoHash, + Withdrawal, Withdrawals, +}; +use reth_prune_types::{PruneCheckpoint, PruneSegment}; +use reth_stages_types::{StageCheckpoint, StageId}; +use reth_storage_api::{DatabaseProviderFactory, StateProvider, StorageChangeSetReader}; +use reth_storage_errors::provider::ProviderResult; +use revm::{ + db::states::PlainStorageRevert, + primitives::{BlockEnv, CfgEnvWithHandlerCfg}, +}; +use std::{ + collections::{hash_map, HashMap}, + ops::{Add, Bound, RangeBounds, RangeInclusive, Sub}, + sync::Arc, +}; +use tracing::trace; + +/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of +/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding +/// this provider. +/// +/// CAUTION: Avoid holding this provider for too long or the inner database transaction will +/// time-out. +#[derive(Debug)] +pub struct ConsistentProvider { + /// Storage provider. + storage_provider: as DatabaseProviderFactory>::Provider, + /// Head block at time of [`Self`] creation + head_block: Option>, + /// In-memory canonical state. This is not a snapshot, and can change! Use with caution. + canonical_in_memory_state: CanonicalInMemoryState, +} + +impl ConsistentProvider { + /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`], + /// + /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and + /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted + /// view of memory and database. + pub fn new( + storage_provider_factory: ProviderFactory, + state: CanonicalInMemoryState, + ) -> ProviderResult { + // Each one provides a snapshot at the time of instantiation, but its order matters. + // + // If we acquire first the database provider, it's possible that before the in-memory chain + // snapshot is instantiated, it will flush blocks to disk. This would + // mean that our database provider would not have access to the flushed blocks (since it's + // working under an older view), while the in-memory state may have deleted them + // entirely. Resulting in gaps on the range. + let head_block = state.head_state(); + let storage_provider = storage_provider_factory.database_provider_ro()?; + Ok(Self { storage_provider, head_block, canonical_in_memory_state: state }) + } + + // Helper function to convert range bounds + fn convert_range_bounds( + &self, + range: impl RangeBounds, + end_unbounded: impl FnOnce() -> T, + ) -> (T, T) + where + T: Copy + Add + Sub + From, + { + let start = match range.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n + T::from(1u8), + Bound::Unbounded => T::from(0u8), + }; + + let end = match range.end_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n - T::from(1u8), + Bound::Unbounded => end_unbounded(), + }; + + (start, end) + } + + /// Storage provider for latest block + fn latest_ref<'a>(&'a self) -> ProviderResult> { + trace!(target: "providers::blockchain", "Getting latest block state provider"); + + // use latest state provider if the head state exists + if let Some(state) = &self.head_block { + trace!(target: "providers::blockchain", "Using head state for latest state provider"); + Ok(self.block_state_provider_ref(state)?.boxed()) + } else { + trace!(target: "providers::blockchain", "Using database state for latest state provider"); + self.storage_provider.latest() + } + } + + fn history_by_block_hash_ref<'a>( + &'a self, + block_hash: BlockHash, + ) -> ProviderResult> { + trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash"); + + self.get_in_memory_or_storage_by_block( + block_hash.into(), + |_| self.storage_provider.history_by_block_hash(block_hash), + |block_state| { + let state_provider = self.block_state_provider_ref(block_state)?; + Ok(Box::new(state_provider)) + }, + ) + } + + /// Returns a state provider indexed by the given block number or tag. + fn state_by_block_number_ref<'a>( + &'a self, + number: BlockNumber, + ) -> ProviderResult> { + let hash = + self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?; + self.history_by_block_hash_ref(hash) + } + + /// Return the last N blocks of state, recreating the [`ExecutionOutcome`]. + /// + /// If the range is empty, or there are no blocks for the given range, then this returns `None`. + pub fn get_state( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + if range.is_empty() { + return Ok(None) + } + let start_block_number = *range.start(); + let end_block_number = *range.end(); + + // We are not removing block meta as it is used to get block changesets. + let mut block_bodies = Vec::new(); + for block_num in range.clone() { + let block_body = self + .block_body_indices(block_num)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?; + block_bodies.push((block_num, block_body)) + } + + // get transaction receipts + let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num()) + else { + return Ok(None) + }; + let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else { + return Ok(None) + }; + + let mut account_changeset = Vec::new(); + for block_num in range.clone() { + let changeset = + self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem)); + account_changeset.extend(changeset); + } + + let mut storage_changeset = Vec::new(); + for block_num in range { + let changeset = self.storage_changeset(block_num)?; + storage_changeset.extend(changeset); + } + + let (state, reverts) = + self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?; + + let mut receipt_iter = + self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter(); + + let mut receipts = Vec::with_capacity(block_bodies.len()); + // loop break if we are at the end of the blocks. + for (_, block_body) in block_bodies { + let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); + for tx_num in block_body.tx_num_range() { + let receipt = receipt_iter + .next() + .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?; + block_receipts.push(Some(receipt)); + } + receipts.push(block_receipts); + } + + Ok(Some(ExecutionOutcome::new_init( + state, + reverts, + // We skip new contracts since we never delete them from the database + Vec::new(), + receipts.into(), + start_block_number, + Vec::new(), + ))) + } + + /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the + /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given + /// storage and account changesets. + fn populate_bundle_state( + &self, + account_changeset: Vec<(u64, AccountBeforeTx)>, + storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>, + block_range_end: BlockNumber, + ) -> ProviderResult<(BundleStateInit, RevertsInit)> { + let mut state: BundleStateInit = HashMap::new(); + let mut reverts: RevertsInit = HashMap::new(); + let state_provider = self.state_by_block_number_ref(block_range_end)?; + + // add account changeset changes + for (block_number, account_before) in account_changeset.into_iter().rev() { + let AccountBeforeTx { info: old_info, address } = account_before; + match state.entry(address) { + hash_map::Entry::Vacant(entry) => { + let new_info = state_provider.basic_account(address)?; + entry.insert((old_info, new_info, HashMap::new())); + } + hash_map::Entry::Occupied(mut entry) => { + // overwrite old account state. + entry.get_mut().0 = old_info; + } + } + // insert old info into reverts. + reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info); + } + + // add storage changeset changes + for (block_and_address, old_storage) in storage_changeset.into_iter().rev() { + let BlockNumberAddress((block_number, address)) = block_and_address; + // get account state or insert from plain state. + let account_state = match state.entry(address) { + hash_map::Entry::Vacant(entry) => { + let present_info = state_provider.basic_account(address)?; + entry.insert((present_info, present_info, HashMap::new())) + } + hash_map::Entry::Occupied(entry) => entry.into_mut(), + }; + + // match storage. + match account_state.2.entry(old_storage.key) { + hash_map::Entry::Vacant(entry) => { + let new_storage_value = + state_provider.storage(address, old_storage.key)?.unwrap_or_default(); + entry.insert((old_storage.value, new_storage_value)); + } + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().0 = old_storage.value; + } + }; + + reverts + .entry(block_number) + .or_default() + .entry(address) + .or_default() + .1 + .push(old_storage); + } + + Ok((state, reverts)) + } + + /// Fetches a range of data from both in-memory state and persistent storage while a predicate + /// is met. + /// + /// Creates a snapshot of the in-memory chain state and database provider to prevent + /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing + /// recent in-memory blocks in case of overlaps. + /// + /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the + /// user to retrieve the required items from the database using [`RangeInclusive`]. + /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory + /// state, allowing for selection or filtering for the desired data. + fn get_in_memory_or_storage_by_block_range_while( + &self, + range: impl RangeBounds, + fetch_db_range: F, + map_block_state_item: G, + mut predicate: P, + ) -> ProviderResult> + where + F: FnOnce( + &DatabaseProviderRO, + RangeInclusive, + &mut P, + ) -> ProviderResult>, + G: Fn(&BlockState, &mut P) -> Option, + P: FnMut(&T) -> bool, + { + // Each one provides a snapshot at the time of instantiation, but its order matters. + // + // If we acquire first the database provider, it's possible that before the in-memory chain + // snapshot is instantiated, it will flush blocks to disk. This would + // mean that our database provider would not have access to the flushed blocks (since it's + // working under an older view), while the in-memory state may have deleted them + // entirely. Resulting in gaps on the range. + let mut in_memory_chain = + self.head_block.as_ref().map(|b| b.chain().collect::>()).unwrap_or_default(); + let db_provider = &self.storage_provider; + + let (start, end) = self.convert_range_bounds(range, || { + // the first block is the highest one. + in_memory_chain + .first() + .map(|b| b.number()) + .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default()) + }); + + if start > end { + return Ok(vec![]) + } + + // Split range into storage_range and in-memory range. If the in-memory range is not + // necessary drop it early. + // + // The last block of `in_memory_chain` is the lowest block number. + let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) { + Some(lowest_memory_block) if lowest_memory_block <= end => { + let highest_memory_block = + in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed"); + + // Database will for a time overlap with in-memory-chain blocks. In + // case of a re-org, it can mean that the database blocks are of a forked chain, and + // so, we should prioritize the in-memory overlapped blocks. + let in_memory_range = + lowest_memory_block.max(start)..=end.min(highest_memory_block); + + // If requested range is in the middle of the in-memory range, remove the necessary + // lowest blocks + in_memory_chain.truncate( + in_memory_chain + .len() + .saturating_sub(start.saturating_sub(lowest_memory_block) as usize), + ); + + let storage_range = + (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1); + + (Some((in_memory_chain, in_memory_range)), storage_range) + } + _ => { + // Drop the in-memory chain so we don't hold blocks in memory. + drop(in_memory_chain); + + (None, Some(start..=end)) + } + }; + + let mut items = Vec::with_capacity((end - start + 1) as usize); + + if let Some(storage_range) = storage_range { + let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?; + items.append(&mut db_items); + + // The predicate was not met, if the number of items differs from the expected. So, we + // return what we have. + if items.len() as u64 != storage_range.end() - storage_range.start() + 1 { + return Ok(items) + } + } + + if let Some((in_memory_chain, in_memory_range)) = in_memory { + for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) { + debug_assert!(num == block.number()); + if let Some(item) = map_block_state_item(block, &mut predicate) { + items.push(item); + } else { + break + } + } + } + + Ok(items) + } + + /// This uses a given [`BlockState`] to initialize a state provider for that block. + fn block_state_provider_ref( + &self, + state: &BlockState, + ) -> ProviderResult> { + let anchor_hash = state.anchor().hash; + let latest_historical = self.history_by_block_hash_ref(anchor_hash)?; + let in_memory = state.chain().map(|block_state| block_state.block()).collect(); + Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory)) + } + + /// Fetches data from either in-memory state or persistent storage for a range of transactions. + /// + /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range. + /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from + /// [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block. + fn get_in_memory_or_storage_by_tx_range( + &self, + range: impl RangeBounds, + fetch_from_db: S, + fetch_from_block_state: M, + ) -> ProviderResult> + where + S: FnOnce( + &DatabaseProviderRO, + RangeInclusive, + ) -> ProviderResult>, + M: Fn(RangeInclusive, &BlockState) -> ProviderResult>, + { + let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::>(); + let provider = &self.storage_provider; + + // Get the last block number stored in the storage which does NOT overlap with in-memory + // chain. + let last_database_block_number = in_mem_chain + .last() + .map(|b| Ok(b.anchor().number)) + .unwrap_or_else(|| provider.last_block_number())?; + + // Get the next tx number for the last block stored in the storage, which marks the start of + // the in-memory state. + let last_block_body_index = provider + .block_body_indices(last_database_block_number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; + let mut in_memory_tx_num = last_block_body_index.next_tx_num(); + + let (start, end) = self.convert_range_bounds(range, || { + in_mem_chain + .iter() + .map(|b| b.block_ref().block().body.transactions.len() as u64) + .sum::() + + last_block_body_index.last_tx_num() + }); + + if start > end { + return Ok(vec![]) + } + + let mut tx_range = start..=end; + + // If the range is entirely before the first in-memory transaction number, fetch from + // storage + if *tx_range.end() < in_memory_tx_num { + return fetch_from_db(provider, tx_range); + } + + let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize); + + // If the range spans storage and memory, get elements from storage first. + if *tx_range.start() < in_memory_tx_num { + // Determine the range that needs to be fetched from storage. + let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1); + + // Set the remaining transaction range for in-memory + tx_range = in_memory_tx_num..=*tx_range.end(); + + items.extend(fetch_from_db(provider, db_range)?); + } + + // Iterate from the lowest block to the highest in-memory chain + for block_state in in_mem_chain.iter().rev() { + let block_tx_count = block_state.block_ref().block().body.transactions.len(); + let remaining = (tx_range.end() - tx_range.start() + 1) as usize; + + // If the transaction range start is equal or higher than the next block first + // transaction, advance + if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 { + in_memory_tx_num += block_tx_count as u64; + continue + } + + // This should only be more than 0 once, in case of a partial range inside a block. + let skip = (tx_range.start() - in_memory_tx_num) as usize; + + items.extend(fetch_from_block_state( + skip..=skip + (remaining.min(block_tx_count - skip) - 1), + block_state, + )?); + + in_memory_tx_num += block_tx_count as u64; + + // Break if the range has been fully processed + if in_memory_tx_num > *tx_range.end() { + break + } + + // Set updated range + tx_range = in_memory_tx_num..=*tx_range.end(); + } + + Ok(items) + } + + /// Fetches data from either in-memory state or persistent storage by transaction + /// [`HashOrNumber`]. + fn get_in_memory_or_storage_by_tx( + &self, + id: HashOrNumber, + fetch_from_db: S, + fetch_from_block_state: M, + ) -> ProviderResult> + where + S: FnOnce(&DatabaseProviderRO) -> ProviderResult>, + M: Fn(usize, TxNumber, &BlockState) -> ProviderResult>, + { + let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::>(); + let provider = &self.storage_provider; + + // Get the last block number stored in the database which does NOT overlap with in-memory + // chain. + let last_database_block_number = in_mem_chain + .last() + .map(|b| Ok(b.anchor().number)) + .unwrap_or_else(|| provider.last_block_number())?; + + // Get the next tx number for the last block stored in the database and consider it the + // first tx number of the in-memory state + let last_block_body_index = provider + .block_body_indices(last_database_block_number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?; + let mut in_memory_tx_num = last_block_body_index.next_tx_num(); + + // If the transaction number is less than the first in-memory transaction number, make a + // database lookup + if let HashOrNumber::Number(id) = id { + if id < in_memory_tx_num { + return fetch_from_db(provider) + } + } + + // Iterate from the lowest block to the highest + for block_state in in_mem_chain.iter().rev() { + let executed_block = block_state.block_ref(); + let block = executed_block.block(); + + for tx_index in 0..block.body.transactions.len() { + match id { + HashOrNumber::Hash(tx_hash) => { + if tx_hash == block.body.transactions[tx_index].hash() { + return fetch_from_block_state(tx_index, in_memory_tx_num, block_state) + } + } + HashOrNumber::Number(id) => { + if id == in_memory_tx_num { + return fetch_from_block_state(tx_index, in_memory_tx_num, block_state) + } + } + } + + in_memory_tx_num += 1; + } + } + + // Not found in-memory, so check database. + if let HashOrNumber::Hash(_) = id { + return fetch_from_db(provider) + } + + Ok(None) + } + + /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`]. + pub(crate) fn get_in_memory_or_storage_by_block( + &self, + id: BlockHashOrNumber, + fetch_from_db: S, + fetch_from_block_state: M, + ) -> ProviderResult + where + S: FnOnce(&DatabaseProviderRO) -> ProviderResult, + M: Fn(&BlockState) -> ProviderResult, + { + if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) { + return fetch_from_block_state(block_state) + } + fetch_from_db(&self.storage_provider) + } +} + +impl ConsistentProvider { + /// Ensures that the given block number is canonical (synced) + /// + /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are + /// out of range and would lead to invalid results, mainly during initial sync. + /// + /// Verifying the `block_number` would be expensive since we need to lookup sync table + /// Instead, we ensure that the `block_number` is within the range of the + /// [`Self::best_block_number`] which is updated when a block is synced. + #[inline] + pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> { + let latest = self.best_block_number()?; + if block_number > latest { + Err(ProviderError::HeaderNotFound(block_number.into())) + } else { + Ok(()) + } + } +} + +impl StaticFileProviderFactory for ConsistentProvider { + fn static_file_provider(&self) -> StaticFileProvider { + self.storage_provider.static_file_provider() + } +} + +impl HeaderProvider for ConsistentProvider { + fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + (*block_hash).into(), + |db_provider| db_provider.header(block_hash), + |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())), + ) + } + + fn header_by_number(&self, num: BlockNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + num.into(), + |db_provider| db_provider.header_by_number(num), + |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())), + ) + } + + fn header_td(&self, hash: &BlockHash) -> ProviderResult> { + if let Some(num) = self.block_number(*hash)? { + self.header_td_by_number(num) + } else { + Ok(None) + } + } + + fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult> { + let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some() + { + // If the block exists in memory, we should return a TD for it. + // + // The canonical in memory state should only store post-merge blocks. Post-merge blocks + // have zero difficulty. This means we can use the total difficulty for the last + // finalized block number if present (so that we are not affected by reorgs), if not the + // last number in the database will be used. + if let Some(last_finalized_num_hash) = + self.canonical_in_memory_state.get_finalized_num_hash() + { + last_finalized_num_hash.number + } else { + self.last_block_number()? + } + } else { + // Otherwise, return what we have on disk for the input block + number + }; + self.storage_provider.header_td_by_number(number) + } + + fn headers_range(&self, range: impl RangeBounds) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, _| db_provider.headers_range(range), + |block_state, _| Some(block_state.block_ref().block().header.header().clone()), + |_| true, + ) + } + + fn sealed_header(&self, number: BlockNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + number.into(), + |db_provider| db_provider.sealed_header(number), + |block_state| Ok(Some(block_state.block_ref().block().header.clone())), + ) + } + + fn sealed_headers_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, _| db_provider.sealed_headers_range(range), + |block_state, _| Some(block_state.block_ref().block().header.clone()), + |_| true, + ) + } + + fn sealed_headers_while( + &self, + range: impl RangeBounds, + predicate: impl FnMut(&SealedHeader) -> bool, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate), + |block_state, predicate| { + let header = &block_state.block_ref().block().header; + predicate(header).then(|| header.clone()) + }, + predicate, + ) + } +} + +impl BlockHashReader for ConsistentProvider { + fn block_hash(&self, number: u64) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + number.into(), + |db_provider| db_provider.block_hash(number), + |block_state| Ok(Some(block_state.hash())), + ) + } + + fn canonical_hashes_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + start..end, + |db_provider, inclusive_range, _| { + db_provider + .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1) + }, + |block_state, _| Some(block_state.hash()), + |_| true, + ) + } +} + +impl BlockNumReader for ConsistentProvider { + fn chain_info(&self) -> ProviderResult { + let best_number = self.best_block_number()?; + Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number }) + } + + fn best_block_number(&self) -> ProviderResult { + self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number()) + } + + fn last_block_number(&self) -> ProviderResult { + self.storage_provider.last_block_number() + } + + fn block_number(&self, hash: B256) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + hash.into(), + |db_provider| db_provider.block_number(hash), + |block_state| Ok(Some(block_state.number())), + ) + } +} + +impl BlockIdReader for ConsistentProvider { + fn pending_block_num_hash(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.pending_block_num_hash()) + } + + fn safe_block_num_hash(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.get_safe_num_hash()) + } + + fn finalized_block_num_hash(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.get_finalized_num_hash()) + } +} + +impl BlockReader for ConsistentProvider { + fn find_block_by_hash(&self, hash: B256, source: BlockSource) -> ProviderResult> { + match source { + BlockSource::Any | BlockSource::Canonical => { + // Note: it's fine to return the unsealed block because the caller already has + // the hash + self.get_in_memory_or_storage_by_block( + hash.into(), + |db_provider| db_provider.find_block_by_hash(hash, source), + |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())), + ) + } + BlockSource::Pending => { + Ok(self.canonical_in_memory_state.pending_block().map(|block| block.unseal())) + } + } + } + + fn block(&self, id: BlockHashOrNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + id, + |db_provider| db_provider.block(id), + |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())), + ) + } + + fn pending_block(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.pending_block()) + } + + fn pending_block_with_senders(&self) -> ProviderResult> { + Ok(self.canonical_in_memory_state.pending_block_with_senders()) + } + + fn pending_block_and_receipts(&self) -> ProviderResult)>> { + Ok(self.canonical_in_memory_state.pending_block_and_receipts()) + } + + fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult>> { + self.get_in_memory_or_storage_by_block( + id, + |db_provider| db_provider.ommers(id), + |block_state| { + if self.chain_spec().final_paris_total_difficulty(block_state.number()).is_some() { + return Ok(Some(Vec::new())) + } + + Ok(Some(block_state.block_ref().block().body.ommers.clone())) + }, + ) + } + + fn block_body_indices( + &self, + number: BlockNumber, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + number.into(), + |db_provider| db_provider.block_body_indices(number), + |block_state| { + // Find the last block indices on database + let last_storage_block_number = block_state.anchor().number; + let mut stored_indices = self + .storage_provider + .block_body_indices(last_storage_block_number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?; + + // Prepare our block indices + stored_indices.first_tx_num = stored_indices.next_tx_num(); + stored_indices.tx_count = 0; + + // Iterate from the lowest block in memory until our target block + for state in block_state.chain().collect::>().into_iter().rev() { + let block_tx_count = state.block_ref().block.body.transactions.len() as u64; + if state.block_ref().block().number == number { + stored_indices.tx_count = block_tx_count; + } else { + stored_indices.first_tx_num += block_tx_count; + } + } + + Ok(Some(stored_indices)) + }, + ) + } + + /// Returns the block with senders with matching number or hash from database. + /// + /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid + /// hashes, since they would need to be calculated on the spot, and we want fast querying.** + /// + /// Returns `None` if block is not found. + fn block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + id, + |db_provider| db_provider.block_with_senders(id, transaction_kind), + |block_state| Ok(Some(block_state.block_with_senders())), + ) + } + + fn sealed_block_with_senders( + &self, + id: BlockHashOrNumber, + transaction_kind: TransactionVariant, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block( + id, + |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind), + |block_state| Ok(Some(block_state.sealed_block_with_senders())), + ) + } + + fn block_range(&self, range: RangeInclusive) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, _| db_provider.block_range(range), + |block_state, _| Some(block_state.block_ref().block().clone().unseal()), + |_| true, + ) + } + + fn block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, _| db_provider.block_with_senders_range(range), + |block_state, _| Some(block_state.block_with_senders()), + |_| true, + ) + } + + fn sealed_block_with_senders_range( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, _| db_provider.sealed_block_with_senders_range(range), + |block_state, _| Some(block_state.sealed_block_with_senders()), + |_| true, + ) + } +} + +impl TransactionsProvider for ConsistentProvider { + fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + tx_hash.into(), + |db_provider| db_provider.transaction_id(tx_hash), + |_, tx_number, _| Ok(Some(tx_number)), + ) + } + + fn transaction_by_id(&self, id: TxNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + id.into(), + |provider| provider.transaction_by_id(id), + |tx_index, _, block_state| { + Ok(block_state.block_ref().block().body.transactions.get(tx_index).cloned()) + }, + ) + } + + fn transaction_by_id_no_hash( + &self, + id: TxNumber, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + id.into(), + |provider| provider.transaction_by_id_no_hash(id), + |tx_index, _, block_state| { + Ok(block_state + .block_ref() + .block() + .body + .transactions + .get(tx_index) + .cloned() + .map(Into::into)) + }, + ) + } + + fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult> { + if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) { + return Ok(Some(tx)) + } + + self.storage_provider.transaction_by_hash(hash) + } + + fn transaction_by_hash_with_meta( + &self, + tx_hash: TxHash, + ) -> ProviderResult> { + if let Some((tx, meta)) = + self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash)) + { + return Ok(Some((tx, meta))) + } + + self.storage_provider.transaction_by_hash_with_meta(tx_hash) + } + + fn transaction_block(&self, id: TxNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + id.into(), + |provider| provider.transaction_block(id), + |_, _, block_state| Ok(Some(block_state.block_ref().block().number)), + ) + } + + fn transactions_by_block( + &self, + id: BlockHashOrNumber, + ) -> ProviderResult>> { + self.get_in_memory_or_storage_by_block( + id, + |provider| provider.transactions_by_block(id), + |block_state| Ok(Some(block_state.block_ref().block().body.transactions.clone())), + ) + } + + fn transactions_by_block_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult>> { + self.get_in_memory_or_storage_by_block_range_while( + range, + |db_provider, range, _| db_provider.transactions_by_block_range(range), + |block_state, _| Some(block_state.block_ref().block().body.transactions.clone()), + |_| true, + ) + } + + fn transactions_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx_range( + range, + |db_provider, db_range| db_provider.transactions_by_tx_range(db_range), + |index_range, block_state| { + Ok(block_state.block_ref().block().body.transactions[index_range] + .iter() + .cloned() + .map(Into::into) + .collect()) + }, + ) + } + + fn senders_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx_range( + range, + |db_provider, db_range| db_provider.senders_by_tx_range(db_range), + |index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()), + ) + } + + fn transaction_sender(&self, id: TxNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + id.into(), + |provider| provider.transaction_sender(id), + |tx_index, _, block_state| Ok(block_state.block_ref().senders.get(tx_index).copied()), + ) + } +} + +impl ReceiptProvider for ConsistentProvider { + fn receipt(&self, id: TxNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + id.into(), + |provider| provider.receipt(id), + |tx_index, _, block_state| { + Ok(block_state.executed_block_receipts().get(tx_index).cloned()) + }, + ) + } + + fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult> { + for block_state in self.head_block.iter().flat_map(|b| b.chain()) { + let executed_block = block_state.block_ref(); + let block = executed_block.block(); + let receipts = block_state.executed_block_receipts(); + + // assuming 1:1 correspondence between transactions and receipts + debug_assert_eq!( + block.body.transactions.len(), + receipts.len(), + "Mismatch between transaction and receipt count" + ); + + if let Some(tx_index) = block.body.transactions.iter().position(|tx| tx.hash() == hash) + { + // safe to use tx_index for receipts due to 1:1 correspondence + return Ok(receipts.get(tx_index).cloned()); + } + } + + self.storage_provider.receipt_by_hash(hash) + } + + fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult>> { + self.get_in_memory_or_storage_by_block( + block, + |db_provider| db_provider.receipts_by_block(block), + |block_state| Ok(Some(block_state.executed_block_receipts())), + ) + } + + fn receipts_by_tx_range( + &self, + range: impl RangeBounds, + ) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx_range( + range, + |db_provider, db_range| db_provider.receipts_by_tx_range(db_range), + |index_range, block_state| { + Ok(block_state.executed_block_receipts().drain(index_range).collect()) + }, + ) + } +} + +impl ReceiptProviderIdExt for ConsistentProvider { + fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult>> { + match block { + BlockId::Hash(rpc_block_hash) => { + let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?; + if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) { + if let Some(state) = self + .head_block + .as_ref() + .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into())) + { + receipts = Some(state.executed_block_receipts()); + } + } + Ok(receipts) + } + BlockId::Number(num_tag) => match num_tag { + BlockNumberOrTag::Pending => Ok(self + .canonical_in_memory_state + .pending_state() + .map(|block_state| block_state.executed_block_receipts())), + _ => { + if let Some(num) = self.convert_block_number(num_tag)? { + self.receipts_by_block(num.into()) + } else { + Ok(None) + } + } + }, + } + } +} + +impl WithdrawalsProvider for ConsistentProvider { + fn withdrawals_by_block( + &self, + id: BlockHashOrNumber, + timestamp: u64, + ) -> ProviderResult> { + if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) { + return Ok(None) + } + + self.get_in_memory_or_storage_by_block( + id, + |db_provider| db_provider.withdrawals_by_block(id, timestamp), + |block_state| Ok(block_state.block_ref().block().body.withdrawals.clone()), + ) + } + + fn latest_withdrawal(&self) -> ProviderResult> { + let best_block_num = self.best_block_number()?; + + self.get_in_memory_or_storage_by_block( + best_block_num.into(), + |db_provider| db_provider.latest_withdrawal(), + |block_state| { + Ok(block_state + .block_ref() + .block() + .body + .withdrawals + .clone() + .and_then(|mut w| w.pop())) + }, + ) + } +} + +impl StageCheckpointReader for ConsistentProvider { + fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult> { + self.storage_provider.get_stage_checkpoint(id) + } + + fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult>> { + self.storage_provider.get_stage_checkpoint_progress(id) + } + + fn get_all_checkpoints(&self) -> ProviderResult> { + self.storage_provider.get_all_checkpoints() + } +} + +impl EvmEnvProvider for ConsistentProvider { + fn fill_env_at( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + block_env: &mut BlockEnv, + at: BlockHashOrNumber, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv
, + { + let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; + let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; + self.fill_env_with_header(cfg, block_env, &header, evm_config) + } + + fn fill_env_with_header( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + block_env: &mut BlockEnv, + header: &Header, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv
, + { + let total_difficulty = self + .header_td_by_number(header.number)? + .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; + evm_config.fill_cfg_and_block_env(cfg, block_env, header, total_difficulty); + Ok(()) + } + + fn fill_cfg_env_at( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + at: BlockHashOrNumber, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv
, + { + let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?; + let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?; + self.fill_cfg_env_with_header(cfg, &header, evm_config) + } + + fn fill_cfg_env_with_header( + &self, + cfg: &mut CfgEnvWithHandlerCfg, + header: &Header, + evm_config: EvmConfig, + ) -> ProviderResult<()> + where + EvmConfig: ConfigureEvmEnv
, + { + let total_difficulty = self + .header_td_by_number(header.number)? + .ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?; + evm_config.fill_cfg_env(cfg, header, total_difficulty); + Ok(()) + } +} + +impl PruneCheckpointReader for ConsistentProvider { + fn get_prune_checkpoint( + &self, + segment: PruneSegment, + ) -> ProviderResult> { + self.storage_provider.get_prune_checkpoint(segment) + } + + fn get_prune_checkpoints(&self) -> ProviderResult> { + self.storage_provider.get_prune_checkpoints() + } +} + +impl ChainSpecProvider for ConsistentProvider { + type ChainSpec = N::ChainSpec; + + fn chain_spec(&self) -> Arc { + ChainSpecProvider::chain_spec(&self.storage_provider) + } +} + +impl BlockReaderIdExt for ConsistentProvider { + fn block_by_id(&self, id: BlockId) -> ProviderResult> { + match id { + BlockId::Number(num) => self.block_by_number_or_tag(num), + BlockId::Hash(hash) => { + // TODO: should we only apply this for the RPCs that are listed in EIP-1898? + // so not at the provider level? + // if we decide to do this at a higher level, then we can make this an automatic + // trait impl + if Some(true) == hash.require_canonical { + // check the database, canonical blocks are only stored in the database + self.find_block_by_hash(hash.block_hash, BlockSource::Canonical) + } else { + self.block_by_hash(hash.block_hash) + } + } + } + } + + fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult> { + Ok(match id { + BlockNumberOrTag::Latest => { + Some(self.canonical_in_memory_state.get_canonical_head().unseal()) + } + BlockNumberOrTag::Finalized => { + self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal()) + } + BlockNumberOrTag::Safe => { + self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal()) + } + BlockNumberOrTag::Earliest => self.header_by_number(0)?, + BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(), + + BlockNumberOrTag::Number(num) => self.header_by_number(num)?, + }) + } + + fn sealed_header_by_number_or_tag( + &self, + id: BlockNumberOrTag, + ) -> ProviderResult> { + match id { + BlockNumberOrTag::Latest => { + Ok(Some(self.canonical_in_memory_state.get_canonical_head())) + } + BlockNumberOrTag::Finalized => { + Ok(self.canonical_in_memory_state.get_finalized_header()) + } + BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()), + BlockNumberOrTag::Earliest => self.header_by_number(0)?.map_or_else( + || Ok(None), + |h| { + let sealed = h.seal_slow(); + let (header, seal) = sealed.into_parts(); + Ok(Some(SealedHeader::new(header, seal))) + }, + ), + BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()), + BlockNumberOrTag::Number(num) => self.header_by_number(num)?.map_or_else( + || Ok(None), + |h| { + let sealed = h.seal_slow(); + let (header, seal) = sealed.into_parts(); + Ok(Some(SealedHeader::new(header, seal))) + }, + ), + } + } + + fn sealed_header_by_id(&self, id: BlockId) -> ProviderResult> { + Ok(match id { + BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?, + BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(|h| { + let sealed = h.seal_slow(); + let (header, seal) = sealed.into_parts(); + SealedHeader::new(header, seal) + }), + }) + } + + fn header_by_id(&self, id: BlockId) -> ProviderResult> { + Ok(match id { + BlockId::Number(num) => self.header_by_number_or_tag(num)?, + BlockId::Hash(hash) => self.header(&hash.block_hash)?, + }) + } + + fn ommers_by_id(&self, id: BlockId) -> ProviderResult>> { + match id { + BlockId::Number(num) => self.ommers_by_number_or_tag(num), + BlockId::Hash(hash) => { + // TODO: EIP-1898 question, see above + // here it is not handled + self.ommers(BlockHashOrNumber::Hash(hash.block_hash)) + } + } + } +} + +impl StorageChangeSetReader for ConsistentProvider { + fn storage_changeset( + &self, + block_number: BlockNumber, + ) -> ProviderResult> { + if let Some(state) = + self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into())) + { + let changesets = state + .block() + .execution_output + .bundle + .reverts + .clone() + .into_plain_state_reverts() + .storage + .into_iter() + .flatten() + .flat_map(|revert: PlainStorageRevert| { + revert.storage_revert.into_iter().map(move |(key, value)| { + ( + BlockNumberAddress((block_number, revert.address)), + StorageEntry { key: key.into(), value: value.to_previous_value() }, + ) + }) + }) + .collect(); + Ok(changesets) + } else { + // Perform checks on whether or not changesets exist for the block. + + // No prune checkpoint means history should exist and we should `unwrap_or(true)` + let storage_history_exists = self + .storage_provider + .get_prune_checkpoint(PruneSegment::StorageHistory)? + .and_then(|checkpoint| { + // return true if the block number is ahead of the prune checkpoint. + // + // The checkpoint stores the highest pruned block number, so we should make + // sure the block_number is strictly greater. + checkpoint.block_number.map(|checkpoint| block_number > checkpoint) + }) + .unwrap_or(true); + + if !storage_history_exists { + return Err(ProviderError::StateAtBlockPruned(block_number)) + } + + self.storage_provider.storage_changeset(block_number) + } + } +} + +impl ChangeSetReader for ConsistentProvider { + fn account_block_changeset( + &self, + block_number: BlockNumber, + ) -> ProviderResult> { + if let Some(state) = + self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into())) + { + let changesets = state + .block_ref() + .execution_output + .bundle + .reverts + .clone() + .into_plain_state_reverts() + .accounts + .into_iter() + .flatten() + .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) }) + .collect(); + Ok(changesets) + } else { + // Perform checks on whether or not changesets exist for the block. + + // No prune checkpoint means history should exist and we should `unwrap_or(true)` + let account_history_exists = self + .storage_provider + .get_prune_checkpoint(PruneSegment::AccountHistory)? + .and_then(|checkpoint| { + // return true if the block number is ahead of the prune checkpoint. + // + // The checkpoint stores the highest pruned block number, so we should make + // sure the block_number is strictly greater. + checkpoint.block_number.map(|checkpoint| block_number > checkpoint) + }) + .unwrap_or(true); + + if !account_history_exists { + return Err(ProviderError::StateAtBlockPruned(block_number)) + } + + self.storage_provider.account_block_changeset(block_number) + } + } +} + +impl AccountReader for ConsistentProvider { + /// Get basic account information. + fn basic_account(&self, address: Address) -> ProviderResult> { + // use latest state provider + let state_provider = self.latest_ref()?; + state_provider.basic_account(address) + } +} + +impl StateReader for ConsistentProvider { + /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary. + /// + /// If data for the block does not exist, this will return [`None`]. + /// + /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is + /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be + /// inconsistent. Currently this can safely be called within the blockchain tree thread, + /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the + /// first place. + fn get_state(&self, block: BlockNumber) -> ProviderResult> { + if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) { + let state = state.block_ref().execution_outcome().clone(); + Ok(Some(state)) + } else { + Self::get_state(self, block..=block) + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ + providers::blockchain_provider::BlockchainProvider2, + test_utils::create_test_provider_factory, BlockWriter, + }; + use alloy_eips::BlockHashOrNumber; + use alloy_primitives::B256; + use itertools::Itertools; + use rand::Rng; + use reth_chain_state::{ExecutedBlock, NewCanonicalChain}; + use reth_db::models::AccountBeforeTx; + use reth_execution_types::ExecutionOutcome; + use reth_primitives::SealedBlock; + use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader}; + use reth_testing_utils::generators::{ + self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams, + }; + use revm::db::BundleState; + use std::{ + ops::{Bound, Range, RangeBounds}, + sync::Arc, + }; + + const TEST_BLOCKS_COUNT: usize = 5; + + fn random_blocks( + rng: &mut impl Rng, + database_blocks: usize, + in_memory_blocks: usize, + requests_count: Option>, + withdrawals_count: Option>, + tx_count: impl RangeBounds, + ) -> (Vec, Vec) { + let block_range = (database_blocks + in_memory_blocks - 1) as u64; + + let tx_start = match tx_count.start_bound() { + Bound::Included(&n) | Bound::Excluded(&n) => n, + Bound::Unbounded => u8::MIN, + }; + let tx_end = match tx_count.end_bound() { + Bound::Included(&n) | Bound::Excluded(&n) => n + 1, + Bound::Unbounded => u8::MAX, + }; + + let blocks = random_block_range( + rng, + 0..=block_range, + BlockRangeParams { + parent: Some(B256::ZERO), + tx_count: tx_start..tx_end, + requests_count, + withdrawals_count, + }, + ); + let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks); + (database_blocks.to_vec(), in_memory_blocks.to_vec()) + } + + #[test] + fn test_block_reader_find_block_by_hash() -> eyre::Result<()> { + // Initialize random number generator and provider factory + let mut rng = generators::rng(); + let factory = create_test_provider_factory(); + + // Generate 10 random blocks and split into database and in-memory blocks + let blocks = random_block_range( + &mut rng, + 0..=10, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + let (database_blocks, in_memory_blocks) = blocks.split_at(5); + + // Insert first 5 blocks into the database + let provider_rw = factory.provider_rw()?; + for block in database_blocks { + provider_rw.insert_historical_block( + block.clone().seal_with_senders().expect("failed to seal block with senders"), + )?; + } + provider_rw.commit()?; + + // Create a new provider + let provider = BlockchainProvider2::new(factory)?; + let consistent_provider = provider.consistent_provider()?; + + // Useful blocks + let first_db_block = database_blocks.first().unwrap(); + let first_in_mem_block = in_memory_blocks.first().unwrap(); + let last_in_mem_block = in_memory_blocks.last().unwrap(); + + // No block in memory before setting in memory state + assert_eq!( + consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?, + None + ); + assert_eq!( + consistent_provider + .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?, + None + ); + // No pending block in memory + assert_eq!( + consistent_provider + .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?, + None + ); + + // Insert first block into the in-memory state + let in_memory_block_senders = + first_in_mem_block.senders().expect("failed to recover senders"); + let chain = NewCanonicalChain::Commit { + new: vec![ExecutedBlock::new( + Arc::new(first_in_mem_block.clone()), + Arc::new(in_memory_block_senders), + Default::default(), + Default::default(), + Default::default(), + )], + }; + consistent_provider.canonical_in_memory_state.update_chain(chain); + let consistent_provider = provider.consistent_provider()?; + + // Now the block should be found in memory + assert_eq!( + consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?, + Some(first_in_mem_block.clone().into()) + ); + assert_eq!( + consistent_provider + .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?, + Some(first_in_mem_block.clone().into()) + ); + + // Find the first block in database by hash + assert_eq!( + consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?, + Some(first_db_block.clone().into()) + ); + assert_eq!( + consistent_provider + .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?, + Some(first_db_block.clone().into()) + ); + + // No pending block in database + assert_eq!( + consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?, + None + ); + + // Insert the last block into the pending state + provider.canonical_in_memory_state.set_pending_block(ExecutedBlock { + block: Arc::new(last_in_mem_block.clone()), + senders: Default::default(), + execution_output: Default::default(), + hashed_state: Default::default(), + trie: Default::default(), + }); + + // Now the last block should be found in memory + assert_eq!( + consistent_provider + .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?, + Some(last_in_mem_block.clone().into()) + ); + + Ok(()) + } + + #[test] + fn test_block_reader_block() -> eyre::Result<()> { + // Initialize random number generator and provider factory + let mut rng = generators::rng(); + let factory = create_test_provider_factory(); + + // Generate 10 random blocks and split into database and in-memory blocks + let blocks = random_block_range( + &mut rng, + 0..=10, + BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() }, + ); + let (database_blocks, in_memory_blocks) = blocks.split_at(5); + + // Insert first 5 blocks into the database + let provider_rw = factory.provider_rw()?; + for block in database_blocks { + provider_rw.insert_historical_block( + block.clone().seal_with_senders().expect("failed to seal block with senders"), + )?; + } + provider_rw.commit()?; + + // Create a new provider + let provider = BlockchainProvider2::new(factory)?; + let consistent_provider = provider.consistent_provider()?; + + // First in memory block + let first_in_mem_block = in_memory_blocks.first().unwrap(); + // First database block + let first_db_block = database_blocks.first().unwrap(); + + // First in memory block should not be found yet as not integrated to the in-memory state + assert_eq!( + consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?, + None + ); + assert_eq!( + consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?, + None + ); + + // Insert first block into the in-memory state + let in_memory_block_senders = + first_in_mem_block.senders().expect("failed to recover senders"); + let chain = NewCanonicalChain::Commit { + new: vec![ExecutedBlock::new( + Arc::new(first_in_mem_block.clone()), + Arc::new(in_memory_block_senders), + Default::default(), + Default::default(), + Default::default(), + )], + }; + consistent_provider.canonical_in_memory_state.update_chain(chain); + + let consistent_provider = provider.consistent_provider()?; + + // First in memory block should be found + assert_eq!( + consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?, + Some(first_in_mem_block.clone().into()) + ); + assert_eq!( + consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?, + Some(first_in_mem_block.clone().into()) + ); + + // First database block should be found + assert_eq!( + consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?, + Some(first_db_block.clone().into()) + ); + assert_eq!( + consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?, + Some(first_db_block.clone().into()) + ); + + Ok(()) + } + + #[test] + fn test_changeset_reader() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let (database_blocks, in_memory_blocks) = + random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1); + + let first_database_block = database_blocks.first().map(|block| block.number).unwrap(); + let last_database_block = database_blocks.last().map(|block| block.number).unwrap(); + let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap(); + + let accounts = random_eoa_accounts(&mut rng, 2); + + let (database_changesets, database_state) = random_changeset_range( + &mut rng, + &database_blocks, + accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))), + 0..0, + 0..0, + ); + let (in_memory_changesets, in_memory_state) = random_changeset_range( + &mut rng, + &in_memory_blocks, + database_state + .iter() + .map(|(address, (account, storage))| (*address, (*account, storage.clone()))), + 0..0, + 0..0, + ); + + let factory = create_test_provider_factory(); + + let provider_rw = factory.provider_rw()?; + provider_rw.append_blocks_with_state( + database_blocks + .into_iter() + .map(|b| b.seal_with_senders().expect("failed to seal block with senders")) + .collect(), + ExecutionOutcome { + bundle: BundleState::new( + database_state.into_iter().map(|(address, (account, _))| { + (address, None, Some(account.into()), Default::default()) + }), + database_changesets + .iter() + .map(|block_changesets| { + block_changesets.iter().map(|(address, account, _)| { + (*address, Some(Some((*account).into())), []) + }) + }) + .collect::>(), + Vec::new(), + ), + first_block: first_database_block, + ..Default::default() + }, + Default::default(), + Default::default(), + )?; + provider_rw.commit()?; + + let provider = BlockchainProvider2::new(factory)?; + + let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap(); + let chain = NewCanonicalChain::Commit { + new: vec![in_memory_blocks + .first() + .map(|block| { + let senders = block.senders().expect("failed to recover senders"); + ExecutedBlock::new( + Arc::new(block.clone()), + Arc::new(senders), + Arc::new(ExecutionOutcome { + bundle: BundleState::new( + in_memory_state.into_iter().map(|(address, (account, _))| { + (address, None, Some(account.into()), Default::default()) + }), + [in_memory_changesets.iter().map(|(address, account, _)| { + (*address, Some(Some((*account).into())), Vec::new()) + })], + [], + ), + first_block: first_in_memory_block, + ..Default::default() + }), + Default::default(), + Default::default(), + ) + }) + .unwrap()], + }; + provider.canonical_in_memory_state.update_chain(chain); + + let consistent_provider = provider.consistent_provider()?; + + assert_eq!( + consistent_provider.account_block_changeset(last_database_block).unwrap(), + database_changesets + .into_iter() + .last() + .unwrap() + .into_iter() + .sorted_by_key(|(address, _, _)| *address) + .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) }) + .collect::>() + ); + assert_eq!( + consistent_provider.account_block_changeset(first_in_memory_block).unwrap(), + in_memory_changesets + .into_iter() + .sorted_by_key(|(address, _, _)| *address) + .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) }) + .collect::>() + ); + + Ok(()) + } +} diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index a67ebf89ba62..c81ef05d2eaa 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -61,6 +61,9 @@ pub use consistent_view::{ConsistentDbView, ConsistentViewError}; mod blockchain_provider; pub use blockchain_provider::BlockchainProvider2; +mod consistent; +pub use consistent::ConsistentProvider; + /// Helper trait keeping common requirements of providers for [`NodeTypesWithDB`]. pub trait ProviderNodeTypes: NodeTypesWithDB {}