Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add S3Stage downloader #13784

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ backon = { version = "1.2", default-features = false, features = [
] }
bincode = "1.3"
bitflags = "2.4"
blake3 = "1.5.5"
boyer-moore-magiclen = "0.2.16"
bytes = { version = "1.5", default-features = false }
cfg-if = "1.0"
Expand Down
8 changes: 8 additions & 0 deletions crates/stages/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ reth-db-api.workspace = true
reth-etl.workspace = true
reth-evm.workspace = true
reth-exex.workspace = true
reth-fs-util.workspace = true
reth-network-p2p.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] }
reth-primitives-traits = { workspace = true, features = [
Expand Down Expand Up @@ -57,6 +58,12 @@ rayon.workspace = true
num-traits = "0.2.15"
tempfile = { workspace = true, optional = true }
bincode.workspace = true
blake3.workspace = true
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
"blocking"
] }
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
# reth
Expand All @@ -75,6 +82,7 @@ reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-network-peers.workspace = true
reth-tracing.workspace = true

alloy-rlp.workspace = true
itertools.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/stages/stages/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod index_storage_history;
/// Stage for computing state root.
mod merkle;
mod prune;
/// The s3 download stage
mod s3;
/// The sender recovery stage.
mod sender_recovery;
/// The transaction lookup stage
Expand All @@ -32,6 +34,7 @@ pub use index_account_history::*;
pub use index_storage_history::*;
pub use merkle::*;
pub use prune::*;
pub use s3::*;
pub use sender_recovery::*;
pub use tx_lookup::*;

Expand Down
31 changes: 31 additions & 0 deletions crates/stages/stages/src/stages/s3/downloader/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use alloy_primitives::B256;
use reth_fs_util::FsPathError;

joshieDo marked this conversation as resolved.
Show resolved Hide resolved
/// Possible downloader error variants.
#[derive(Debug, thiserror::Error)]
pub enum DownloaderError {
/// Requires a valid `total_size` {0}
#[error("requires a valid total_size")]
InvalidMetadataTotalSize(Option<usize>),
#[error("tried to access chunk on index {0}, but there's only {1} chunks")]
/// Invalid chunk access
InvalidChunk(usize, usize),
// File hash mismatch.
#[error("file hash does not match the expected one {0} != {1} ")]
InvalidFileHash(B256, B256),
// Empty content length returned from the server.
#[error("metadata got an empty content length from server")]
EmptyContentLength,
/// Reqwest error
#[error(transparent)]
FsPath(#[from] FsPathError),
/// Reqwest error
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
/// Std Io error
#[error(transparent)]
StdIo(#[from] std::io::Error),
/// Bincode error
#[error(transparent)]
Bincode(#[from] bincode::Error),
}
184 changes: 184 additions & 0 deletions crates/stages/stages/src/stages/s3/downloader/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange};

use super::{
error::DownloaderError,
meta::Metadata,
worker::{WorkerRequest, WorkerResponse},
};
use alloy_primitives::B256;
use reqwest::{header::CONTENT_LENGTH, Client};
use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::BufReader,
path::Path,
};
use tracing::{debug, error, info};

/// Downloads file from url to data file path.
///
/// If a `file_hash` is passed, it will verify it at the end.
///
/// ## Details
///
/// 1) A [`Metadata`] file is created or opened in `{target_dir}/download/{filename}.metadata`. It
/// tracks the download progress including total file size, downloaded bytes, chunk sizes, and
/// ranges that still need downloading. Allows for resumability.
/// 2) The target file is preallocated with the total size of the file in
/// `{target_dir}/download/{filename}`.
/// 3) Multiple `workers` are spawned for downloading of specific chunks of the file.
/// 4) `Orchestrator` manages workers, distributes chunk ranges, and ensures the download progresses
/// efficiently by dynamically assigning tasks to workers as they become available.
/// 5) Once the file is downloaded:
/// * If `file_hash` is `Some`, verifies its blake3 hash.
/// * Deletes the metadata file
/// * Moves downloaded file to target directory.
pub async fn fetch(
filename: &str,
target_dir: &Path,
url: &str,
mut concurrent: u64,
file_hash: Option<B256>,
) -> Result<(), DownloaderError> {
// Create a temporary directory to download files to, before moving them to target_dir.
let download_dir = target_dir.join("download");
reth_fs_util::create_dir_all(&download_dir)?;

let data_file = download_dir.join(filename);
let mut metadata = metadata(&data_file, url).await?;
if metadata.is_done() {
return Ok(())
}

// Ensure the file is preallocated so we can download it concurrently
{
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&data_file)?;

if file.metadata()?.len() as usize != metadata.total_size {
info!(target: "sync::stages::s3::downloader", ?filename, length = metadata.total_size, "Preallocating space.");
file.set_len(metadata.total_size as u64)?;
}
}

while !metadata.is_done() {
mattsse marked this conversation as resolved.
Show resolved Hide resolved
info!(target: "sync::stages::s3::downloader", ?filename, "Downloading.");

// Find the missing file chunks and the minimum number of workers required
let missing_chunks = metadata.needed_ranges();
concurrent = concurrent
.min(std::thread::available_parallelism()?.get() as u64)
.min(missing_chunks.len() as u64);

let mut orchestrator_rx = spawn_workers(url, concurrent, &data_file);

let mut workers = HashMap::new();
let mut missing_chunks = missing_chunks.into_iter();

// Distribute chunk ranges to workers when they free up
while let Some(worker_msg) = orchestrator_rx.recv().await {
Comment on lines +82 to +83
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this more performant than using a single downloader? I'd assume downloading is bound by bandwidth so unclear if using multiple downloaders improves this?

Copy link
Collaborator Author

@joshieDo joshieDo Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although in theory it's bound by bandwidth, many times servers will throttle single connections. Using multiple connections helps

debug!(target: "sync::stages::s3::downloader", ?worker_msg, "received message from worker");

let available_worker = match worker_msg {
WorkerResponse::Ready { worker_id, tx } => {
debug!(target: "sync::stages::s3::downloader", ?worker_id, "Worker ready.");
workers.insert(worker_id, tx);
worker_id
}
WorkerResponse::DownloadedChunk { worker_id, chunk_index, written_bytes } => {
metadata.update_chunk(chunk_index, written_bytes)?;
worker_id
}
WorkerResponse::Err { worker_id, error } => {
error!(target: "sync::stages::s3::downloader", ?worker_id, "Worker found an error: {:?}", error);
return Err(error)
}
};

let msg = if let Some(RemainingChunkRange { index, start, end }) = missing_chunks.next()
{
debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request.");
WorkerRequest::Download { chunk_index: index, start, end }
} else {
debug!(target: "sync::stages::s3::downloader", ?available_worker, "Sent Finish command to worker.");
WorkerRequest::Finish
};

let _ = workers.get(&available_worker).expect("should exist").send(msg);
}
}

if let Some(file_hash) = file_hash {
info!(target: "sync::stages::s3::downloader", ?filename, "Checking file integrity.");
check_file_hash(&data_file, &file_hash)?;
}

// No longer need the metadata file.
metadata.delete()?;

// Move downloaded file to desired directory.
let file_directory = target_dir.join(filename);
reth_fs_util::rename(data_file, &file_directory)?;
info!(target: "sync::stages::s3::downloader", ?file_directory, "Moved file from temporary to target directory.");

Ok(())
}

