Skip to content

Commit

Permalink
feat: extract zstd compressors (paradigmxyz#13250)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Dec 9, 2024
1 parent eb60808 commit 386e4b3
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 45 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ members = [
"crates/storage/nippy-jar/",
"crates/storage/provider/",
"crates/storage/storage-api/",
"crates/storage/zstd-compressors/",
"crates/tasks/",
"crates/tokio-util/",
"crates/tracing/",
Expand Down Expand Up @@ -422,6 +423,7 @@ reth-trie-common = { path = "crates/trie/common" }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-sparse = { path = "crates/trie/sparse" }
reth-zstd-compressors = { path = "crates/storage/zstd-compressors", default-features = false }

# revm
revm = { version = "18.0.0", features = ["std"], default-features = false }
Expand Down
7 changes: 4 additions & 3 deletions crates/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reth-ethereum-forks.workspace = true
reth-static-file-types.workspace = true
revm-primitives = { workspace = true, features = ["serde"] }
reth-codecs = { workspace = true, optional = true }
reth-zstd-compressors = { workspace = true, optional = true }

# ethereum
alloy-consensus.workspace = true
Expand Down Expand Up @@ -55,7 +56,6 @@ rand = { workspace = true, optional = true }
rayon.workspace = true
serde.workspace = true
serde_with = { workspace = true, optional = true }
zstd = { workspace = true, features = ["experimental"], optional = true }

# arbitrary utils
arbitrary = { workspace = true, features = ["derive"], optional = true }
Expand Down Expand Up @@ -108,11 +108,12 @@ std = [
"alloy-rlp/std",
"reth-ethereum-forks/std",
"bytes/std",
"derive_more/std"
"derive_more/std",
"reth-zstd-compressors?/std"
]
reth-codec = [
"dep:reth-codecs",
"dep:zstd",
"dep:reth-zstd-compressors",
"dep:modular-bitfield", "std",
"reth-primitives-traits/reth-codec",
]
Expand Down
4 changes: 0 additions & 4 deletions crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub use traits::*;
#[cfg(feature = "alloy-compat")]
mod alloy_compat;
mod block;
#[cfg(feature = "reth-codec")]
mod compression;
pub mod proofs;
mod receipt;
pub use reth_static_file_types as static_file;
Expand All @@ -38,8 +36,6 @@ pub use block::{generate_valid_header, valid_header_strategy};
pub use block::{
Block, BlockBody, BlockWithSenders, SealedBlock, SealedBlockFor, SealedBlockWithSenders,
};
#[cfg(feature = "reth-codec")]
pub use compression::*;
pub use receipt::{gas_spent_by_transactions, Receipt, Receipts};
pub use reth_primitives_traits::{
logs_bloom, Account, Bytecode, GotExpected, GotExpectedBoxed, Header, HeaderError, Log,
Expand Down
4 changes: 2 additions & 2 deletions crates/primitives/src/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use derive_more::{DerefMut, From, IntoIterator};
use reth_primitives_traits::receipt::ReceiptExt;
use serde::{Deserialize, Serialize};

#[cfg(feature = "reth-codec")]
use crate::compression::{RECEIPT_COMPRESSOR, RECEIPT_DECOMPRESSOR};
use crate::TxType;
#[cfg(feature = "reth-codec")]
use reth_zstd_compressors::{RECEIPT_COMPRESSOR, RECEIPT_DECOMPRESSOR};

/// Retrieves gas spent by transactions as a vector of tuples (transaction index, gas used).
pub use reth_primitives_traits::receipt::gas_spent_by_transactions;
Expand Down
8 changes: 4 additions & 4 deletions crates/primitives/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1365,14 +1365,14 @@ impl reth_codecs::Compact for TransactionSigned {
let tx_bits = if zstd_bit {
let mut tmp = Vec::with_capacity(256);
if cfg!(feature = "std") {
crate::compression::TRANSACTION_COMPRESSOR.with(|compressor| {
reth_zstd_compressors::TRANSACTION_COMPRESSOR.with(|compressor| {
let mut compressor = compressor.borrow_mut();
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
})
} else {
let mut compressor = crate::compression::create_tx_compressor();
let mut compressor = reth_zstd_compressors::create_tx_compressor();
let tx_bits = self.transaction.to_compact(&mut tmp);
buf.put_slice(&compressor.compress(&tmp).expect("Failed to compress"));
tx_bits as u8
Expand All @@ -1399,7 +1399,7 @@ impl reth_codecs::Compact for TransactionSigned {
let zstd_bit = bitflags >> 3;
let (transaction, buf) = if zstd_bit != 0 {
if cfg!(feature = "std") {
crate::compression::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
reth_zstd_compressors::TRANSACTION_DECOMPRESSOR.with(|decompressor| {
let mut decompressor = decompressor.borrow_mut();

// TODO: enforce that zstd is only present at a "top" level type
Expand All @@ -1411,7 +1411,7 @@ impl reth_codecs::Compact for TransactionSigned {
(transaction, buf)
})
} else {
let mut decompressor = crate::compression::create_tx_decompressor();
let mut decompressor = reth_zstd_compressors::create_tx_decompressor();
let transaction_type = (bitflags & 0b110) >> 1;
let (transaction, _) =
Transaction::from_compact(decompressor.decompress(buf), transaction_type);
Expand Down
19 changes: 19 additions & 0 deletions crates/storage/zstd-compressors/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "reth-zstd-compressors"
version.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
description = "Commonly used zstd compressors."

[lints]
workspace = true

[dependencies]
zstd = { workspace = true, features = ["experimental"] }

[features]
default = ["std"]
std = []
Original file line number Diff line number Diff line change
@@ -1,41 +1,61 @@
//! Commonly used zstd [`Compressor`] and [`Decompressor`] for reth types.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(feature = "std"), no_std)]

extern crate alloc;

use crate::alloc::string::ToString;
use alloc::vec::Vec;
use core::cell::RefCell;
use zstd::bulk::{Compressor, Decompressor};

/// Compression/Decompression dictionary for `Receipt`.
pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("./receipt_dictionary.bin");
pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("../receipt_dictionary.bin");
/// Compression/Decompression dictionary for `Transaction`.
pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("./transaction_dictionary.bin");
pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("../transaction_dictionary.bin");

// We use `thread_local` compressors and decompressors because dictionaries can be quite big, and
// zstd-rs recommends to use one context/compressor per thread
#[cfg(feature = "std")]
std::thread_local! {
/// Thread Transaction compressor.
pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
.expect("failed to initialize transaction compressor"),
);

/// Thread Transaction decompressor.
pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
RefCell::new(ReusableDecompressor::new(
Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
.expect("failed to initialize transaction decompressor"),
));

/// Thread receipt compressor.
pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
.expect("failed to initialize receipt compressor"),
);

/// Thread receipt decompressor.
pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
RefCell::new(ReusableDecompressor::new(
Decompressor::with_dictionary(RECEIPT_DICTIONARY)
.expect("failed to initialize receipt decompressor"),
));
pub use locals::*;
#[cfg(feature = "std")]
mod locals {
use super::*;
use core::cell::RefCell;

// We use `thread_local` compressors and decompressors because dictionaries can be quite big,
// and zstd-rs recommends to use one context/compressor per thread
std::thread_local! {
/// Thread Transaction compressor.
pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
.expect("failed to initialize transaction compressor"),
);

/// Thread Transaction decompressor.
pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
RefCell::new(ReusableDecompressor::new(
Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
.expect("failed to initialize transaction decompressor"),
));

/// Thread receipt compressor.
pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
.expect("failed to initialize receipt compressor"),
);

/// Thread receipt decompressor.
pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
RefCell::new(ReusableDecompressor::new(
Decompressor::with_dictionary(RECEIPT_DICTIONARY)
.expect("failed to initialize receipt decompressor"),
));
}
}

/// Fn creates tx [`Compressor`]
Expand Down

0 comments on commit 386e4b3

Please sign in to comment.