Skip to content

Commit

Permalink
migrate p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Jan 9, 2025
1 parent 0c390e1 commit 5312afa
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 66 deletions.
14 changes: 6 additions & 8 deletions crates/net/p2p/src/bodies/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ use super::response::BlockResponse;
use crate::error::DownloadResult;
use alloy_primitives::BlockNumber;
use futures::Stream;
use std::{fmt::Debug, ops::RangeInclusive};
use reth_primitives_traits::Block;
use std::ops::RangeInclusive;

/// Body downloader return type.
pub type BodyDownloaderResult<H, B> = DownloadResult<Vec<BlockResponse<H, B>>>;
pub type BodyDownloaderResult<B> = DownloadResult<Vec<BlockResponse<B>>>;

/// A downloader capable of fetching and yielding block bodies from block headers.
///
/// A downloader represents a distinct strategy for submitting requests to download block bodies,
/// while a [`BodiesClient`][crate::bodies::client::BodiesClient] represents a client capable of
/// fulfilling these requests.
pub trait BodyDownloader:
Send + Sync + Stream<Item = BodyDownloaderResult<Self::Header, Self::Body>> + Unpin
Send + Sync + Stream<Item = BodyDownloaderResult<Self::Block>> + Unpin
{
/// The type of header that is being used
type Header: Debug + Send + Sync + Unpin + 'static;

/// The type of the body that is being downloaded.
type Body: Debug + Send + Sync + Unpin + 'static;
/// The Block type this downloader supports
type Block: Block + 'static;

/// Method for setting the download range.
fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()>;
Expand Down
31 changes: 13 additions & 18 deletions crates/net/p2p/src/bodies/response.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,26 @@
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockNumber, U256};
use reth_primitives::{BlockBody, SealedBlock, SealedHeader};
use reth_primitives_traits::InMemorySize;
use reth_primitives::{SealedBlock, SealedHeader};
use reth_primitives_traits::{Block, InMemorySize};
/// The block response
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum BlockResponse<H, B = BlockBody> {
pub enum BlockResponse<B: Block> {
/// Full block response (with transactions or ommers)
Full(SealedBlock<H, B>),
Full(SealedBlock<B>),
/// The empty block response
Empty(SealedHeader<H>),
Empty(SealedHeader<B::Header>),
}

impl<H, B> BlockResponse<H, B>
impl<B> BlockResponse<B>
where
H: BlockHeader,
B: Block,
{
/// Return the reference to the response header
pub const fn header(&self) -> &SealedHeader<H> {
match self {
Self::Full(block) => block.sealed_header(),
Self::Empty(header) => header,
}
}

/// Return the block number
pub fn block_number(&self) -> BlockNumber {
self.header().number()
match self {
Self::Full(block) => block.number(),
Self::Empty(header) => header.number(),
}
}

/// Return the reference to the response header
Expand All @@ -37,15 +32,15 @@ where
}

/// Return the reference to the response body
pub fn into_body(self) -> Option<B> {
pub fn into_body(self) -> Option<B::Body> {
match self {
Self::Full(block) => Some(block.into_body()),
Self::Empty(_) => None,
}
}
}

impl<H: InMemorySize, B: InMemorySize> InMemorySize for BlockResponse<H, B> {
impl<B: Block> InMemorySize for BlockResponse<B> {
#[inline]
fn size(&self) -> usize {
match self {
Expand Down
33 changes: 17 additions & 16 deletions crates/net/p2p/src/full_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
Client: BlockClient,
{
client: Client,
consensus: Arc<dyn Consensus<Client::Header, Client::Body, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
}

impl<Client> FullBlockClient<Client>
Expand All @@ -40,7 +40,7 @@ where
/// Creates a new instance of `FullBlockClient`.
pub fn new(
client: Client,
consensus: Arc<dyn Consensus<Client::Header, Client::Body, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
) -> Self {
Self { client, consensus }
}
Expand Down Expand Up @@ -118,7 +118,7 @@ where
Client: BlockClient,
{
client: Client,
consensus: Arc<dyn Consensus<Client::Header, Client::Body, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
hash: B256,
request: FullBlockRequest<Client>,
header: Option<SealedHeader<Client::Header>>,
Expand All @@ -140,15 +140,15 @@ where
}

/// Returns the [`SealedBlock`] if the request is complete and valid.
fn take_block(&mut self) -> Option<SealedBlock<Client::Header, Client::Body>> {
fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
if self.header.is_none() || self.body.is_none() {
return None
}

let header = self.header.take().unwrap();
let resp = self.body.take().unwrap();
match resp {
BodyResponse::Validated(body) => Some(SealedBlock::new(header, body)),
BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
BodyResponse::PendingValidation(resp) => {
// ensure the block is valid, else retry
if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
Expand All @@ -159,7 +159,7 @@ where
self.request.body = Some(self.client.get_block_body(self.hash));
return None
}
Some(SealedBlock::new(header, resp.into_data()))
Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
}
}
}
Expand All @@ -182,7 +182,7 @@ impl<Client> Future for FetchFullBlockFuture<Client>
where
Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
{
type Output = SealedBlock<Client::Header, Client::Body>;
type Output = SealedBlock<Client::Block>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Expand Down Expand Up @@ -330,7 +330,7 @@ where
/// The client used to fetch headers and bodies.
client: Client,
/// The consensus instance used to validate the blocks.
consensus: Arc<dyn Consensus<Client::Header, Client::Body, Error = ConsensusError>>,
consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
/// The block hash to start fetching from (inclusive).
start_hash: B256,
/// How many blocks to fetch: `len([start_hash, ..]) == count`
Expand Down Expand Up @@ -388,7 +388,7 @@ where
///
/// These are returned in falling order starting with the requested `hash`, i.e. with
/// descending block numbers.
fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Header, Client::Body>>> {
fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
if !self.is_bodies_complete() {
// not done with bodies yet
return None
Expand Down Expand Up @@ -421,15 +421,16 @@ where
}
};

valid_responses.push(SealedBlock::new(header.clone(), body));
valid_responses
.push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
}
}

if needs_retry {
// put response hashes back into bodies map since we aren't returning them as a
// response
for block in valid_responses {
let (header, body) = block.split();
let (header, body) = block.split_sealed_header_body();
self.bodies.insert(header, BodyResponse::Validated(body));
}

Expand Down Expand Up @@ -505,7 +506,7 @@ impl<Client> Future for FetchFullBlockRangeFuture<Client>
where
Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
{
type Output = Vec<SealedBlock<Client::Header, Client::Body>>;
type Output = Vec<SealedBlock<Client::Block>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Expand Down Expand Up @@ -658,7 +659,7 @@ mod tests {
let client = FullBlockClient::test_client(client);

let received = client.get_full_block(header.hash()).await;
assert_eq!(received, SealedBlock::new(header, body));
assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
}

#[tokio::test]
Expand All @@ -671,7 +672,7 @@ mod tests {

let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block");
assert_eq!(*received, SealedBlock::new(header, body));
assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
}

/// Inserts headers and returns the last header and block body.
Expand Down Expand Up @@ -703,7 +704,7 @@ mod tests {

let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block");
assert_eq!(*received, SealedBlock::new(header.clone(), body));
assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));

let received = client.get_full_block_range(header.hash(), 10).await;
assert_eq!(received.len(), 10);
Expand All @@ -722,7 +723,7 @@ mod tests {

let received = client.get_full_block_range(header.hash(), 1).await;
let received = received.first().expect("response should include a block");
assert_eq!(*received, SealedBlock::new(header.clone(), body));
assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));

let received = client.get_full_block_range(header.hash(), 50).await;
assert_eq!(received.len(), 50);
Expand Down
2 changes: 1 addition & 1 deletion crates/net/p2p/src/headers/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub type HeadersFut<H = Header> =
Pin<Box<dyn Future<Output = PeerRequestResult<Vec<H>>> + Send + Sync>>;

/// The block headers downloader client
#[auto_impl::auto_impl(&, Arc, Box)]
// #[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: DownloadClient {
/// The header type this client fetches.
type Header: BlockHeader;
Expand Down
25 changes: 13 additions & 12 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,20 @@ pub mod test_utils;

pub use bodies::client::BodiesClient;
pub use headers::client::HeadersClient;
use reth_primitives_traits::Block;

/// Helper trait that unifies network behaviour needed for fetching blocks.
pub trait BlockClient: HeadersClient + BodiesClient + Unpin + Clone {}

impl<T> BlockClient for T where T: HeadersClient + BodiesClient + Unpin + Clone {}

/// The [`BlockClient`] providing Ethereum block parts.
pub trait EthBlockClient:
BlockClient<Header = alloy_consensus::Header, Body = reth_primitives::BlockBody>
/// Helper trait that unifies network behaviour needed for fetching entire blocks.
pub trait BlockClient:
HeadersClient<Header = <Self::Block as Block>::Header>
+ BodiesClient<Body = <Self::Block as Block>::Body>
+ Unpin
+ Clone
{
/// The Block type that this client fetches.
type Block: Block;
}

impl<T> EthBlockClient for T where
T: BlockClient<Header = alloy_consensus::Header, Body = reth_primitives::BlockBody>
{
}
/// The [`BlockClient`] providing Ethereum block parts.
pub trait EthBlockClient: BlockClient<Block = reth_primitives::Block> {}

impl<T> EthBlockClient for T where T: BlockClient<Block = reth_primitives::Block> {}
12 changes: 9 additions & 3 deletions crates/net/p2p/src/test_utils/full_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
error::PeerRequestResult,
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
BlockClient,
};
use alloy_consensus::Header;
use alloy_eips::{BlockHashOrNumber, BlockNumHash};
Expand Down Expand Up @@ -133,9 +134,10 @@ impl TestFullBlockClient {
pub fn highest_block(&self) -> Option<SealedBlock> {
self.headers.lock().iter().max_by_key(|(_, header)| header.number).and_then(
|(hash, header)| {
self.bodies.lock().get(hash).map(|body| {
SealedBlock::new(SealedHeader::new(header.clone(), *hash), body.clone())
})
self.bodies
.lock()
.get(hash)
.map(|body| SealedBlock::from_parts(header.clone(), body.clone(), *hash))
},
)
}
Expand Down Expand Up @@ -243,3 +245,7 @@ impl BodiesClient for TestFullBlockClient {
)))
}
}

