Skip to content

Commit

Permalink
Replace sort with hash map in duplicate detection
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Jan 29, 2024
1 parent 5427002 commit 1211546
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 114 deletions.
33 changes: 32 additions & 1 deletion crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy_rlp::{
use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128};

use std::sync::Arc;
use std::{collections::HashMap, mem, sync::Arc};

#[cfg(feature = "arbitrary")]
use proptest::prelude::*;
Expand Down Expand Up @@ -243,6 +243,22 @@ impl NewPooledTransactionHashes {
NewPooledTransactionHashes::Eth68(_) => None,
}
}

/// Returns the inner type if this an eth68 announcement.
pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(mem::take(msg)),
}
}

/// Returns the inner type if this an eth66 announcement.
pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
match self {
NewPooledTransactionHashes::Eth66(msg) => Some(mem::take(msg)),
NewPooledTransactionHashes::Eth68(_) => None,
}
}
}

impl From<NewPooledTransactionHashes> for EthMessage {
Expand Down Expand Up @@ -288,6 +304,21 @@ impl HandleAnnouncement for NewPooledTransactionHashes {
}
}

/// Announcement data that has been validated according to the configured network. For an eth68
/// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66
/// announcement, values of the map are `None`.
pub type ValidAnnouncementData = HashMap<TxHash, Option<(u8, usize)>>;

impl HandleAnnouncement for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.is_empty()
}

fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) {
self.retain(|&hash, _| f(hash))
}
}

/// This informs peers of transaction hashes for transactions that have appeared on the network,
/// but have not been included in a block.
#[derive_arbitrary(rlp)]
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ reth-provider.workspace = true
reth-rpc-types.workspace = true
reth-tokio-util.workspace = true

# ethereum
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
alloy-rlp.workspace = true

# async/futures
Expand Down Expand Up @@ -66,8 +68,6 @@ secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recov
derive_more.workspace = true
schnellru.workspace = true
itertools.workspace = true

enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
tempfile = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
33 changes: 22 additions & 11 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,25 +578,36 @@ where
}

// filter out invalid entries
if let Some(eth68_msg) = msg.as_eth68_mut() {
// validate eth68 message
if let FilterOutcome::ReportPeer =
self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_68(eth68_msg)
{
let mut valid_announcement_data = HashMap::new();
if let Some(eth68_msg) = msg.take_eth68() {
// validate eth68 announcement data
let (outcome, valid_data) =
self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_68(eth68_msg);

if let FilterOutcome::ReportPeer = outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
} else if let Some(eth66_msg) = msg.as_eth66_mut() {
if let FilterOutcome::ReportPeer =
self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_66(eth66_msg)
{

valid_announcement_data = valid_data
} else if let Some(eth66_msg) = msg.take_eth66() {
// validate eth66 announcement data
let (outcome, valid_data) =
self.transaction_fetcher.filter_valid_hashes.filter_valid_entries_66(eth66_msg);

if let FilterOutcome::ReportPeer = outcome {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}

valid_announcement_data = valid_data
}

// filter out already seen unknown hashes. this loads the hashes into the tx fetcher,
// hence they should be valid at this point. for any seen hashes add the peer as fallback.
self.transaction_fetcher
.filter_unseen_hashes(&mut msg, peer_id, |peer_id| self.peers.contains_key(&peer_id));
self.transaction_fetcher.filter_unseen_hashes(
&mut valid_announcement_data,
peer_id,
|peer_id| self.peers.contains_key(&peer_id),
);

if msg.is_empty() {
// nothing to request
Expand Down
Loading

0 comments on commit 1211546

Please sign in to comment.