Skip to content

Commit

Permalink
order bmeta segment
Browse files Browse the repository at this point in the history
put on bottom
  • Loading branch information
joshieDo committed Jan 9, 2025
1 parent 2b58217 commit 2874270
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 48 deletions.
4 changes: 2 additions & 2 deletions crates/cli/commands/src/db/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Command {
StaticFileSegment::Receipts => {
(table_key::<tables::Receipts>(&key)?, <ReceiptMask<ReceiptTy<N>>>::MASK)
}
StaticFileSegment::BlockMeta => todo!(), // TODO(joshie),
StaticFileSegment::BlockMeta => todo!(),
};

let content = tool.provider_factory.static_file_provider().find_static_file(
Expand Down Expand Up @@ -115,7 +115,7 @@ impl Command {
println!("{}", serde_json::to_string_pretty(&receipt)?);
}
StaticFileSegment::BlockMeta => {
todo!() // TODO(joshie)
todo!()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/static-file/static-file/src/static_file_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ mod tests {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
block_meta: Some(1),
block_meta: None,
})
.expect("get static file targets");
assert_matches!(locked_producer.run(targets.clone()), Ok(_));
Expand Down
15 changes: 7 additions & 8 deletions crates/static-file/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,19 @@ impl HighestStaticFiles {
}
}

/// Returns an iterator over all static file segments
fn iter(&self) -> impl Iterator<Item = Option<BlockNumber>> {
[self.headers, self.transactions, self.receipts, self.block_meta].into_iter()
}

/// Returns the minimum block of all segments.
pub fn min_block_num(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts, self.block_meta]
.iter()
.filter_map(|&option| option)
.min()
self.iter().filter_map(|b| b).min()
}

/// Returns the maximum block of all segments.
pub fn max_block_num(&self) -> Option<u64> {
[self.headers, self.transactions, self.receipts, self.block_meta]
.iter()
.filter_map(|&option| option)
.max()
self.iter().filter_map(|b| b).max()
}
}

Expand Down
13 changes: 10 additions & 3 deletions crates/static-file/types/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy_primitives::TxNumber;
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::{ops::RangeInclusive, str::FromStr};
use strum::{AsRefStr, EnumIter, EnumString};
use strum::{AsRefStr, EnumString};

#[derive(
Debug,
Expand All @@ -17,7 +17,6 @@ use strum::{AsRefStr, EnumIter, EnumString};
Deserialize,
Serialize,
EnumString,
EnumIter,
AsRefStr,
Display,
)]
Expand All @@ -34,7 +33,7 @@ pub enum StaticFileSegment {
#[strum(serialize = "receipts")]
/// Static File segment responsible for the `Receipts` table.
Receipts,
#[strum(serialize = "bmeta")]
#[strum(serialize = "blockmeta")]
/// Static File segment responsible for the `BlockBodyIndices`, `BlockOmmers`,
/// `BlockWithdrawals` tables.
BlockMeta,
Expand All @@ -51,6 +50,14 @@ impl StaticFileSegment {
}
}

/// Returns an iterator over all segments.
pub fn iter() -> impl Iterator<Item = StaticFileSegment> {
// The order of segments is significant and must be maintained to ensure correctness. For
// example, Transactions require BlockBodyIndices from Blockmeta to be sound, and thus, this
// one must be checked first.
[Self::Headers, Self::BlockMeta, Self::Transactions, Self::Receipts].into_iter()
}

/// Returns the default configuration of the segment.
pub const fn config(&self) -> SegmentConfig {
SegmentConfig { compression: Compression::Lz4 }
Expand Down
13 changes: 2 additions & 11 deletions crates/storage/provider/src/providers/static_file/jar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use crate::{
TransactionsProvider,
};
use alloy_consensus::transaction::TransactionMeta;
use alloy_eips::{
eip2718::Encodable2718,
eip4895::{Withdrawal, Withdrawals},
BlockHashOrNumber,
};
use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
use reth_chainspec::ChainInfo;
use reth_db::{
Expand All @@ -23,7 +19,7 @@ use reth_db::{
table::{Decompress, Value},
};
use reth_node_types::{FullNodePrimitives, NodePrimitives};
use reth_primitives::{transaction::recover_signers, SealedHeader};
use reth_primitives::SealedHeader;
use reth_primitives_traits::SignedTransaction;
use reth_storage_api::{BlockBodyIndicesProvider, OmmersProvider, WithdrawalsProvider};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
Expand Down Expand Up @@ -371,11 +367,6 @@ impl<N: NodePrimitives> WithdrawalsProvider for StaticFileJarProvider<'_, N> {
// Only accepts block number queries
Err(ProviderError::UnsupportedProvider)
}

fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
// Required data not present in static_files
Err(ProviderError::UnsupportedProvider)
}
}

impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileJarProvider<'_, N> {
Expand Down
49 changes: 30 additions & 19 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use std::{
path::{Path, PathBuf},
sync::{mpsc, Arc},
};
use strum::IntoEnumIterator;
use tracing::{info, trace, warn};

/// Alias type for a map that can be queried for block ranges from a transaction
Expand Down Expand Up @@ -682,6 +681,11 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
};

for segment in StaticFileSegment::iter() {
// Not integrated yet
if segment.is_block_meta() {
continue
}

if has_receipt_pruning && segment.is_receipts() {
// Pruned nodes (including full node) do not store receipts as static files.
continue
Expand Down Expand Up @@ -780,7 +784,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
.ensure_invariants::<_, tables::BlockBodyIndices>(
provider,
segment,
highest_tx,
highest_block,
highest_block,
)?,
} {
Expand Down Expand Up @@ -832,35 +836,40 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
where
Provider: DBProvider + BlockReader + StageCheckpointReader,
{
let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
let highest_static_file_block = highest_static_file_block.unwrap_or_default();
let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;

if let Some((db_first_entry, _)) = db_cursor.first()? {
// If there is a gap between the entry found in static file and
// database, then we have most likely lost static file data and need to unwind so we can
// load it again
if !(db_first_entry <= highest_static_file_entry ||
highest_static_file_entry + 1 == db_first_entry)
if let (Some(highest_entry), Some(highest_block)) =
(highest_static_file_entry, highest_static_file_block)
{
info!(
target: "reth::providers::static_file",
?db_first_entry,
?highest_static_file_entry,
unwind_target = highest_static_file_block,
?segment,
"Setting unwind target."
);
return Ok(Some(highest_static_file_block))
// If there is a gap between the entry found in static file and
// database, then we have most likely lost static file data and need to unwind so we
// can load it again
if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
info!(
target: "reth::providers::static_file",
?db_first_entry,
?highest_entry,
unwind_target = highest_block,
?segment,
"Setting unwind target."
);
return Ok(Some(highest_block))
}
}

if let Some((db_last_entry, _)) = db_cursor.last()? {
if db_last_entry > highest_static_file_entry {
if highest_static_file_entry
.is_none_or(|highest_entry| db_last_entry > highest_entry)
{
return Ok(None)
}
}
}

let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
let highest_static_file_block = highest_static_file_block.unwrap_or_default();

// If static file entry is ahead of the database entries, then ensure the checkpoint block
// number matches.
let checkpoint_block_number = provider
Expand Down Expand Up @@ -900,6 +909,8 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
// TODO(joshie): is_block_meta
writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
} else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
// todo joshie: is querying block_body_indices a potential issue once bbi is moved
// to sf as well
let number = highest_static_file_entry - block.last_tx_num();
if segment.is_receipts() {
writer.prune_receipts(number, checkpoint_block_number)?;
Expand Down
24 changes: 20 additions & 4 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use reth_codecs::Compact;
use reth_db::models::StoredBlockBodyIndices;
use reth_db::models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals};
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
use reth_node_types::NodePrimitives;
Expand Down Expand Up @@ -234,7 +234,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
StaticFileSegment::Receipts => {
self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
}
StaticFileSegment::BlockMeta => todo!(), // TODO(joshie),
StaticFileSegment::BlockMeta => todo!(),
}
}

Expand Down Expand Up @@ -557,9 +557,25 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
Ok(())
}

/// Appends [`StoredBlockBodyIndices`], [`StoredBlockOmmers`] and [`StoredBlockWithdrawals`] to
/// static file.
///
/// It **CALLS** `increment_block()` since it's a block based segment.
pub fn append_eth_block_meta(
&mut self,
body_indices: &StoredBlockBodyIndices,
ommers: &StoredBlockOmmers<N::BlockHeader>,
withdrawals: &StoredBlockWithdrawals,
expected_block_number: BlockNumber,
) -> ProviderResult<()>
where
N::BlockHeader: Compact,
{
self.append_block_meta(body_indices, ommers, withdrawals, expected_block_number)
}

/// Appends [`StoredBlockBodyIndices`] and any other two arbitrary types belonging to the block
/// body ([`reth_db::models::StoredBlockOmmers`] and
/// [`reth_db::models::StoredBlockWithdrawals`] on ethereum) to static file.
/// body to static file.
///
/// It **CALLS** `increment_block()` since it's a block based segment.
pub fn append_block_meta<F1, F2>(
Expand Down

0 comments on commit 2874270

Please sign in to comment.