impl BlockClient for TestFullBlockClient {
type Block = reth_primitives::Block;
}
10 changes: 2 additions & 8 deletions crates/net/p2p/src/test_utils/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};
use alloy_consensus::Header;
use futures::{Future, FutureExt, Stream, StreamExt};
use reth_consensus::{test_utils::TestConsensus, Consensus, ConsensusError};
use reth_consensus::{test_utils::TestConsensus, HeaderValidator};
use reth_eth_wire_types::HeadersDirection;
use reth_network_peers::{PeerId, WithPeerId};
use reth_primitives::SealedHeader;
Expand Down Expand Up @@ -146,13 +146,7 @@ impl Stream for TestDownload {
}

let empty: SealedHeader = SealedHeader::default();
if let Err(error) =
<dyn Consensus<_, Error = ConsensusError>>::validate_header_against_parent(
&this.consensus,
&empty,
&empty,
)
{
if let Err(error) = this.consensus.validate_header_against_parent(&empty, &empty) {
this.done = true;
return Poll::Ready(Some(Err(DownloadError::HeaderValidation {
hash: empty.hash(),
Expand Down
17 changes: 17 additions & 0 deletions crates/primitives-traits/src/block/sealed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ impl<B> SealedBlock<B>
where
B: Block,
{
/// Creates the [`SealedBlock`] from the block's parts.
pub fn from_parts(header: B::Header, body: B::Body, hash: BlockHash) -> Self {
Self::new(B::new(header, body), hash)
}

/// Creates the [`SealedBlock`] from the [`SealedHeader`] and the body.
pub fn from_sealed_parts(header: SealedHeader<B::Header>, body: B::Body) -> Self {
let (header, hash) = header.split();
Self::from_parts(header, body, hash)
}

/// Converts this block into a [`RecoveredBlock`] with the given senders if the number of
/// senders is equal to the number of transactions in the block and recovers the senders from
/// the transactions, if
Expand Down Expand Up @@ -142,6 +153,12 @@ where
self.block.split()
}

/// Splits the block into body and header into separate components.
pub fn split_sealed_header_body(self) -> (SealedHeader<B::Header>, B::Body) {
let (header, block) = self.block.split();
(SealedHeader::new(header, self.hash), block)
}

/// Returns an iterator over all blob versioned hashes from the block body.
#[inline]
pub fn blob_versioned_hashes_iter(&self) -> impl Iterator<Item = &B256> + '_ {
Expand Down

0 comments on commit 5312afa

Please sign in to comment.