Skip to content

Commit

Permalink
refactor: unify logic for writing receipts (#12878)
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr authored Nov 26, 2024
1 parent dee0b8c commit 2d6b893
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 332 deletions.
15 changes: 9 additions & 6 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use reth_network_api::NetworkInfo;
use reth_node_ethereum::EthExecutorProvider;
use reth_primitives::BlockExt;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, AccountExtReader,
ChainSpecProvider, HashingWriter, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderFactory, StageCheckpointReader, StateWriter, StorageReader,
providers::ProviderNodeTypes, AccountExtReader, ChainSpecProvider, DatabaseProviderFactory,
HashingWriter, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderFactory,
StageCheckpointReader, StateWriter, StorageLocation, StorageReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
Expand Down Expand Up @@ -163,7 +163,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
return Ok(())
}

let provider_rw = provider_factory.provider_rw()?;
let provider_rw = provider_factory.database_provider_rw()?;

// Insert block, state and hashes
provider_rw.insert_historical_block(
Expand All @@ -172,8 +172,11 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw.0);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
provider_rw.write_to_storage(
execution_outcome,
OriginalValuesKnown::No,
StorageLocation::Database,
)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
provider_rw.insert_storage_for_hashing(storages)?;
Expand Down
13 changes: 8 additions & 5 deletions bin/reth/src/commands/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use reth_network_p2p::full_block::FullBlockClient;
use reth_node_api::BlockTy;
use reth_node_ethereum::EthExecutorProvider;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockNumReader, BlockWriter,
ChainSpecProvider, DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter, StorageLocation,
providers::ProviderNodeTypes, BlockNumReader, BlockWriter, ChainSpecProvider,
DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, ProviderFactory, StateWriter, StorageLocation,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
Expand Down Expand Up @@ -158,8 +158,11 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
let execution_outcome = executor.finalize();

let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
provider_rw.write_to_storage(
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::Database,
)?;

let checkpoint = Some(StageCheckpoint::new(
block_number
Expand Down
12 changes: 6 additions & 6 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use reth_primitives::Receipts;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter,
StateWriter, StaticFileProviderFactory, StaticFileWriter, StatsReader,
StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
};
use reth_stages::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
Expand Down Expand Up @@ -219,11 +219,11 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());

// finally, write the receipts
let mut storage_writer = UnifiedStorageWriter::from(
&provider,
static_file_provider.latest_writer(StaticFileSegment::Receipts)?,
);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_to_storage(
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::StaticFiles,
)?;
}

