From 5312afa160f3b03cd1a46eaec9a434ab31f49b70 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 9 Jan 2025 17:20:48 +0100 Subject: [PATCH] migrate p2p --- crates/net/p2p/src/bodies/downloader.rs | 14 ++++----- crates/net/p2p/src/bodies/response.rs | 31 ++++++++---------- crates/net/p2p/src/full_block.rs | 33 ++++++++++---------- crates/net/p2p/src/headers/client.rs | 2 +- crates/net/p2p/src/lib.rs | 25 ++++++++------- crates/net/p2p/src/test_utils/full_block.rs | 12 +++++-- crates/net/p2p/src/test_utils/headers.rs | 10 ++---- crates/primitives-traits/src/block/sealed.rs | 17 ++++++++++ 8 files changed, 78 insertions(+), 66 deletions(-) diff --git a/crates/net/p2p/src/bodies/downloader.rs b/crates/net/p2p/src/bodies/downloader.rs index ce7827c8e8851..f11473daa96c0 100644 --- a/crates/net/p2p/src/bodies/downloader.rs +++ b/crates/net/p2p/src/bodies/downloader.rs @@ -2,10 +2,11 @@ use super::response::BlockResponse; use crate::error::DownloadResult; use alloy_primitives::BlockNumber; use futures::Stream; -use std::{fmt::Debug, ops::RangeInclusive}; +use reth_primitives_traits::Block; +use std::ops::RangeInclusive; /// Body downloader return type. -pub type BodyDownloaderResult = DownloadResult>>; +pub type BodyDownloaderResult = DownloadResult>>; /// A downloader capable of fetching and yielding block bodies from block headers. /// @@ -13,13 +14,10 @@ pub type BodyDownloaderResult = DownloadResult>>; /// while a [`BodiesClient`][crate::bodies::client::BodiesClient] represents a client capable of /// fulfilling these requests. pub trait BodyDownloader: - Send + Sync + Stream> + Unpin + Send + Sync + Stream> + Unpin { - /// The type of header that is being used - type Header: Debug + Send + Sync + Unpin + 'static; - - /// The type of the body that is being downloaded. - type Body: Debug + Send + Sync + Unpin + 'static; + /// The Block type this downloader supports + type Block: Block + 'static; /// Method for setting the download range. fn set_download_range(&mut self, range: RangeInclusive) -> DownloadResult<()>; diff --git a/crates/net/p2p/src/bodies/response.rs b/crates/net/p2p/src/bodies/response.rs index 517c5b879835a..d53ca32eb3391 100644 --- a/crates/net/p2p/src/bodies/response.rs +++ b/crates/net/p2p/src/bodies/response.rs @@ -1,31 +1,26 @@ use alloy_consensus::BlockHeader; use alloy_primitives::{BlockNumber, U256}; -use reth_primitives::{BlockBody, SealedBlock, SealedHeader}; -use reth_primitives_traits::InMemorySize; +use reth_primitives::{SealedBlock, SealedHeader}; +use reth_primitives_traits::{Block, InMemorySize}; /// The block response #[derive(PartialEq, Eq, Debug, Clone)] -pub enum BlockResponse { +pub enum BlockResponse { /// Full block response (with transactions or ommers) - Full(SealedBlock), + Full(SealedBlock), /// The empty block response - Empty(SealedHeader), + Empty(SealedHeader), } -impl BlockResponse +impl BlockResponse where - H: BlockHeader, + B: Block, { - /// Return the reference to the response header - pub const fn header(&self) -> &SealedHeader { - match self { - Self::Full(block) => block.sealed_header(), - Self::Empty(header) => header, - } - } - /// Return the block number pub fn block_number(&self) -> BlockNumber { - self.header().number() + match self { + Self::Full(block) => block.number(), + Self::Empty(header) => header.number(), + } } /// Return the reference to the response header @@ -37,7 +32,7 @@ where } /// Return the reference to the response body - pub fn into_body(self) -> Option { + pub fn into_body(self) -> Option { match self { Self::Full(block) => Some(block.into_body()), Self::Empty(_) => None, @@ -45,7 +40,7 @@ where } } -impl InMemorySize for BlockResponse { +impl InMemorySize for BlockResponse { #[inline] fn size(&self) -> usize { match self { diff --git a/crates/net/p2p/src/full_block.rs b/crates/net/p2p/src/full_block.rs index 309252bb8f26a..368d9c4e6d4ad 100644 --- a/crates/net/p2p/src/full_block.rs +++ b/crates/net/p2p/src/full_block.rs @@ -30,7 +30,7 @@ where Client: BlockClient, { client: Client, - consensus: Arc>, + consensus: Arc>, } impl FullBlockClient @@ -40,7 +40,7 @@ where /// Creates a new instance of `FullBlockClient`. pub fn new( client: Client, - consensus: Arc>, + consensus: Arc>, ) -> Self { Self { client, consensus } } @@ -118,7 +118,7 @@ where Client: BlockClient, { client: Client, - consensus: Arc>, + consensus: Arc>, hash: B256, request: FullBlockRequest, header: Option>, @@ -140,7 +140,7 @@ where } /// Returns the [`SealedBlock`] if the request is complete and valid. - fn take_block(&mut self) -> Option> { + fn take_block(&mut self) -> Option> { if self.header.is_none() || self.body.is_none() { return None } @@ -148,7 +148,7 @@ where let header = self.header.take().unwrap(); let resp = self.body.take().unwrap(); match resp { - BodyResponse::Validated(body) => Some(SealedBlock::new(header, body)), + BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)), BodyResponse::PendingValidation(resp) => { // ensure the block is valid, else retry if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header) @@ -159,7 +159,7 @@ where self.request.body = Some(self.client.get_block_body(self.hash)); return None } - Some(SealedBlock::new(header, resp.into_data())) + Some(SealedBlock::from_sealed_parts(header, resp.into_data())) } } } @@ -182,7 +182,7 @@ impl Future for FetchFullBlockFuture where Client: BlockClient + 'static, { - type Output = SealedBlock; + type Output = SealedBlock; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -330,7 +330,7 @@ where /// The client used to fetch headers and bodies. client: Client, /// The consensus instance used to validate the blocks. - consensus: Arc>, + consensus: Arc>, /// The block hash to start fetching from (inclusive). start_hash: B256, /// How many blocks to fetch: `len([start_hash, ..]) == count` @@ -388,7 +388,7 @@ where /// /// These are returned in falling order starting with the requested `hash`, i.e. with /// descending block numbers. - fn take_blocks(&mut self) -> Option>> { + fn take_blocks(&mut self) -> Option>> { if !self.is_bodies_complete() { // not done with bodies yet return None @@ -421,7 +421,8 @@ where } }; - valid_responses.push(SealedBlock::new(header.clone(), body)); + valid_responses + .push(SealedBlock::::from_sealed_parts(header.clone(), body)); } } @@ -429,7 +430,7 @@ where // put response hashes back into bodies map since we aren't returning them as a // response for block in valid_responses { - let (header, body) = block.split(); + let (header, body) = block.split_sealed_header_body(); self.bodies.insert(header, BodyResponse::Validated(body)); } @@ -505,7 +506,7 @@ impl Future for FetchFullBlockRangeFuture where Client: BlockClient + 'static, { - type Output = Vec>; + type Output = Vec>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -658,7 +659,7 @@ mod tests { let client = FullBlockClient::test_client(client); let received = client.get_full_block(header.hash()).await; - assert_eq!(received, SealedBlock::new(header, body)); + assert_eq!(received, SealedBlock::from_sealed_parts(header, body)); } #[tokio::test] @@ -671,7 +672,7 @@ mod tests { let received = client.get_full_block_range(header.hash(), 1).await; let received = received.first().expect("response should include a block"); - assert_eq!(*received, SealedBlock::new(header, body)); + assert_eq!(*received, SealedBlock::from_sealed_parts(header, body)); } /// Inserts headers and returns the last header and block body. @@ -703,7 +704,7 @@ mod tests { let received = client.get_full_block_range(header.hash(), 1).await; let received = received.first().expect("response should include a block"); - assert_eq!(*received, SealedBlock::new(header.clone(), body)); + assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body)); let received = client.get_full_block_range(header.hash(), 10).await; assert_eq!(received.len(), 10); @@ -722,7 +723,7 @@ mod tests { let received = client.get_full_block_range(header.hash(), 1).await; let received = received.first().expect("response should include a block"); - assert_eq!(*received, SealedBlock::new(header.clone(), body)); + assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body)); let received = client.get_full_block_range(header.hash(), 50).await; assert_eq!(received.len(), 50); diff --git a/crates/net/p2p/src/headers/client.rs b/crates/net/p2p/src/headers/client.rs index 606d8f389a84e..b942661c2d2d3 100644 --- a/crates/net/p2p/src/headers/client.rs +++ b/crates/net/p2p/src/headers/client.rs @@ -55,7 +55,7 @@ pub type HeadersFut = Pin>> + Send + Sync>>; /// The block headers downloader client -#[auto_impl::auto_impl(&, Arc, Box)] +// #[auto_impl::auto_impl(&, Arc, Box)] pub trait HeadersClient: DownloadClient { /// The header type this client fetches. type Header: BlockHeader; diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 7dcb77671d469..bef537bdcf3d4 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -47,19 +47,20 @@ pub mod test_utils; pub use bodies::client::BodiesClient; pub use headers::client::HeadersClient; +use reth_primitives_traits::Block; -/// Helper trait that unifies network behaviour needed for fetching blocks. -pub trait BlockClient: HeadersClient + BodiesClient + Unpin + Clone {} - -impl BlockClient for T where T: HeadersClient + BodiesClient + Unpin + Clone {} - -/// The [`BlockClient`] providing Ethereum block parts. -pub trait EthBlockClient: - BlockClient
+/// Helper trait that unifies network behaviour needed for fetching entire blocks. +pub trait BlockClient: + HeadersClient
::Header> + + BodiesClient::Body> + + Unpin + + Clone { + /// The Block type that this client fetches. + type Block: Block; } -impl EthBlockClient for T where - T: BlockClient
-{ -} +/// The [`BlockClient`] providing Ethereum block parts. +pub trait EthBlockClient: BlockClient {} + +impl EthBlockClient for T where T: BlockClient {} diff --git a/crates/net/p2p/src/test_utils/full_block.rs b/crates/net/p2p/src/test_utils/full_block.rs index ee65bcb3f0720..bb000aa7c9533 100644 --- a/crates/net/p2p/src/test_utils/full_block.rs +++ b/crates/net/p2p/src/test_utils/full_block.rs @@ -4,6 +4,7 @@ use crate::{ error::PeerRequestResult, headers::client::{HeadersClient, HeadersRequest}, priority::Priority, + BlockClient, }; use alloy_consensus::Header; use alloy_eips::{BlockHashOrNumber, BlockNumHash}; @@ -133,9 +134,10 @@ impl TestFullBlockClient { pub fn highest_block(&self) -> Option { self.headers.lock().iter().max_by_key(|(_, header)| header.number).and_then( |(hash, header)| { - self.bodies.lock().get(hash).map(|body| { - SealedBlock::new(SealedHeader::new(header.clone(), *hash), body.clone()) - }) + self.bodies + .lock() + .get(hash) + .map(|body| SealedBlock::from_parts(header.clone(), body.clone(), *hash)) }, ) } @@ -243,3 +245,7 @@ impl BodiesClient for TestFullBlockClient { ))) } } + +impl BlockClient for TestFullBlockClient { + type Block = reth_primitives::Block; +} diff --git a/crates/net/p2p/src/test_utils/headers.rs b/crates/net/p2p/src/test_utils/headers.rs index 6e20b335a1078..3975cfb147f2a 100644 --- a/crates/net/p2p/src/test_utils/headers.rs +++ b/crates/net/p2p/src/test_utils/headers.rs @@ -12,7 +12,7 @@ use crate::{ }; use alloy_consensus::Header; use futures::{Future, FutureExt, Stream, StreamExt}; -use reth_consensus::{test_utils::TestConsensus, Consensus, ConsensusError}; +use reth_consensus::{test_utils::TestConsensus, HeaderValidator}; use reth_eth_wire_types::HeadersDirection; use reth_network_peers::{PeerId, WithPeerId}; use reth_primitives::SealedHeader; @@ -146,13 +146,7 @@ impl Stream for TestDownload { } let empty: SealedHeader = SealedHeader::default(); - if let Err(error) = - >::validate_header_against_parent( - &this.consensus, - &empty, - &empty, - ) - { + if let Err(error) = this.consensus.validate_header_against_parent(&empty, &empty) { this.done = true; return Poll::Ready(Some(Err(DownloadError::HeaderValidation { hash: empty.hash(), diff --git a/crates/primitives-traits/src/block/sealed.rs b/crates/primitives-traits/src/block/sealed.rs index 24ea8c4ee35c4..11e7fb53a37c0 100644 --- a/crates/primitives-traits/src/block/sealed.rs +++ b/crates/primitives-traits/src/block/sealed.rs @@ -48,6 +48,17 @@ impl SealedBlock where B: Block, { + /// Creates the [`SealedBlock`] from the block's parts. + pub fn from_parts(header: B::Header, body: B::Body, hash: BlockHash) -> Self { + Self::new(B::new(header, body), hash) + } + + /// Creates the [`SealedBlock`] from the [`SealedHeader`] and the body. + pub fn from_sealed_parts(header: SealedHeader, body: B::Body) -> Self { + let (header, hash) = header.split(); + Self::from_parts(header, body, hash) + } + /// Converts this block into a [`RecoveredBlock`] with the given senders if the number of /// senders is equal to the number of transactions in the block and recovers the senders from /// the transactions, if @@ -142,6 +153,12 @@ where self.block.split() } + /// Splits the block into body and header into separate components. + pub fn split_sealed_header_body(self) -> (SealedHeader, B::Body) { + let (header, block) = self.block.split(); + (SealedHeader::new(header, self.hash), block) + } + /// Returns an iterator over all blob versioned hashes from the block body. #[inline] pub fn blob_versioned_hashes_iter(&self) -> impl Iterator + '_ {