/// Creates a metadata file used to keep track of the downloaded chunks. Useful on resuming after a
/// shutdown.
async fn metadata(data_file: &Path, url: &str) -> Result<Metadata, DownloaderError> {
if Metadata::file_path(data_file).exists() {
debug!(target: "sync::stages::s3::downloader", ?data_file, "Loading metadata ");
return Metadata::load(data_file)
}

let client = Client::new();
let resp = client.head(url).send().await?;
let total_length: usize = resp
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
Comment on lines +139 to +145
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need any additional header setup?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that's it. just need to know the total size of the file

.ok_or(DownloaderError::EmptyContentLength)?;

debug!(target: "sync::stages::s3::downloader", ?data_file, "Creating metadata ");

Metadata::builder(data_file).with_total_size(total_length).build()
}

/// Ensures the file on path has the expected blake3 hash.
fn check_file_hash(path: &Path, expected: &B256) -> Result<(), DownloaderError> {
let mut reader = BufReader::new(File::open(path)?);
let mut hasher = blake3::Hasher::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need blake3 here or can use sha256?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blake3 is way faster, and we're dealing with total 300GB

std::io::copy(&mut reader, &mut hasher)?;

let file_hash = hasher.finalize();
if file_hash.as_bytes() != expected {
return Err(DownloaderError::InvalidFileHash(file_hash.as_bytes().into(), *expected))
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::b256;

#[tokio::test]
async fn test_download() {
reth_tracing::init_test_tracing();

let b3sum = b256!("81a7318f69fc1d6bb0a58a24af302f3b978bc75a435e4ae5d075f999cd060cfd");
let url = "https://link.testfile.org/500MB";

let file = tempfile::NamedTempFile::new().unwrap();
let filename = file.path().file_name().unwrap().to_str().unwrap();
let target_dir = file.path().parent().unwrap();
fetch(filename, target_dir, url, 4, Some(b3sum)).await.unwrap();
}
}
Loading
Loading