From a15b2d0c031f5ffe6044e0fe3d66371d77059c6b Mon Sep 17 00:00:00 2001 From: Stefan Date: Wed, 14 Aug 2024 13:34:43 +0200 Subject: [PATCH 01/14] Mempool syncer: implementation. The syncers get initiated once the Mempool Executors are spawned. One Syncer per Executor is started in order to sync the regular and control transactions separately. First the Syncer starts to discover which transaction hashes other nodes with a mempool have and compare those with what we have locally. Then we distribute those unknown hashes among the peers we know have those transactions and retrieve the actual transactions. Lastly for every received transaction we do a full verification which should add it to our local mempool if everything checks out. --- mempool/Cargo.toml | 2 +- mempool/src/executor.rs | 61 ++- mempool/src/lib.rs | 2 + mempool/src/mempool.rs | 46 +- mempool/src/sync/messages.rs | 97 +++++ mempool/src/sync/mod.rs | 666 +++++++++++++++++++++++++++++ mempool/tests/mod.rs | 4 +- network-interface/src/peer_info.rs | 6 +- 8 files changed, 856 insertions(+), 28 deletions(-) create mode 100644 mempool/src/sync/messages.rs create mode 100644 mempool/src/sync/mod.rs diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index f40599d891..aaf2d46923 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -42,6 +42,7 @@ nimiq-keys = { workspace = true } nimiq-network-interface = { workspace = true } nimiq-primitives = { workspace = true, features = ["coin", "networks"] } nimiq-serde = { workspace = true } +nimiq-time = { workspace = true } nimiq-transaction = { workspace = true } nimiq-utils = { workspace = true, features = ["spawn", "time"] } @@ -58,7 +59,6 @@ nimiq-genesis-builder = { workspace = true } nimiq-network-mock = { workspace = true } nimiq-test-log = { workspace = true } nimiq-test-utils = { workspace = true } -nimiq-time = { workspace = true } nimiq-transaction-builder = { workspace = true } nimiq-vrf = { workspace = true } diff --git a/mempool/src/executor.rs b/mempool/src/executor.rs index 3b8f775a7e..5b64da1b8e 100644 --- a/mempool/src/executor.rs +++ b/mempool/src/executor.rs @@ -11,7 +11,7 @@ use std::{ use futures::{ready, stream::BoxStream, StreamExt}; use nimiq_blockchain::Blockchain; -use nimiq_network_interface::network::{MsgAcceptance, Network, Topic}; +use nimiq_network_interface::network::{CloseReason, MsgAcceptance, Network, Topic}; use nimiq_primitives::networks::NetworkId; use nimiq_transaction::Transaction; use nimiq_utils::spawn; @@ -26,6 +26,14 @@ use crate::{ const CONCURRENT_VERIF_TASKS: u32 = 10000; +/// Enum describing whether a network response directly comes from a peer or through Gossipsub +pub enum PubsubIdOrPeerId { + /// A network response coming directly from another peer + PeerId(N::PeerId), + /// A network response coming over Gossipsub + PubsubId(N::PubsubId), +} + pub(crate) struct MempoolExecutor { // Blockchain reference blockchain: Arc>, @@ -45,8 +53,8 @@ pub(crate) struct MempoolExecutor { // Network ID, used for tx verification network_id: NetworkId, - // Transaction stream that is used to listen to transactions from the network - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + // Transaction stream that is used to listen to transactions from the network and the Mempool Syncer + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, // Phantom data for the unused type T _phantom: PhantomData, @@ -58,7 +66,7 @@ impl MempoolExecutor { state: Arc>, filter: Arc>, network: Arc, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, verification_tasks: Arc, ) -> Self { Self { @@ -85,7 +93,9 @@ impl Future for MempoolExecutor { } } - while let Some((tx, pubsub_id)) = ready!(self.txn_stream.as_mut().poll_next_unpin(cx)) { + while let Some((tx, pubsub_or_peer_id)) = + ready!(self.txn_stream.as_mut().poll_next_unpin(cx)) + { let decrement = Decrementer(Arc::clone(&self.verification_tasks)); if self .verification_tasks @@ -113,16 +123,37 @@ impl Future for MempoolExecutor { TxPriority::Medium, ); - let acceptance = match verify_tx_ret { - Ok(_) => MsgAcceptance::Accept, - // Reject the message if signature verification fails or transaction is invalid - // for current validation window - Err(VerifyErr::InvalidTransaction(_)) => MsgAcceptance::Reject, - Err(VerifyErr::AlreadyIncluded) => MsgAcceptance::Reject, - Err(_) => MsgAcceptance::Ignore, - }; - - network.validate_message::(pubsub_id, acceptance); + match pubsub_or_peer_id { + PubsubIdOrPeerId::PeerId(peer_id) => match verify_tx_ret { + Ok(_) => (), + Err(VerifyErr::InvalidAccount(_)) + | Err(VerifyErr::InvalidBlockNumber) + | Err(VerifyErr::InvalidTransaction(_)) => { + if network.has_peer(peer_id) { + warn!( + %peer_id, + "Banning peer because it responded with an invalid mempool transaction while syncing" + ); + network + .disconnect_peer(peer_id, CloseReason::MaliciousPeer) + .await; + } + } + Err(_) => (), + }, + PubsubIdOrPeerId::PubsubId(pubsub_id) => { + let acceptance = match verify_tx_ret { + Ok(_) => MsgAcceptance::Accept, + // Reject the message if signature verification fails or transaction is invalid + // for current validation window + Err(VerifyErr::InvalidTransaction(_)) => MsgAcceptance::Reject, + Err(VerifyErr::AlreadyIncluded) => MsgAcceptance::Reject, + Err(_) => MsgAcceptance::Ignore, + }; + + network.validate_message::(pubsub_id, acceptance); + } + } drop(decrement); }); diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index 0bd27cc77b..c1dddea8c7 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -24,5 +24,7 @@ pub mod mempool; mod mempool_metrics; /// Mempool transaction module pub mod mempool_transactions; +/// Mempool syncer module +mod sync; /// Verify transaction module pub mod verify; diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index 4029e502b5..dc62cb547f 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -6,7 +6,7 @@ use std::{ use futures::{ future::{AbortHandle, Abortable}, lock::{Mutex, MutexGuard}, - stream::{BoxStream, StreamExt}, + stream::{select, BoxStream, StreamExt}, }; use nimiq_account::ReservedBalance; use nimiq_block::Block; @@ -28,10 +28,11 @@ use tokio_metrics::TaskMonitor; use crate::mempool_metrics::MempoolMetrics; use crate::{ config::MempoolConfig, - executor::MempoolExecutor, + executor::{MempoolExecutor, PubsubIdOrPeerId}, filter::{MempoolFilter, MempoolRules}, mempool_state::{EvictionReason, MempoolState}, mempool_transactions::{MempoolTransactions, TxPriority}, + sync::{messages::MempoolTransactionType, MempoolSyncer}, verify::{verify_tx, VerifyErr}, }; @@ -90,7 +91,7 @@ impl Mempool { network: Arc, monitor: Option, mut handle: MutexGuard<'_, Option>, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, ) { if handle.is_some() { // If we already have an executor running, don't do anything @@ -144,14 +145,41 @@ impl Mempool { return; } + // TODO: get correct peers + // TODO: only get peers that are synced with us + // Sync regular transactions with the mempool of other peers + let regular_transactions_syncer = MempoolSyncer::new( + network.get_peers(), + MempoolTransactionType::Regular, + Arc::clone(&network), + Arc::clone(&self.blockchain), + Arc::clone(&self.state), + ); + // Subscribe to the network TX topic - let txn_stream = network.subscribe::().await.unwrap(); + let txn_stream = network + .subscribe::() + .await + .unwrap() + .map(|(tx, pubsub_id)| (tx, PubsubIdOrPeerId::PubsubId(pubsub_id))) + .boxed(); self.start_executor::( Arc::clone(&network), monitor, executor_handle, - txn_stream, + select(regular_transactions_syncer, txn_stream).boxed(), + ); + + // TODO: get correct peers + // TODO: only get peers that are synced with us + // Sync control transactions with the mempool of other peers + let control_transactions_syncer = MempoolSyncer::new( + network.get_peers(), + MempoolTransactionType::Control, + Arc::clone(&network), + Arc::clone(&self.blockchain), + Arc::clone(&self.state), ); // Subscribe to the control transaction topic @@ -159,14 +187,14 @@ impl Mempool { .subscribe::() .await .unwrap() - .map(|(tx, pubsub_id)| (Transaction::from(tx), pubsub_id)) + .map(|(tx, pubsub_id)| (Transaction::from(tx), PubsubIdOrPeerId::PubsubId(pubsub_id))) .boxed(); self.start_executor::( network, control_monitor, control_executor_handle, - txn_stream, + select(control_transactions_syncer, txn_stream).boxed(), ); } @@ -177,7 +205,7 @@ impl Mempool { /// stream instead. pub async fn start_executor_with_txn_stream( &self, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, network: Arc, ) { self.start_executor::( @@ -195,7 +223,7 @@ impl Mempool { /// stream instead. pub async fn start_control_executor_with_txn_stream( &self, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, network: Arc, ) { self.start_executor::( diff --git a/mempool/src/sync/messages.rs b/mempool/src/sync/messages.rs new file mode 100644 index 0000000000..cdaeaa08bb --- /dev/null +++ b/mempool/src/sync/messages.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use nimiq_hash::Blake2bHash; +use nimiq_network_interface::{ + network::Network, + request::{Handle, RequestCommon, RequestMarker}, +}; +use nimiq_serde::{Deserialize, Serialize}; +use nimiq_transaction::Transaction; +use parking_lot::RwLock; + +use crate::mempool_state::MempoolState; + +const MAX_REQUEST_RESPONSE_MEMPOOL_STATE: u32 = 1000; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum MempoolTransactionType { + Control, + Regular, +} + +/// Request the current transaction hashes in the mempool. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RequestMempoolHashes { + pub transaction_type: MempoolTransactionType, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ResponseMempoolHashes { + pub hashes: Vec, +} + +impl RequestCommon for RequestMempoolHashes { + type Kind = RequestMarker; + const TYPE_ID: u16 = 219; + type Response = ResponseMempoolHashes; + const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_MEMPOOL_STATE; +} + +/// Request transactions in the mempool based on the provided hashes. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RequestMempoolTransactions { + pub hashes: Vec, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ResponseMempoolTransactions { + pub transactions: Vec, +} + +impl RequestCommon for RequestMempoolTransactions { + type Kind = RequestMarker; + const TYPE_ID: u16 = 220; + type Response = ResponseMempoolTransactions; + const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_MEMPOOL_STATE; +} + +impl Handle>> for RequestMempoolHashes { + fn handle(&self, _: N::PeerId, context: &Arc>) -> ResponseMempoolHashes { + let hashes: Vec = match self.transaction_type { + MempoolTransactionType::Regular => context + .read() + .regular_transactions + .best_transactions + .iter() + .map(|txn| txn.0.clone()) + .collect(), + MempoolTransactionType::Control => context + .read() + .control_transactions + .best_transactions + .iter() + .map(|txn| txn.0.clone()) + .collect(), + }; + + ResponseMempoolHashes { hashes } + } +} + +impl Handle>> for RequestMempoolTransactions { + fn handle( + &self, + _: N::PeerId, + context: &Arc>, + ) -> ResponseMempoolTransactions { + let mut transactions = Vec::with_capacity(self.hashes.len()); + let state = context.read(); + self.hashes.iter().for_each(|hash| { + if let Some(txn) = state.get(hash) { + transactions.push(txn.to_owned()); + } + }); + + ResponseMempoolTransactions { transactions } + } +} diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs new file mode 100644 index 0000000000..cdbfb848c5 --- /dev/null +++ b/mempool/src/sync/mod.rs @@ -0,0 +1,666 @@ +pub(crate) mod messages; + +use std::{ + collections::{ + hash_map::Entry::{Occupied, Vacant}, + HashMap, VecDeque, + }, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use futures::{ + future::BoxFuture, + stream::{AbortHandle, Abortable}, + FutureExt, Stream, StreamExt, +}; +use messages::{ + MempoolTransactionType, RequestMempoolHashes, RequestMempoolTransactions, + ResponseMempoolHashes, ResponseMempoolTransactions, +}; +use nimiq_blockchain::Blockchain; +use nimiq_hash::{Blake2bHash, Hash}; +use nimiq_network_interface::{ + network::Network, + request::{request_handler, RequestError}, +}; +use nimiq_time::sleep_until; +use nimiq_transaction::{historic_transaction::RawTransactionHash, Transaction}; +use nimiq_utils::{spawn, stream::FuturesUnordered}; +use parking_lot::RwLock; +use tokio::time::Sleep; + +use crate::{executor::PubsubIdOrPeerId, mempool_state::MempoolState}; + +enum RequestStatus { + Pending, + Requested, + Received, +} + +/// Struct for keeping track which peers have a specific hash in their mempool and +/// store if we have a requested the corresponding transaction already to one of those peers. +struct HashRequestStatus { + status: RequestStatus, + peer_ids: Vec, +} + +impl HashRequestStatus { + pub fn new(peer_ids: Vec) -> Self { + Self { + status: RequestStatus::Pending, + peer_ids, + } + } + + pub fn add_peer(&mut self, peer_id: N::PeerId) { + self.peer_ids.push(peer_id); + } + + pub fn has_peer(&self, peer_id: &N::PeerId) -> bool { + self.peer_ids.contains(peer_id) + } + + pub fn is_requested(&self) -> bool { + matches!(self.status, RequestStatus::Requested) + } + + pub fn mark_as_requested(&mut self) { + self.status = RequestStatus::Requested; + } + + pub fn mark_as_received(&mut self) { + self.status = RequestStatus::Received + } +} + +const MAX_HASHES_PER_REQUEST: usize = 500; +const MAX_TOTAL_TRANSACTIONS: usize = 5000; +const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes + +/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have mempool +pub(crate) struct MempoolSyncer { + /// Timeout to gracefully shutdown the mempool syncer entirely + abort_timeout: Pin>, + + /// Blockchain reference + blockchain: Arc>, + + /// Requests to other peers for fetching transaction hashes that currently are in their mempool + hashes_requests: FuturesUnordered< + BoxFuture<'static, (N::PeerId, Result)>, + >, + + /// Reference to the network in order to send requests + network: Arc, + + /// Peers with a mempool we reach out for to discover and retrieve their mempool hashes and transactions + peers: Vec, + + /// The mempool state: the data structure where the transactions are stored locally + mempool_state: Arc>, + + /// Retrieved transactions that are ready to get verified and pushed into our local mempool + transactions: VecDeque<(Transaction, N::PeerId)>, + + /// Requests to other peers for fetching transactions by their hashes + transactions_requests: FuturesUnordered< + BoxFuture<'static, (N::PeerId, Result)>, + >, + + /// Collection of transaction hashes we don't have in our local mempool + unknown_hashes: HashMap>, + + /// Abort handle for the hashes request handler + hashes_request_abort_handle: Option, + + /// Abort handle for the transactions request handler + transactions_request_abort_handle: Option, +} + +impl MempoolSyncer { + pub fn new( + peers: Vec, + transaction_type: MempoolTransactionType, + network: Arc, + blockchain: Arc>, + mempool_state: Arc>, + ) -> Self { + let hashes_requests = peers + .iter() + .map(|peer_id| { + let peer_id = *peer_id; + let network = Arc::clone(&network); + let transaction_type = transaction_type.to_owned(); + async move { + ( + peer_id, + Self::request_mempool_hashes(network, peer_id, transaction_type).await, + ) + } + .boxed() + }) + .collect(); + + debug!(num_peers = %peers.len(), ?transaction_type, "Fetching mempool hashes from peers"); + + let mut syncer = Self { + abort_timeout: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), + blockchain, + hashes_requests, + network: Arc::clone(&network), + peers, + mempool_state: Arc::clone(&mempool_state), + unknown_hashes: HashMap::new(), + transactions: VecDeque::new(), + transactions_requests: FuturesUnordered::new(), + hashes_request_abort_handle: None, + transactions_request_abort_handle: None, + }; + + syncer.init_network_request_receivers(&network, &mempool_state); + + syncer + } + + /// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes + fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) { + let blockchain = self.blockchain.read(); + let state = self.mempool_state.read(); + + debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes"); + + hashes.into_iter().for_each(|hash| { + // Perform some basic checks to reduce the amount of transactions we are going to request later + // TODO: what if I respond with MAX_TOTAL_TRANSACTIONS fake hashes + if self.unknown_hashes.len() < MAX_TOTAL_TRANSACTIONS + && !blockchain + .contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None) + && !state.contains(&hash) + { + match self.unknown_hashes.entry(hash) { + Occupied(mut entry) => { + entry.get_mut().add_peer(peer_id); + } + Vacant(entry) => { + entry.insert(HashRequestStatus::new(vec![peer_id])); + } + }; + } + }) + } + + /// Create a batch of transaction hashes that haven't yet been requested + fn batch_hashes_by_peer_id( + &mut self, + peer_id: N::PeerId, + ) -> Option<(RequestMempoolTransactions, N::PeerId)> { + let hashes: Vec = self + .unknown_hashes + .iter_mut() + .filter(|(_, request_status)| { + matches!(request_status.status, RequestStatus::Pending) + && request_status.has_peer(&peer_id) + }) + .take(MAX_HASHES_PER_REQUEST) + .map(|(hash, request_status)| { + request_status.mark_as_requested(); + hash.to_owned() + }) + .collect(); + + if hashes.is_empty() { + // This peer has no interesting transactions for us + return None; + } + + debug!(peer_id = %peer_id, num = %hashes.len(), "Fetching mempool transactions from peer"); + + Some((RequestMempoolTransactions { hashes }, peer_id)) + } + + /// Spawn request handlers in order to process network responses + fn init_network_request_receivers( + &mut self, + network: &Arc, + mempool_state: &Arc>, + ) { + // Register an abort handle and spawn the request handler for RequestMempoolHashes responses as a task + let stream = request_handler( + network, + network.receive_requests::(), + mempool_state, + ) + .boxed(); + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let future = async move { + let _ = Abortable::new(stream, abort_registration).await; + }; + spawn(future); + self.hashes_request_abort_handle = Some(abort_handle); + + // Register an abort handle and spawn the request handler for RequestMempoolTransactions responses as a task + let stream = request_handler( + network, + network.receive_requests::(), + mempool_state, + ) + .boxed(); + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let future = async move { + let _ = Abortable::new(stream, abort_registration).await; + }; + spawn(future); + self.transactions_request_abort_handle = Some(abort_handle); + } + + /// Abort the spawned request handlers + fn shutdown_request_handlers(&mut self) { + if let Some(abort_handle) = self.hashes_request_abort_handle.take() { + abort_handle.abort(); + } + + if let Some(abort_handle) = self.transactions_request_abort_handle.take() { + abort_handle.abort() + } + } + + /// While there still are unknown transaction hashes which are not part of a request, generate requests and send them to other peers + fn send_mempool_transactions_requests(&mut self) { + let mut prepared_requests = vec![]; + + while self + .unknown_hashes + .iter() + .any(|(_, request_status)| !request_status.is_requested()) + { + for i in 0..self.peers.len() { + let peer = self.peers[i]; + if let Some(request) = self.batch_hashes_by_peer_id(peer) { + prepared_requests.push(request); + } + } + } + + let requests = prepared_requests.into_iter().map(|request| { + let peer_id = request.1; + let network = Arc::clone(&self.network); + async move { + ( + peer_id, + Self::request_mempool_transactions( + network, + peer_id, + request.0.hashes.to_owned(), + ) + .await, + ) + } + .boxed() + }); + + self.transactions_requests.extend(requests); + } + + /// Network request for retrieving mempool hashes from other peers + async fn request_mempool_hashes( + network: Arc, + peer_id: N::PeerId, + transaction_type: MempoolTransactionType, + ) -> Result { + network + .request::(RequestMempoolHashes { transaction_type }, peer_id) + .await + } + + /// Network request for retrieving mempool transactions from other peers through a list of provided hashes + async fn request_mempool_transactions( + network: Arc, + peer_id: N::PeerId, + hashes: Vec, + ) -> Result { + network + .request::(RequestMempoolTransactions { hashes }, peer_id) + .await + } +} + +impl Stream for MempoolSyncer { + type Item = (Transaction, PubsubIdOrPeerId); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // First we check if we have a result we can yield + if let Some((transaction, peer_id)) = self.transactions.pop_front() { + return Poll::Ready(Some((transaction, PubsubIdOrPeerId::PeerId(peer_id)))); + } + + // Then we check if we should shutdown ourself + if self.abort_timeout.poll_unpin(cx).is_ready() { + info!("Shutdown mempool syncer"); + self.shutdown_request_handlers(); + return Poll::Ready(None); + } + + // Then we check our RequestMempoolHashes responses + while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { + match result { + Ok(hashes) => { + self.push_unknown_hashes(hashes.hashes, peer_id); + } + Err(err) => { + error!(%err, %peer_id, "Failed to fetch mempool hashes"); + } + } + } + + // Then we construct our RequestMempoolTransactions requests and send them over the network to our peers + self.send_mempool_transactions_requests(); + + // Then we check our RequestMempoolTransactions responses + while let Poll::Ready(Some((peer_id, result))) = + self.transactions_requests.poll_next_unpin(cx) + { + match result { + Ok(transactions) => { + let transactions: Vec<(Transaction, N::PeerId)> = transactions + .transactions + .into_iter() + .filter_map(|txn| { + // Filter out transactions we didn't ask and mark the ones we did ask for as received + match self.unknown_hashes.entry(txn.hash()) { + Occupied(mut entry) => match entry.get().status { + RequestStatus::Requested => { + entry.get_mut().mark_as_received(); + Some((txn, peer_id)) + } + RequestStatus::Pending | RequestStatus::Received => None, + }, + Vacant(_) => None, + } + }) + .collect(); + + if transactions.is_empty() { + continue; + } + + info!(num = %transactions.len(), "Pushed synced mempool transactions into the mempool"); + self.transactions.extend(transactions); + } + Err(err) => { + error!(%err, %peer_id, "Failed to fetch mempool transactions"); + } + } + } + + // By now it could be that we have some results we can yield, so we try again + if let Some((transaction, peer_id)) = self.transactions.pop_front() { + return Poll::Ready(Some((transaction, PubsubIdOrPeerId::PeerId(peer_id)))); + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, task::Poll, time::Duration}; + + use futures::{poll, StreamExt}; + use nimiq_blockchain::{Blockchain, BlockchainConfig}; + use nimiq_database::mdbx::MdbxDatabase; + use nimiq_genesis::NetworkId; + use nimiq_hash::{Blake2bHash, Hash}; + use nimiq_keys::{Address, KeyPair, SecureGenerate}; + use nimiq_network_interface::network::Network; + use nimiq_network_mock::MockHub; + use nimiq_test_log::test; + use nimiq_test_utils::{ + test_rng::test_rng, + test_transaction::{generate_transactions, TestAccount, TestTransaction}, + }; + use nimiq_time::sleep; + use nimiq_transaction::Transaction; + use nimiq_utils::{spawn, time::OffsetTime}; + use parking_lot::RwLock; + use rand::rngs::StdRng; + + use crate::{ + mempool_state::MempoolState, + mempool_transactions::TxPriority, + sync::{ + messages::{ + RequestMempoolHashes, RequestMempoolTransactions, ResponseMempoolHashes, + ResponseMempoolTransactions, + }, + MempoolSyncer, MempoolTransactionType, MAX_HASHES_PER_REQUEST, + }, + }; + + #[test(tokio::test)] + async fn it_can_discover_mempool_hashes_and_get_transactions_from_other_peers() { + // Generate test transactions + let num_transactions = MAX_HASHES_PER_REQUEST + 10; + let mut rng = test_rng(true); + let mempool_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (txns, _): (Vec, usize) = + generate_transactions(mempool_transactions, true); + let hashes: Vec = txns.iter().map(|txn| txn.hash()).collect(); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + let net2 = Arc::new(hub.new_network()); + + // Dial peer + net1.dial_mock(&net2); + let peer_ids = net1.get_peers(); + + // Setup streams to respond to requests + let hash_stream = net2.receive_requests::(); + let txns_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + let net2_clone = Arc::clone(&net2); + let txns_listener_future = txns_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = ResponseMempoolTransactions { + transactions: txns.clone(), + }; + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responders + spawn(hashes_listener_future); + spawn(txns_listener_future); + + // Create a new mempool syncer + let mut syncer = MempoolSyncer::new( + peer_ids, + MempoolTransactionType::Regular, + net1, + Arc::clone(&blockchain), + Arc::clone(&state), + ); + + // Poll to send out requests to peers + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // Poll to check request responses + let _ = poll!(syncer.next()); + + // Verify that all the hashes of the test transactions we generated, have been received by the syncer + hashes.iter().for_each(|hash| { + assert!(syncer.unknown_hashes.contains_key(hash)); + }); + assert_eq!(syncer.unknown_hashes.len(), num_transactions); + + // Poll to request the transactions and we should have at least 1 transaction by now + sleep(Duration::from_millis(200)).await; + match poll!(syncer.next()) { + Poll::Ready(txn) => assert!(txn.is_some()), + Poll::Pending => unreachable!(), + } + + // Verify that we received all the transactions we have asked for + // num_transactions - 1 here because we pop_front immediately on a Poll::Ready() + assert_eq!(syncer.transactions.len(), num_transactions - 1); + } + + #[test(tokio::test)] + async fn it_ignores_transactions_we_already_have_locally() { + // Generate test transactions + let num_transactions = 10; + let mut rng = test_rng(true); + let known_test_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (known_txns, _): (Vec, usize) = + generate_transactions(known_test_transactions, true); + let known_hashes: Vec = known_txns.iter().map(|txn| txn.hash()).collect(); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Load known hashes into the mempool state + let mut handle = state.write(); + known_txns.iter().for_each(|txn| { + handle.regular_transactions.insert(&txn, TxPriority::Medium); + }); + assert_eq!(handle.regular_transactions.len(), known_hashes.len()); + drop(handle); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + let net2 = Arc::new(hub.new_network()); + + // Dial peer + net1.dial_mock(&net2); + let peer_ids = net1.get_peers(); + + // Setup stream to respond to requests + let hash_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: known_hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responder + spawn(hashes_listener_future); + + // Create a new mempool syncer + let mut syncer = MempoolSyncer::new( + peer_ids, + MempoolTransactionType::Regular, + net1, + Arc::clone(&blockchain), + Arc::clone(&state), + ); + + // Poll to send out requests to peers + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // Poll to check request responses + let _ = poll!(syncer.next()); + + // All the hashes we received are already in the mempool state so should be skipped + assert!(syncer.unknown_hashes.is_empty()); + } + + fn generate_test_transactions(num: usize, mut rng: &mut StdRng) -> Vec { + let mut mempool_transactions = vec![]; + + for _ in 0..num { + let kp1 = KeyPair::generate(&mut rng); + let addr1 = Address::from(&kp1.public); + let account1 = TestAccount { + address: addr1, + keypair: kp1, + }; + + let kp2 = KeyPair::generate(&mut rng); + let addr2 = Address::from(&kp2.public); + let account2 = TestAccount { + address: addr2, + keypair: kp2, + }; + + let mempool_transaction = TestTransaction { + fee: 0, + value: 10, + recipient: account1, + sender: account2, + }; + mempool_transactions.push(mempool_transaction); + } + + mempool_transactions + } +} diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 693dda6264..68b2601748 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -89,7 +89,7 @@ async fn send_txn_to_mempool( tokio::task::spawn(async move { for txn in transactions { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), Some(mock_id.clone()))) .await .unwrap(); } @@ -121,7 +121,7 @@ async fn send_control_txn_to_mempool( tokio::task::spawn(async move { for txn in transactions { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), Some(mock_id.clone()))) .await .unwrap(); } diff --git a/network-interface/src/peer_info.rs b/network-interface/src/peer_info.rs index c384a709ed..b16f509562 100644 --- a/network-interface/src/peer_info.rs +++ b/network-interface/src/peer_info.rs @@ -66,10 +66,14 @@ impl Services { | Services::FULL_BLOCKS | Services::ACCOUNTS_PROOF | Services::ACCOUNTS_CHUNKS + | Services::MEMPOOL } NodeType::Light => Services::empty(), NodeType::Full => { - Services::ACCOUNTS_PROOF | Services::FULL_BLOCKS | Services::ACCOUNTS_CHUNKS + Services::ACCOUNTS_PROOF + | Services::FULL_BLOCKS + | Services::ACCOUNTS_CHUNKS + | Services::MEMPOOL } } } From 253d3debd747a98159d90f876fc7fb5cec35ed6d Mon Sep 17 00:00:00 2001 From: Stefan Date: Wed, 21 Aug 2024 12:52:18 +0200 Subject: [PATCH 02/14] Utils: extend FuturesOrdered and FuturesUnordered wrappers with `extend` method --- utils/src/stream.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/utils/src/stream.rs b/utils/src/stream.rs index c67c5f1b2d..0d13dc3959 100644 --- a/utils/src/stream.rs +++ b/utils/src/stream.rs @@ -49,10 +49,20 @@ impl FuturesOrdered { } /// Returns the number of futures in the queue. /// - /// See also [`futures::stream::FuturesOrdered::is_empty`]. + /// See also [`futures::stream::FuturesOrdered::len`]. pub fn len(&self) -> usize { self.inner.len() } + /// Push all the futures produced by the iterator into the back of the queue. + /// + /// See also [`futures::stream::FuturesOrdered::extend`]. + pub fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + self.inner.extend(iter); + self.waker.wake(); + } /// Push a future into the back of the queue. /// /// See also [`futures::stream::FuturesOrdered::push`]. @@ -116,10 +126,20 @@ impl FuturesUnordered { } /// Returns the number of futures in the set. /// - /// See also [`futures::stream::FuturesUnordered::is_empty`]. + /// See also [`futures::stream::FuturesUnordered::len`]. pub fn len(&self) -> usize { self.inner.len() } + /// Push all the futures produced by the iterator into the set of futures. + /// + /// See also [`futures::stream::FuturesUnordered::extend`]. + pub fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + self.inner.extend(iter); + self.waker.wake(); + } /// Push a future into the set. /// /// See also [`futures::stream::FuturesUnordered::push`]. From aca394f095844c3c8777d6ce05a53e55e7298814 Mon Sep 17 00:00:00 2001 From: Stefan Date: Wed, 21 Aug 2024 14:54:02 +0200 Subject: [PATCH 03/14] Mempool syncer: organize tests --- mempool/src/sync/mod.rs | 262 +--------------------------------- mempool/src/sync/tests/mod.rs | 260 +++++++++++++++++++++++++++++++++ mempool/tests/mod.rs | 9 +- 3 files changed, 267 insertions(+), 264 deletions(-) create mode 100644 mempool/src/sync/tests/mod.rs diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index cdbfb848c5..45cd032b6a 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod messages; +mod tests; use std::{ collections::{ @@ -403,264 +404,3 @@ impl Stream for MempoolSyncer { Poll::Pending } } - -#[cfg(test)] -mod tests { - use std::{sync::Arc, task::Poll, time::Duration}; - - use futures::{poll, StreamExt}; - use nimiq_blockchain::{Blockchain, BlockchainConfig}; - use nimiq_database::mdbx::MdbxDatabase; - use nimiq_genesis::NetworkId; - use nimiq_hash::{Blake2bHash, Hash}; - use nimiq_keys::{Address, KeyPair, SecureGenerate}; - use nimiq_network_interface::network::Network; - use nimiq_network_mock::MockHub; - use nimiq_test_log::test; - use nimiq_test_utils::{ - test_rng::test_rng, - test_transaction::{generate_transactions, TestAccount, TestTransaction}, - }; - use nimiq_time::sleep; - use nimiq_transaction::Transaction; - use nimiq_utils::{spawn, time::OffsetTime}; - use parking_lot::RwLock; - use rand::rngs::StdRng; - - use crate::{ - mempool_state::MempoolState, - mempool_transactions::TxPriority, - sync::{ - messages::{ - RequestMempoolHashes, RequestMempoolTransactions, ResponseMempoolHashes, - ResponseMempoolTransactions, - }, - MempoolSyncer, MempoolTransactionType, MAX_HASHES_PER_REQUEST, - }, - }; - - #[test(tokio::test)] - async fn it_can_discover_mempool_hashes_and_get_transactions_from_other_peers() { - // Generate test transactions - let num_transactions = MAX_HASHES_PER_REQUEST + 10; - let mut rng = test_rng(true); - let mempool_transactions = generate_test_transactions(num_transactions, &mut rng); - - // Turn test transactions into transactions - let (txns, _): (Vec, usize) = - generate_transactions(mempool_transactions, true); - let hashes: Vec = txns.iter().map(|txn| txn.hash()).collect(); - - // Create an empty blockchain - let time = Arc::new(OffsetTime::new()); - let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); - let blockchain = Arc::new(RwLock::new( - Blockchain::new( - env, - BlockchainConfig::default(), - NetworkId::UnitAlbatross, - time, - ) - .unwrap(), - )); - - // Setup empty Mempool State - let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); - - // Setup network - let mut hub = MockHub::default(); - let net1 = Arc::new(hub.new_network()); - let net2 = Arc::new(hub.new_network()); - - // Dial peer - net1.dial_mock(&net2); - let peer_ids = net1.get_peers(); - - // Setup streams to respond to requests - let hash_stream = net2.receive_requests::(); - let txns_stream = net2.receive_requests::(); - - let hashes_repsonse = ResponseMempoolHashes { - hashes: hashes.clone(), - }; - - let net2_clone = Arc::clone(&net2); - let hashes_listener_future = - hash_stream.for_each(move |(_request, request_id, _peer_id)| { - let test_response = hashes_repsonse.clone(); - let net2 = Arc::clone(&net2_clone); - async move { - net2.respond::(request_id, test_response.clone()) - .await - .unwrap(); - } - }); - - let net2_clone = Arc::clone(&net2); - let txns_listener_future = txns_stream.for_each(move |(_request, request_id, _peer_id)| { - let test_response = ResponseMempoolTransactions { - transactions: txns.clone(), - }; - let net2 = Arc::clone(&net2_clone); - async move { - net2.respond::(request_id, test_response.clone()) - .await - .unwrap(); - } - }); - - // Spawn the request responders - spawn(hashes_listener_future); - spawn(txns_listener_future); - - // Create a new mempool syncer - let mut syncer = MempoolSyncer::new( - peer_ids, - MempoolTransactionType::Regular, - net1, - Arc::clone(&blockchain), - Arc::clone(&state), - ); - - // Poll to send out requests to peers - let _ = poll!(syncer.next()); - sleep(Duration::from_millis(200)).await; - - // Poll to check request responses - let _ = poll!(syncer.next()); - - // Verify that all the hashes of the test transactions we generated, have been received by the syncer - hashes.iter().for_each(|hash| { - assert!(syncer.unknown_hashes.contains_key(hash)); - }); - assert_eq!(syncer.unknown_hashes.len(), num_transactions); - - // Poll to request the transactions and we should have at least 1 transaction by now - sleep(Duration::from_millis(200)).await; - match poll!(syncer.next()) { - Poll::Ready(txn) => assert!(txn.is_some()), - Poll::Pending => unreachable!(), - } - - // Verify that we received all the transactions we have asked for - // num_transactions - 1 here because we pop_front immediately on a Poll::Ready() - assert_eq!(syncer.transactions.len(), num_transactions - 1); - } - - #[test(tokio::test)] - async fn it_ignores_transactions_we_already_have_locally() { - // Generate test transactions - let num_transactions = 10; - let mut rng = test_rng(true); - let known_test_transactions = generate_test_transactions(num_transactions, &mut rng); - - // Turn test transactions into transactions - let (known_txns, _): (Vec, usize) = - generate_transactions(known_test_transactions, true); - let known_hashes: Vec = known_txns.iter().map(|txn| txn.hash()).collect(); - - // Setup empty Mempool State - let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); - - // Load known hashes into the mempool state - let mut handle = state.write(); - known_txns.iter().for_each(|txn| { - handle.regular_transactions.insert(&txn, TxPriority::Medium); - }); - assert_eq!(handle.regular_transactions.len(), known_hashes.len()); - drop(handle); - - // Create an empty blockchain - let time = Arc::new(OffsetTime::new()); - let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); - let blockchain = Arc::new(RwLock::new( - Blockchain::new( - env, - BlockchainConfig::default(), - NetworkId::UnitAlbatross, - time, - ) - .unwrap(), - )); - - // Setup network - let mut hub = MockHub::default(); - let net1 = Arc::new(hub.new_network()); - let net2 = Arc::new(hub.new_network()); - - // Dial peer - net1.dial_mock(&net2); - let peer_ids = net1.get_peers(); - - // Setup stream to respond to requests - let hash_stream = net2.receive_requests::(); - - let hashes_repsonse = ResponseMempoolHashes { - hashes: known_hashes.clone(), - }; - - let net2_clone = Arc::clone(&net2); - let hashes_listener_future = - hash_stream.for_each(move |(_request, request_id, _peer_id)| { - let test_response = hashes_repsonse.clone(); - let net2 = Arc::clone(&net2_clone); - async move { - net2.respond::(request_id, test_response.clone()) - .await - .unwrap(); - } - }); - - // Spawn the request responder - spawn(hashes_listener_future); - - // Create a new mempool syncer - let mut syncer = MempoolSyncer::new( - peer_ids, - MempoolTransactionType::Regular, - net1, - Arc::clone(&blockchain), - Arc::clone(&state), - ); - - // Poll to send out requests to peers - let _ = poll!(syncer.next()); - sleep(Duration::from_millis(200)).await; - - // Poll to check request responses - let _ = poll!(syncer.next()); - - // All the hashes we received are already in the mempool state so should be skipped - assert!(syncer.unknown_hashes.is_empty()); - } - - fn generate_test_transactions(num: usize, mut rng: &mut StdRng) -> Vec { - let mut mempool_transactions = vec![]; - - for _ in 0..num { - let kp1 = KeyPair::generate(&mut rng); - let addr1 = Address::from(&kp1.public); - let account1 = TestAccount { - address: addr1, - keypair: kp1, - }; - - let kp2 = KeyPair::generate(&mut rng); - let addr2 = Address::from(&kp2.public); - let account2 = TestAccount { - address: addr2, - keypair: kp2, - }; - - let mempool_transaction = TestTransaction { - fee: 0, - value: 10, - recipient: account1, - sender: account2, - }; - mempool_transactions.push(mempool_transaction); - } - - mempool_transactions - } -} diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs new file mode 100644 index 0000000000..a055183c7d --- /dev/null +++ b/mempool/src/sync/tests/mod.rs @@ -0,0 +1,260 @@ +#[cfg(test)] +mod tests { + use std::{sync::Arc, task::Poll, time::Duration}; + + use futures::{poll, StreamExt}; + use nimiq_blockchain::{Blockchain, BlockchainConfig}; + use nimiq_database::mdbx::MdbxDatabase; + use nimiq_genesis::NetworkId; + use nimiq_hash::{Blake2bHash, Hash}; + use nimiq_keys::{Address, KeyPair, SecureGenerate}; + use nimiq_network_interface::network::Network; + use nimiq_network_mock::MockHub; + use nimiq_test_log::test; + use nimiq_test_utils::{ + test_rng::test_rng, + test_transaction::{generate_transactions, TestAccount, TestTransaction}, + }; + use nimiq_time::sleep; + use nimiq_transaction::Transaction; + use nimiq_utils::{spawn, time::OffsetTime}; + use parking_lot::RwLock; + use rand::rngs::StdRng; + + use crate::{ + mempool_state::MempoolState, + mempool_transactions::TxPriority, + sync::{ + messages::{ + RequestMempoolHashes, RequestMempoolTransactions, ResponseMempoolHashes, + ResponseMempoolTransactions, + }, + MempoolSyncer, MempoolTransactionType, MAX_HASHES_PER_REQUEST, + }, + }; + + #[test(tokio::test)] + async fn it_can_discover_mempool_hashes_and_get_transactions_from_other_peers() { + // Generate test transactions + let num_transactions = MAX_HASHES_PER_REQUEST + 10; + let mut rng = test_rng(true); + let mempool_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (txns, _): (Vec, usize) = + generate_transactions(mempool_transactions, true); + let hashes: Vec = txns.iter().map(|txn| txn.hash()).collect(); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + let net2 = Arc::new(hub.new_network()); + + // Dial peer + net1.dial_mock(&net2); + let peer_ids = net1.get_peers(); + + // Setup streams to respond to requests + let hash_stream = net2.receive_requests::(); + let txns_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + let net2_clone = Arc::clone(&net2); + let txns_listener_future = txns_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = ResponseMempoolTransactions { + transactions: txns.clone(), + }; + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responders + spawn(hashes_listener_future); + spawn(txns_listener_future); + + // Create a new mempool syncer + let mut syncer = MempoolSyncer::new( + peer_ids, + MempoolTransactionType::Regular, + net1, + Arc::clone(&blockchain), + Arc::clone(&state), + ); + + // Poll to send out requests to peers + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // Poll to check request responses + let _ = poll!(syncer.next()); + + // Verify that all the hashes of the test transactions we generated, have been received by the syncer + hashes.iter().for_each(|hash| { + assert!(syncer.unknown_hashes.contains_key(hash)); + }); + assert_eq!(syncer.unknown_hashes.len(), num_transactions); + + // Poll to request the transactions and we should have at least 1 transaction by now + sleep(Duration::from_millis(200)).await; + match poll!(syncer.next()) { + Poll::Ready(txn) => assert!(txn.is_some()), + Poll::Pending => unreachable!(), + } + + // Verify that we received all the transactions we have asked for + // num_transactions - 1 here because we pop_front immediately on a Poll::Ready() + assert_eq!(syncer.transactions.len(), num_transactions - 1); + } + + #[test(tokio::test)] + async fn it_ignores_transactions_we_already_have_locally() { + // Generate test transactions + let num_transactions = 10; + let mut rng = test_rng(true); + let known_test_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (known_txns, _): (Vec, usize) = + generate_transactions(known_test_transactions, true); + let known_hashes: Vec = known_txns.iter().map(|txn| txn.hash()).collect(); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Load known hashes into the mempool state + let mut handle = state.write(); + known_txns.iter().for_each(|txn| { + handle.regular_transactions.insert(&txn, TxPriority::Medium); + }); + assert_eq!(handle.regular_transactions.len(), known_hashes.len()); + drop(handle); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + let net2 = Arc::new(hub.new_network()); + + // Dial peer + net1.dial_mock(&net2); + let peer_ids = net1.get_peers(); + + // Setup stream to respond to requests + let hash_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: known_hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responder + spawn(hashes_listener_future); + + // Create a new mempool syncer + let mut syncer = MempoolSyncer::new( + peer_ids, + MempoolTransactionType::Regular, + net1, + Arc::clone(&blockchain), + Arc::clone(&state), + ); + + // Poll to send out requests to peers + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // Poll to check request responses + let _ = poll!(syncer.next()); + + // All the hashes we received are already in the mempool state so should be skipped + assert!(syncer.unknown_hashes.is_empty()); + } + + fn generate_test_transactions(num: usize, mut rng: &mut StdRng) -> Vec { + let mut mempool_transactions = vec![]; + + for _ in 0..num { + let kp1 = KeyPair::generate(&mut rng); + let addr1 = Address::from(&kp1.public); + let account1 = TestAccount { + address: addr1, + keypair: kp1, + }; + + let kp2 = KeyPair::generate(&mut rng); + let addr2 = Address::from(&kp2.public); + let account2 = TestAccount { + address: addr2, + keypair: kp2, + }; + + let mempool_transaction = TestTransaction { + fee: 0, + value: 10, + recipient: account1, + sender: account2, + }; + mempool_transactions.push(mempool_transaction); + } + + mempool_transactions + } +} diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 68b2601748..fe9e23b52d 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -11,7 +11,10 @@ use nimiq_keys::{ Address, Ed25519PublicKey as SchnorrPublicKey, KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey, SecureGenerate, }; -use nimiq_mempool::{config::MempoolConfig, mempool::Mempool, mempool_transactions::TxPriority}; +use nimiq_mempool::{ + config::MempoolConfig, executor::PubsubIdOrPeerId, mempool::Mempool, + mempool_transactions::TxPriority, +}; use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId}; use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy}; use nimiq_serde::{Deserialize, Serialize}; @@ -89,7 +92,7 @@ async fn send_txn_to_mempool( tokio::task::spawn(async move { for txn in transactions { txn_stream_tx - .send((txn.clone(), Some(mock_id.clone()))) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .unwrap(); } @@ -121,7 +124,7 @@ async fn send_control_txn_to_mempool( tokio::task::spawn(async move { for txn in transactions { txn_stream_tx - .send((txn.clone(), Some(mock_id.clone()))) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .unwrap(); } From ac23352873ad3cc50f8d7aa9c8420942378af6e6 Mon Sep 17 00:00:00 2001 From: Stefan Date: Thu, 22 Aug 2024 16:25:27 +0200 Subject: [PATCH 04/14] Mempool syncer: prevent flooding peers with requests --- mempool/src/sync/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 45cd032b6a..a3efa48a11 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -65,7 +65,7 @@ impl HashRequestStatus { } pub fn is_requested(&self) -> bool { - matches!(self.status, RequestStatus::Requested) + !matches!(self.status, RequestStatus::Pending) } pub fn mark_as_requested(&mut self) { @@ -84,7 +84,7 @@ const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 /// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have mempool pub(crate) struct MempoolSyncer { /// Timeout to gracefully shutdown the mempool syncer entirely - abort_timeout: Pin>, + shutdown_timer: Pin>, /// Blockchain reference blockchain: Arc>, @@ -111,7 +111,7 @@ pub(crate) struct MempoolSyncer { BoxFuture<'static, (N::PeerId, Result)>, >, - /// Collection of transaction hashes we don't have in our local mempool + /// Collection of transaction hashes not present in the local mempool unknown_hashes: HashMap>, /// Abort handle for the hashes request handler @@ -148,7 +148,7 @@ impl MempoolSyncer { debug!(num_peers = %peers.len(), ?transaction_type, "Fetching mempool hashes from peers"); let mut syncer = Self { - abort_timeout: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), + shutdown_timer: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), blockchain, hashes_requests, network: Arc::clone(&network), @@ -338,8 +338,11 @@ impl Stream for MempoolSyncer { } // Then we check if we should shutdown ourself - if self.abort_timeout.poll_unpin(cx).is_ready() { - info!("Shutdown mempool syncer"); + if self.shutdown_timer.poll_unpin(cx).is_ready() { + info!( + syncer_type = ?self.mempool_transaction_type, + "Shutdown mempool syncer" + ); self.shutdown_request_handlers(); return Poll::Ready(None); } From 33c9ecec02ad8b74b78ae090f7a487ae853323c8 Mon Sep 17 00:00:00 2001 From: Stefan Date: Thu, 22 Aug 2024 16:29:40 +0200 Subject: [PATCH 05/14] Consensus: mechanism to be able to subscribe for consensus sync events --- consensus/src/sync/syncer.rs | 20 +++++++++++++++++++- consensus/src/sync/syncer_proxy.rs | 8 +++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/consensus/src/sync/syncer.rs b/consensus/src/sync/syncer.rs index 16c42eadee..240dd215f8 100644 --- a/consensus/src/sync/syncer.rs +++ b/consensus/src/sync/syncer.rs @@ -16,6 +16,8 @@ use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent, Subsc use nimiq_primitives::policy::Policy; use nimiq_time::{interval, Interval}; use nimiq_utils::stream::FuturesUnordered; +use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender}; +use tokio_stream::wrappers::BroadcastStream; use crate::{ consensus::ResolveBlockRequest, messages::RequestHead, sync::live::block_queue::BlockSource, @@ -106,6 +108,11 @@ pub enum LiveSyncPeerEvent { Ahead(TPeerId), } +#[derive(Clone)] +pub enum SyncerEvent { + AddLiveSync(TPeerId), +} + /// Syncer is the main synchronization object inside `Consensus` /// It has a reference to the main blockchain and network and has two dynamic /// trait objects: @@ -127,6 +134,9 @@ pub struct Syncer, L: LiveSync> { /// A proxy to the blockchain blockchain: BlockchainProxy, + /// Sending-half of a broadcast channel for publishing syncer events + events: BroadcastSender>, + /// A reference to the network network: Arc, @@ -159,12 +169,15 @@ impl, L: LiveSync> Syncer { macro_sync: M, ) -> Syncer { let network_events = network.subscribe_events(); + let (tx, _rx) = broadcast(256); + Syncer { live_sync, macro_sync, blockchain, network, network_events, + events: tx, outdated_peers: Default::default(), incompatible_peers: Default::default(), check_interval: interval(Self::CHECK_INTERVAL), @@ -184,13 +197,18 @@ impl, L: LiveSync> Syncer { pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) { debug!(%peer_id, "Adding peer to live sync"); - self.live_sync.add_peer(peer_id); + self.live_sync.add_peer(peer_id.clone()); + self.events.send(SyncerEvent::AddLiveSync(peer_id)).ok(); } pub fn num_peers(&self) -> usize { self.live_sync.num_peers() } + pub fn subscribe_events(&self) -> BroadcastStream::PeerId>> { + BroadcastStream::new(self.events.subscribe()) + } + pub fn peers(&self) -> Vec { self.live_sync.peers() } diff --git a/consensus/src/sync/syncer_proxy.rs b/consensus/src/sync/syncer_proxy.rs index 9bc4f83e41..4af5c2032c 100644 --- a/consensus/src/sync/syncer_proxy.rs +++ b/consensus/src/sync/syncer_proxy.rs @@ -15,6 +15,7 @@ use nimiq_primitives::policy::Policy; use nimiq_zkp_component::zkp_component::ZKPComponentProxy; use parking_lot::Mutex; use pin_project::pin_project; +use tokio_stream::wrappers::BroadcastStream; #[cfg(feature = "full")] use crate::sync::{ @@ -30,7 +31,7 @@ use crate::{ queue::QueueConfig, BlockLiveSync, }, - syncer::{LiveSyncPushEvent, Syncer}, + syncer::{LiveSyncPushEvent, Syncer, SyncerEvent}, }, BlsCache, }; @@ -227,6 +228,11 @@ impl SyncerProxy { gen_syncer_match!(self, accepted_block_announcements) } + /// Returns a broadcast receiver in for syncer events + pub fn subscribe_events(&self) -> BroadcastStream::PeerId>> { + gen_syncer_match!(self, subscribe_events) + } + /// Returns whether the state sync has finished (or `true` if there is no state sync required) pub fn state_complete(&self) -> bool { gen_syncer_match!(self, state_complete) From e143ef67795b2c3200c052bd3a1ec32d9a9b2d18 Mon Sep 17 00:00:00 2001 From: Stefan Date: Thu, 22 Aug 2024 16:35:57 +0200 Subject: [PATCH 06/14] Mempool task: subscribe to consensus sync and network events to maintain a collection of peers that are in live sync --- consensus/src/sync/syncer.rs | 2 +- mempool/mempool-task/src/lib.rs | 55 +++++++++++++++++++++++++++++---- mempool/src/mempool.rs | 16 +++++----- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/consensus/src/sync/syncer.rs b/consensus/src/sync/syncer.rs index 240dd215f8..af5ac6aa36 100644 --- a/consensus/src/sync/syncer.rs +++ b/consensus/src/sync/syncer.rs @@ -197,7 +197,7 @@ impl, L: LiveSync> Syncer { pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) { debug!(%peer_id, "Adding peer to live sync"); - self.live_sync.add_peer(peer_id.clone()); + self.live_sync.add_peer(peer_id); self.events.send(SyncerEvent::AddLiveSync(peer_id)).ok(); } diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index d866f7a88e..ca40470332 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -8,9 +9,9 @@ use futures::{stream::BoxStream, Future, Stream, StreamExt}; use log::{debug, warn}; use nimiq_blockchain::Blockchain; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; -use nimiq_consensus::{Consensus, ConsensusEvent, ConsensusProxy}; +use nimiq_consensus::{sync::syncer::SyncerEvent, Consensus, ConsensusEvent, ConsensusProxy}; use nimiq_mempool::{config::MempoolConfig, mempool::Mempool}; -use nimiq_network_interface::network::Network; +use nimiq_network_interface::network::{Network, NetworkEvent, SubscribeEvents}; use nimiq_utils::spawn; use parking_lot::RwLock; #[cfg(feature = "metrics")] @@ -34,6 +35,10 @@ pub struct MempoolTask { consensus_event_rx: BroadcastStream, blockchain_event_rx: BoxStream<'static, BlockchainEvent>, + network_event_rx: SubscribeEvents, + syncer_event_rx: BroadcastStream>, + + peers_in_live_sync: HashSet, pub mempool: Arc, mempool_active: bool, @@ -50,17 +55,22 @@ impl MempoolTask { mempool_config: MempoolConfig, ) -> Self { let consensus_event_rx = consensus.subscribe_events(); + let network_event_rx = consensus.network.subscribe_events(); + let syncer_event_rx = consensus.sync.subscribe_events(); + let blockchain_event_rx = blockchain.read().notifier_as_stream(); + let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); let mempool = Arc::new(Mempool::new(Arc::clone(&blockchain), mempool_config)); let mempool_active = false; - let blockchain_event_rx = blockchain.read().notifier_as_stream(); - Self { consensus: consensus.proxy(), + peers_in_live_sync, consensus_event_rx, blockchain_event_rx, + network_event_rx, + syncer_event_rx, mempool: Arc::clone(&mempool), mempool_active, @@ -92,13 +102,14 @@ impl MempoolTask { let mempool = Arc::clone(&self.mempool); let network = Arc::clone(&self.consensus.network); + let peers = self.peers_in_live_sync.clone().into_iter().collect(); #[cfg(not(feature = "metrics"))] spawn({ async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. mempool.cleanup(); - mempool.start_executors(network, None, None).await; + mempool.start_executors(network, None, None, peers).await; } }); #[cfg(feature = "metrics")] @@ -111,7 +122,12 @@ impl MempoolTask { mempool.cleanup(); mempool - .start_executors(network, Some(mempool_monitor), Some(ctrl_mempool_monitor)) + .start_executors( + network, + Some(mempool_monitor), + Some(ctrl_mempool_monitor), + peers, + ) .await; } }); @@ -167,12 +183,39 @@ impl MempoolTask { } } } + + fn on_syncer_event(&mut self, event: SyncerEvent) { + match event { + SyncerEvent::AddLiveSync(peer_id) => { + self.peers_in_live_sync.insert(peer_id); + } + } + } + + fn on_network_event(&mut self, event: NetworkEvent) { + match event { + NetworkEvent::PeerLeft(peer_id) => { + self.peers_in_live_sync.remove(&peer_id); + } + NetworkEvent::PeerJoined(_, _) | NetworkEvent::DhtReady => (), + } + } } impl Stream for MempoolTask { type Item = MempoolEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Process syncer updates. + while let Poll::Ready(Some(Ok(event))) = self.syncer_event_rx.poll_next_unpin(cx) { + self.on_syncer_event(event) + } + + // Process network updates. + while let Poll::Ready(Some(Ok(event))) = self.network_event_rx.poll_next_unpin(cx) { + self.on_network_event(event) + } + // Process consensus updates. // Start mempool as soon as we have consensus and can enforce the validity window. // Stop the mempool if we lose consensus or cannot enforce the validity window. diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index dc62cb547f..8c3defc911 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -14,7 +14,10 @@ use nimiq_blockchain::{Blockchain, TransactionVerificationCache}; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::Address; -use nimiq_network_interface::network::{Network, Topic}; +use nimiq_network_interface::{ + network::{Network, Topic}, + peer_info::Services, +}; use nimiq_serde::Serialize; use nimiq_transaction::{ historic_transaction::RawTransactionHash, ControlTransactionTopic, Transaction, @@ -136,6 +139,7 @@ impl Mempool { network: Arc, monitor: Option, control_monitor: Option, + mut peers: Vec, ) { let executor_handle = self.executor_handle.lock().await; let control_executor_handle = self.control_executor_handle.lock().await; @@ -145,11 +149,11 @@ impl Mempool { return; } - // TODO: get correct peers - // TODO: only get peers that are synced with us + info!("Initializing mempool syncers"); + peers.retain(|peer_id| network.peer_provides_services(*peer_id, Services::MEMPOOL)); // Sync regular transactions with the mempool of other peers let regular_transactions_syncer = MempoolSyncer::new( - network.get_peers(), + peers.clone(), MempoolTransactionType::Regular, Arc::clone(&network), Arc::clone(&self.blockchain), @@ -171,11 +175,9 @@ impl Mempool { select(regular_transactions_syncer, txn_stream).boxed(), ); - // TODO: get correct peers - // TODO: only get peers that are synced with us // Sync control transactions with the mempool of other peers let control_transactions_syncer = MempoolSyncer::new( - network.get_peers(), + peers, MempoolTransactionType::Control, Arc::clone(&network), Arc::clone(&self.blockchain), From e4c856e64ea7fbd805c201dfd911a89f8273fbeb Mon Sep 17 00:00:00 2001 From: Stefan Date: Fri, 23 Aug 2024 09:14:53 +0200 Subject: [PATCH 07/14] Mempool syncer: spawn request handlers regardless of whether the mempool syncer is active --- mempool/mempool-task/src/lib.rs | 6 ++- mempool/src/mempool.rs | 8 +++- mempool/src/sync/mod.rs | 75 +++++++++------------------------ mempool/tests/mod.rs | 53 +++++++++++++++++++---- 4 files changed, 76 insertions(+), 66 deletions(-) diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index ca40470332..1825af867c 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -60,7 +60,11 @@ impl MempoolTask { let blockchain_event_rx = blockchain.read().notifier_as_stream(); let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); - let mempool = Arc::new(Mempool::new(Arc::clone(&blockchain), mempool_config)); + let mempool = Arc::new(Mempool::new( + Arc::clone(&blockchain), + mempool_config, + Arc::clone(&consensus.network), + )); let mempool_active = false; Self { diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index 8c3defc911..77c3aa1190 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -68,12 +68,18 @@ impl Mempool { pub const DEFAULT_CONTROL_SIZE_LIMIT: usize = 6_000_000; /// Creates a new mempool - pub fn new(blockchain: Arc>, config: MempoolConfig) -> Self { + pub fn new( + blockchain: Arc>, + config: MempoolConfig, + network: Arc, + ) -> Self { let state = Arc::new(RwLock::new(MempoolState::new( config.size_limit, config.control_size_limit, ))); + MempoolSyncer::init_network_request_receivers(network, Arc::clone(&state)); + Self { blockchain, state: Arc::clone(&state), diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index a3efa48a11..4f46de357d 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -12,11 +12,7 @@ use std::{ time::{Duration, Instant}, }; -use futures::{ - future::BoxFuture, - stream::{AbortHandle, Abortable}, - FutureExt, Stream, StreamExt, -}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use messages::{ MempoolTransactionType, RequestMempoolHashes, RequestMempoolTransactions, ResponseMempoolHashes, ResponseMempoolTransactions, @@ -114,11 +110,8 @@ pub(crate) struct MempoolSyncer { /// Collection of transaction hashes not present in the local mempool unknown_hashes: HashMap>, - /// Abort handle for the hashes request handler - hashes_request_abort_handle: Option, - - /// Abort handle for the transactions request handler - transactions_request_abort_handle: Option, + /// The type of mempool transactions requested to other peers + mempool_transaction_type: MempoolTransactionType, } impl MempoolSyncer { @@ -147,7 +140,7 @@ impl MempoolSyncer { debug!(num_peers = %peers.len(), ?transaction_type, "Fetching mempool hashes from peers"); - let mut syncer = Self { + Self { shutdown_timer: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), blockchain, hashes_requests, @@ -157,13 +150,8 @@ impl MempoolSyncer { unknown_hashes: HashMap::new(), transactions: VecDeque::new(), transactions_requests: FuturesUnordered::new(), - hashes_request_abort_handle: None, - transactions_request_abort_handle: None, - }; - - syncer.init_network_request_receivers(&network, &mempool_state); - - syncer + mempool_transaction_type: transaction_type, + } } /// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes @@ -223,49 +211,27 @@ impl MempoolSyncer { } /// Spawn request handlers in order to process network responses - fn init_network_request_receivers( - &mut self, - network: &Arc, - mempool_state: &Arc>, + pub fn init_network_request_receivers( + network: Arc, + mempool_state: Arc>, ) { - // Register an abort handle and spawn the request handler for RequestMempoolHashes responses as a task - let stream = request_handler( - network, + // Spawn the request handler for RequestMempoolHashes responses as a task + let fut = request_handler( + &network, network.receive_requests::(), - mempool_state, + &mempool_state, ) .boxed(); - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let future = async move { - let _ = Abortable::new(stream, abort_registration).await; - }; - spawn(future); - self.hashes_request_abort_handle = Some(abort_handle); - - // Register an abort handle and spawn the request handler for RequestMempoolTransactions responses as a task - let stream = request_handler( - network, + spawn(fut); + + // Spawn the request handler for RequestMempoolTransactions responses as a task + let fut = request_handler( + &network, network.receive_requests::(), - mempool_state, + &mempool_state, ) .boxed(); - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - let future = async move { - let _ = Abortable::new(stream, abort_registration).await; - }; - spawn(future); - self.transactions_request_abort_handle = Some(abort_handle); - } - - /// Abort the spawned request handlers - fn shutdown_request_handlers(&mut self) { - if let Some(abort_handle) = self.hashes_request_abort_handle.take() { - abort_handle.abort(); - } - - if let Some(abort_handle) = self.transactions_request_abort_handle.take() { - abort_handle.abort() - } + spawn(fut); } /// While there still are unknown transaction hashes which are not part of a request, generate requests and send them to other peers @@ -343,7 +309,6 @@ impl Stream for MempoolSyncer { syncer_type = ?self.mempool_transaction_type, "Shutdown mempool syncer" ); - self.shutdown_request_handlers(); return Poll::Ready(None); } diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index fe9e23b52d..777e5ef75f 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -60,10 +60,14 @@ async fn send_get_mempool_txns( txn_len: usize, ) -> (Vec, usize) { // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); send_txn_to_mempool(&mempool, mock_network, mock_id, transactions).await; @@ -1122,10 +1126,14 @@ async fn mempool_update() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1250,10 +1258,14 @@ async fn mempool_update_aged_transaction() { )); // Create mempool and subscribe with a custom txn stream - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1401,10 +1413,14 @@ async fn mempool_update_not_enough_balance() { )); // Create mempool and subscribe with a custom txn stream - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1563,10 +1579,14 @@ async fn mempool_update_pruned_account() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1664,10 +1684,14 @@ async fn mempool_basic_prioritization_control_tx() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns.clone()).await; @@ -1773,10 +1797,14 @@ async fn mempool_regular_and_control_tx() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // This is the transaction produced in the block let control_tx = TransactionBuilder::new_create_staker( @@ -1919,7 +1947,10 @@ async fn applies_total_tx_size_limits() { size_limit: txns_len - (1 + txns[1].serialized_size()), ..Default::default() }; - let mempool = Mempool::new(blockchain, mempool_config); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new(blockchain, mempool_config, Arc::clone(&mock_network)); // The worst transaction is the second transaction with the lowest fee. let worst_tx = txns[1].hash::(); @@ -2132,10 +2163,14 @@ async fn it_can_reject_invalid_vesting_contract_transaction() { let producer = BlockProducer::new(signing_key(), voting_key()); // Create mempool and subscribe with a custom txn stream - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // #1.0: Micro block that creates a vesting contract let bc = blockchain.upgradable_read(); From 5fa5e8938c30307e6be66da85757a3d708eb6dc0 Mon Sep 17 00:00:00 2001 From: Stefan Date: Fri, 23 Aug 2024 11:47:01 +0200 Subject: [PATCH 08/14] Mempool syncer: functionality to add peers to discover its mempool --- mempool/src/sync/mod.rs | 20 +++++++++ mempool/src/sync/tests/mod.rs | 83 +++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 4f46de357d..1d26def05b 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -181,6 +181,26 @@ impl MempoolSyncer { }) } + /// Add peer to discover its mempool + fn add_peer(&mut self, peer_id: N::PeerId) { + if self.peers.contains(&peer_id) { + return; + } + + self.peers.push(peer_id); + let network = Arc::clone(&self.network); + let transaction_type = self.mempool_transaction_type.clone(); + let request = async move { + ( + peer_id, + Self::request_mempool_hashes(network, peer_id, transaction_type).await, + ) + } + .boxed(); + + self.hashes_requests.push(request); + } + /// Create a batch of transaction hashes that haven't yet been requested fn batch_hashes_by_peer_id( &mut self, diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs index a055183c7d..d3727c9144 100644 --- a/mempool/src/sync/tests/mod.rs +++ b/mempool/src/sync/tests/mod.rs @@ -228,6 +228,89 @@ mod tests { assert!(syncer.unknown_hashes.is_empty()); } + #[test(tokio::test)] + async fn it_can_dynamically_add_a_new_peer() { + // Generate test transactions + let num_transactions = 10; + let mut rng = test_rng(true); + let mempool_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (txns, _): (Vec, usize) = + generate_transactions(mempool_transactions, true); + let hashes: Vec = txns.iter().map(|txn| txn.hash()).collect(); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + + // Create a new mempool syncer with 0 peers to sync with + let mut syncer = MempoolSyncer::new( + vec![], + MempoolTransactionType::Regular, + Arc::clone(&net1), + Arc::clone(&blockchain), + Arc::clone(&state), + ); + + // Poll to send out requests to peers and verify nothing has been received + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + assert!(syncer.unknown_hashes.is_empty()); + + // Add new peer and dial it + let net2 = Arc::new(hub.new_network()); + net1.dial_mock(&net2); + + // Setup stream to respond to requests + let hash_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responder + spawn(hashes_listener_future); + + // Dynamically add new peer to mempool syncer + syncer.add_peer(net2.peer_id()); + + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // we have received unknown hashes from the added peer + let _ = poll!(syncer.next()); + assert_eq!(syncer.unknown_hashes.len(), num_transactions); + } + fn generate_test_transactions(num: usize, mut rng: &mut StdRng) -> Vec { let mut mempool_transactions = vec![]; From 79fefea1b34f8b2baad19b7293dc81186306b6c7 Mon Sep 17 00:00:00 2001 From: Stefan Date: Fri, 23 Aug 2024 16:39:28 +0200 Subject: [PATCH 09/14] Consensus: acquire the consensus sync event subscription through the ConsensusProxy rather than the Consensus directly --- consensus/src/consensus/consensus_proxy.rs | 15 ++++++++++---- consensus/src/consensus/mod.rs | 23 ++++++++++++++-------- consensus/src/sync/syncer.rs | 13 ++++++------ consensus/src/sync/syncer_proxy.rs | 10 +++++----- consensus/tests/sync_utils.rs | 2 +- mempool/mempool-task/src/lib.rs | 4 ++-- web-client/src/client/lib.rs | 4 ++-- 7 files changed, 42 insertions(+), 29 deletions(-) diff --git a/consensus/src/consensus/consensus_proxy.rs b/consensus/src/consensus/consensus_proxy.rs index c8dcc20cd5..b76922867c 100644 --- a/consensus/src/consensus/consensus_proxy.rs +++ b/consensus/src/consensus/consensus_proxy.rs @@ -34,6 +34,7 @@ use crate::{ RequestBlocksProof, RequestSubscribeToAddress, RequestTransactionReceiptsByAddress, RequestTransactionsProof, ResponseBlocksProof, }, + sync::syncer::SyncEvent, ConsensusEvent, }; @@ -42,7 +43,8 @@ pub struct ConsensusProxy { pub network: Arc, pub(crate) established_flag: Arc, pub(crate) synced_validity_window_flag: Arc, - pub(crate) events: broadcast::Sender, + pub(crate) consensus_events: broadcast::Sender, + pub(crate) sync_events: broadcast::Sender>, pub(crate) request: mpsc::Sender>, } @@ -53,7 +55,8 @@ impl Clone for ConsensusProxy { network: Arc::clone(&self.network), established_flag: Arc::clone(&self.established_flag), synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag), - events: self.events.clone(), + consensus_events: self.consensus_events.clone(), + sync_events: self.sync_events.clone(), request: self.request.clone(), } } @@ -81,8 +84,12 @@ impl ConsensusProxy { && self.synced_validity_window_flag.load(Ordering::Acquire) } - pub fn subscribe_events(&self) -> BroadcastStream { - BroadcastStream::new(self.events.subscribe()) + pub fn subscribe_consensus_events(&self) -> BroadcastStream { + BroadcastStream::new(self.consensus_events.subscribe()) + } + + pub fn subscribe_sync_events(&self) -> BroadcastStream::PeerId>> { + BroadcastStream::new(self.sync_events.subscribe()) } /// Subscribe to remote address notification events diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 8d4fac8366..5723b6e777 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -34,7 +34,11 @@ use self::remote_event_dispatcher::RemoteEventDispatcher; use crate::{ consensus::head_requests::{HeadRequests, HeadRequestsResult}, messages::{RequestBlock, RequestHead, RequestMacroChain, RequestMissingBlocks}, - sync::{live::block_queue::BlockSource, syncer::LiveSyncPushEvent, syncer_proxy::SyncerProxy}, + sync::{ + live::block_queue::BlockSource, + syncer::{LiveSyncPushEvent, SyncEvent}, + syncer_proxy::SyncerProxy, + }, }; #[cfg(feature = "full")] use crate::{ @@ -128,7 +132,8 @@ pub struct Consensus { pub sync: SyncerProxy, - events: broadcast::Sender, + consensus_events: broadcast::Sender, + sync_events: broadcast::Sender>, established_flag: Arc, #[cfg(feature = "full")] last_batch_number: u32, @@ -212,7 +217,8 @@ impl Consensus { blockchain, network, sync: syncer, - events: broadcast::Sender::new(256), + consensus_events: broadcast::Sender::new(256), + sync_events: broadcast::Sender::new(256), established_flag, #[cfg(feature = "full")] last_batch_number: 0, @@ -295,7 +301,7 @@ impl Consensus { } pub fn subscribe_events(&self) -> BroadcastStream { - BroadcastStream::new(self.events.subscribe()) + BroadcastStream::new(self.consensus_events.subscribe()) } pub fn is_established(&self) -> bool { @@ -312,7 +318,8 @@ impl Consensus { network: Arc::clone(&self.network), established_flag: Arc::clone(&self.established_flag), synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag), - events: self.events.clone(), + consensus_events: self.consensus_events.clone(), + sync_events: self.sync_events.clone(), request: self.requests.0.clone(), } } @@ -328,7 +335,7 @@ impl Consensus { // We don't care if anyone is listening. let (synced_validity_window, _) = self.check_validity_window(); - self.events + self.consensus_events .send(ConsensusEvent::Established { synced_validity_window, }) @@ -543,7 +550,7 @@ impl Future for Consensus { // Check consensus established state on changes. if let Some(event) = self.check_established(None) { - self.events.send(event).ok(); + self.consensus_events.send(event).ok(); } // Poll any head requests if active. @@ -559,7 +566,7 @@ impl Future for Consensus { // Update established state using the result. if let Some(event) = self.check_established(Some(result)) { - self.events.send(event).ok(); + self.consensus_events.send(event).ok(); } } } diff --git a/consensus/src/sync/syncer.rs b/consensus/src/sync/syncer.rs index af5ac6aa36..5544b7697a 100644 --- a/consensus/src/sync/syncer.rs +++ b/consensus/src/sync/syncer.rs @@ -17,7 +17,6 @@ use nimiq_primitives::policy::Policy; use nimiq_time::{interval, Interval}; use nimiq_utils::stream::FuturesUnordered; use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender}; -use tokio_stream::wrappers::BroadcastStream; use crate::{ consensus::ResolveBlockRequest, messages::RequestHead, sync::live::block_queue::BlockSource, @@ -109,7 +108,7 @@ pub enum LiveSyncPeerEvent { } #[derive(Clone)] -pub enum SyncerEvent { +pub enum SyncEvent { AddLiveSync(TPeerId), } @@ -134,8 +133,8 @@ pub struct Syncer, L: LiveSync> { /// A proxy to the blockchain blockchain: BlockchainProxy, - /// Sending-half of a broadcast channel for publishing syncer events - events: BroadcastSender>, + /// Sending-half of a broadcast channel for publishing sync events + events: BroadcastSender>, /// A reference to the network network: Arc, @@ -198,15 +197,15 @@ impl, L: LiveSync> Syncer { pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) { debug!(%peer_id, "Adding peer to live sync"); self.live_sync.add_peer(peer_id); - self.events.send(SyncerEvent::AddLiveSync(peer_id)).ok(); + self.events.send(SyncEvent::AddLiveSync(peer_id)).ok(); } pub fn num_peers(&self) -> usize { self.live_sync.num_peers() } - pub fn subscribe_events(&self) -> BroadcastStream::PeerId>> { - BroadcastStream::new(self.events.subscribe()) + pub fn broadcast_sender(&self) -> BroadcastSender::PeerId>> { + self.events.clone() } pub fn peers(&self) -> Vec { diff --git a/consensus/src/sync/syncer_proxy.rs b/consensus/src/sync/syncer_proxy.rs index 4af5c2032c..883aa15558 100644 --- a/consensus/src/sync/syncer_proxy.rs +++ b/consensus/src/sync/syncer_proxy.rs @@ -15,7 +15,7 @@ use nimiq_primitives::policy::Policy; use nimiq_zkp_component::zkp_component::ZKPComponentProxy; use parking_lot::Mutex; use pin_project::pin_project; -use tokio_stream::wrappers::BroadcastStream; +use tokio::sync::broadcast::Sender as BroadcastSender; #[cfg(feature = "full")] use crate::sync::{ @@ -31,7 +31,7 @@ use crate::{ queue::QueueConfig, BlockLiveSync, }, - syncer::{LiveSyncPushEvent, Syncer, SyncerEvent}, + syncer::{LiveSyncPushEvent, SyncEvent, Syncer}, }, BlsCache, }; @@ -228,9 +228,9 @@ impl SyncerProxy { gen_syncer_match!(self, accepted_block_announcements) } - /// Returns a broadcast receiver in for syncer events - pub fn subscribe_events(&self) -> BroadcastStream::PeerId>> { - gen_syncer_match!(self, subscribe_events) + /// Returns a broadcast sender for sync events + pub fn broadcast_sender(&self) -> BroadcastSender::PeerId>> { + gen_syncer_match!(self, broadcast_sender) } /// Returns whether the state sync has finished (or `true` if there is no state sync required) diff --git a/consensus/tests/sync_utils.rs b/consensus/tests/sync_utils.rs index d060ba8d20..7245ef30f9 100644 --- a/consensus/tests/sync_utils.rs +++ b/consensus/tests/sync_utils.rs @@ -202,7 +202,7 @@ pub async fn sync_two_peers( BlockchainEvent::Finalized(_) | BlockchainEvent::EpochFinalized(_) )) }); - let mut consensus_events = consensus2_proxy.subscribe_events(); + let mut consensus_events = consensus2_proxy.subscribe_consensus_events(); spawn(consensus2); for _ in 0..num_batches_live_sync { diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index 1825af867c..8406e24bb3 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -188,9 +188,9 @@ impl MempoolTask { } } - fn on_syncer_event(&mut self, event: SyncerEvent) { + fn on_syncer_event(&mut self, event: SyncEvent) { match event { - SyncerEvent::AddLiveSync(peer_id) => { + SyncEvent::AddLiveSync(peer_id) => { self.peers_in_live_sync.insert(peer_id); } } diff --git a/web-client/src/client/lib.rs b/web-client/src/client/lib.rs index 458ac924e2..eee1254c32 100644 --- a/web-client/src/client/lib.rs +++ b/web-client/src/client/lib.rs @@ -357,7 +357,7 @@ impl Client { let is_established = self .inner .consensus_proxy() - .subscribe_events() + .subscribe_consensus_events() .any(|event| async move { if let Ok(state) = event { matches!(state, ConsensusEvent::Established { .. }) @@ -989,7 +989,7 @@ impl Client { let consensus = self.inner.consensus_proxy(); let network = self.inner.network(); - let mut consensus_events = consensus.subscribe_events(); + let mut consensus_events = consensus.subscribe_consensus_events(); let consensus_listeners = Rc::clone(&self.consensus_changed_listeners); From a5eb776e960a5e7a7fce6562cf81edfbc18b3699 Mon Sep 17 00:00:00 2001 From: Stefan Date: Sat, 24 Aug 2024 15:00:02 +0200 Subject: [PATCH 10/14] Test utils: move consensus constructor into the `test_utils` crate --- test-utils/src/consensus.rs | 35 ++++++++++++++++++++++++++++++++ test-utils/src/lib.rs | 1 + validator/src/proposal_buffer.rs | 26 +----------------------- 3 files changed, 37 insertions(+), 25 deletions(-) create mode 100644 test-utils/src/consensus.rs diff --git a/test-utils/src/consensus.rs b/test-utils/src/consensus.rs new file mode 100644 index 0000000000..70f43d78e6 --- /dev/null +++ b/test-utils/src/consensus.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; + +use nimiq_blockchain::Blockchain; +use nimiq_blockchain_proxy::BlockchainProxy; +use nimiq_bls::cache::PublicKeyCache; +use nimiq_consensus::{ + sync::syncer_proxy::SyncerProxy, BlsCache, Consensus, ConsensusEvent, ConsensusProxy, +}; +use nimiq_network_interface::network::Network; +use nimiq_zkp_component::ZKPComponent; +use parking_lot::{Mutex, RwLock}; + +use crate::test_network::TestNetwork; + +/// Given a blockchain and a network creates an instance of Consensus. +pub async fn consensus( + blockchain: Arc>, + net: Arc, +) -> Consensus { + let blockchain_proxy = BlockchainProxy::from(&blockchain); + + let zkp_proxy = ZKPComponent::new(blockchain_proxy.clone(), Arc::clone(&net), None) + .await + .proxy(); + + let syncer = SyncerProxy::new_history( + blockchain_proxy.clone(), + Arc::clone(&net), + Arc::new(Mutex::new(BlsCache::new_test())), + net.subscribe_events(), + ) + .await; + + Consensus::new(blockchain_proxy, net, syncer, 0, zkp_proxy) +} diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 8ef1e0d6cb..17b5c13164 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -2,6 +2,7 @@ pub mod accounts_revert; pub mod block_production; pub mod blockchain; pub mod blockchain_with_rng; +pub mod consensus; pub mod mock_node; pub mod node; pub mod test_custom_block; diff --git a/validator/src/proposal_buffer.rs b/validator/src/proposal_buffer.rs index a5eac73d83..cbfa7b5612 100644 --- a/validator/src/proposal_buffer.rs +++ b/validator/src/proposal_buffer.rs @@ -577,40 +577,16 @@ mod test { use nimiq_test_log::test; use nimiq_test_utils::{ block_production::{TemporaryBlockProducer, SIGNING_KEY}, - test_network::TestNetwork, + consensus::consensus, }; use nimiq_time::{sleep, timeout}; use nimiq_utils::spawn; use nimiq_validator_network::network_impl::ValidatorNetworkImpl; - use nimiq_zkp_component::ZKPComponent; - use parking_lot::{Mutex, RwLock}; use tokio::select; use super::{ProposalAndPubsubId, ProposalBuffer}; use crate::{aggregation::tendermint::proposal::Header, r#macro::ProposalTopic}; - /// Given a blockchain and a network creates an instance of Consensus. - async fn consensus( - blockchain: Arc>, - net: Arc, - ) -> Consensus { - let blockchain_proxy = BlockchainProxy::from(&blockchain); - - let zkp_proxy = ZKPComponent::new(blockchain_proxy.clone(), Arc::clone(&net), None) - .await - .proxy(); - - let syncer = SyncerProxy::new_history( - blockchain_proxy.clone(), - Arc::clone(&net), - Arc::new(Mutex::new(BlsCache::new_test())), - net.subscribe_events(), - ) - .await; - - Consensus::new(blockchain_proxy, net, syncer, 0, zkp_proxy) - } - async fn setup() -> ( ConsensusProxy, TemporaryBlockProducer, From f13cb29bc36b32da6dc94449c84687044d04e445 Mon Sep 17 00:00:00 2001 From: Stefan Date: Sat, 24 Aug 2024 15:10:15 +0200 Subject: [PATCH 11/14] Mempool syncer: subscribe to consensus sync events in order add new peers dynamically --- Cargo.lock | 1 + mempool/Cargo.toml | 1 + mempool/mempool-task/src/lib.rs | 27 +++++++++++++++++---------- mempool/src/mempool.rs | 6 ++++-- mempool/src/sync/mod.rs | 23 ++++++++++++++++++----- mempool/src/sync/tests/mod.rs | 21 +++++++++++++++++---- 6 files changed, 58 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67715bf423..bef1a168aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4389,6 +4389,7 @@ dependencies = [ "nimiq-blockchain", "nimiq-blockchain-interface", "nimiq-bls", + "nimiq-consensus", "nimiq-database", "nimiq-genesis", "nimiq-genesis-builder", diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index aaf2d46923..7f5ed567d9 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -36,6 +36,7 @@ nimiq-account = { workspace = true } nimiq-block = { workspace = true } nimiq-blockchain = { workspace = true } nimiq-blockchain-interface = { workspace = true } +nimiq-consensus = { workspace = true } nimiq-database = { workspace = true } nimiq-hash = { workspace = true } nimiq-keys = { workspace = true } diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index 8406e24bb3..273090d150 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -9,7 +9,7 @@ use futures::{stream::BoxStream, Future, Stream, StreamExt}; use log::{debug, warn}; use nimiq_blockchain::Blockchain; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; -use nimiq_consensus::{sync::syncer::SyncerEvent, Consensus, ConsensusEvent, ConsensusProxy}; +use nimiq_consensus::{sync::syncer::SyncEvent, Consensus, ConsensusEvent, ConsensusProxy}; use nimiq_mempool::{config::MempoolConfig, mempool::Mempool}; use nimiq_network_interface::network::{Network, NetworkEvent, SubscribeEvents}; use nimiq_utils::spawn; @@ -36,7 +36,7 @@ pub struct MempoolTask { consensus_event_rx: BroadcastStream, blockchain_event_rx: BoxStream<'static, BlockchainEvent>, network_event_rx: SubscribeEvents, - syncer_event_rx: BroadcastStream>, + sync_event_rx: BroadcastStream::PeerId>>, peers_in_live_sync: HashSet, @@ -56,9 +56,11 @@ impl MempoolTask { ) -> Self { let consensus_event_rx = consensus.subscribe_events(); let network_event_rx = consensus.network.subscribe_events(); - let syncer_event_rx = consensus.sync.subscribe_events(); let blockchain_event_rx = blockchain.read().notifier_as_stream(); + let proxy = consensus.proxy(); + let sync_event_rx = proxy.subscribe_sync_events(); + let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); let mempool = Arc::new(Mempool::new( Arc::clone(&blockchain), @@ -68,13 +70,13 @@ impl MempoolTask { let mempool_active = false; Self { - consensus: consensus.proxy(), + consensus: proxy, peers_in_live_sync, consensus_event_rx, blockchain_event_rx, network_event_rx, - syncer_event_rx, + sync_event_rx, mempool: Arc::clone(&mempool), mempool_active, @@ -109,17 +111,21 @@ impl MempoolTask { let peers = self.peers_in_live_sync.clone().into_iter().collect(); #[cfg(not(feature = "metrics"))] spawn({ + let consensus = self.consensus.clone(); async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. mempool.cleanup(); - mempool.start_executors(network, None, None, peers).await; + mempool + .start_executors(network, None, None, peers, consensus) + .await; } }); #[cfg(feature = "metrics")] spawn({ let mempool_monitor = self.mempool_monitor.clone(); let ctrl_mempool_monitor = self.control_mempool_monitor.clone(); + let consensus = self.consensus.clone(); async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. @@ -131,6 +137,7 @@ impl MempoolTask { Some(mempool_monitor), Some(ctrl_mempool_monitor), peers, + consensus, ) .await; } @@ -188,7 +195,7 @@ impl MempoolTask { } } - fn on_syncer_event(&mut self, event: SyncEvent) { + fn on_consensus_sync_event(&mut self, event: SyncEvent) { match event { SyncEvent::AddLiveSync(peer_id) => { self.peers_in_live_sync.insert(peer_id); @@ -210,9 +217,9 @@ impl Stream for MempoolTask { type Item = MempoolEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Process syncer updates. - while let Poll::Ready(Some(Ok(event))) = self.syncer_event_rx.poll_next_unpin(cx) { - self.on_syncer_event(event) + // Process consensus sync updates. + while let Poll::Ready(Some(Ok(event))) = self.sync_event_rx.poll_next_unpin(cx) { + self.on_consensus_sync_event(event) } // Process network updates. diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index 77c3aa1190..e687c72a57 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -12,6 +12,7 @@ use nimiq_account::ReservedBalance; use nimiq_block::Block; use nimiq_blockchain::{Blockchain, TransactionVerificationCache}; use nimiq_blockchain_interface::AbstractBlockchain; +use nimiq_consensus::ConsensusProxy; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::Address; use nimiq_network_interface::{ @@ -146,6 +147,7 @@ impl Mempool { monitor: Option, control_monitor: Option, mut peers: Vec, + consensus: ConsensusProxy, ) { let executor_handle = self.executor_handle.lock().await; let control_executor_handle = self.control_executor_handle.lock().await; @@ -161,8 +163,8 @@ impl Mempool { let regular_transactions_syncer = MempoolSyncer::new( peers.clone(), MempoolTransactionType::Regular, - Arc::clone(&network), Arc::clone(&self.blockchain), + consensus.clone(), Arc::clone(&self.state), ); @@ -185,8 +187,8 @@ impl Mempool { let control_transactions_syncer = MempoolSyncer::new( peers, MempoolTransactionType::Control, - Arc::clone(&network), Arc::clone(&self.blockchain), + consensus.clone(), Arc::clone(&self.state), ); diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 1d26def05b..95ed53a125 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -18,6 +18,7 @@ use messages::{ ResponseMempoolHashes, ResponseMempoolTransactions, }; use nimiq_blockchain::Blockchain; +use nimiq_consensus::{sync::syncer::SyncEvent, ConsensusProxy}; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_network_interface::{ network::Network, @@ -28,6 +29,7 @@ use nimiq_transaction::{historic_transaction::RawTransactionHash, Transaction}; use nimiq_utils::{spawn, stream::FuturesUnordered}; use parking_lot::RwLock; use tokio::time::Sleep; +use tokio_stream::wrappers::BroadcastStream; use crate::{executor::PubsubIdOrPeerId, mempool_state::MempoolState}; @@ -77,11 +79,14 @@ const MAX_HASHES_PER_REQUEST: usize = 500; const MAX_TOTAL_TRANSACTIONS: usize = 5000; const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes -/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have mempool +/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool pub(crate) struct MempoolSyncer { /// Timeout to gracefully shutdown the mempool syncer entirely shutdown_timer: Pin>, + /// Consensus sync event receiver + consensus_sync_event_rx: BroadcastStream::PeerId>>, + /// Blockchain reference blockchain: Arc>, @@ -118,15 +123,15 @@ impl MempoolSyncer { pub fn new( peers: Vec, transaction_type: MempoolTransactionType, - network: Arc, blockchain: Arc>, + consensus: ConsensusProxy, mempool_state: Arc>, ) -> Self { let hashes_requests = peers .iter() .map(|peer_id| { let peer_id = *peer_id; - let network = Arc::clone(&network); + let network = Arc::clone(&consensus.network); let transaction_type = transaction_type.to_owned(); async move { ( @@ -142,9 +147,10 @@ impl MempoolSyncer { Self { shutdown_timer: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), + consensus_sync_event_rx: consensus.subscribe_sync_events(), blockchain, hashes_requests, - network: Arc::clone(&network), + network: consensus.network, peers, mempool_state: Arc::clone(&mempool_state), unknown_hashes: HashMap::new(), @@ -332,6 +338,13 @@ impl Stream for MempoolSyncer { return Poll::Ready(None); } + // Then we check if peers got added to live sync in the mean time + while let Poll::Ready(Some(Ok(event))) = self.consensus_sync_event_rx.poll_next_unpin(cx) { + match event { + SyncEvent::AddLiveSync(peer_id) => self.add_peer(peer_id), + } + } + // Then we check our RequestMempoolHashes responses while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { match result { @@ -375,7 +388,7 @@ impl Stream for MempoolSyncer { continue; } - info!(num = %transactions.len(), "Pushed synced mempool transactions into the mempool"); + info!(num = %transactions.len(), "Synced mempool transactions"); self.transactions.extend(transactions); } Err(err) => { diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs index d3727c9144..85d2a68419 100644 --- a/mempool/src/sync/tests/mod.rs +++ b/mempool/src/sync/tests/mod.rs @@ -12,6 +12,7 @@ mod tests { use nimiq_network_mock::MockHub; use nimiq_test_log::test; use nimiq_test_utils::{ + consensus::consensus, test_rng::test_rng, test_transaction::{generate_transactions, TestAccount, TestTransaction}, }; @@ -70,6 +71,10 @@ mod tests { net1.dial_mock(&net2); let peer_ids = net1.get_peers(); + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + // Setup streams to respond to requests let hash_stream = net2.receive_requests::(); let txns_stream = net2.receive_requests::(); @@ -111,8 +116,8 @@ mod tests { let mut syncer = MempoolSyncer::new( peer_ids, MempoolTransactionType::Regular, - net1, Arc::clone(&blockchain), + proxy, Arc::clone(&state), ); @@ -186,6 +191,10 @@ mod tests { net1.dial_mock(&net2); let peer_ids = net1.get_peers(); + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + // Setup stream to respond to requests let hash_stream = net2.receive_requests::(); @@ -212,8 +221,8 @@ mod tests { let mut syncer = MempoolSyncer::new( peer_ids, MempoolTransactionType::Regular, - net1, Arc::clone(&blockchain), + proxy, Arc::clone(&state), ); @@ -260,12 +269,16 @@ mod tests { let mut hub = MockHub::default(); let net1 = Arc::new(hub.new_network()); + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + // Create a new mempool syncer with 0 peers to sync with let mut syncer = MempoolSyncer::new( vec![], MempoolTransactionType::Regular, - Arc::clone(&net1), Arc::clone(&blockchain), + proxy, Arc::clone(&state), ); @@ -306,7 +319,7 @@ mod tests { let _ = poll!(syncer.next()); sleep(Duration::from_millis(200)).await; - // we have received unknown hashes from the added peer + // We have received unknown hashes from the added peer let _ = poll!(syncer.next()); assert_eq!(syncer.unknown_hashes.len(), num_transactions); } From d97fd1f20b9ae58068b14a8f450a777a1cba673d Mon Sep 17 00:00:00 2001 From: Stefan Date: Tue, 10 Sep 2024 16:27:33 +0200 Subject: [PATCH 12/14] Mempool syncer: minor improvements - Only try to construct mempool transaction requests if new hashes are discovered - Check the mempool state first before checking the blockchain if a transaction is already known/included - Bound the amount of received hashes that will be processed per peer - Fix tests --- mempool/mempool-task/src/lib.rs | 11 ++++---- mempool/src/sync/mod.rs | 48 ++++++++++++++++++++------------- mempool/src/sync/tests/mod.rs | 4 +-- mempool/tests/mod.rs | 23 ++++++++++------ 4 files changed, 52 insertions(+), 34 deletions(-) diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index 273090d150..34fbe6193f 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -56,12 +56,7 @@ impl MempoolTask { ) -> Self { let consensus_event_rx = consensus.subscribe_events(); let network_event_rx = consensus.network.subscribe_events(); - let blockchain_event_rx = blockchain.read().notifier_as_stream(); - - let proxy = consensus.proxy(); - let sync_event_rx = proxy.subscribe_sync_events(); - let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); let mempool = Arc::new(Mempool::new( Arc::clone(&blockchain), mempool_config, @@ -69,6 +64,12 @@ impl MempoolTask { )); let mempool_active = false; + let blockchain_event_rx = blockchain.read().notifier_as_stream(); + + let proxy = consensus.proxy(); + let sync_event_rx = proxy.subscribe_sync_events(); + let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); + Self { consensus: proxy, peers_in_live_sync, diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 95ed53a125..fae20f959f 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -76,7 +76,7 @@ impl HashRequestStatus { } const MAX_HASHES_PER_REQUEST: usize = 500; -const MAX_TOTAL_TRANSACTIONS: usize = 5000; +const MAX_TOTAL_HASHES: usize = 25_000; const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool @@ -85,7 +85,7 @@ pub(crate) struct MempoolSyncer { shutdown_timer: Pin>, /// Consensus sync event receiver - consensus_sync_event_rx: BroadcastStream::PeerId>>, + consensus_sync_event_rx: BroadcastStream::PeerId>>, /// Blockchain reference blockchain: Arc>, @@ -161,30 +161,34 @@ impl MempoolSyncer { } /// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes - fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) { + fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) -> bool { let blockchain = self.blockchain.read(); let state = self.mempool_state.read(); debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes"); + let mut new_hashes_discovered = false; - hashes.into_iter().for_each(|hash| { + hashes.into_iter().take(MAX_TOTAL_HASHES).for_each(|hash| { // Perform some basic checks to reduce the amount of transactions we are going to request later - // TODO: what if I respond with MAX_TOTAL_TRANSACTIONS fake hashes - if self.unknown_hashes.len() < MAX_TOTAL_TRANSACTIONS - && !blockchain + if state.contains(&hash) + || blockchain .contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None) - && !state.contains(&hash) { - match self.unknown_hashes.entry(hash) { - Occupied(mut entry) => { - entry.get_mut().add_peer(peer_id); - } - Vacant(entry) => { - entry.insert(HashRequestStatus::new(vec![peer_id])); - } - }; + return; } - }) + + match self.unknown_hashes.entry(hash) { + Occupied(mut entry) => { + entry.get_mut().add_peer(peer_id); + } + Vacant(entry) => { + entry.insert(HashRequestStatus::new(vec![peer_id])); + new_hashes_discovered = true; + } + }; + }); + + new_hashes_discovered } /// Add peer to discover its mempool @@ -193,6 +197,7 @@ impl MempoolSyncer { return; } + debug!(%peer_id, "Peer added to mempool sync"); self.peers.push(peer_id); let network = Arc::clone(&self.network); let transaction_type = self.mempool_transaction_type.clone(); @@ -346,10 +351,13 @@ impl Stream for MempoolSyncer { } // Then we check our RequestMempoolHashes responses + let mut new_hashes_discovered = false; while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { match result { Ok(hashes) => { - self.push_unknown_hashes(hashes.hashes, peer_id); + if self.push_unknown_hashes(hashes.hashes, peer_id) { + new_hashes_discovered = true; + } } Err(err) => { error!(%err, %peer_id, "Failed to fetch mempool hashes"); @@ -358,7 +366,9 @@ impl Stream for MempoolSyncer { } // Then we construct our RequestMempoolTransactions requests and send them over the network to our peers - self.send_mempool_transactions_requests(); + if new_hashes_discovered { + self.send_mempool_transactions_requests(); + } // Then we check our RequestMempoolTransactions responses while let Poll::Ready(Some((peer_id, result))) = diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs index 85d2a68419..4cbf685bee 100644 --- a/mempool/src/sync/tests/mod.rs +++ b/mempool/src/sync/tests/mod.rs @@ -163,8 +163,8 @@ mod tests { // Load known hashes into the mempool state let mut handle = state.write(); - known_txns.iter().for_each(|txn| { - handle.regular_transactions.insert(&txn, TxPriority::Medium); + known_txns.into_iter().for_each(|txn| { + handle.regular_transactions.insert(txn, TxPriority::Medium); }); assert_eq!(handle.regular_transactions.len(), known_hashes.len()); drop(handle); diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 777e5ef75f..319eaf2308 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -872,10 +872,14 @@ async fn multiple_start_stop() { let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); + let mock_id = MockId::new(hub.new_address().into()); // Subscribe mempool with the mpsc stream created mempool @@ -892,7 +896,7 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx1 - .send((txn.clone(), mock_id1.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id1.clone()))) .await .unwrap(); } @@ -916,7 +920,7 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .expect_err("Send should fail, executor is stopped"); } @@ -941,10 +945,14 @@ async fn multiple_start_stop() { let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + mock_network.clone(), + ); + let mock_id = MockId::new(hub.new_address().into()); // Subscribe mempool with the mpsc stream created mempool @@ -959,7 +967,7 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .unwrap(); } @@ -1948,7 +1956,6 @@ async fn applies_total_tx_size_limits() { ..Default::default() }; let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); let mempool = Mempool::new(blockchain, mempool_config, Arc::clone(&mock_network)); From d480f232587e3bd7a080bbbcf488cc700ab9a04c Mon Sep 17 00:00:00 2001 From: Stefan Date: Thu, 7 Nov 2024 11:44:58 +0100 Subject: [PATCH 13/14] Mempool syncer: check peer for necessary service flag when adding it dynamically --- mempool/src/sync/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index fae20f959f..01e0035765 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -22,6 +22,7 @@ use nimiq_consensus::{sync::syncer::SyncEvent, ConsensusProxy}; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_network_interface::{ network::Network, + peer_info::Services, request::{request_handler, RequestError}, }; use nimiq_time::sleep_until; @@ -193,7 +194,11 @@ impl MempoolSyncer { /// Add peer to discover its mempool fn add_peer(&mut self, peer_id: N::PeerId) { - if self.peers.contains(&peer_id) { + if self.peers.contains(&peer_id) + || !self + .network + .peer_provides_services(peer_id, Services::MEMPOOL) + { return; } From 20da134c99a5226f4baa1a5d554e822dcaa423ca Mon Sep 17 00:00:00 2001 From: Stefan Date: Tue, 10 Dec 2024 16:49:11 +0100 Subject: [PATCH 14/14] Mempool syncer: fix merge conflicts --- mempool/src/sync/mod.rs | 3 +-- mempool/tests/mod.rs | 14 ++++++++------ test-utils/src/consensus.rs | 7 ++----- validator/src/proposal_buffer.rs | 6 +----- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs index 01e0035765..d72065d550 100644 --- a/mempool/src/sync/mod.rs +++ b/mempool/src/sync/mod.rs @@ -25,11 +25,10 @@ use nimiq_network_interface::{ peer_info::Services, request::{request_handler, RequestError}, }; -use nimiq_time::sleep_until; +use nimiq_time::{sleep_until, Sleep}; use nimiq_transaction::{historic_transaction::RawTransactionHash, Transaction}; use nimiq_utils::{spawn, stream::FuturesUnordered}; use parking_lot::RwLock; -use tokio::time::Sleep; use tokio_stream::wrappers::BroadcastStream; use crate::{executor::PubsubIdOrPeerId, mempool_state::MempoolState}; diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 319eaf2308..ec5f5b29d6 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -2051,14 +2051,15 @@ async fn assert_rebase_invalidates_mempool_tx(tx1: Transaction, tx2: Transaction let temp_producer2 = TemporaryBlockProducer::new(); assert_eq!(temp_producer2.push(block), Ok(PushResult::Extended)); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); // Create mempool and subscribe with a custom txn stream. let mempool = Mempool::new( Arc::clone(&temp_producer1.blockchain), MempoolConfig::default(), + Arc::clone(&mock_network), ); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); // Include tx1 in block1 let block1 = temp_producer1.next_block_with_txs(vec![], false, vec![tx1.clone()]); @@ -2088,14 +2089,15 @@ async fn assert_rebase_invalidates_mempool_tx(tx1: Transaction, tx2: Transaction async fn mempool_update_rebase_earlier_time() { let temp_producer1 = TemporaryBlockProducer::new(); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); // Create mempool and subscribe with a custom txn stream. let mempool = Mempool::new( Arc::clone(&temp_producer1.blockchain), MempoolConfig::default(), + Arc::clone(&mock_network), ); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); // Create vesting and redeeming txs let key_pair = ed25519_key_pair(ACCOUNT_SECRET_KEY); diff --git a/test-utils/src/consensus.rs b/test-utils/src/consensus.rs index 70f43d78e6..525c6a3643 100644 --- a/test-utils/src/consensus.rs +++ b/test-utils/src/consensus.rs @@ -2,10 +2,7 @@ use std::sync::Arc; use nimiq_blockchain::Blockchain; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; -use nimiq_consensus::{ - sync::syncer_proxy::SyncerProxy, BlsCache, Consensus, ConsensusEvent, ConsensusProxy, -}; +use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, BlsCache, Consensus}; use nimiq_network_interface::network::Network; use nimiq_zkp_component::ZKPComponent; use parking_lot::{Mutex, RwLock}; @@ -13,7 +10,7 @@ use parking_lot::{Mutex, RwLock}; use crate::test_network::TestNetwork; /// Given a blockchain and a network creates an instance of Consensus. -pub async fn consensus( +pub async fn consensus( blockchain: Arc>, net: Arc, ) -> Consensus { diff --git a/validator/src/proposal_buffer.rs b/validator/src/proposal_buffer.rs index cbfa7b5612..0a7709d288 100644 --- a/validator/src/proposal_buffer.rs +++ b/validator/src/proposal_buffer.rs @@ -563,11 +563,7 @@ mod test { use futures::{FutureExt, StreamExt}; use nimiq_block::MacroHeader; - use nimiq_blockchain::Blockchain; - use nimiq_blockchain_proxy::BlockchainProxy; - use nimiq_consensus::{ - sync::syncer_proxy::SyncerProxy, BlsCache, Consensus, ConsensusEvent, ConsensusProxy, - }; + use nimiq_consensus::{ConsensusEvent, ConsensusProxy}; use nimiq_keys::{KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey}; use nimiq_network_interface::network::Network as NetworkInterface; use nimiq_network_mock::{MockHub, MockNetwork};