Skip to content

Commit

Permalink
MetadataFile & RemainingChunkRange
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Jan 20, 2025
1 parent e59f489 commit 3331f34
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 21 deletions.
6 changes: 3 additions & 3 deletions crates/stages/stages/src/stages/s3/downloader/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stages::s3::downloader::worker::spawn_workers;
use crate::stages::s3::downloader::{worker::spawn_workers, RemainingChunkRange};

use super::{
error::DownloaderError,
Expand Down Expand Up @@ -101,9 +101,9 @@ pub async fn fetch(

let worker = workers.get(&available_worker).expect("should exist");
match missing_chunks.next() {
Some((chunk_index, (start, end))) => {
Some(RemainingChunkRange { index, start, end }) => {
debug!(target: "sync::stages::s3::downloader", ?available_worker, start, end, "Worker download request.");
let _ = worker.send(WorkerRequest::Download { chunk_index, start, end });
let _ = worker.send(WorkerRequest::Download { chunk_index: index, start, end });
}
None => {
let _ = worker.send(WorkerRequest::Finish);
Expand Down
66 changes: 49 additions & 17 deletions crates/stages/stages/src/stages/s3/downloader/meta.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::error::DownloaderError;
use super::{error::DownloaderError, RemainingChunkRange};
use serde::{Deserialize, Serialize};
use std::{
fs::File,
ops::RangeInclusive,
path::{Path, PathBuf},
};
use tracing::info;

/// Tracks download progress and manages chunked downloads for resumable file transfers.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub struct Metadata {
/// Total file size
pub total_size: usize,
Expand All @@ -16,11 +17,10 @@ pub struct Metadata {
/// Download chunk size. Default 150MB.
pub chunk_size: usize,
/// Remaining download ranges for each chunk.
/// - `Some((start, end))`: Start and end indices to be downloaded.
/// - `Some(RangeInclusive)`: range to be downloaded.
/// - `None`: Chunk fully downloaded.
chunks: Vec<Option<(usize, usize)>>,
chunks: Vec<Option<RangeInclusive<usize>>>,
/// Path with the stored metadata.
#[serde(skip)]
path: PathBuf,
}

Expand All @@ -38,15 +38,17 @@ impl Metadata {
))
}

/// Returns a list of all chunks with their remaining ranges to be downloaded.
///
/// Returns a list of `(chunk_index, (start, end))`
pub fn needed_ranges(&self) -> Vec<(usize, (usize, usize))> {
/// Returns a list of all chunks with their remaining ranges to be downloaded:
/// `RemainingChunkRange`.
pub fn needed_ranges(&self) -> Vec<RemainingChunkRange> {
self.chunks
.iter()
.enumerate()
.filter(|(_, remaining)| remaining.is_some())
.map(|(index, remaining)| (index, remaining.expect("qed")))
.map(|(index, remaining)| {
let range = remaining.as_ref().expect("qed");
RemainingChunkRange { index, start: *range.start(), end: *range.end() }
})
.collect()
}

Expand All @@ -64,12 +66,12 @@ impl Metadata {
}

// Update chunk with downloaded range
if let Some((mut start, end)) = self.chunks[index] {
start += downloaded_bytes;
if start > end {
if let Some(range) = &self.chunks[index] {
let start = range.start() + downloaded_bytes;
if start > *range.end() {
self.chunks[index] = None;
} else {
self.chunks[index] = Some((start, end));
self.chunks[index] = Some(start..=*range.end());
}
}

Expand All @@ -85,13 +87,17 @@ impl Metadata {
/// Commits the [`Metadata`] to file.
pub fn commit(&self) -> Result<(), DownloaderError> {
Ok(reth_fs_util::atomic_write_file(&self.path, |file| {
bincode::serialize_into(file, &self)
bincode::serialize_into(file, &MetadataFile::from(self))
})?)
}

/// Loads a [`Metadata`] file from disk using the target data file.
pub fn load(data_file: &Path) -> Result<Self, DownloaderError> {
Ok(bincode::deserialize_from(File::open(Self::file_path(data_file))?)?)
let metadata_file_path = Self::file_path(data_file);
let MetadataFile { total_size, downloaded, chunk_size, chunks } =
bincode::deserialize_from(File::open(&metadata_file_path)?)?;

Ok(Self { total_size, downloaded, chunk_size, chunks, path: metadata_file_path })
}

/// Returns true if we have downloaded all chunks.
Expand Down Expand Up @@ -142,7 +148,7 @@ impl MetadataBuilder {
let chunks = (0..*total_size)
.step_by(self.chunk_size)
.map(|start| {
Some((start, (start + self.chunk_size).min(*total_size).saturating_sub(1)))
Some(start..=(start + self.chunk_size).min(*total_size).saturating_sub(1))
})
.collect();

Expand All @@ -161,3 +167,29 @@ impl MetadataBuilder {
}
}
}

/// Helper type that can serialize and deserialize [`Metadata`] to disk.
#[derive(Debug, Serialize, Deserialize)]
struct MetadataFile {
/// Total file size
total_size: usize,
/// Total file size
downloaded: usize,
/// Download chunk size. Default 150MB.
chunk_size: usize,
/// Remaining download ranges for each chunk.
/// - `Some(RangeInclusive)`: range to be downloaded.
/// - `None`: Chunk fully downloaded.
chunks: Vec<Option<RangeInclusive<usize>>>,
}

impl From<&Metadata> for MetadataFile {
fn from(metadata: &Metadata) -> Self {
Self {
total_size: metadata.total_size,
downloaded: metadata.downloaded,
chunk_size: metadata.chunk_size,
chunks: metadata.chunks.clone(),
}
}
}
13 changes: 12 additions & 1 deletion crates/stages/stages/src/stages/s3/downloader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,18 @@ pub(crate) enum S3DownloaderResponse {

impl S3DownloaderResponse {
/// Whether the downloaded block range is the last requested one.
pub(crate) fn is_done(&self) -> bool {
pub(crate) const fn is_done(&self) -> bool {
matches!(self, Self::Done)
}
}

/// Chunk nth remaining range to be downloaded.
#[derive(Debug)]
pub struct RemainingChunkRange {
/// The nth chunk
pub index: usize,
/// Start of range
pub start: usize,
/// End of range
pub end: usize,
}

0 comments on commit 3331f34

Please sign in to comment.