Skip to content

Commit

Permalink
feat: make bodies downloader generic over header (#13259)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Dec 10, 2024
1 parent da99986 commit 15470b4
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 92 deletions.
93 changes: 48 additions & 45 deletions crates/net/downloaders/src/bodies/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ use tracing::info;
/// All blocks in a batch are fetched at the same time.
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct BodiesDownloader<B: BodiesClient, Provider> {
pub struct BodiesDownloader<B: BodiesClient, Provider: HeaderProvider> {
/// The bodies client
client: Arc<B>,
/// The consensus client
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<Provider::Header, B::Body>>,
/// The database handle
provider: Provider,
/// The maximum number of non-empty blocks per one request
Expand All @@ -57,19 +57,19 @@ pub struct BodiesDownloader<B: BodiesClient, Provider> {
/// The latest block number returned.
latest_queued_block_number: Option<BlockNumber>,
/// Requests in progress
in_progress_queue: BodiesRequestQueue<B>,
in_progress_queue: BodiesRequestQueue<Provider::Header, B>,
/// Buffered responses
buffered_responses: BinaryHeap<OrderedBodiesResponse<B::Body>>,
buffered_responses: BinaryHeap<OrderedBodiesResponse<Provider::Header, B::Body>>,
/// Queued body responses that can be returned for insertion into the database.
queued_bodies: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
queued_bodies: Vec<BlockResponse<Provider::Header, B::Body>>,
/// The bodies downloader metrics.
metrics: BodyDownloaderMetrics,
}

impl<B, Provider> BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
Provider: HeaderProvider + Unpin + 'static,
Provider: HeaderProvider<Header: BlockHeader> + Unpin + 'static,
{
/// Returns the next contiguous request.
fn next_headers_request(&self) -> DownloadResult<Option<Vec<SealedHeader<Provider::Header>>>> {
Expand Down Expand Up @@ -193,14 +193,16 @@ where
}

/// Queues bodies and sets the latest queued block number
fn queue_bodies(&mut self, bodies: Vec<BlockResponse<alloy_consensus::Header, B::Body>>) {
fn queue_bodies(&mut self, bodies: Vec<BlockResponse<Provider::Header, B::Body>>) {
self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
self.queued_bodies.extend(bodies);
self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
}

/// Removes the next response from the buffer.
fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse<B::Body>> {
fn pop_buffered_response(
&mut self,
) -> Option<OrderedBodiesResponse<Provider::Header, B::Body>> {
let resp = self.buffered_responses.pop()?;
self.metrics.buffered_responses.decrement(1.);
self.buffered_blocks_size_bytes -= resp.size();
Expand All @@ -210,13 +212,10 @@ where
}

/// Adds a new response to the internal buffer
fn buffer_bodies_response(
&mut self,
response: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
) {
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<Provider::Header, B::Body>>) {
// take into account capacity
let size = response.iter().map(BlockResponse::size).sum::<usize>() +
response.capacity() * mem::size_of::<BlockResponse<B::Body>>();
response.capacity() * mem::size_of::<BlockResponse<Provider::Header, B::Body>>();

let response = OrderedBodiesResponse { resp: response, size };
let response_len = response.len();
Expand All @@ -230,9 +229,7 @@ where
}

/// Returns a response if it's first block number matches the next expected.
fn try_next_buffered(
&mut self,
) -> Option<Vec<BlockResponse<alloy_consensus::Header, B::Body>>> {
fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<Provider::Header, B::Body>>> {
if let Some(next) = self.buffered_responses.peek() {
let expected = self.next_expected_block_number();
let next_block_range = next.block_range();
Expand All @@ -258,9 +255,7 @@ where

/// Returns the next batch of block bodies that can be returned if we have enough buffered
/// bodies
fn try_split_next_batch(
&mut self,
) -> Option<Vec<BlockResponse<alloy_consensus::Header, B::Body>>> {
fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<Provider::Header, B::Body>>> {
if self.queued_bodies.len() >= self.stream_batch_size {
let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
self.queued_bodies.shrink_to_fit();
Expand Down Expand Up @@ -292,12 +287,17 @@ where
Self: BodyDownloader + 'static,
{
/// Spawns the downloader task via [`tokio::task::spawn`]
pub fn into_task(self) -> TaskDownloader<<Self as BodyDownloader>::Body> {
pub fn into_task(
self,
) -> TaskDownloader<<Self as BodyDownloader>::Header, <Self as BodyDownloader>::Body> {
self.into_task_with(&TokioTaskExecutor::default())
}

/// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner.
pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<<Self as BodyDownloader>::Body>
pub fn into_task_with<S>(
self,
spawner: &S,
) -> TaskDownloader<<Self as BodyDownloader>::Header, <Self as BodyDownloader>::Body>
where
S: TaskSpawner,
{
Expand All @@ -308,8 +308,9 @@ where
impl<B, Provider> BodyDownloader for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: Debug + InMemorySize> + 'static,
Provider: HeaderProvider<Header = alloy_consensus::Header> + Unpin + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Header = Provider::Header;
type Body = B::Body;

/// Set a new download range (exclusive).
Expand Down Expand Up @@ -358,9 +359,9 @@ where
impl<B, Provider> Stream for BodiesDownloader<B, Provider>
where
B: BodiesClient<Body: InMemorySize> + 'static,
Provider: HeaderProvider<Header = alloy_consensus::Header> + Unpin + 'static,
Provider: HeaderProvider + Unpin + 'static,
{
type Item = BodyDownloaderResult<B::Body>;
type Item = BodyDownloaderResult<Provider::Header, B::Body>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand Down Expand Up @@ -442,13 +443,28 @@ where
}

#[derive(Debug)]
struct OrderedBodiesResponse<B> {
resp: Vec<BlockResponse<alloy_consensus::Header, B>>,
struct OrderedBodiesResponse<H, B> {
resp: Vec<BlockResponse<H, B>>,
/// The total size of the response in bytes
size: usize,
}

impl<B> OrderedBodiesResponse<B> {
impl<H, B> OrderedBodiesResponse<H, B> {
#[inline]
fn len(&self) -> usize {
self.resp.len()
}

/// Returns the size of the response in bytes
///
/// See [`BlockResponse::size`]
#[inline]
const fn size(&self) -> usize {
self.size
}
}

impl<H: BlockHeader, B> OrderedBodiesResponse<H, B> {
/// Returns the block number of the first element
///
/// # Panics
Expand All @@ -464,36 +480,23 @@ impl<B> OrderedBodiesResponse<B> {
fn block_range(&self) -> RangeInclusive<u64> {
self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
}

#[inline]
fn len(&self) -> usize {
self.resp.len()
}

/// Returns the size of the response in bytes
///
/// See [`BlockResponse::size`]
#[inline]
const fn size(&self) -> usize {
self.size
}
}

impl<B> PartialEq for OrderedBodiesResponse<B> {
impl<H: BlockHeader, B> PartialEq for OrderedBodiesResponse<H, B> {
fn eq(&self, other: &Self) -> bool {
self.first_block_number() == other.first_block_number()
}
}

impl<B> Eq for OrderedBodiesResponse<B> {}
impl<H: BlockHeader, B> Eq for OrderedBodiesResponse<H, B> {}

impl<B> PartialOrd for OrderedBodiesResponse<B> {
impl<H: BlockHeader, B> PartialOrd for OrderedBodiesResponse<H, B> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<B> Ord for OrderedBodiesResponse<B> {
impl<H: BlockHeader, B> Ord for OrderedBodiesResponse<H, B> {
fn cmp(&self, other: &Self) -> Ordering {
self.first_block_number().cmp(&other.first_block_number()).reverse()
}
Expand Down Expand Up @@ -573,7 +576,7 @@ impl BodiesDownloaderBuilder {
pub fn build<B, Provider>(
self,
client: B,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<Provider::Header, B::Body>>,
provider: Provider,
) -> BodiesDownloader<B, Provider>
where
Expand Down
14 changes: 10 additions & 4 deletions crates/net/downloaders/src/bodies/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@ use std::{fmt::Debug, ops::RangeInclusive};
/// A [`BodyDownloader`] implementation that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopBodiesDownloader<B>(std::marker::PhantomData<B>);
pub struct NoopBodiesDownloader<H, B> {
_header: std::marker::PhantomData<H>,
_body: std::marker::PhantomData<B>,
}

impl<B: Debug + Send + Sync + Unpin + 'static> BodyDownloader for NoopBodiesDownloader<B> {
impl<H: Debug + Send + Sync + Unpin + 'static, B: Debug + Send + Sync + Unpin + 'static>
BodyDownloader for NoopBodiesDownloader<H, B>
{
type Body = B;
type Header = H;

fn set_download_range(&mut self, _: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
Ok(())
}
}

impl<B> Stream for NoopBodiesDownloader<B> {
type Item = Result<Vec<BlockResponse<alloy_consensus::Header, B>>, DownloadError>;
impl<H, B> Stream for NoopBodiesDownloader<H, B> {
type Item = Result<Vec<BlockResponse<H, B>>, DownloadError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
Expand Down
21 changes: 12 additions & 9 deletions crates/net/downloaders/src/bodies/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::request::BodiesRequestFuture;
use crate::metrics::BodyDownloaderMetrics;
use alloy_consensus::BlockHeader;
use alloy_primitives::BlockNumber;
use futures::{stream::FuturesUnordered, Stream};
use futures_util::StreamExt;
Expand All @@ -19,18 +20,19 @@ use std::{
/// The wrapper around [`FuturesUnordered`] that keeps information
/// about the blocks currently being requested.
#[derive(Debug)]
pub(crate) struct BodiesRequestQueue<B: BodiesClient> {
pub(crate) struct BodiesRequestQueue<H, B: BodiesClient> {
/// Inner body request queue.
inner: FuturesUnordered<BodiesRequestFuture<B>>,
inner: FuturesUnordered<BodiesRequestFuture<H, B>>,
/// The downloader metrics.
metrics: BodyDownloaderMetrics,
/// Last requested block number.
pub(crate) last_requested_block_number: Option<BlockNumber>,
}

impl<B> BodiesRequestQueue<B>
impl<H, B> BodiesRequestQueue<H, B>
where
B: BodiesClient + 'static,
H: BlockHeader,
{
/// Create new instance of request queue.
pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
Expand Down Expand Up @@ -58,15 +60,15 @@ where
pub(crate) fn push_new_request(
&mut self,
client: Arc<B>,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
request: Vec<SealedHeader>,
consensus: Arc<dyn Consensus<H, B::Body>>,
request: Vec<SealedHeader<H>>,
) {
// Set last max requested block number
self.last_requested_block_number = request
.last()
.map(|last| match self.last_requested_block_number {
Some(num) => last.number.max(num),
None => last.number,
Some(num) => last.number().max(num),
None => last.number(),
})
.or(self.last_requested_block_number);
// Create request and push into the queue.
Expand All @@ -76,11 +78,12 @@ where
}
}

impl<B> Stream for BodiesRequestQueue<B>
impl<H, B> Stream for BodiesRequestQueue<H, B>
where
H: BlockHeader + Send + Sync + Unpin + 'static,
B: BodiesClient<Body: InMemorySize> + 'static,
{
type Item = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B::Body>>>;
type Item = DownloadResult<Vec<BlockResponse<H, B::Body>>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().inner.poll_next_unpin(cx)
Expand Down
22 changes: 12 additions & 10 deletions crates/net/downloaders/src/bodies/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,31 @@ use std::{
/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
/// that try to give us bodies that do not match the requested order are going to be penalized
/// and eventually disconnected.
pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
pub(crate) struct BodiesRequestFuture<H, B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<H, B::Body>>,
metrics: BodyDownloaderMetrics,
/// Metrics for individual responses. This can be used to observe how the size (in bytes) of
/// responses change while bodies are being downloaded.
response_metrics: ResponseMetrics,
// Headers to download. The collection is shrunk as responses are buffered.
pending_headers: VecDeque<SealedHeader>,
pending_headers: VecDeque<SealedHeader<H>>,
/// Internal buffer for all blocks
buffer: Vec<BlockResponse<alloy_consensus::Header, B::Body>>,
buffer: Vec<BlockResponse<H, B::Body>>,
fut: Option<B::Output>,
/// Tracks how many bodies we requested in the last request.
last_request_len: Option<usize>,
}

impl<B> BodiesRequestFuture<B>
impl<H, B> BodiesRequestFuture<H, B>
where
B: BodiesClient + 'static,
H: BlockHeader,
{
/// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
pub(crate) fn new(
client: Arc<B>,
consensus: Arc<dyn Consensus<alloy_consensus::Header, B::Body>>,
consensus: Arc<dyn Consensus<H, B::Body>>,
metrics: BodyDownloaderMetrics,
) -> Self {
Self {
Expand All @@ -76,7 +77,7 @@ where
}
}

pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader>) -> Self {
pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<H>>) -> Self {
self.buffer.reserve_exact(headers.len());
self.pending_headers = VecDeque::from(headers);
// Submit the request only if there are any headers to download.
Expand Down Expand Up @@ -192,7 +193,7 @@ where
if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
// Body is invalid, put the header back and return an error
let hash = block.hash();
let number = block.number;
let number = block.number();
self.pending_headers.push_front(block.header);
return Err(DownloadError::BodyValidation {
hash,
Expand All @@ -213,11 +214,12 @@ where
}
}

impl<B> Future for BodiesRequestFuture<B>
impl<H, B> Future for BodiesRequestFuture<H, B>
where
H: BlockHeader + Unpin + Send + Sync + 'static,
B: BodiesClient<Body: InMemorySize> + 'static,
{
type Output = DownloadResult<Vec<BlockResponse<alloy_consensus::Header, B::Body>>>;
type Output = DownloadResult<Vec<BlockResponse<H, B::Body>>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Expand Down
Loading

0 comments on commit 15470b4

Please sign in to comment.