From e59f4895de4677851a1daa4769e735244129482a Mon Sep 17 00:00:00 2001 From: joshie <93316087+joshieDo@users.noreply.github.com> Date: Wed, 15 Jan 2025 14:40:10 +0000 Subject: [PATCH] maybe_spawn_fetch --- .../stages/src/stages/s3/downloader/mod.rs | 17 ++-- crates/stages/stages/src/stages/s3/mod.rs | 89 +++++++++---------- 2 files changed, 54 insertions(+), 52 deletions(-) diff --git a/crates/stages/stages/src/stages/s3/downloader/mod.rs b/crates/stages/stages/src/stages/s3/downloader/mod.rs index 8a804f68425b..9f77807273a5 100644 --- a/crates/stages/stages/src/stages/s3/downloader/mod.rs +++ b/crates/stages/stages/src/stages/s3/downloader/mod.rs @@ -12,9 +12,16 @@ pub use meta::Metadata; /// Response sent by the fetch task to `S3Stage` once it has downloaded all files of a block /// range. -pub(crate) struct S3DownloaderResponse { - /// Downloaded range. - pub range: std::ops::RangeInclusive, - /// Whether the fetch task has downloaded the last requested block range. - pub is_done: bool, +pub(crate) enum S3DownloaderResponse { + /// A new block range was downloaded. + AddedNewRange, + /// The last requested block range was downloaded. + Done, +} + +impl S3DownloaderResponse { + /// Whether the downloaded block range is the last requested one. + pub(crate) fn is_done(&self) -> bool { + matches!(self, Self::Done) + } } diff --git a/crates/stages/stages/src/stages/s3/mod.rs b/crates/stages/stages/src/stages/s3/mod.rs index 6b92f8d3a73f..073e8fdcf990 100644 --- a/crates/stages/stages/src/stages/s3/mod.rs +++ b/crates/stages/stages/src/stages/s3/mod.rs @@ -5,17 +5,15 @@ use downloader::{DownloaderError, S3DownloaderResponse}; mod filelist; use filelist::DOWNLOAD_FILE_LIST; -use reth_db::{cursor::DbCursorRW, tables, transaction::DbTxMut}; +use reth_db::transaction::DbTxMut; use reth_primitives::StaticFileSegment; use reth_provider::{ - BlockBodyIndicesProvider, DBProvider, StageCheckpointReader, StageCheckpointWriter, - StaticFileProviderFactory, + DBProvider, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, }; use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; use std::{ - ops::RangeInclusive, path::PathBuf, task::{ready, Context, Poll}, }; @@ -36,8 +34,6 @@ pub struct S3Stage { max_concurrent_requests: u64, /// Channel to receive the downloaded ranges from the fetch task. fetch_rx: Option>>, - /// Downloaded ranges by the fetch task. - downloaded_ranges: Vec>, } impl Stage for S3Stage @@ -63,8 +59,7 @@ where let response = match ready!(rx.poll_recv(cx)) { Some(Ok(response)) => { - is_done = response.is_done; - self.downloaded_ranges.push(response.range); + is_done = response.is_done(); Ok(()) } Some(Err(_)) => todo!(), // TODO: DownloaderError -> StageError @@ -78,10 +73,13 @@ where return Poll::Ready(response) } - // Spawns the downloader task - self.spawn_fetch(input); + // Spawns the downloader task if there are any missing files + if let Some(fetch_rx) = self.maybe_spawn_fetch(input) { + self.fetch_rx = Some(fetch_rx); + return Poll::Pending + } - Poll::Pending + Poll::Ready(Ok(())) } fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result @@ -91,39 +89,26 @@ where + StageCheckpointReader + StageCheckpointWriter, { - let (_, to_block) = input.next_block_range().into_inner(); - let static_file_provider = provider.static_file_provider(); - let mut tx_block_cursor = provider.tx_ref().cursor_write::()?; - // Re-initializes the provider to detect the new additions - static_file_provider.initialize_index()?; - - let highest_block = self - .downloaded_ranges - .last() - .map(|r| *r.end()) - .unwrap_or_else(|| input.checkpoint().block_number); - - for block_range in self.downloaded_ranges.drain(..) { - // Populate TransactionBlock table - for block_number in block_range { - // TODO: should be error if none since we always expect them to exist here - if let Some(indice) = static_file_provider.block_body_indices(block_number)? { - if indice.tx_count() > 0 { - tx_block_cursor.append(indice.last_tx_num(), &block_number)?; - } - } - } - } + provider.static_file_provider().initialize_index()?; - let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None }; - provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?; - provider.save_stage_checkpoint(S3_STAGE_ID, checkpoint)?; + // TODO logic for appending tx_block - // TODO: verify input.target according to s3 stage specifications - let done = highest_block == to_block; + // let (_, _to_block) = input.next_block_range().into_inner(); + // let static_file_provider = provider.static_file_provider(); + // let mut _tx_block_cursor = + // provider.tx_ref().cursor_write::()?; - Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done }) + // tx_block_cursor.append(indice.last_tx_num(), &block_number)?; + + // let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None }; + // provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?; + // provider.save_stage_checkpoint(S3_STAGE_ID, checkpoint)?; + + // // TODO: verify input.target according to s3 stage specifications + // let done = highest_block == to_block; + + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } fn unwind( @@ -137,12 +122,16 @@ where } impl S3Stage { - /// Spawns a task that will fetch all the missing static files from the remote server. + /// It will only spawn a task to fetch files from the remote server, it there are any missing + /// static files. /// /// Every time a block range is ready with all the necessary files, it sends a /// [`S3DownloaderResponse`] to `self.fetch_rx`. If it's the last requested block range, the /// response will have `is_done` set to true. - fn spawn_fetch(&mut self, input: ExecInput) { + fn maybe_spawn_fetch( + &self, + input: ExecInput, + ) -> Option>> { let checkpoint = input.checkpoint(); // TODO: input target can only be certain numbers. eg. 499_999 , 999_999 etc. @@ -171,6 +160,11 @@ impl S3Stage { requests.push((block_range, block_range_requests)); } + // Return None, if we have downloaded all the files that are required. + if requests.is_empty() { + return None + } + let static_file_directory = self.static_file_directory.clone(); let url = self.url.clone(); let max_concurrent_requests = self.max_concurrent_requests; @@ -179,7 +173,7 @@ impl S3Stage { tokio::spawn(async move { let mut requests_iter = requests.into_iter().peekable(); - while let Some((block_range, file_requests)) = requests_iter.next() { + while let Some((_, file_requests)) = requests_iter.next() { for (filename, file_hash) in file_requests { if let Err(err) = fetch( filename, @@ -195,16 +189,17 @@ impl S3Stage { } } - let response = S3DownloaderResponse { - range: block_range.into(), - is_done: requests_iter.peek().is_none(), + let response = if requests_iter.peek().is_none() { + S3DownloaderResponse::Done + } else { + S3DownloaderResponse::AddedNewRange }; let _ = fetch_tx.send(Ok(response)); } }); - self.fetch_rx = Some(fetch_rx); + Some(fetch_rx) } }