From 2011b402b8ac8c2e239c7ac477d60b66bfc690a2 Mon Sep 17 00:00:00 2001 From: Stefan Date: Tue, 10 Sep 2024 16:27:33 +0200 Subject: [PATCH] Mempool syncer: minor improvements - Only try to construct mempool transaction requests if new hashes are discovered - Check the mempool state first before checking the blockchain if a transaction is already known/included - Bound the amount of received hashes that will be processed per peer - Fix tests --- mempool/mempool-task/src/lib.rs | 11 ++++---- mempool/src/sync/mod.rs | 48 ++++++++++++++++++++------------- mempool/src/sync/tests/mod.rs | 4 +-- mempool/tests/mod.rs | 33 +++++++++++++++++------ 4 files changed, 62 insertions(+), 34 deletions(-) diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index 273090d150..34fbe6193f 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -56,12 +56,7 @@ impl MempoolTask { ) -> Self { let consensus_event_rx = consensus.subscribe_events(); let network_event_rx = consensus.network.subscribe_events(); - let blockchain_event_rx = blockchain.read().notifier_as_stream(); - - let proxy = consensus.proxy(); - let sync_event_rx = proxy.subscribe_sync_events(); - let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); let mempool = Arc::new(Mempool::new( Arc::clone(&blockchain), mempool_config, @@ -69,6 +64,12 @@ impl MempoolTask { )); let mempool_active = false; + let blockchain_event_rx = blockchain.read().notifier_as_stream(); + + let proxy = consensus.proxy(); + let sync_event_rx = proxy.subscribe_sync_events(); + let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); + Self { consensus: proxy, peers_in_live_sync, diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 95ed53a125..fae20f959f 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -76,7 +76,7 @@ impl HashRequestStatus { } const MAX_HASHES_PER_REQUEST: usize = 500; -const MAX_TOTAL_TRANSACTIONS: usize = 5000; +const MAX_TOTAL_HASHES: usize = 25_000; const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool @@ -85,7 +85,7 @@ pub(crate) struct MempoolSyncer { shutdown_timer: Pin>, /// Consensus sync event receiver - consensus_sync_event_rx: BroadcastStream::PeerId>>, + consensus_sync_event_rx: BroadcastStream::PeerId>>, /// Blockchain reference blockchain: Arc>, @@ -161,30 +161,34 @@ impl MempoolSyncer { } /// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes - fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) { + fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) -> bool { let blockchain = self.blockchain.read(); let state = self.mempool_state.read(); debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes"); + let mut new_hashes_discovered = false; - hashes.into_iter().for_each(|hash| { + hashes.into_iter().take(MAX_TOTAL_HASHES).for_each(|hash| { // Perform some basic checks to reduce the amount of transactions we are going to request later - // TODO: what if I respond with MAX_TOTAL_TRANSACTIONS fake hashes - if self.unknown_hashes.len() < MAX_TOTAL_TRANSACTIONS - && !blockchain + if state.contains(&hash) + || blockchain .contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None) - && !state.contains(&hash) { - match self.unknown_hashes.entry(hash) { - Occupied(mut entry) => { - entry.get_mut().add_peer(peer_id); - } - Vacant(entry) => { - entry.insert(HashRequestStatus::new(vec![peer_id])); - } - }; + return; } - }) + + match self.unknown_hashes.entry(hash) { + Occupied(mut entry) => { + entry.get_mut().add_peer(peer_id); + } + Vacant(entry) => { + entry.insert(HashRequestStatus::new(vec![peer_id])); + new_hashes_discovered = true; + } + }; + }); + + new_hashes_discovered } /// Add peer to discover its mempool @@ -193,6 +197,7 @@ impl MempoolSyncer { return; } + debug!(%peer_id, "Peer added to mempool sync"); self.peers.push(peer_id); let network = Arc::clone(&self.network); let transaction_type = self.mempool_transaction_type.clone(); @@ -346,10 +351,13 @@ impl Stream for MempoolSyncer { } // Then we check our RequestMempoolHashes responses + let mut new_hashes_discovered = false; while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { match result { Ok(hashes) => { - self.push_unknown_hashes(hashes.hashes, peer_id); + if self.push_unknown_hashes(hashes.hashes, peer_id) { + new_hashes_discovered = true; + } } Err(err) => { error!(%err, %peer_id, "Failed to fetch mempool hashes"); @@ -358,7 +366,9 @@ impl Stream for MempoolSyncer { } // Then we construct our RequestMempoolTransactions requests and send them over the network to our peers - self.send_mempool_transactions_requests(); + if new_hashes_discovered { + self.send_mempool_transactions_requests(); + } // Then we check our RequestMempoolTransactions responses while let Poll::Ready(Some((peer_id, result))) = diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs index 85d2a68419..4cbf685bee 100644 --- a/mempool/src/sync/tests/mod.rs +++ b/mempool/src/sync/tests/mod.rs @@ -163,8 +163,8 @@ mod tests { // Load known hashes into the mempool state let mut handle = state.write(); - known_txns.iter().for_each(|txn| { - handle.regular_transactions.insert(&txn, TxPriority::Medium); + known_txns.into_iter().for_each(|txn| { + handle.regular_transactions.insert(txn, TxPriority::Medium); }); assert_eq!(handle.regular_transactions.len(), known_hashes.len()); drop(handle); diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 639a259df5..bd824669dd 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -15,6 +15,7 @@ use nimiq_mempool::{ config::MempoolConfig, executor::PubsubIdOrPeerId, mempool::Mempool, mempool_transactions::TxPriority, }; +use nimiq_network_interface::network::PubsubId; use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId}; use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy}; use nimiq_serde::{Deserialize, Serialize}; @@ -871,10 +872,14 @@ async fn multiple_start_stop() { let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); + let mock_id = MockId::new(hub.new_address().into()); // Subscribe mempool with the mpsc stream created mempool @@ -891,7 +896,10 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx1 - .send((txn.clone(), mock_id1.clone())) + .send(( + txn.clone(), + PubsubIdOrPeerId::PeerId(mock_id1.propagation_source().clone()), + )) .await .unwrap(); } @@ -915,7 +923,10 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send(( + txn.clone(), + PubsubIdOrPeerId::PeerId(mock_id.propagation_source()), + )) .await .expect_err("Send should fail, executor is stopped"); } @@ -940,10 +951,14 @@ async fn multiple_start_stop() { let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + mock_network.clone(), + ); + let mock_id = MockId::new(hub.new_address().into()); // Subscribe mempool with the mpsc stream created mempool @@ -958,7 +973,10 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send(( + txn.clone(), + PubsubIdOrPeerId::PeerId(mock_id.propagation_source()), + )) .await .unwrap(); } @@ -1947,7 +1965,6 @@ async fn applies_total_tx_size_limits() { ..Default::default() }; let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); let mempool = Mempool::new(blockchain, mempool_config, Arc::clone(&mock_network));