diff --git a/crates/stages/stages/src/stages/s3/downloader/fetch.rs b/crates/stages/stages/src/stages/s3/downloader/fetch.rs index ae329f02e1ce..1a31c8bc0b51 100644 --- a/crates/stages/stages/src/stages/s3/downloader/fetch.rs +++ b/crates/stages/stages/src/stages/s3/downloader/fetch.rs @@ -1,4 +1,4 @@ -use crate::stages::s3::downloader::worker::spawn_workers; +use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange}; use super::{ error::DownloaderError, @@ -101,9 +101,9 @@ pub async fn fetch( let worker = workers.get(&available_worker).expect("should exist"); match missing_chunks.next() { - Some((chunk_index, (start, end))) => { + Some(RemainingChunkRange { index, start, end }) => { debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request."); - let _ = worker.send(WorkerRequest::Download { chunk_index, start, end }); + let _ = worker.send(WorkerRequest::Download { chunk_index: index, start, end }); } None => { let _ = worker.send(WorkerRequest::Finish); diff --git a/crates/stages/stages/src/stages/s3/downloader/meta.rs b/crates/stages/stages/src/stages/s3/downloader/meta.rs index 290286e4c334..7ff4213fffc3 100644 --- a/crates/stages/stages/src/stages/s3/downloader/meta.rs +++ b/crates/stages/stages/src/stages/s3/downloader/meta.rs @@ -1,13 +1,14 @@ -use super::error::DownloaderError; +use super::{error::DownloaderError, RemainingChunkRange}; use serde::{Deserialize, Serialize}; use std::{ fs::File, + ops::RangeInclusive, path::{Path, PathBuf}, }; use tracing::info; /// Tracks download progress and manages chunked downloads for resumable file transfers. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub struct Metadata { /// Total file size pub total_size: usize, @@ -16,11 +17,10 @@ pub struct Metadata { /// Download chunk size. Default 150MB. pub chunk_size: usize, /// Remaining download ranges for each chunk. - /// - `Some((start, end))`: Start and end indices to be downloaded. + /// - `Some(RangeInclusive)`: range to be downloaded. /// - `None`: Chunk fully downloaded. - chunks: Vec>, + chunks: Vec>>, /// Path with the stored metadata. - #[serde(skip)] path: PathBuf, } @@ -38,15 +38,17 @@ impl Metadata { )) } - /// Returns a list of all chunks with their remaining ranges to be downloaded. - /// - /// Returns a list of `(chunk_index, (start, end))` - pub fn needed_ranges(&self) -> Vec<(usize, (usize, usize))> { + /// Returns a list of all chunks with their remaining ranges to be downloaded: + /// `RemainingChunkRange`. + pub fn needed_ranges(&self) -> Vec { self.chunks .iter() .enumerate() .filter(|(_, remaining)| remaining.is_some()) - .map(|(index, remaining)| (index, remaining.expect("qed"))) + .map(|(index, remaining)| { + let range = remaining.as_ref().expect("qed"); + RemainingChunkRange { index, start: *range.start(), end: *range.end() } + }) .collect() } @@ -64,12 +66,12 @@ impl Metadata { } // Update chunk with downloaded range - if let Some((mut start, end)) = self.chunks[index] { - start += downloaded_bytes; - if start > end { + if let Some(range) = &self.chunks[index] { + let start = range.start() + downloaded_bytes; + if start > *range.end() { self.chunks[index] = None; } else { - self.chunks[index] = Some((start, end)); + self.chunks[index] = Some(start..=*range.end()); } } @@ -85,13 +87,17 @@ impl Metadata { /// Commits the [`Metadata`] to file. pub fn commit(&self) -> Result<(), DownloaderError> { Ok(reth_fs_util::atomic_write_file(&self.path, |file| { - bincode::serialize_into(file, &self) + bincode::serialize_into(file, &MetadataFile::from(self)) })?) } /// Loads a [`Metadata`] file from disk using the target data file. pub fn load(data_file: &Path) -> Result { - Ok(bincode::deserialize_from(File::open(Self::file_path(data_file))?)?) + let metadata_file_path = Self::file_path(data_file); + let MetadataFile { total_size, downloaded, chunk_size, chunks } = + bincode::deserialize_from(File::open(&metadata_file_path)?)?; + + Ok(Self { total_size, downloaded, chunk_size, chunks, path: metadata_file_path }) } /// Returns true if we have downloaded all chunks. @@ -142,7 +148,7 @@ impl MetadataBuilder { let chunks = (0..*total_size) .step_by(self.chunk_size) .map(|start| { - Some((start, (start + self.chunk_size).min(*total_size).saturating_sub(1))) + Some(start..=(start + self.chunk_size).min(*total_size).saturating_sub(1)) }) .collect(); @@ -161,3 +167,29 @@ impl MetadataBuilder { } } } + +/// Helper type that can serialize and deserialize [`Metadata`] to disk. +#[derive(Debug, Serialize, Deserialize)] +struct MetadataFile { + /// Total file size + total_size: usize, + /// Total file size + downloaded: usize, + /// Download chunk size. Default 150MB. + chunk_size: usize, + /// Remaining download ranges for each chunk. + /// - `Some(RangeInclusive)`: range to be downloaded. + /// - `None`: Chunk fully downloaded. + chunks: Vec>>, +} + +impl From<&Metadata> for MetadataFile { + fn from(metadata: &Metadata) -> Self { + Self { + total_size: metadata.total_size, + downloaded: metadata.downloaded, + chunk_size: metadata.chunk_size, + chunks: metadata.chunks.clone(), + } + } +} diff --git a/crates/stages/stages/src/stages/s3/downloader/mod.rs b/crates/stages/stages/src/stages/s3/downloader/mod.rs index 9f77807273a5..d42c8251a079 100644 --- a/crates/stages/stages/src/stages/s3/downloader/mod.rs +++ b/crates/stages/stages/src/stages/s3/downloader/mod.rs @@ -21,7 +21,18 @@ pub(crate) enum S3DownloaderResponse { impl S3DownloaderResponse { /// Whether the downloaded block range is the last requested one. - pub(crate) fn is_done(&self) -> bool { + pub(crate) const fn is_done(&self) -> bool { matches!(self, Self::Done) } } + +/// Chunk nth remaining range to be downloaded. +#[derive(Debug)] +pub struct RemainingChunkRange { + /// The nth chunk + pub index: usize, + /// Start of range + pub start: usize, + /// End of range + pub end: usize, +}