// Only commit if we have imported as many receipts as the number of transactions.
Expand Down
17 changes: 7 additions & 10 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ use reth_primitives::{SealedHeader, StaticFileSegment};
use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives};
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::UnifiedStorageWriter,
BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, StateChangeWriter, StateWriter, StaticFileProviderFactory,
StatsReader, TransactionVariant,
StatsReader, StorageLocation, TransactionVariant,
};
use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
Expand Down Expand Up @@ -180,9 +179,8 @@ where
+ StaticFileProviderFactory
+ StatsReader
+ StateChangeWriter
+ BlockHashReader,
for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a, Provider::Primitives>>:
StateWriter,
+ BlockHashReader
+ StateWriter,
{
/// Return the id of the stage
fn id(&self) -> StageId {
Expand Down Expand Up @@ -211,7 +209,7 @@ where
let static_file_provider = provider.static_file_provider();

// We only use static files for Receipts, if there is no receipt pruning of any kind.
let static_file_producer = if self.prune_modes.receipts.is_none() &&
let write_receipts_to = if self.prune_modes.receipts.is_none() &&
self.prune_modes.receipts_log_filter.is_empty()
{
debug!(target: "sync::stages::execution", start = start_block, "Preparing static file producer");
Expand All @@ -220,9 +218,9 @@ where
// Since there might be a database <-> static file inconsistency (read
// `prepare_static_file_producer` for context), we commit the change straight away.
producer.commit()?;
Some(producer)
StorageLocation::StaticFiles
} else {
None
StorageLocation::Database
};

let db = StateProviderDatabase(LatestStateProviderRef::new(provider));
Expand Down Expand Up @@ -362,8 +360,7 @@ where
let time = Instant::now();

// write output
let mut writer = UnifiedStorageWriter::new(provider, static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;
provider.write_to_storage(state, OriginalValuesKnown::Yes, write_receipts_to)?;

let db_write_duration = time.elapsed();
debug!(
Expand Down
14 changes: 11 additions & 3 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use reth_provider::{
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider,
DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter,
OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateChangeWriter,
StateWriter, StaticFileProviderFactory, TrieWriter,
StateWriter, StaticFileProviderFactory, StorageLocation, TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
Expand Down Expand Up @@ -76,6 +76,7 @@ where
+ HeaderProvider
+ HashingWriter
+ StateChangeWriter
+ StateWriter
+ AsRef<PF::ProviderRW>,
{
let chain = factory.chain_spec();
Expand Down Expand Up @@ -147,6 +148,7 @@ where
+ DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider
+ StateWriter
+ AsRef<Provider>,
{
insert_state(provider, alloc, 0)
Expand All @@ -163,6 +165,7 @@ where
+ DBProvider<Tx: DbTxMut>
+ StateChangeWriter
+ HeaderProvider
+ StateWriter
+ AsRef<Provider>,
{
let capacity = alloc.size_hint().1.unwrap_or(0);
Expand Down Expand Up @@ -230,8 +233,11 @@ where
Vec::new(),
);

let mut storage_writer = UnifiedStorageWriter::from_database(&provider);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_to_storage(
execution_outcome,
OriginalValuesKnown::Yes,
StorageLocation::Database,
)?;

trace!(target: "reth::cli", "Inserted state");

Expand Down Expand Up @@ -351,6 +357,7 @@ where
+ HashingWriter
+ StateChangeWriter
+ TrieWriter
+ StateWriter
+ AsRef<Provider>,
{
let block = provider_rw.last_block_number()?;
Expand Down Expand Up @@ -470,6 +477,7 @@ where
+ HeaderProvider
+ HashingWriter
+ HistoryWriter
+ StateWriter
+ StateChangeWriter
+ AsRef<Provider>,
{
Expand Down
22 changes: 7 additions & 15 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,13 +904,18 @@ mod tests {
.unwrap_or_default();

// Insert blocks into the database
for block in &database_blocks {
for (block, receipts) in database_blocks.iter().zip(&receipts) {
// TODO: this should be moved inside `insert_historical_block`: <https://github.com/paradigmxyz/reth/issues/11524>
let mut transactions_writer =
static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
let mut receipts_writer =
static_file_provider.latest_writer(StaticFileSegment::Receipts)?;
transactions_writer.increment_block(block.number)?;
for tx in block.body.transactions() {
receipts_writer.increment_block(block.number)?;

for (tx, receipt) in block.body.transactions().iter().zip(receipts) {
transactions_writer.append_transaction(tx_num, tx)?;
receipts_writer.append_receipt(tx_num, receipt)?;
tx_num += 1;
}

Expand All @@ -919,19 +924,6 @@ mod tests {
)?;
}

// Insert receipts into the static files
UnifiedStorageWriter::new(
&provider_rw,
Some(factory.static_file_provider().latest_writer(StaticFileSegment::Receipts)?),
)
.append_receipts_from_blocks(
// The initial block number is required
database_blocks.first().map(|b| b.number).unwrap_or_default(),
receipts[..database_blocks.len()]
.iter()
.map(|vec| vec.clone().into_iter().map(Some).collect::<Vec<_>>()),
)?;

// Commit to both storages: database and static files
UnifiedStorageWriter::commit(provider_rw)?;

Expand Down
82 changes: 75 additions & 7 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
writer::UnifiedStorageWriter,
AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap,
Expand Down Expand Up @@ -3017,12 +3016,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
durations_recorder.record_relative(metrics::Action::InsertBlock);
}

// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
// TODO: should _these_ be moved to storagewriter? seems like storagewriter should be
// _above_ db provider
let mut storage_writer = UnifiedStorageWriter::from_database(self);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
self.write_to_storage(
execution_outcome,
OriginalValuesKnown::No,
StorageLocation::Database,
)?;
durations_recorder.record_relative(metrics::Action::InsertState);

// insert hashes and intermediate merkle nodes
Expand Down Expand Up @@ -3142,3 +3140,73 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref()
}
}

impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> StateWriter
for DatabaseProvider<TX, N>
{
fn write_to_storage(
&self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()> {
let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);

self.write_state_reverts(reverts, execution_outcome.first_block)?;
self.write_state_changes(plain_state)?;

let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;

let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());

// Prepare receipts cursor if we are going to write receipts to the database
//
// We are writing to database if requested or if there's any kind of receipt pruning
// configured
let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
.then(|| self.tx.cursor_write::<tables::Receipts>())
.transpose()?;

// Prepare receipts static writer if we are going to write receipts to static files
//
// We are writing to static files if requested and if there's no receipt pruning configured
let mut receipts_static_writer = (write_receipts_to.static_files() &&
!has_receipts_pruning)
.then(|| {
self.static_file_provider
.get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
})
.transpose()?;

for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
let block_number = execution_outcome.first_block + idx as u64;

// Increment block number for receipts static file writer
if let Some(writer) = receipts_static_writer.as_mut() {
writer.increment_block(block_number)?;
}

let first_tx_index = bodies_cursor
.seek_exact(block_number)?
.map(|(_, indices)| indices.first_tx_num())
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;

for (idx, receipt) in receipts.into_iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt {
if let Some(writer) = &mut receipts_static_writer {
writer.append_receipt(receipt_idx, &receipt)?;
}

if let Some(cursor) = &mut receipts_cursor {
cursor.append(receipt_idx, receipt)?;
}
}
}
}

Ok(())
}
}
5 changes: 4 additions & 1 deletion crates/storage/provider/src/traits/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ use revm::db::{
};
use std::ops::RangeInclusive;

use super::StorageLocation;

/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage.
pub trait StateWriter {
/// Write the data and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
fn write_to_storage(
&mut self,
&self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
write_receipts_to: StorageLocation,
) -> ProviderResult<()>;
}

Expand Down
29 changes: 0 additions & 29 deletions crates/storage/provider/src/writer/database.rs

This file was deleted.

Loading

0 comments on commit 2d6b893

Please sign in to comment.