Skip to content

Commit

Permalink
feat: prefetch proofs in prewarm thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Jan 22, 2025
1 parent ea4a67d commit 86536dd
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 27 deletions.
88 changes: 62 additions & 26 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use alloy_primitives::{
map::{HashMap, HashSet},
keccak256,
map::{B256Set, HashMap, HashSet},
Address, BlockNumber, B256, U256,
};
use alloy_rpc_types_engine::{
Expand All @@ -35,7 +36,7 @@ use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::{
execute::BlockExecutorProvider,
system_calls::{NoopHook, OnStateHook},
ConfigureEvm, Evm
ConfigureEvm, Evm,
};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_builder_primitives::PayloadBuilder;
Expand All @@ -52,11 +53,16 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::ControlFlow;
use reth_trie::{
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState, TrieInput,
trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, HashedPostState,
MultiProofTargets, TrieInput,
};
use reth_trie_db::DatabaseTrieCursorFactory;
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use root::{StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootTask};
use revm_primitives::ResultAndState;
use root::{
StateHookSender, StateRootComputeOutcome, StateRootConfig, StateRootHandle, StateRootMessage,
StateRootTask,
};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, VecDeque},
Expand Down Expand Up @@ -2274,6 +2280,32 @@ where
return Err(e.into())
}

let persistence_not_in_progress = !self.persistence_state.in_progress();
let (state_root_handle, state_root_task_config, state_root_sender, state_hook) =
if persistence_not_in_progress && self.config.use_state_root_task() {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let state_root_config = StateRootConfig::new_from_input(
consistent_view.clone(),
self.compute_trie_input(consistent_view, block.header().parent_hash())
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?,
);

let state_root_task = StateRootTask::new(
state_root_config.clone(),
self.state_root_task_pool.clone(),
);
let state_root_sender = state_root_task.state_hook_sender();
let state_hook = Box::new(state_root_task.state_hook()) as Box<dyn OnStateHook>;
(
Some(state_root_task.spawn()),
Some(state_root_config),
Some(state_root_sender),
state_hook,
)
} else {
(None, None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
};

// Use cached state provider before executing, this does nothing currently, will be used in
// prewarming
let caches = ProviderCacheBuilder::default().build_caches();
Expand All @@ -2288,12 +2320,15 @@ where

// Prewarm transactions
for (tx, sender) in block.body().transactions().iter().zip(block.senders()) {
let state_root_sender = state_root_sender.clone();

self.prewarm_transaction(
block.header().clone(),
tx.clone(),
*sender,
caches.clone(),
cache_metrics.clone(),
state_root_sender,
)?;
}

Expand All @@ -2304,26 +2339,6 @@ where
let block_hash = block.hash();
let sealed_block = Arc::new(block.clone_sealed_block());

let persistence_not_in_progress = !self.persistence_state.in_progress();

let (state_root_handle, state_root_task_config, state_hook) = if persistence_not_in_progress &&
self.config.use_state_root_task()
{
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let state_root_config = StateRootConfig::new_from_input(
consistent_view.clone(),
self.compute_trie_input(consistent_view, block.header().parent_hash())
.map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?,
);

let state_root_task =
StateRootTask::new(state_root_config.clone(), self.state_root_task_pool.clone());
let state_hook = Box::new(state_root_task.state_hook()) as Box<dyn OnStateHook>;
(Some(state_root_task.spawn()), Some(state_root_config), state_hook)
} else {
(None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
};

let execution_start = Instant::now();
let output = self.metrics.executor.execute_metered(executor, &block, state_hook)?;
let execution_time = execution_start.elapsed();
Expand Down Expand Up @@ -2499,6 +2514,7 @@ where
sender: Address,
caches: ProviderCaches,
cache_metrics: CachedStateMetrics,
state_root_sender: Option<StateHookSender>,
) -> Result<(), InsertBlockErrorKind> {
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
error!(target: "engine::tree", parent=?block.parent_hash(), "Could not get state provider for prewarm");
Expand All @@ -2522,9 +2538,29 @@ where
// create the tx env and reset nonce
let mut tx_env = evm_config.tx_env(&tx, sender);
tx_env.nonce = None;
if let Err(err) = evm.transact(tx_env) {
error!(target: "engine::tree", ?err, tx_hash=?tx.tx_hash(), ?sender, "Error when executing prewarm transaction");
let ResultAndState { state, .. } = match evm.transact(tx_env) {
Ok(res) => res,
Err(err) => {
error!(target: "engine::tree", ?err, tx_hash=?tx.tx_hash(), ?sender, "Error when executing prewarm transaction");
return
}
};

let Some(state_root_sender) = state_root_sender else {
return
};

let mut targets = MultiProofTargets::default();
for (addr, account) in state {
let mut storage_set = B256Set::default();
for (key, _) in account.storage {
storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
}

targets.insert(keccak256(addr), storage_set);
}

let _ = state_root_sender.send(StateRootMessage::PrefetchProofs(targets));
});

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ProofSequencer {
}

/// A wrapper for the sender that signals completion when dropped
#[derive(Deref, Debug)]
#[derive(Deref, Debug, Clone)]
pub struct StateHookSender(Sender<StateRootMessage>);

impl StateHookSender {
Expand Down

0 comments on commit 86536dd

Please sign in to comment.