Skip to content

Commit

Permalink
maybe_spawn_fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Jan 16, 2025
1 parent 61811df commit e59f489
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 52 deletions.
17 changes: 12 additions & 5 deletions crates/stages/stages/src/stages/s3/downloader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ pub use meta::Metadata;

/// Response sent by the fetch task to `S3Stage` once it has downloaded all files of a block
/// range.
pub(crate) struct S3DownloaderResponse {
/// Downloaded range.
pub range: std::ops::RangeInclusive<u64>,
/// Whether the fetch task has downloaded the last requested block range.
pub is_done: bool,
pub(crate) enum S3DownloaderResponse {
/// A new block range was downloaded.
AddedNewRange,
/// The last requested block range was downloaded.
Done,
}

impl S3DownloaderResponse {
/// Whether the downloaded block range is the last requested one.
pub(crate) fn is_done(&self) -> bool {
matches!(self, Self::Done)
}
}
89 changes: 42 additions & 47 deletions crates/stages/stages/src/stages/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@ use downloader::{DownloaderError, S3DownloaderResponse};
mod filelist;
use filelist::DOWNLOAD_FILE_LIST;

use reth_db::{cursor::DbCursorRW, tables, transaction::DbTxMut};
use reth_db::transaction::DbTxMut;
use reth_primitives::StaticFileSegment;
use reth_provider::{
BlockBodyIndicesProvider, DBProvider, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
DBProvider, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use std::{
ops::RangeInclusive,
path::PathBuf,
task::{ready, Context, Poll},
};
Expand All @@ -36,8 +34,6 @@ pub struct S3Stage {
max_concurrent_requests: u64,
/// Channel to receive the downloaded ranges from the fetch task.
fetch_rx: Option<UnboundedReceiver<Result<S3DownloaderResponse, DownloaderError>>>,
/// Downloaded ranges by the fetch task.
downloaded_ranges: Vec<RangeInclusive<u64>>,
}

impl<Provider> Stage<Provider> for S3Stage
Expand All @@ -63,8 +59,7 @@ where

let response = match ready!(rx.poll_recv(cx)) {
Some(Ok(response)) => {
is_done = response.is_done;
self.downloaded_ranges.push(response.range);
is_done = response.is_done();
Ok(())
}
Some(Err(_)) => todo!(), // TODO: DownloaderError -> StageError
Expand All @@ -78,10 +73,13 @@ where
return Poll::Ready(response)
}

// Spawns the downloader task
self.spawn_fetch(input);
// Spawns the downloader task if there are any missing files
if let Some(fetch_rx) = self.maybe_spawn_fetch(input) {
self.fetch_rx = Some(fetch_rx);
return Poll::Pending
}

Poll::Pending
Poll::Ready(Ok(()))
}

fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>
Expand All @@ -91,39 +89,26 @@ where
+ StageCheckpointReader
+ StageCheckpointWriter,
{
let (_, to_block) = input.next_block_range().into_inner();
let static_file_provider = provider.static_file_provider();
let mut tx_block_cursor = provider.tx_ref().cursor_write::<tables::TransactionBlocks>()?;

// Re-initializes the provider to detect the new additions
static_file_provider.initialize_index()?;

let highest_block = self
.downloaded_ranges
.last()
.map(|r| *r.end())
.unwrap_or_else(|| input.checkpoint().block_number);

for block_range in self.downloaded_ranges.drain(..) {
// Populate TransactionBlock table
for block_number in block_range {
// TODO: should be error if none since we always expect them to exist here
if let Some(indice) = static_file_provider.block_body_indices(block_number)? {
if indice.tx_count() > 0 {
tx_block_cursor.append(indice.last_tx_num(), &block_number)?;
}
}
}
}
provider.static_file_provider().initialize_index()?;

let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None };
provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?;
provider.save_stage_checkpoint(S3_STAGE_ID, checkpoint)?;
// TODO logic for appending tx_block

// TODO: verify input.target according to s3 stage specifications
let done = highest_block == to_block;
// let (_, _to_block) = input.next_block_range().into_inner();
// let static_file_provider = provider.static_file_provider();
// let mut _tx_block_cursor =
// provider.tx_ref().cursor_write::<tables::TransactionBlocks>()?;

Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done })
// tx_block_cursor.append(indice.last_tx_num(), &block_number)?;

// let checkpoint = StageCheckpoint { block_number: highest_block, stage_checkpoint: None };
// provider.save_stage_checkpoint(StageId::Bodies, checkpoint)?;
// provider.save_stage_checkpoint(S3_STAGE_ID, checkpoint)?;

// // TODO: verify input.target according to s3 stage specifications
// let done = highest_block == to_block;

Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
}

fn unwind(
Expand All @@ -137,12 +122,16 @@ where
}

impl S3Stage {
/// Spawns a task that will fetch all the missing static files from the remote server.
/// It will only spawn a task to fetch files from the remote server, it there are any missing
/// static files.
///
/// Every time a block range is ready with all the necessary files, it sends a
/// [`S3DownloaderResponse`] to `self.fetch_rx`. If it's the last requested block range, the
/// response will have `is_done` set to true.
fn spawn_fetch(&mut self, input: ExecInput) {
fn maybe_spawn_fetch(
&self,
input: ExecInput,
) -> Option<UnboundedReceiver<Result<S3DownloaderResponse, DownloaderError>>> {
let checkpoint = input.checkpoint();
// TODO: input target can only be certain numbers. eg. 499_999 , 999_999 etc.

Expand Down Expand Up @@ -171,6 +160,11 @@ impl S3Stage {
requests.push((block_range, block_range_requests));
}

// Return None, if we have downloaded all the files that are required.
if requests.is_empty() {
return None
}

let static_file_directory = self.static_file_directory.clone();
let url = self.url.clone();
let max_concurrent_requests = self.max_concurrent_requests;
Expand All @@ -179,7 +173,7 @@ impl S3Stage {
tokio::spawn(async move {
let mut requests_iter = requests.into_iter().peekable();

while let Some((block_range, file_requests)) = requests_iter.next() {
while let Some((_, file_requests)) = requests_iter.next() {
for (filename, file_hash) in file_requests {
if let Err(err) = fetch(
filename,
Expand All @@ -195,16 +189,17 @@ impl S3Stage {
}
}

let response = S3DownloaderResponse {
range: block_range.into(),
is_done: requests_iter.peek().is_none(),
let response = if requests_iter.peek().is_none() {
S3DownloaderResponse::Done
} else {
S3DownloaderResponse::AddedNewRange
};

let _ = fetch_tx.send(Ok(response));
}
});

self.fetch_rx = Some(fetch_rx);
Some(fetch_rx)
}
}

Expand Down

0 comments on commit e59f489

Please sign in to comment.