diff --git a/Cargo.lock b/Cargo.lock index 03fd74c60664..ea94bd74a0de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,6 +1374,19 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake3" +version = "1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -2044,6 +2057,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + [[package]] name = "convert_case" version = "0.6.0" @@ -6369,6 +6388,7 @@ checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", + "futures-channel", "futures-core", "futures-util", "http", @@ -9088,6 +9108,7 @@ dependencies = [ "alloy-rlp", "assert_matches", "bincode", + "blake3", "codspeed-criterion-compat", "futures-util", "itertools 0.13.0", @@ -9096,6 +9117,7 @@ dependencies = [ "pprof", "rand 0.8.5", "rayon", + "reqwest", "reth-chainspec", "reth-codecs", "reth-config", @@ -9109,6 +9131,7 @@ dependencies = [ "reth-execution-errors", "reth-execution-types", "reth-exex", + "reth-fs-util", "reth-network-p2p", "reth-network-peers", "reth-primitives", @@ -9121,8 +9144,10 @@ dependencies = [ "reth-static-file", "reth-storage-errors", "reth-testing-utils", + "reth-tracing", "reth-trie", "reth-trie-db", + "serde", "tempfile", "thiserror 2.0.11", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 0e6a39084b2e..4a6f90d7380b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -485,6 +485,7 @@ backon = { version = "1.2", default-features = false, features = [ ] } bincode = "1.3" bitflags = "2.4" +blake3 = "1.5.5" boyer-moore-magiclen = "0.2.16" bytes = { version = "1.5", default-features = false } cfg-if = "1.0" diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index e7114eeb16ac..2b519558c078 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -22,6 +22,7 @@ reth-db-api.workspace = true reth-etl.workspace = true reth-evm.workspace = true reth-exex.workspace = true +reth-fs-util.workspace = true reth-network-p2p.workspace = true reth-primitives = { workspace = true, features = ["secp256k1"] } reth-primitives-traits = { workspace = true, features = [ @@ -57,6 +58,12 @@ rayon.workspace = true num-traits = "0.2.15" tempfile = { workspace = true, optional = true } bincode.workspace = true +blake3.workspace = true +reqwest = { workspace = true, default-features = false, features = [ + "rustls-tls-native-roots", + "blocking" +] } +serde = { workspace = true, features = ["derive"] } [dev-dependencies] # reth @@ -75,6 +82,7 @@ reth-testing-utils.workspace = true reth-trie = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-network-peers.workspace = true +reth-tracing.workspace = true alloy-rlp.workspace = true itertools.workspace = true diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 33a4d76a11f9..142452aa5344 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -17,6 +17,8 @@ mod index_storage_history; /// Stage for computing state root. mod merkle; mod prune; +/// The s3 download stage +mod s3; /// The sender recovery stage. mod sender_recovery; /// The transaction lookup stage @@ -32,6 +34,7 @@ pub use index_account_history::*; pub use index_storage_history::*; pub use merkle::*; pub use prune::*; +pub use s3::*; pub use sender_recovery::*; pub use tx_lookup::*; diff --git a/crates/stages/stages/src/stages/s3/downloader/error.rs b/crates/stages/stages/src/stages/s3/downloader/error.rs new file mode 100644 index 000000000000..49f4b418aad2 --- /dev/null +++ b/crates/stages/stages/src/stages/s3/downloader/error.rs @@ -0,0 +1,31 @@ +use alloy_primitives::B256; +use reth_fs_util::FsPathError; + +/// Possible downloader error variants. +#[derive(Debug, thiserror::Error)] +pub enum DownloaderError { + /// Requires a valid `total_size` {0} + #[error("requires a valid total_size")] + InvalidMetadataTotalSize(Option), + #[error("tried to access chunk on index {0}, but there's only {1} chunks")] + /// Invalid chunk access + InvalidChunk(usize, usize), + // File hash mismatch. + #[error("file hash does not match the expected one {0} != {1} ")] + InvalidFileHash(B256, B256), + // Empty content length returned from the server. + #[error("metadata got an empty content length from server")] + EmptyContentLength, + /// Reqwest error + #[error(transparent)] + FsPath(#[from] FsPathError), + /// Reqwest error + #[error(transparent)] + Reqwest(#[from] reqwest::Error), + /// Std Io error + #[error(transparent)] + StdIo(#[from] std::io::Error), + /// Bincode error + #[error(transparent)] + Bincode(#[from] bincode::Error), +} diff --git a/crates/stages/stages/src/stages/s3/downloader/fetch.rs b/crates/stages/stages/src/stages/s3/downloader/fetch.rs new file mode 100644 index 000000000000..b9ae5774753a --- /dev/null +++ b/crates/stages/stages/src/stages/s3/downloader/fetch.rs @@ -0,0 +1,184 @@ +use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange}; + +use super::{ + error::DownloaderError, + meta::Metadata, + worker::{WorkerRequest, WorkerResponse}, +}; +use alloy_primitives::B256; +use reqwest::{header::CONTENT_LENGTH, Client}; +use std::{ + collections::HashMap, + fs::{File, OpenOptions}, + io::BufReader, + path::Path, +}; +use tracing::{debug, error, info}; + +/// Downloads file from url to data file path. +/// +/// If a `file_hash` is passed, it will verify it at the end. +/// +/// ## Details +/// +/// 1) A [`Metadata`] file is created or opened in `{target_dir}/download/{filename}.metadata`. It +/// tracks the download progress including total file size, downloaded bytes, chunk sizes, and +/// ranges that still need downloading. Allows for resumability. +/// 2) The target file is preallocated with the total size of the file in +/// `{target_dir}/download/{filename}`. +/// 3) Multiple `workers` are spawned for downloading of specific chunks of the file. +/// 4) `Orchestrator` manages workers, distributes chunk ranges, and ensures the download progresses +/// efficiently by dynamically assigning tasks to workers as they become available. +/// 5) Once the file is downloaded: +/// * If `file_hash` is `Some`, verifies its blake3 hash. +/// * Deletes the metadata file +/// * Moves downloaded file to target directory. +pub async fn fetch( + filename: &str, + target_dir: &Path, + url: &str, + mut concurrent: u64, + file_hash: Option, +) -> Result<(), DownloaderError> { + // Create a temporary directory to download files to, before moving them to target_dir. + let download_dir = target_dir.join("download"); + reth_fs_util::create_dir_all(&download_dir)?; + + let data_file = download_dir.join(filename); + let mut metadata = metadata(&data_file, url).await?; + if metadata.is_done() { + return Ok(()) + } + + // Ensure the file is preallocated so we can download it concurrently + { + let file = OpenOptions::new() + .create(true) + .truncate(true) + .read(true) + .write(true) + .open(&data_file)?; + + if file.metadata()?.len() as usize != metadata.total_size { + info!(target: "sync::stages::s3::downloader", ?filename, length = metadata.total_size, "Preallocating space."); + file.set_len(metadata.total_size as u64)?; + } + } + + while !metadata.is_done() { + info!(target: "sync::stages::s3::downloader", ?filename, "Downloading."); + + // Find the missing file chunks and the minimum number of workers required + let missing_chunks = metadata.needed_ranges(); + concurrent = concurrent + .min(std::thread::available_parallelism()?.get() as u64) + .min(missing_chunks.len() as u64); + + let mut orchestrator_rx = spawn_workers(url, concurrent, &data_file); + + let mut workers = HashMap::new(); + let mut missing_chunks = missing_chunks.into_iter(); + + // Distribute chunk ranges to workers when they free up + while let Some(worker_msg) = orchestrator_rx.recv().await { + debug!(target: "sync::stages::s3::downloader", ?worker_msg, "received message from worker"); + + let available_worker = match worker_msg { + WorkerResponse::Ready { worker_id, tx } => { + debug!(target: "sync::stages::s3::downloader", ?worker_id, "Worker ready."); + workers.insert(worker_id, tx); + worker_id + } + WorkerResponse::DownloadedChunk { worker_id, chunk_index, written_bytes } => { + metadata.update_chunk(chunk_index, written_bytes)?; + worker_id + } + WorkerResponse::Err { worker_id, error } => { + error!(target: "sync::stages::s3::downloader", ?worker_id, "Worker found an error: {:?}", error); + return Err(error) + } + }; + + let msg = if let Some(RemainingChunkRange { index, start, end }) = missing_chunks.next() + { + debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request."); + WorkerRequest::Download { chunk_index: index, start, end } + } else { + debug!(target: "sync::stages::s3::downloader", ?available_worker, "Sent Finish command to worker."); + WorkerRequest::Finish + }; + + let _ = workers.get(&available_worker).expect("should exist").send(msg); + } + } + + if let Some(file_hash) = file_hash { + info!(target: "sync::stages::s3::downloader", ?filename, "Checking file integrity."); + check_file_hash(&data_file, &file_hash)?; + } + + // No longer need the metadata file. + metadata.delete()?; + + // Move downloaded file to desired directory. + let file_directory = target_dir.join(filename); + reth_fs_util::rename(data_file, &file_directory)?; + info!(target: "sync::stages::s3::downloader", ?file_directory, "Moved file from temporary to target directory."); + + Ok(()) +} + +/// Creates a metadata file used to keep track of the downloaded chunks. Useful on resuming after a +/// shutdown. +async fn metadata(data_file: &Path, url: &str) -> Result { + if Metadata::file_path(data_file).exists() { + debug!(target: "sync::stages::s3::downloader", ?data_file, "Loading metadata "); + return Metadata::load(data_file) + } + + let client = Client::new(); + let resp = client.head(url).send().await?; + let total_length: usize = resp + .headers() + .get(CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) + .ok_or(DownloaderError::EmptyContentLength)?; + + debug!(target: "sync::stages::s3::downloader", ?data_file, "Creating metadata "); + + Metadata::builder(data_file).with_total_size(total_length).build() +} + +/// Ensures the file on path has the expected blake3 hash. +fn check_file_hash(path: &Path, expected: &B256) -> Result<(), DownloaderError> { + let mut reader = BufReader::new(File::open(path)?); + let mut hasher = blake3::Hasher::new(); + std::io::copy(&mut reader, &mut hasher)?; + + let file_hash = hasher.finalize(); + if file_hash.as_bytes() != expected { + return Err(DownloaderError::InvalidFileHash(file_hash.as_bytes().into(), *expected)) + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::b256; + + #[tokio::test] + async fn test_download() { + reth_tracing::init_test_tracing(); + + let b3sum = b256!("81a7318f69fc1d6bb0a58a24af302f3b978bc75a435e4ae5d075f999cd060cfd"); + let url = "https://link.testfile.org/500MB"; + + let file = tempfile::NamedTempFile::new().unwrap(); + let filename = file.path().file_name().unwrap().to_str().unwrap(); + let target_dir = file.path().parent().unwrap(); + fetch(filename, target_dir, url, 4, Some(b3sum)).await.unwrap(); + } +} diff --git a/crates/stages/stages/src/stages/s3/downloader/meta.rs b/crates/stages/stages/src/stages/s3/downloader/meta.rs new file mode 100644 index 000000000000..7ff4213fffc3 --- /dev/null +++ b/crates/stages/stages/src/stages/s3/downloader/meta.rs @@ -0,0 +1,195 @@ +use super::{error::DownloaderError, RemainingChunkRange}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + ops::RangeInclusive, + path::{Path, PathBuf}, +}; +use tracing::info; + +/// Tracks download progress and manages chunked downloads for resumable file transfers. +#[derive(Debug)] +pub struct Metadata { + /// Total file size + pub total_size: usize, + /// Total file size + pub downloaded: usize, + /// Download chunk size. Default 150MB. + pub chunk_size: usize, + /// Remaining download ranges for each chunk. + /// - `Some(RangeInclusive)`: range to be downloaded. + /// - `None`: Chunk fully downloaded. + chunks: Vec>>, + /// Path with the stored metadata. + path: PathBuf, +} + +impl Metadata { + /// Build a [`Metadata`] using a builder. + pub fn builder(data_file: &Path) -> MetadataBuilder { + MetadataBuilder::new(Self::file_path(data_file)) + } + + /// Returns the metadata file path of a data file: `{data_file}.metadata` + pub fn file_path(data_file: &Path) -> PathBuf { + data_file.with_file_name(format!( + "{}.metadata", + data_file.file_name().unwrap_or_default().to_string_lossy() + )) + } + + /// Returns a list of all chunks with their remaining ranges to be downloaded: + /// `RemainingChunkRange`. + pub fn needed_ranges(&self) -> Vec { + self.chunks + .iter() + .enumerate() + .filter(|(_, remaining)| remaining.is_some()) + .map(|(index, remaining)| { + let range = remaining.as_ref().expect("qed"); + RemainingChunkRange { index, start: *range.start(), end: *range.end() } + }) + .collect() + } + + /// Updates a downloaded chunk. + pub fn update_chunk( + &mut self, + index: usize, + downloaded_bytes: usize, + ) -> Result<(), DownloaderError> { + self.downloaded += downloaded_bytes; + + let num_chunks = self.chunks.len(); + if index >= self.chunks.len() { + return Err(DownloaderError::InvalidChunk(index, num_chunks)) + } + + // Update chunk with downloaded range + if let Some(range) = &self.chunks[index] { + let start = range.start() + downloaded_bytes; + if start > *range.end() { + self.chunks[index] = None; + } else { + self.chunks[index] = Some(start..=*range.end()); + } + } + + let file = self.path.file_stem().unwrap_or_default().to_string_lossy().into_owned(); + info!( + target: "sync::stages::s3::downloader", + file, + "{}/{}", self.downloaded / 1024 / 1024, self.total_size / 1024 / 1024); + + self.commit() + } + + /// Commits the [`Metadata`] to file. + pub fn commit(&self) -> Result<(), DownloaderError> { + Ok(reth_fs_util::atomic_write_file(&self.path, |file| { + bincode::serialize_into(file, &MetadataFile::from(self)) + })?) + } + + /// Loads a [`Metadata`] file from disk using the target data file. + pub fn load(data_file: &Path) -> Result { + let metadata_file_path = Self::file_path(data_file); + let MetadataFile { total_size, downloaded, chunk_size, chunks } = + bincode::deserialize_from(File::open(&metadata_file_path)?)?; + + Ok(Self { total_size, downloaded, chunk_size, chunks, path: metadata_file_path }) + } + + /// Returns true if we have downloaded all chunks. + pub fn is_done(&self) -> bool { + !self.chunks.iter().any(|c| c.is_some()) + } + + /// Deletes [`Metadata`] file from disk. + pub fn delete(self) -> Result<(), DownloaderError> { + Ok(reth_fs_util::remove_file(&self.path)?) + } +} + +/// A builder that can configure [Metadata] +#[derive(Debug)] +pub struct MetadataBuilder { + /// Path with the stored metadata. + metadata_path: PathBuf, + /// Total file size + total_size: Option, + /// Download chunk size. Default 150MB. + chunk_size: usize, +} + +impl MetadataBuilder { + const fn new(metadata_path: PathBuf) -> Self { + Self { + metadata_path, + total_size: None, + chunk_size: 150 * (1024 * 1024), // 150MB + } + } + + pub const fn with_total_size(mut self, total_size: usize) -> Self { + self.total_size = Some(total_size); + self + } + + pub const fn with_chunk_size(mut self, chunk_size: usize) -> Self { + self.chunk_size = chunk_size; + self + } + + /// Returns a [Metadata] if + pub fn build(&self) -> Result { + match &self.total_size { + Some(total_size) if *total_size > 0 => { + let chunks = (0..*total_size) + .step_by(self.chunk_size) + .map(|start| { + Some(start..=(start + self.chunk_size).min(*total_size).saturating_sub(1)) + }) + .collect(); + + let metadata = Metadata { + path: self.metadata_path.clone(), + total_size: *total_size, + downloaded: 0, + chunk_size: self.chunk_size, + chunks, + }; + metadata.commit()?; + + Ok(metadata) + } + _ => Err(DownloaderError::InvalidMetadataTotalSize(self.total_size)), + } + } +} + +/// Helper type that can serialize and deserialize [`Metadata`] to disk. +#[derive(Debug, Serialize, Deserialize)] +struct MetadataFile { + /// Total file size + total_size: usize, + /// Total file size + downloaded: usize, + /// Download chunk size. Default 150MB. + chunk_size: usize, + /// Remaining download ranges for each chunk. + /// - `Some(RangeInclusive)`: range to be downloaded. + /// - `None`: Chunk fully downloaded. + chunks: Vec>>, +} + +impl From<&Metadata> for MetadataFile { + fn from(metadata: &Metadata) -> Self { + Self { + total_size: metadata.total_size, + downloaded: metadata.downloaded, + chunk_size: metadata.chunk_size, + chunks: metadata.chunks.clone(), + } + } +} diff --git a/crates/stages/stages/src/stages/s3/downloader/mod.rs b/crates/stages/stages/src/stages/s3/downloader/mod.rs new file mode 100644 index 000000000000..d42c8251a079 --- /dev/null +++ b/crates/stages/stages/src/stages/s3/downloader/mod.rs @@ -0,0 +1,38 @@ +//! Provides functionality for downloading files in chunks from a remote source. It supports +//! concurrent downloads, resuming interrupted downloads, and file integrity verification. + +mod error; +mod fetch; +mod meta; +mod worker; + +pub(crate) use error::DownloaderError; +pub use fetch::fetch; +pub use meta::Metadata; + +/// Response sent by the fetch task to `S3Stage` once it has downloaded all files of a block +/// range. +pub(crate) enum S3DownloaderResponse { + /// A new block range was downloaded. + AddedNewRange, + /// The last requested block range was downloaded. + Done, +} + +impl S3DownloaderResponse { + /// Whether the downloaded block range is the last requested one. + pub(crate) const fn is_done(&self) -> bool { + matches!(self, Self::Done) + } +} + +/// Chunk nth remaining range to be downloaded. +#[derive(Debug)] +pub struct RemainingChunkRange { + /// The nth chunk + pub index: usize, + /// Start of range + pub start: usize, + /// End of range + pub end: usize, +} diff --git a/crates/stages/stages/src/stages/s3/downloader/worker.rs b/crates/stages/stages/src/stages/s3/downloader/worker.rs new file mode 100644 index 000000000000..a5300a0b2471 --- /dev/null +++ b/crates/stages/stages/src/stages/s3/downloader/worker.rs @@ -0,0 +1,110 @@ +use super::error::DownloaderError; +use reqwest::{header::RANGE, Client}; +use std::path::{Path, PathBuf}; +use tokio::{ + fs::OpenOptions, + io::{AsyncSeekExt, AsyncWriteExt, BufWriter}, + sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, +}; +use tracing::debug; + +/// Responses sent by a worker. +#[derive(Debug)] +pub(crate) enum WorkerResponse { + /// Worker has been spawned and awaiting work. + Ready { worker_id: u64, tx: UnboundedSender }, + /// Worker has downloaded + DownloadedChunk { worker_id: u64, chunk_index: usize, written_bytes: usize }, + /// Worker has encountered an error. + Err { worker_id: u64, error: DownloaderError }, +} + +/// Requests sent to a worker. +#[derive(Debug)] +pub(crate) enum WorkerRequest { + /// Requests a range to be downloaded. + Download { chunk_index: usize, start: usize, end: usize }, + /// Signals a worker exit. + Finish, +} + +/// Spawns the requested number of workers and returns a `UnboundedReceiver` that all of them will +/// respond to. +pub(crate) fn spawn_workers( + url: &str, + worker_count: u64, + data_file: &Path, +) -> UnboundedReceiver { + // Create channels for communication between workers and orchestrator + let (orchestrator_tx, orchestrator_rx) = unbounded_channel(); + + // Initiate workers + for worker_id in 0..worker_count { + let orchestrator_tx = orchestrator_tx.clone(); + let data_file = data_file.to_path_buf(); + let url = url.to_string(); + debug!(target: "sync::stages::s3::downloader", ?worker_id, "Spawning."); + + tokio::spawn(async move { + if let Err(error) = worker_fetch(worker_id, &orchestrator_tx, data_file, url).await { + let _ = orchestrator_tx.send(WorkerResponse::Err { worker_id, error }); + } + }); + } + + orchestrator_rx +} + +/// Downloads requested chunk ranges to the data file. +async fn worker_fetch( + worker_id: u64, + orchestrator_tx: &UnboundedSender, + data_file: PathBuf, + url: String, +) -> Result<(), DownloaderError> { + let client = Client::new(); + let mut data_file = BufWriter::new(OpenOptions::new().write(true).open(data_file).await?); + + // Signals readiness to download + let (tx, mut rx) = unbounded_channel::(); + orchestrator_tx.send(WorkerResponse::Ready { worker_id, tx }).unwrap_or_else(|_| { + debug!("Failed to notify orchestrator of readiness"); + }); + + while let Some(req) = rx.recv().await { + debug!( + target: "sync::stages::s3::downloader", + worker_id, + ?req, + "received from orchestrator" + ); + + match req { + WorkerRequest::Download { chunk_index, start, end } => { + data_file.seek(tokio::io::SeekFrom::Start(start as u64)).await?; + + let mut response = client + .get(&url) + .header(RANGE, format!("bytes={}-{}", start, end)) + .send() + .await?; + + let mut written_bytes = 0; + while let Some(chunk) = response.chunk().await? { + written_bytes += chunk.len(); + data_file.write_all(&chunk).await?; + } + data_file.flush().await?; + + let _ = orchestrator_tx.send(WorkerResponse::DownloadedChunk { + worker_id, + chunk_index, + written_bytes, + }); + } + WorkerRequest::Finish => break, + } + } + + Ok(()) +} diff --git a/crates/stages/stages/src/stages/s3/filelist.rs b/crates/stages/stages/src/stages/s3/filelist.rs new file mode 100644 index 000000000000..683c4a208862 --- /dev/null +++ b/crates/stages/stages/src/stages/s3/filelist.rs @@ -0,0 +1,21 @@ +use alloy_primitives::B256; + +/// File list to be downloaded with their hashes. +pub(crate) static DOWNLOAD_FILE_LIST: [[(&str, B256); 3]; 2] = [ + [ + ("static_file_transactions_0_499999", B256::ZERO), + ("static_file_transactions_0_499999.off", B256::ZERO), + ("static_file_transactions_0_499999.conf", B256::ZERO), + // ("static_file_blockmeta_0_499999", B256::ZERO), + // ("static_file_blockmeta_0_499999.off", B256::ZERO), + // ("static_file_blockmeta_0_499999.conf", B256::ZERO), + ], + [ + ("static_file_transactions_500000_999999", B256::ZERO), + ("static_file_transactions_500000_999999.off", B256::ZERO), + ("static_file_transactions_500000_999999.conf", B256::ZERO), + // ("static_file_blockmeta_500000_999999", B256::ZERO), + // ("static_file_blockmeta_500000_999999.off", B256::ZERO), + // ("static_file_blockmeta_500000_999999.conf", B256::ZERO), + ], +]; diff --git a/crates/stages/stages/src/stages/s3/mod.rs b/crates/stages/stages/src/stages/s3/mod.rs new file mode 100644 index 000000000000..18e8a99c7162 --- /dev/null +++ b/crates/stages/stages/src/stages/s3/mod.rs @@ -0,0 +1,294 @@ +mod downloader; +pub use downloader::{fetch, Metadata}; +use downloader::{DownloaderError, S3DownloaderResponse}; + +mod filelist; +use filelist::DOWNLOAD_FILE_LIST; + +use reth_db::transaction::DbTxMut; +use reth_primitives::StaticFileSegment; +use reth_provider::{ + DBProvider, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, +}; +use reth_stages_api::{ + ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, +}; +use std::{ + path::PathBuf, + task::{ready, Context, Poll}, +}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; + +/// S3 `StageId` +const S3_STAGE_ID: StageId = StageId::Other("S3"); + +/// The S3 stage. +#[derive(Default, Debug)] +#[non_exhaustive] +pub struct S3Stage { + /// Static file directory. + static_file_directory: PathBuf, + /// Remote server URL. + url: String, + /// Maximum number of connections per download. + max_concurrent_requests: u64, + /// Channel to receive the downloaded ranges from the fetch task. + fetch_rx: Option>>, +} + +impl Stage for S3Stage +where + Provider: DBProvider + + StaticFileProviderFactory + + StageCheckpointReader + + StageCheckpointWriter, +{ + fn id(&self) -> StageId { + S3_STAGE_ID + } + + fn poll_execute_ready( + &mut self, + cx: &mut Context<'_>, + input: ExecInput, + ) -> Poll> { + loop { + // We are currently fetching and may have downloaded ranges that we can process. + if let Some(rx) = &mut self.fetch_rx { + // Whether we have downloaded all the required files. + let mut is_done = false; + + let response = match ready!(rx.poll_recv(cx)) { + Some(Ok(response)) => { + is_done = response.is_done(); + Ok(()) + } + Some(Err(_)) => todo!(), // TODO: DownloaderError -> StageError + None => Err(StageError::ChannelClosed), + }; + + if is_done { + self.fetch_rx = None; + } + + return Poll::Ready(response) + } + + // Spawns the downloader task if there are any missing files + if let Some(fetch_rx) = self.maybe_spawn_fetch(input) { + self.fetch_rx = Some(fetch_rx); + + // Polls fetch_rx & registers waker + continue + } + + // No files to be downloaded + return Poll::Ready(Ok(())) + } + } + + fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result + where + Provider: DBProvider + + StaticFileProviderFactory + + StageCheckpointReader + + StageCheckpointWriter, + { + // Re-initializes the provider to detect the new additions + provider.static_file_provider().initialize_index()?; + + // TODO logic for appending tx_block + + // let (_, _to_block) = input.next_block_range().into_inner(); + // let static_file_provider = provider.static_file_provider(); + // let mut _tx_block_cursor = + // provider.tx_ref().cursor_write::()?; + + // tx_block_cursor.append(indice.last_tx_num(), &block_number)?; + + // let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None }; + // provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?; + // provider.save_stage_checkpoint(S3_STAGE_ID, checkpoint)?; + + // // TODO: verify input.target according to s3 stage specifications + // let done = highest_block == to_block; + + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) + } + + fn unwind( + &mut self, + _provider: &Provider, + input: UnwindInput, + ) -> Result { + // TODO + Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) + } +} + +impl S3Stage { + /// It will only spawn a task to fetch files from the remote server, it there are any missing + /// static files. + /// + /// Every time a block range is ready with all the necessary files, it sends a + /// [`S3DownloaderResponse`] to `self.fetch_rx`. If it's the last requested block range, the + /// response will have `is_done` set to true. + fn maybe_spawn_fetch( + &self, + input: ExecInput, + ) -> Option>> { + let checkpoint = input.checkpoint(); + // TODO: input target can only be certain numbers. eg. 499_999 , 999_999 etc. + + // Create a list of all the missing files per block range that need to be downloaded. + let mut requests = vec![]; + for block_range_files in &DOWNLOAD_FILE_LIST { + let (_, block_range) = + StaticFileSegment::parse_filename(block_range_files[0].0).expect("qed"); + + if block_range.end() <= checkpoint.block_number { + continue + } + + let mut block_range_requests = vec![]; + for (filename, file_hash) in block_range_files { + // If the file already exists, then we are resuming a previously interrupted stage + // run. + if self.static_file_directory.join(filename).exists() { + // TODO: check hash if the file already exists + continue + } + + block_range_requests.push((filename, file_hash)); + } + + requests.push((block_range, block_range_requests)); + } + + // Return None, if we have downloaded all the files that are required. + if requests.is_empty() { + return None + } + + let static_file_directory = self.static_file_directory.clone(); + let url = self.url.clone(); + let max_concurrent_requests = self.max_concurrent_requests; + + let (fetch_tx, fetch_rx) = unbounded_channel(); + tokio::spawn(async move { + let mut requests_iter = requests.into_iter().peekable(); + + while let Some((_, file_requests)) = requests_iter.next() { + for (filename, file_hash) in file_requests { + if let Err(err) = fetch( + filename, + &static_file_directory, + &format!("{}/{filename}", url), + max_concurrent_requests, + Some(*file_hash), + ) + .await + { + let _ = fetch_tx.send(Err(err)); + return + } + } + + let response = if requests_iter.peek().is_none() { + S3DownloaderResponse::Done + } else { + S3DownloaderResponse::AddedNewRange + }; + + let _ = fetch_tx.send(Ok(response)); + } + }); + + Some(fetch_rx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{ + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, + UnwindStageTestRunner, + }; + use reth_primitives::SealedHeader; + use reth_testing_utils::{ + generators, + generators::{random_header, random_header_range}, + }; + + // stage_test_suite_ext!(S3TestRunner, s3); + + #[derive(Default)] + struct S3TestRunner { + db: TestStageDB, + } + + impl StageTestRunner for S3TestRunner { + type S = S3Stage; + + fn db(&self) -> &TestStageDB { + &self.db + } + + fn stage(&self) -> Self::S { + S3Stage::default() + } + } + + impl ExecuteStageTestRunner for S3TestRunner { + type Seed = Vec; + + fn seed_execution(&mut self, input: ExecInput) -> Result { + let start = input.checkpoint().block_number; + let mut rng = generators::rng(); + let head = random_header(&mut rng, start, None); + self.db.insert_headers_with_td(std::iter::once(&head))?; + + // use previous progress as seed size + let end = input.target.unwrap_or_default() + 1; + + if start + 1 >= end { + return Ok(Vec::default()) + } + + let mut headers = random_header_range(&mut rng, start + 1..end, head.hash()); + self.db.insert_headers_with_td(headers.iter())?; + headers.insert(0, head); + Ok(headers) + } + + fn validate_execution( + &self, + input: ExecInput, + output: Option, + ) -> Result<(), TestRunnerError> { + if let Some(output) = output { + assert!(output.done, "stage should always be done"); + assert_eq!( + output.checkpoint.block_number, + input.target(), + "stage progress should always match progress of previous stage" + ); + } + Ok(()) + } + } + + impl UnwindStageTestRunner for S3TestRunner { + fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> { + Ok(()) + } + } + + #[test] + fn parse_files() { + for block_range_files in &DOWNLOAD_FILE_LIST { + let (_, _) = StaticFileSegment::parse_filename(block_range_files[0].0).expect("qed"); + } + } +}