diff --git a/Cargo.lock b/Cargo.lock index a738154288..c518fddb4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4139,6 +4139,7 @@ dependencies = [ "nimiq-blockchain", "nimiq-blockchain-interface", "nimiq-bls", + "nimiq-consensus", "nimiq-database", "nimiq-genesis", "nimiq-genesis-builder", diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index 8b62010d8b..90c92f635e 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -36,6 +36,7 @@ nimiq-account = { workspace = true } nimiq-block = { workspace = true } nimiq-blockchain = { workspace = true } nimiq-blockchain-interface = { workspace = true } +nimiq-consensus = { workspace = true } nimiq-database = { workspace = true } nimiq-hash = { workspace = true } nimiq-keys = { workspace = true } diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index 7c205eec6a..7710b616a9 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -9,7 +9,7 @@ use futures::{stream::BoxStream, Future, Stream, StreamExt}; use log::{debug, trace, warn}; use nimiq_blockchain::Blockchain; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; -use nimiq_consensus::{sync::syncer::SyncerEvent, Consensus, ConsensusEvent, ConsensusProxy}; +use nimiq_consensus::{sync::syncer::SyncEvent, Consensus, ConsensusEvent, ConsensusProxy}; use nimiq_mempool::{config::MempoolConfig, mempool::Mempool}; use nimiq_network_interface::network::{Network, NetworkEvent, SubscribeEvents}; use nimiq_utils::spawn; @@ -37,7 +37,7 @@ pub struct MempoolTask { consensus_event_rx: BroadcastStream, blockchain_event_rx: BoxStream<'static, BlockchainEvent>, network_event_rx: SubscribeEvents, - syncer_event_rx: BroadcastStream>, + sync_event_rx: BroadcastStream::PeerId>>, peers_in_live_sync: HashSet, @@ -57,7 +57,6 @@ impl MempoolTask { ) -> Self { let consensus_event_rx = consensus.subscribe_events(); let network_event_rx = consensus.network.subscribe_events(); - let syncer_event_rx = consensus.sync.subscribe_events(); let blockchain_rg = blockchain.read(); let blockchain_event_rx = blockchain_rg.notifier_as_stream(); @@ -71,16 +70,18 @@ impl MempoolTask { Arc::clone(&consensus.network), )); let mempool_active = false; + let proxy = consensus.proxy(); + let sync_event_rx = proxy.subscribe_sync_events(); Self { - consensus: consensus.proxy(), + consensus: proxy, blockchain, peers_in_live_sync, consensus_event_rx, blockchain_event_rx, network_event_rx, - syncer_event_rx, + sync_event_rx, mempool: Arc::clone(&mempool), mempool_active, @@ -115,17 +116,21 @@ impl MempoolTask { let peers = self.peers_in_live_sync.clone().into_iter().collect(); #[cfg(not(feature = "metrics"))] spawn({ + let consensus = self.consensus.clone(); async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. mempool.cleanup(); - mempool.start_executors(network, None, None, peers).await; + mempool + .start_executors(network, None, None, peers, consensus) + .await; } }); #[cfg(feature = "metrics")] spawn({ let mempool_monitor = self.mempool_monitor.clone(); let ctrl_mempool_monitor = self.control_mempool_monitor.clone(); + let consensus = self.consensus.clone(); async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. @@ -137,6 +142,7 @@ impl MempoolTask { Some(mempool_monitor), Some(ctrl_mempool_monitor), peers, + consensus, ) .await; } @@ -194,7 +200,7 @@ impl MempoolTask { } } - fn on_syncer_event(&mut self, event: SyncEvent) { + fn on_consensus_sync_event(&mut self, event: SyncEvent) { match event { SyncEvent::AddLiveSync(peer_id) => { self.peers_in_live_sync.insert(peer_id); @@ -216,9 +222,9 @@ impl Stream for MempoolTask { type Item = MempoolEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Process syncer updates. - while let Poll::Ready(Some(Ok(event))) = self.syncer_event_rx.poll_next_unpin(cx) { - self.on_syncer_event(event) + // Process consensus sync updates. + while let Poll::Ready(Some(Ok(event))) = self.sync_event_rx.poll_next_unpin(cx) { + self.on_consensus_sync_event(event) } // Process network updates. diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index bd20823401..7975fb8036 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -12,6 +12,7 @@ use nimiq_account::ReservedBalance; use nimiq_block::Block; use nimiq_blockchain::{Blockchain, TransactionVerificationCache}; use nimiq_blockchain_interface::AbstractBlockchain; +use nimiq_consensus::ConsensusProxy; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::Address; use nimiq_network_interface::{ @@ -146,6 +147,7 @@ impl Mempool { monitor: Option, control_monitor: Option, mut peers: Vec, + consensus: ConsensusProxy, ) { let executor_handle = self.executor_handle.lock().await; let control_executor_handle = self.control_executor_handle.lock().await; @@ -161,8 +163,8 @@ impl Mempool { let regular_transactions_syncer = MempoolSyncer::new( peers.clone(), MempoolTransactionType::Regular, - Arc::clone(&network), Arc::clone(&self.blockchain), + consensus.clone(), Arc::clone(&self.state), ); @@ -185,8 +187,8 @@ impl Mempool { let control_transactions_syncer = MempoolSyncer::new( peers, MempoolTransactionType::Control, - Arc::clone(&network), Arc::clone(&self.blockchain), + consensus.clone(), Arc::clone(&self.state), ); diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 1d26def05b..95ed53a125 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -18,6 +18,7 @@ use messages::{ ResponseMempoolHashes, ResponseMempoolTransactions, }; use nimiq_blockchain::Blockchain; +use nimiq_consensus::{sync::syncer::SyncEvent, ConsensusProxy}; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_network_interface::{ network::Network, @@ -28,6 +29,7 @@ use nimiq_transaction::{historic_transaction::RawTransactionHash, Transaction}; use nimiq_utils::{spawn, stream::FuturesUnordered}; use parking_lot::RwLock; use tokio::time::Sleep; +use tokio_stream::wrappers::BroadcastStream; use crate::{executor::PubsubIdOrPeerId, mempool_state::MempoolState}; @@ -77,11 +79,14 @@ const MAX_HASHES_PER_REQUEST: usize = 500; const MAX_TOTAL_TRANSACTIONS: usize = 5000; const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes -/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have mempool +/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool pub(crate) struct MempoolSyncer { /// Timeout to gracefully shutdown the mempool syncer entirely shutdown_timer: Pin>, + /// Consensus sync event receiver + consensus_sync_event_rx: BroadcastStream::PeerId>>, + /// Blockchain reference blockchain: Arc>, @@ -118,15 +123,15 @@ impl MempoolSyncer { pub fn new( peers: Vec, transaction_type: MempoolTransactionType, - network: Arc, blockchain: Arc>, + consensus: ConsensusProxy, mempool_state: Arc>, ) -> Self { let hashes_requests = peers .iter() .map(|peer_id| { let peer_id = *peer_id; - let network = Arc::clone(&network); + let network = Arc::clone(&consensus.network); let transaction_type = transaction_type.to_owned(); async move { ( @@ -142,9 +147,10 @@ impl MempoolSyncer { Self { shutdown_timer: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), + consensus_sync_event_rx: consensus.subscribe_sync_events(), blockchain, hashes_requests, - network: Arc::clone(&network), + network: consensus.network, peers, mempool_state: Arc::clone(&mempool_state), unknown_hashes: HashMap::new(), @@ -332,6 +338,13 @@ impl Stream for MempoolSyncer { return Poll::Ready(None); } + // Then we check if peers got added to live sync in the mean time + while let Poll::Ready(Some(Ok(event))) = self.consensus_sync_event_rx.poll_next_unpin(cx) { + match event { + SyncEvent::AddLiveSync(peer_id) => self.add_peer(peer_id), + } + } + // Then we check our RequestMempoolHashes responses while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { match result { @@ -375,7 +388,7 @@ impl Stream for MempoolSyncer { continue; } - info!(num = %transactions.len(), "Pushed synced mempool transactions into the mempool"); + info!(num = %transactions.len(), "Synced mempool transactions"); self.transactions.extend(transactions); } Err(err) => { diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs index d3727c9144..85d2a68419 100644 --- a/mempool/src/sync/tests/mod.rs +++ b/mempool/src/sync/tests/mod.rs @@ -12,6 +12,7 @@ mod tests { use nimiq_network_mock::MockHub; use nimiq_test_log::test; use nimiq_test_utils::{ + consensus::consensus, test_rng::test_rng, test_transaction::{generate_transactions, TestAccount, TestTransaction}, }; @@ -70,6 +71,10 @@ mod tests { net1.dial_mock(&net2); let peer_ids = net1.get_peers(); + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + // Setup streams to respond to requests let hash_stream = net2.receive_requests::(); let txns_stream = net2.receive_requests::(); @@ -111,8 +116,8 @@ mod tests { let mut syncer = MempoolSyncer::new( peer_ids, MempoolTransactionType::Regular, - net1, Arc::clone(&blockchain), + proxy, Arc::clone(&state), ); @@ -186,6 +191,10 @@ mod tests { net1.dial_mock(&net2); let peer_ids = net1.get_peers(); + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + // Setup stream to respond to requests let hash_stream = net2.receive_requests::(); @@ -212,8 +221,8 @@ mod tests { let mut syncer = MempoolSyncer::new( peer_ids, MempoolTransactionType::Regular, - net1, Arc::clone(&blockchain), + proxy, Arc::clone(&state), ); @@ -260,12 +269,16 @@ mod tests { let mut hub = MockHub::default(); let net1 = Arc::new(hub.new_network()); + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + // Create a new mempool syncer with 0 peers to sync with let mut syncer = MempoolSyncer::new( vec![], MempoolTransactionType::Regular, - Arc::clone(&net1), Arc::clone(&blockchain), + proxy, Arc::clone(&state), ); @@ -306,7 +319,7 @@ mod tests { let _ = poll!(syncer.next()); sleep(Duration::from_millis(200)).await; - // we have received unknown hashes from the added peer + // We have received unknown hashes from the added peer let _ = poll!(syncer.next()); assert_eq!(syncer.unknown_hashes.len(), num_transactions); }