diff --git a/Cargo.lock b/Cargo.lock index 67715bf423..bef1a168aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4389,6 +4389,7 @@ dependencies = [ "nimiq-blockchain", "nimiq-blockchain-interface", "nimiq-bls", + "nimiq-consensus", "nimiq-database", "nimiq-genesis", "nimiq-genesis-builder", diff --git a/consensus/src/consensus/consensus_proxy.rs b/consensus/src/consensus/consensus_proxy.rs index c8dcc20cd5..b76922867c 100644 --- a/consensus/src/consensus/consensus_proxy.rs +++ b/consensus/src/consensus/consensus_proxy.rs @@ -34,6 +34,7 @@ use crate::{ RequestBlocksProof, RequestSubscribeToAddress, RequestTransactionReceiptsByAddress, RequestTransactionsProof, ResponseBlocksProof, }, + sync::syncer::SyncEvent, ConsensusEvent, }; @@ -42,7 +43,8 @@ pub struct ConsensusProxy { pub network: Arc, pub(crate) established_flag: Arc, pub(crate) synced_validity_window_flag: Arc, - pub(crate) events: broadcast::Sender, + pub(crate) consensus_events: broadcast::Sender, + pub(crate) sync_events: broadcast::Sender>, pub(crate) request: mpsc::Sender>, } @@ -53,7 +55,8 @@ impl Clone for ConsensusProxy { network: Arc::clone(&self.network), established_flag: Arc::clone(&self.established_flag), synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag), - events: self.events.clone(), + consensus_events: self.consensus_events.clone(), + sync_events: self.sync_events.clone(), request: self.request.clone(), } } @@ -81,8 +84,12 @@ impl ConsensusProxy { && self.synced_validity_window_flag.load(Ordering::Acquire) } - pub fn subscribe_events(&self) -> BroadcastStream { - BroadcastStream::new(self.events.subscribe()) + pub fn subscribe_consensus_events(&self) -> BroadcastStream { + BroadcastStream::new(self.consensus_events.subscribe()) + } + + pub fn subscribe_sync_events(&self) -> BroadcastStream::PeerId>> { + BroadcastStream::new(self.sync_events.subscribe()) } /// Subscribe to remote address notification events diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 8d4fac8366..5723b6e777 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -34,7 +34,11 @@ use self::remote_event_dispatcher::RemoteEventDispatcher; use crate::{ consensus::head_requests::{HeadRequests, HeadRequestsResult}, messages::{RequestBlock, RequestHead, RequestMacroChain, RequestMissingBlocks}, - sync::{live::block_queue::BlockSource, syncer::LiveSyncPushEvent, syncer_proxy::SyncerProxy}, + sync::{ + live::block_queue::BlockSource, + syncer::{LiveSyncPushEvent, SyncEvent}, + syncer_proxy::SyncerProxy, + }, }; #[cfg(feature = "full")] use crate::{ @@ -128,7 +132,8 @@ pub struct Consensus { pub sync: SyncerProxy, - events: broadcast::Sender, + consensus_events: broadcast::Sender, + sync_events: broadcast::Sender>, established_flag: Arc, #[cfg(feature = "full")] last_batch_number: u32, @@ -212,7 +217,8 @@ impl Consensus { blockchain, network, sync: syncer, - events: broadcast::Sender::new(256), + consensus_events: broadcast::Sender::new(256), + sync_events: broadcast::Sender::new(256), established_flag, #[cfg(feature = "full")] last_batch_number: 0, @@ -295,7 +301,7 @@ impl Consensus { } pub fn subscribe_events(&self) -> BroadcastStream { - BroadcastStream::new(self.events.subscribe()) + BroadcastStream::new(self.consensus_events.subscribe()) } pub fn is_established(&self) -> bool { @@ -312,7 +318,8 @@ impl Consensus { network: Arc::clone(&self.network), established_flag: Arc::clone(&self.established_flag), synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag), - events: self.events.clone(), + consensus_events: self.consensus_events.clone(), + sync_events: self.sync_events.clone(), request: self.requests.0.clone(), } } @@ -328,7 +335,7 @@ impl Consensus { // We don't care if anyone is listening. let (synced_validity_window, _) = self.check_validity_window(); - self.events + self.consensus_events .send(ConsensusEvent::Established { synced_validity_window, }) @@ -543,7 +550,7 @@ impl Future for Consensus { // Check consensus established state on changes. if let Some(event) = self.check_established(None) { - self.events.send(event).ok(); + self.consensus_events.send(event).ok(); } // Poll any head requests if active. @@ -559,7 +566,7 @@ impl Future for Consensus { // Update established state using the result. if let Some(event) = self.check_established(Some(result)) { - self.events.send(event).ok(); + self.consensus_events.send(event).ok(); } } } diff --git a/consensus/src/sync/syncer.rs b/consensus/src/sync/syncer.rs index 16c42eadee..5544b7697a 100644 --- a/consensus/src/sync/syncer.rs +++ b/consensus/src/sync/syncer.rs @@ -16,6 +16,7 @@ use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent, Subsc use nimiq_primitives::policy::Policy; use nimiq_time::{interval, Interval}; use nimiq_utils::stream::FuturesUnordered; +use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender}; use crate::{ consensus::ResolveBlockRequest, messages::RequestHead, sync::live::block_queue::BlockSource, @@ -106,6 +107,11 @@ pub enum LiveSyncPeerEvent { Ahead(TPeerId), } +#[derive(Clone)] +pub enum SyncEvent { + AddLiveSync(TPeerId), +} + /// Syncer is the main synchronization object inside `Consensus` /// It has a reference to the main blockchain and network and has two dynamic /// trait objects: @@ -127,6 +133,9 @@ pub struct Syncer, L: LiveSync> { /// A proxy to the blockchain blockchain: BlockchainProxy, + /// Sending-half of a broadcast channel for publishing sync events + events: BroadcastSender>, + /// A reference to the network network: Arc, @@ -159,12 +168,15 @@ impl, L: LiveSync> Syncer { macro_sync: M, ) -> Syncer { let network_events = network.subscribe_events(); + let (tx, _rx) = broadcast(256); + Syncer { live_sync, macro_sync, blockchain, network, network_events, + events: tx, outdated_peers: Default::default(), incompatible_peers: Default::default(), check_interval: interval(Self::CHECK_INTERVAL), @@ -185,12 +197,17 @@ impl, L: LiveSync> Syncer { pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) { debug!(%peer_id, "Adding peer to live sync"); self.live_sync.add_peer(peer_id); + self.events.send(SyncEvent::AddLiveSync(peer_id)).ok(); } pub fn num_peers(&self) -> usize { self.live_sync.num_peers() } + pub fn broadcast_sender(&self) -> BroadcastSender::PeerId>> { + self.events.clone() + } + pub fn peers(&self) -> Vec { self.live_sync.peers() } diff --git a/consensus/src/sync/syncer_proxy.rs b/consensus/src/sync/syncer_proxy.rs index 9bc4f83e41..883aa15558 100644 --- a/consensus/src/sync/syncer_proxy.rs +++ b/consensus/src/sync/syncer_proxy.rs @@ -15,6 +15,7 @@ use nimiq_primitives::policy::Policy; use nimiq_zkp_component::zkp_component::ZKPComponentProxy; use parking_lot::Mutex; use pin_project::pin_project; +use tokio::sync::broadcast::Sender as BroadcastSender; #[cfg(feature = "full")] use crate::sync::{ @@ -30,7 +31,7 @@ use crate::{ queue::QueueConfig, BlockLiveSync, }, - syncer::{LiveSyncPushEvent, Syncer}, + syncer::{LiveSyncPushEvent, SyncEvent, Syncer}, }, BlsCache, }; @@ -227,6 +228,11 @@ impl SyncerProxy { gen_syncer_match!(self, accepted_block_announcements) } + /// Returns a broadcast sender for sync events + pub fn broadcast_sender(&self) -> BroadcastSender::PeerId>> { + gen_syncer_match!(self, broadcast_sender) + } + /// Returns whether the state sync has finished (or `true` if there is no state sync required) pub fn state_complete(&self) -> bool { gen_syncer_match!(self, state_complete) diff --git a/consensus/tests/sync_utils.rs b/consensus/tests/sync_utils.rs index d060ba8d20..7245ef30f9 100644 --- a/consensus/tests/sync_utils.rs +++ b/consensus/tests/sync_utils.rs @@ -202,7 +202,7 @@ pub async fn sync_two_peers( BlockchainEvent::Finalized(_) | BlockchainEvent::EpochFinalized(_) )) }); - let mut consensus_events = consensus2_proxy.subscribe_events(); + let mut consensus_events = consensus2_proxy.subscribe_consensus_events(); spawn(consensus2); for _ in 0..num_batches_live_sync { diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index f40599d891..7f5ed567d9 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -36,12 +36,14 @@ nimiq-account = { workspace = true } nimiq-block = { workspace = true } nimiq-blockchain = { workspace = true } nimiq-blockchain-interface = { workspace = true } +nimiq-consensus = { workspace = true } nimiq-database = { workspace = true } nimiq-hash = { workspace = true } nimiq-keys = { workspace = true } nimiq-network-interface = { workspace = true } nimiq-primitives = { workspace = true, features = ["coin", "networks"] } nimiq-serde = { workspace = true } +nimiq-time = { workspace = true } nimiq-transaction = { workspace = true } nimiq-utils = { workspace = true, features = ["spawn", "time"] } @@ -58,7 +60,6 @@ nimiq-genesis-builder = { workspace = true } nimiq-network-mock = { workspace = true } nimiq-test-log = { workspace = true } nimiq-test-utils = { workspace = true } -nimiq-time = { workspace = true } nimiq-transaction-builder = { workspace = true } nimiq-vrf = { workspace = true } diff --git a/mempool/mempool-task/src/lib.rs b/mempool/mempool-task/src/lib.rs index d866f7a88e..34fbe6193f 100644 --- a/mempool/mempool-task/src/lib.rs +++ b/mempool/mempool-task/src/lib.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -8,9 +9,9 @@ use futures::{stream::BoxStream, Future, Stream, StreamExt}; use log::{debug, warn}; use nimiq_blockchain::Blockchain; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; -use nimiq_consensus::{Consensus, ConsensusEvent, ConsensusProxy}; +use nimiq_consensus::{sync::syncer::SyncEvent, Consensus, ConsensusEvent, ConsensusProxy}; use nimiq_mempool::{config::MempoolConfig, mempool::Mempool}; -use nimiq_network_interface::network::Network; +use nimiq_network_interface::network::{Network, NetworkEvent, SubscribeEvents}; use nimiq_utils::spawn; use parking_lot::RwLock; #[cfg(feature = "metrics")] @@ -34,6 +35,10 @@ pub struct MempoolTask { consensus_event_rx: BroadcastStream, blockchain_event_rx: BoxStream<'static, BlockchainEvent>, + network_event_rx: SubscribeEvents, + sync_event_rx: BroadcastStream::PeerId>>, + + peers_in_live_sync: HashSet, pub mempool: Arc, mempool_active: bool, @@ -50,17 +55,29 @@ impl MempoolTask { mempool_config: MempoolConfig, ) -> Self { let consensus_event_rx = consensus.subscribe_events(); + let network_event_rx = consensus.network.subscribe_events(); - let mempool = Arc::new(Mempool::new(Arc::clone(&blockchain), mempool_config)); + let mempool = Arc::new(Mempool::new( + Arc::clone(&blockchain), + mempool_config, + Arc::clone(&consensus.network), + )); let mempool_active = false; let blockchain_event_rx = blockchain.read().notifier_as_stream(); + let proxy = consensus.proxy(); + let sync_event_rx = proxy.subscribe_sync_events(); + let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers()); + Self { - consensus: consensus.proxy(), + consensus: proxy, + peers_in_live_sync, consensus_event_rx, blockchain_event_rx, + network_event_rx, + sync_event_rx, mempool: Arc::clone(&mempool), mempool_active, @@ -92,26 +109,37 @@ impl MempoolTask { let mempool = Arc::clone(&self.mempool); let network = Arc::clone(&self.consensus.network); + let peers = self.peers_in_live_sync.clone().into_iter().collect(); #[cfg(not(feature = "metrics"))] spawn({ + let consensus = self.consensus.clone(); async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. mempool.cleanup(); - mempool.start_executors(network, None, None).await; + mempool + .start_executors(network, None, None, peers, consensus) + .await; } }); #[cfg(feature = "metrics")] spawn({ let mempool_monitor = self.mempool_monitor.clone(); let ctrl_mempool_monitor = self.control_mempool_monitor.clone(); + let consensus = self.consensus.clone(); async move { // The mempool is not updated while consensus is lost. // Thus, we need to check all transactions if they are still valid. mempool.cleanup(); mempool - .start_executors(network, Some(mempool_monitor), Some(ctrl_mempool_monitor)) + .start_executors( + network, + Some(mempool_monitor), + Some(ctrl_mempool_monitor), + peers, + consensus, + ) .await; } }); @@ -167,12 +195,39 @@ impl MempoolTask { } } } + + fn on_consensus_sync_event(&mut self, event: SyncEvent) { + match event { + SyncEvent::AddLiveSync(peer_id) => { + self.peers_in_live_sync.insert(peer_id); + } + } + } + + fn on_network_event(&mut self, event: NetworkEvent) { + match event { + NetworkEvent::PeerLeft(peer_id) => { + self.peers_in_live_sync.remove(&peer_id); + } + NetworkEvent::PeerJoined(_, _) | NetworkEvent::DhtReady => (), + } + } } impl Stream for MempoolTask { type Item = MempoolEvent; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Process consensus sync updates. + while let Poll::Ready(Some(Ok(event))) = self.sync_event_rx.poll_next_unpin(cx) { + self.on_consensus_sync_event(event) + } + + // Process network updates. + while let Poll::Ready(Some(Ok(event))) = self.network_event_rx.poll_next_unpin(cx) { + self.on_network_event(event) + } + // Process consensus updates. // Start mempool as soon as we have consensus and can enforce the validity window. // Stop the mempool if we lose consensus or cannot enforce the validity window. diff --git a/mempool/src/executor.rs b/mempool/src/executor.rs index 3b8f775a7e..5b64da1b8e 100644 --- a/mempool/src/executor.rs +++ b/mempool/src/executor.rs @@ -11,7 +11,7 @@ use std::{ use futures::{ready, stream::BoxStream, StreamExt}; use nimiq_blockchain::Blockchain; -use nimiq_network_interface::network::{MsgAcceptance, Network, Topic}; +use nimiq_network_interface::network::{CloseReason, MsgAcceptance, Network, Topic}; use nimiq_primitives::networks::NetworkId; use nimiq_transaction::Transaction; use nimiq_utils::spawn; @@ -26,6 +26,14 @@ use crate::{ const CONCURRENT_VERIF_TASKS: u32 = 10000; +/// Enum describing whether a network response directly comes from a peer or through Gossipsub +pub enum PubsubIdOrPeerId { + /// A network response coming directly from another peer + PeerId(N::PeerId), + /// A network response coming over Gossipsub + PubsubId(N::PubsubId), +} + pub(crate) struct MempoolExecutor { // Blockchain reference blockchain: Arc>, @@ -45,8 +53,8 @@ pub(crate) struct MempoolExecutor { // Network ID, used for tx verification network_id: NetworkId, - // Transaction stream that is used to listen to transactions from the network - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + // Transaction stream that is used to listen to transactions from the network and the Mempool Syncer + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, // Phantom data for the unused type T _phantom: PhantomData, @@ -58,7 +66,7 @@ impl MempoolExecutor { state: Arc>, filter: Arc>, network: Arc, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, verification_tasks: Arc, ) -> Self { Self { @@ -85,7 +93,9 @@ impl Future for MempoolExecutor { } } - while let Some((tx, pubsub_id)) = ready!(self.txn_stream.as_mut().poll_next_unpin(cx)) { + while let Some((tx, pubsub_or_peer_id)) = + ready!(self.txn_stream.as_mut().poll_next_unpin(cx)) + { let decrement = Decrementer(Arc::clone(&self.verification_tasks)); if self .verification_tasks @@ -113,16 +123,37 @@ impl Future for MempoolExecutor { TxPriority::Medium, ); - let acceptance = match verify_tx_ret { - Ok(_) => MsgAcceptance::Accept, - // Reject the message if signature verification fails or transaction is invalid - // for current validation window - Err(VerifyErr::InvalidTransaction(_)) => MsgAcceptance::Reject, - Err(VerifyErr::AlreadyIncluded) => MsgAcceptance::Reject, - Err(_) => MsgAcceptance::Ignore, - }; - - network.validate_message::(pubsub_id, acceptance); + match pubsub_or_peer_id { + PubsubIdOrPeerId::PeerId(peer_id) => match verify_tx_ret { + Ok(_) => (), + Err(VerifyErr::InvalidAccount(_)) + | Err(VerifyErr::InvalidBlockNumber) + | Err(VerifyErr::InvalidTransaction(_)) => { + if network.has_peer(peer_id) { + warn!( + %peer_id, + "Banning peer because it responded with an invalid mempool transaction while syncing" + ); + network + .disconnect_peer(peer_id, CloseReason::MaliciousPeer) + .await; + } + } + Err(_) => (), + }, + PubsubIdOrPeerId::PubsubId(pubsub_id) => { + let acceptance = match verify_tx_ret { + Ok(_) => MsgAcceptance::Accept, + // Reject the message if signature verification fails or transaction is invalid + // for current validation window + Err(VerifyErr::InvalidTransaction(_)) => MsgAcceptance::Reject, + Err(VerifyErr::AlreadyIncluded) => MsgAcceptance::Reject, + Err(_) => MsgAcceptance::Ignore, + }; + + network.validate_message::(pubsub_id, acceptance); + } + } drop(decrement); }); diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index 0bd27cc77b..c1dddea8c7 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -24,5 +24,7 @@ pub mod mempool; mod mempool_metrics; /// Mempool transaction module pub mod mempool_transactions; +/// Mempool syncer module +mod sync; /// Verify transaction module pub mod verify; diff --git a/mempool/src/mempool.rs b/mempool/src/mempool.rs index 4029e502b5..e687c72a57 100644 --- a/mempool/src/mempool.rs +++ b/mempool/src/mempool.rs @@ -6,15 +6,19 @@ use std::{ use futures::{ future::{AbortHandle, Abortable}, lock::{Mutex, MutexGuard}, - stream::{BoxStream, StreamExt}, + stream::{select, BoxStream, StreamExt}, }; use nimiq_account::ReservedBalance; use nimiq_block::Block; use nimiq_blockchain::{Blockchain, TransactionVerificationCache}; use nimiq_blockchain_interface::AbstractBlockchain; +use nimiq_consensus::ConsensusProxy; use nimiq_hash::{Blake2bHash, Hash}; use nimiq_keys::Address; -use nimiq_network_interface::network::{Network, Topic}; +use nimiq_network_interface::{ + network::{Network, Topic}, + peer_info::Services, +}; use nimiq_serde::Serialize; use nimiq_transaction::{ historic_transaction::RawTransactionHash, ControlTransactionTopic, Transaction, @@ -28,10 +32,11 @@ use tokio_metrics::TaskMonitor; use crate::mempool_metrics::MempoolMetrics; use crate::{ config::MempoolConfig, - executor::MempoolExecutor, + executor::{MempoolExecutor, PubsubIdOrPeerId}, filter::{MempoolFilter, MempoolRules}, mempool_state::{EvictionReason, MempoolState}, mempool_transactions::{MempoolTransactions, TxPriority}, + sync::{messages::MempoolTransactionType, MempoolSyncer}, verify::{verify_tx, VerifyErr}, }; @@ -64,12 +69,18 @@ impl Mempool { pub const DEFAULT_CONTROL_SIZE_LIMIT: usize = 6_000_000; /// Creates a new mempool - pub fn new(blockchain: Arc>, config: MempoolConfig) -> Self { + pub fn new( + blockchain: Arc>, + config: MempoolConfig, + network: Arc, + ) -> Self { let state = Arc::new(RwLock::new(MempoolState::new( config.size_limit, config.control_size_limit, ))); + MempoolSyncer::init_network_request_receivers(network, Arc::clone(&state)); + Self { blockchain, state: Arc::clone(&state), @@ -90,7 +101,7 @@ impl Mempool { network: Arc, monitor: Option, mut handle: MutexGuard<'_, Option>, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, ) { if handle.is_some() { // If we already have an executor running, don't do anything @@ -135,6 +146,8 @@ impl Mempool { network: Arc, monitor: Option, control_monitor: Option, + mut peers: Vec, + consensus: ConsensusProxy, ) { let executor_handle = self.executor_handle.lock().await; let control_executor_handle = self.control_executor_handle.lock().await; @@ -144,14 +157,39 @@ impl Mempool { return; } + info!("Initializing mempool syncers"); + peers.retain(|peer_id| network.peer_provides_services(*peer_id, Services::MEMPOOL)); + // Sync regular transactions with the mempool of other peers + let regular_transactions_syncer = MempoolSyncer::new( + peers.clone(), + MempoolTransactionType::Regular, + Arc::clone(&self.blockchain), + consensus.clone(), + Arc::clone(&self.state), + ); + // Subscribe to the network TX topic - let txn_stream = network.subscribe::().await.unwrap(); + let txn_stream = network + .subscribe::() + .await + .unwrap() + .map(|(tx, pubsub_id)| (tx, PubsubIdOrPeerId::PubsubId(pubsub_id))) + .boxed(); self.start_executor::( Arc::clone(&network), monitor, executor_handle, - txn_stream, + select(regular_transactions_syncer, txn_stream).boxed(), + ); + + // Sync control transactions with the mempool of other peers + let control_transactions_syncer = MempoolSyncer::new( + peers, + MempoolTransactionType::Control, + Arc::clone(&self.blockchain), + consensus.clone(), + Arc::clone(&self.state), ); // Subscribe to the control transaction topic @@ -159,14 +197,14 @@ impl Mempool { .subscribe::() .await .unwrap() - .map(|(tx, pubsub_id)| (Transaction::from(tx), pubsub_id)) + .map(|(tx, pubsub_id)| (Transaction::from(tx), PubsubIdOrPeerId::PubsubId(pubsub_id))) .boxed(); self.start_executor::( network, control_monitor, control_executor_handle, - txn_stream, + select(control_transactions_syncer, txn_stream).boxed(), ); } @@ -177,7 +215,7 @@ impl Mempool { /// stream instead. pub async fn start_executor_with_txn_stream( &self, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, network: Arc, ) { self.start_executor::( @@ -195,7 +233,7 @@ impl Mempool { /// stream instead. pub async fn start_control_executor_with_txn_stream( &self, - txn_stream: BoxStream<'static, (Transaction, ::PubsubId)>, + txn_stream: BoxStream<'static, (Transaction, PubsubIdOrPeerId)>, network: Arc, ) { self.start_executor::( diff --git a/mempool/src/sync/messages.rs b/mempool/src/sync/messages.rs new file mode 100644 index 0000000000..cdaeaa08bb --- /dev/null +++ b/mempool/src/sync/messages.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use nimiq_hash::Blake2bHash; +use nimiq_network_interface::{ + network::Network, + request::{Handle, RequestCommon, RequestMarker}, +}; +use nimiq_serde::{Deserialize, Serialize}; +use nimiq_transaction::Transaction; +use parking_lot::RwLock; + +use crate::mempool_state::MempoolState; + +const MAX_REQUEST_RESPONSE_MEMPOOL_STATE: u32 = 1000; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum MempoolTransactionType { + Control, + Regular, +} + +/// Request the current transaction hashes in the mempool. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RequestMempoolHashes { + pub transaction_type: MempoolTransactionType, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ResponseMempoolHashes { + pub hashes: Vec, +} + +impl RequestCommon for RequestMempoolHashes { + type Kind = RequestMarker; + const TYPE_ID: u16 = 219; + type Response = ResponseMempoolHashes; + const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_MEMPOOL_STATE; +} + +/// Request transactions in the mempool based on the provided hashes. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct RequestMempoolTransactions { + pub hashes: Vec, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ResponseMempoolTransactions { + pub transactions: Vec, +} + +impl RequestCommon for RequestMempoolTransactions { + type Kind = RequestMarker; + const TYPE_ID: u16 = 220; + type Response = ResponseMempoolTransactions; + const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_MEMPOOL_STATE; +} + +impl Handle>> for RequestMempoolHashes { + fn handle(&self, _: N::PeerId, context: &Arc>) -> ResponseMempoolHashes { + let hashes: Vec = match self.transaction_type { + MempoolTransactionType::Regular => context + .read() + .regular_transactions + .best_transactions + .iter() + .map(|txn| txn.0.clone()) + .collect(), + MempoolTransactionType::Control => context + .read() + .control_transactions + .best_transactions + .iter() + .map(|txn| txn.0.clone()) + .collect(), + }; + + ResponseMempoolHashes { hashes } + } +} + +impl Handle>> for RequestMempoolTransactions { + fn handle( + &self, + _: N::PeerId, + context: &Arc>, + ) -> ResponseMempoolTransactions { + let mut transactions = Vec::with_capacity(self.hashes.len()); + let state = context.read(); + self.hashes.iter().for_each(|hash| { + if let Some(txn) = state.get(hash) { + transactions.push(txn.to_owned()); + } + }); + + ResponseMempoolTransactions { transactions } + } +} diff --git a/mempool/src/sync/mod.rs b/mempool/src/sync/mod.rs new file mode 100644 index 0000000000..d72065d550 --- /dev/null +++ b/mempool/src/sync/mod.rs @@ -0,0 +1,421 @@ +pub(crate) mod messages; +mod tests; + +use std::{ + collections::{ + hash_map::Entry::{Occupied, Vacant}, + HashMap, VecDeque, + }, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; +use messages::{ + MempoolTransactionType, RequestMempoolHashes, RequestMempoolTransactions, + ResponseMempoolHashes, ResponseMempoolTransactions, +}; +use nimiq_blockchain::Blockchain; +use nimiq_consensus::{sync::syncer::SyncEvent, ConsensusProxy}; +use nimiq_hash::{Blake2bHash, Hash}; +use nimiq_network_interface::{ + network::Network, + peer_info::Services, + request::{request_handler, RequestError}, +}; +use nimiq_time::{sleep_until, Sleep}; +use nimiq_transaction::{historic_transaction::RawTransactionHash, Transaction}; +use nimiq_utils::{spawn, stream::FuturesUnordered}; +use parking_lot::RwLock; +use tokio_stream::wrappers::BroadcastStream; + +use crate::{executor::PubsubIdOrPeerId, mempool_state::MempoolState}; + +enum RequestStatus { + Pending, + Requested, + Received, +} + +/// Struct for keeping track which peers have a specific hash in their mempool and +/// store if we have a requested the corresponding transaction already to one of those peers. +struct HashRequestStatus { + status: RequestStatus, + peer_ids: Vec, +} + +impl HashRequestStatus { + pub fn new(peer_ids: Vec) -> Self { + Self { + status: RequestStatus::Pending, + peer_ids, + } + } + + pub fn add_peer(&mut self, peer_id: N::PeerId) { + self.peer_ids.push(peer_id); + } + + pub fn has_peer(&self, peer_id: &N::PeerId) -> bool { + self.peer_ids.contains(peer_id) + } + + pub fn is_requested(&self) -> bool { + !matches!(self.status, RequestStatus::Pending) + } + + pub fn mark_as_requested(&mut self) { + self.status = RequestStatus::Requested; + } + + pub fn mark_as_received(&mut self) { + self.status = RequestStatus::Received + } +} + +const MAX_HASHES_PER_REQUEST: usize = 500; +const MAX_TOTAL_HASHES: usize = 25_000; +const SHUTDOWN_TIMEOUT_DURATION: Duration = Duration::from_secs(10 * 60); // 10 minutes + +/// Struct responsible for discovering hashes and retrieving transactions from the mempool of other nodes that have a mempool +pub(crate) struct MempoolSyncer { + /// Timeout to gracefully shutdown the mempool syncer entirely + shutdown_timer: Pin>, + + /// Consensus sync event receiver + consensus_sync_event_rx: BroadcastStream::PeerId>>, + + /// Blockchain reference + blockchain: Arc>, + + /// Requests to other peers for fetching transaction hashes that currently are in their mempool + hashes_requests: FuturesUnordered< + BoxFuture<'static, (N::PeerId, Result)>, + >, + + /// Reference to the network in order to send requests + network: Arc, + + /// Peers with a mempool we reach out for to discover and retrieve their mempool hashes and transactions + peers: Vec, + + /// The mempool state: the data structure where the transactions are stored locally + mempool_state: Arc>, + + /// Retrieved transactions that are ready to get verified and pushed into our local mempool + transactions: VecDeque<(Transaction, N::PeerId)>, + + /// Requests to other peers for fetching transactions by their hashes + transactions_requests: FuturesUnordered< + BoxFuture<'static, (N::PeerId, Result)>, + >, + + /// Collection of transaction hashes not present in the local mempool + unknown_hashes: HashMap>, + + /// The type of mempool transactions requested to other peers + mempool_transaction_type: MempoolTransactionType, +} + +impl MempoolSyncer { + pub fn new( + peers: Vec, + transaction_type: MempoolTransactionType, + blockchain: Arc>, + consensus: ConsensusProxy, + mempool_state: Arc>, + ) -> Self { + let hashes_requests = peers + .iter() + .map(|peer_id| { + let peer_id = *peer_id; + let network = Arc::clone(&consensus.network); + let transaction_type = transaction_type.to_owned(); + async move { + ( + peer_id, + Self::request_mempool_hashes(network, peer_id, transaction_type).await, + ) + } + .boxed() + }) + .collect(); + + debug!(num_peers = %peers.len(), ?transaction_type, "Fetching mempool hashes from peers"); + + Self { + shutdown_timer: Box::pin(sleep_until(Instant::now() + SHUTDOWN_TIMEOUT_DURATION)), + consensus_sync_event_rx: consensus.subscribe_sync_events(), + blockchain, + hashes_requests, + network: consensus.network, + peers, + mempool_state: Arc::clone(&mempool_state), + unknown_hashes: HashMap::new(), + transactions: VecDeque::new(), + transactions_requests: FuturesUnordered::new(), + mempool_transaction_type: transaction_type, + } + } + + /// Push newly discovered hashes into the `unknown_hashes` and keep track which peers have those hashes + fn push_unknown_hashes(&mut self, hashes: Vec, peer_id: N::PeerId) -> bool { + let blockchain = self.blockchain.read(); + let state = self.mempool_state.read(); + + debug!(peer_id = %peer_id, num = %hashes.len(), "Received unknown mempool hashes"); + let mut new_hashes_discovered = false; + + hashes.into_iter().take(MAX_TOTAL_HASHES).for_each(|hash| { + // Perform some basic checks to reduce the amount of transactions we are going to request later + if state.contains(&hash) + || blockchain + .contains_tx_in_validity_window(&RawTransactionHash::from((hash).clone()), None) + { + return; + } + + match self.unknown_hashes.entry(hash) { + Occupied(mut entry) => { + entry.get_mut().add_peer(peer_id); + } + Vacant(entry) => { + entry.insert(HashRequestStatus::new(vec![peer_id])); + new_hashes_discovered = true; + } + }; + }); + + new_hashes_discovered + } + + /// Add peer to discover its mempool + fn add_peer(&mut self, peer_id: N::PeerId) { + if self.peers.contains(&peer_id) + || !self + .network + .peer_provides_services(peer_id, Services::MEMPOOL) + { + return; + } + + debug!(%peer_id, "Peer added to mempool sync"); + self.peers.push(peer_id); + let network = Arc::clone(&self.network); + let transaction_type = self.mempool_transaction_type.clone(); + let request = async move { + ( + peer_id, + Self::request_mempool_hashes(network, peer_id, transaction_type).await, + ) + } + .boxed(); + + self.hashes_requests.push(request); + } + + /// Create a batch of transaction hashes that haven't yet been requested + fn batch_hashes_by_peer_id( + &mut self, + peer_id: N::PeerId, + ) -> Option<(RequestMempoolTransactions, N::PeerId)> { + let hashes: Vec = self + .unknown_hashes + .iter_mut() + .filter(|(_, request_status)| { + matches!(request_status.status, RequestStatus::Pending) + && request_status.has_peer(&peer_id) + }) + .take(MAX_HASHES_PER_REQUEST) + .map(|(hash, request_status)| { + request_status.mark_as_requested(); + hash.to_owned() + }) + .collect(); + + if hashes.is_empty() { + // This peer has no interesting transactions for us + return None; + } + + debug!(peer_id = %peer_id, num = %hashes.len(), "Fetching mempool transactions from peer"); + + Some((RequestMempoolTransactions { hashes }, peer_id)) + } + + /// Spawn request handlers in order to process network responses + pub fn init_network_request_receivers( + network: Arc, + mempool_state: Arc>, + ) { + // Spawn the request handler for RequestMempoolHashes responses as a task + let fut = request_handler( + &network, + network.receive_requests::(), + &mempool_state, + ) + .boxed(); + spawn(fut); + + // Spawn the request handler for RequestMempoolTransactions responses as a task + let fut = request_handler( + &network, + network.receive_requests::(), + &mempool_state, + ) + .boxed(); + spawn(fut); + } + + /// While there still are unknown transaction hashes which are not part of a request, generate requests and send them to other peers + fn send_mempool_transactions_requests(&mut self) { + let mut prepared_requests = vec![]; + + while self + .unknown_hashes + .iter() + .any(|(_, request_status)| !request_status.is_requested()) + { + for i in 0..self.peers.len() { + let peer = self.peers[i]; + if let Some(request) = self.batch_hashes_by_peer_id(peer) { + prepared_requests.push(request); + } + } + } + + let requests = prepared_requests.into_iter().map(|request| { + let peer_id = request.1; + let network = Arc::clone(&self.network); + async move { + ( + peer_id, + Self::request_mempool_transactions( + network, + peer_id, + request.0.hashes.to_owned(), + ) + .await, + ) + } + .boxed() + }); + + self.transactions_requests.extend(requests); + } + + /// Network request for retrieving mempool hashes from other peers + async fn request_mempool_hashes( + network: Arc, + peer_id: N::PeerId, + transaction_type: MempoolTransactionType, + ) -> Result { + network + .request::(RequestMempoolHashes { transaction_type }, peer_id) + .await + } + + /// Network request for retrieving mempool transactions from other peers through a list of provided hashes + async fn request_mempool_transactions( + network: Arc, + peer_id: N::PeerId, + hashes: Vec, + ) -> Result { + network + .request::(RequestMempoolTransactions { hashes }, peer_id) + .await + } +} + +impl Stream for MempoolSyncer { + type Item = (Transaction, PubsubIdOrPeerId); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // First we check if we have a result we can yield + if let Some((transaction, peer_id)) = self.transactions.pop_front() { + return Poll::Ready(Some((transaction, PubsubIdOrPeerId::PeerId(peer_id)))); + } + + // Then we check if we should shutdown ourself + if self.shutdown_timer.poll_unpin(cx).is_ready() { + info!( + syncer_type = ?self.mempool_transaction_type, + "Shutdown mempool syncer" + ); + return Poll::Ready(None); + } + + // Then we check if peers got added to live sync in the mean time + while let Poll::Ready(Some(Ok(event))) = self.consensus_sync_event_rx.poll_next_unpin(cx) { + match event { + SyncEvent::AddLiveSync(peer_id) => self.add_peer(peer_id), + } + } + + // Then we check our RequestMempoolHashes responses + let mut new_hashes_discovered = false; + while let Poll::Ready(Some((peer_id, result))) = self.hashes_requests.poll_next_unpin(cx) { + match result { + Ok(hashes) => { + if self.push_unknown_hashes(hashes.hashes, peer_id) { + new_hashes_discovered = true; + } + } + Err(err) => { + error!(%err, %peer_id, "Failed to fetch mempool hashes"); + } + } + } + + // Then we construct our RequestMempoolTransactions requests and send them over the network to our peers + if new_hashes_discovered { + self.send_mempool_transactions_requests(); + } + + // Then we check our RequestMempoolTransactions responses + while let Poll::Ready(Some((peer_id, result))) = + self.transactions_requests.poll_next_unpin(cx) + { + match result { + Ok(transactions) => { + let transactions: Vec<(Transaction, N::PeerId)> = transactions + .transactions + .into_iter() + .filter_map(|txn| { + // Filter out transactions we didn't ask and mark the ones we did ask for as received + match self.unknown_hashes.entry(txn.hash()) { + Occupied(mut entry) => match entry.get().status { + RequestStatus::Requested => { + entry.get_mut().mark_as_received(); + Some((txn, peer_id)) + } + RequestStatus::Pending | RequestStatus::Received => None, + }, + Vacant(_) => None, + } + }) + .collect(); + + if transactions.is_empty() { + continue; + } + + info!(num = %transactions.len(), "Synced mempool transactions"); + self.transactions.extend(transactions); + } + Err(err) => { + error!(%err, %peer_id, "Failed to fetch mempool transactions"); + } + } + } + + // By now it could be that we have some results we can yield, so we try again + if let Some((transaction, peer_id)) = self.transactions.pop_front() { + return Poll::Ready(Some((transaction, PubsubIdOrPeerId::PeerId(peer_id)))); + } + + Poll::Pending + } +} diff --git a/mempool/src/sync/tests/mod.rs b/mempool/src/sync/tests/mod.rs new file mode 100644 index 0000000000..4cbf685bee --- /dev/null +++ b/mempool/src/sync/tests/mod.rs @@ -0,0 +1,356 @@ +#[cfg(test)] +mod tests { + use std::{sync::Arc, task::Poll, time::Duration}; + + use futures::{poll, StreamExt}; + use nimiq_blockchain::{Blockchain, BlockchainConfig}; + use nimiq_database::mdbx::MdbxDatabase; + use nimiq_genesis::NetworkId; + use nimiq_hash::{Blake2bHash, Hash}; + use nimiq_keys::{Address, KeyPair, SecureGenerate}; + use nimiq_network_interface::network::Network; + use nimiq_network_mock::MockHub; + use nimiq_test_log::test; + use nimiq_test_utils::{ + consensus::consensus, + test_rng::test_rng, + test_transaction::{generate_transactions, TestAccount, TestTransaction}, + }; + use nimiq_time::sleep; + use nimiq_transaction::Transaction; + use nimiq_utils::{spawn, time::OffsetTime}; + use parking_lot::RwLock; + use rand::rngs::StdRng; + + use crate::{ + mempool_state::MempoolState, + mempool_transactions::TxPriority, + sync::{ + messages::{ + RequestMempoolHashes, RequestMempoolTransactions, ResponseMempoolHashes, + ResponseMempoolTransactions, + }, + MempoolSyncer, MempoolTransactionType, MAX_HASHES_PER_REQUEST, + }, + }; + + #[test(tokio::test)] + async fn it_can_discover_mempool_hashes_and_get_transactions_from_other_peers() { + // Generate test transactions + let num_transactions = MAX_HASHES_PER_REQUEST + 10; + let mut rng = test_rng(true); + let mempool_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (txns, _): (Vec, usize) = + generate_transactions(mempool_transactions, true); + let hashes: Vec = txns.iter().map(|txn| txn.hash()).collect(); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + let net2 = Arc::new(hub.new_network()); + + // Dial peer + net1.dial_mock(&net2); + let peer_ids = net1.get_peers(); + + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + + // Setup streams to respond to requests + let hash_stream = net2.receive_requests::(); + let txns_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + let net2_clone = Arc::clone(&net2); + let txns_listener_future = txns_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = ResponseMempoolTransactions { + transactions: txns.clone(), + }; + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responders + spawn(hashes_listener_future); + spawn(txns_listener_future); + + // Create a new mempool syncer + let mut syncer = MempoolSyncer::new( + peer_ids, + MempoolTransactionType::Regular, + Arc::clone(&blockchain), + proxy, + Arc::clone(&state), + ); + + // Poll to send out requests to peers + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // Poll to check request responses + let _ = poll!(syncer.next()); + + // Verify that all the hashes of the test transactions we generated, have been received by the syncer + hashes.iter().for_each(|hash| { + assert!(syncer.unknown_hashes.contains_key(hash)); + }); + assert_eq!(syncer.unknown_hashes.len(), num_transactions); + + // Poll to request the transactions and we should have at least 1 transaction by now + sleep(Duration::from_millis(200)).await; + match poll!(syncer.next()) { + Poll::Ready(txn) => assert!(txn.is_some()), + Poll::Pending => unreachable!(), + } + + // Verify that we received all the transactions we have asked for + // num_transactions - 1 here because we pop_front immediately on a Poll::Ready() + assert_eq!(syncer.transactions.len(), num_transactions - 1); + } + + #[test(tokio::test)] + async fn it_ignores_transactions_we_already_have_locally() { + // Generate test transactions + let num_transactions = 10; + let mut rng = test_rng(true); + let known_test_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (known_txns, _): (Vec, usize) = + generate_transactions(known_test_transactions, true); + let known_hashes: Vec = known_txns.iter().map(|txn| txn.hash()).collect(); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Load known hashes into the mempool state + let mut handle = state.write(); + known_txns.into_iter().for_each(|txn| { + handle.regular_transactions.insert(txn, TxPriority::Medium); + }); + assert_eq!(handle.regular_transactions.len(), known_hashes.len()); + drop(handle); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + let net2 = Arc::new(hub.new_network()); + + // Dial peer + net1.dial_mock(&net2); + let peer_ids = net1.get_peers(); + + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + + // Setup stream to respond to requests + let hash_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: known_hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responder + spawn(hashes_listener_future); + + // Create a new mempool syncer + let mut syncer = MempoolSyncer::new( + peer_ids, + MempoolTransactionType::Regular, + Arc::clone(&blockchain), + proxy, + Arc::clone(&state), + ); + + // Poll to send out requests to peers + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // Poll to check request responses + let _ = poll!(syncer.next()); + + // All the hashes we received are already in the mempool state so should be skipped + assert!(syncer.unknown_hashes.is_empty()); + } + + #[test(tokio::test)] + async fn it_can_dynamically_add_a_new_peer() { + // Generate test transactions + let num_transactions = 10; + let mut rng = test_rng(true); + let mempool_transactions = generate_test_transactions(num_transactions, &mut rng); + + // Turn test transactions into transactions + let (txns, _): (Vec, usize) = + generate_transactions(mempool_transactions, true); + let hashes: Vec = txns.iter().map(|txn| txn.hash()).collect(); + + // Create an empty blockchain + let time = Arc::new(OffsetTime::new()); + let env = MdbxDatabase::new_volatile(Default::default()).unwrap(); + let blockchain = Arc::new(RwLock::new( + Blockchain::new( + env, + BlockchainConfig::default(), + NetworkId::UnitAlbatross, + time, + ) + .unwrap(), + )); + + // Setup empty Mempool State + let state = Arc::new(RwLock::new(MempoolState::new(100, 100))); + + // Setup network + let mut hub = MockHub::default(); + let net1 = Arc::new(hub.new_network()); + + // Setup consensus proxy + let consensus = consensus(Arc::clone(&blockchain), Arc::clone(&net1)).await; + let proxy = consensus.proxy(); + + // Create a new mempool syncer with 0 peers to sync with + let mut syncer = MempoolSyncer::new( + vec![], + MempoolTransactionType::Regular, + Arc::clone(&blockchain), + proxy, + Arc::clone(&state), + ); + + // Poll to send out requests to peers and verify nothing has been received + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + assert!(syncer.unknown_hashes.is_empty()); + + // Add new peer and dial it + let net2 = Arc::new(hub.new_network()); + net1.dial_mock(&net2); + + // Setup stream to respond to requests + let hash_stream = net2.receive_requests::(); + + let hashes_repsonse = ResponseMempoolHashes { + hashes: hashes.clone(), + }; + + let net2_clone = Arc::clone(&net2); + let hashes_listener_future = + hash_stream.for_each(move |(_request, request_id, _peer_id)| { + let test_response = hashes_repsonse.clone(); + let net2 = Arc::clone(&net2_clone); + async move { + net2.respond::(request_id, test_response.clone()) + .await + .unwrap(); + } + }); + + // Spawn the request responder + spawn(hashes_listener_future); + + // Dynamically add new peer to mempool syncer + syncer.add_peer(net2.peer_id()); + + let _ = poll!(syncer.next()); + sleep(Duration::from_millis(200)).await; + + // We have received unknown hashes from the added peer + let _ = poll!(syncer.next()); + assert_eq!(syncer.unknown_hashes.len(), num_transactions); + } + + fn generate_test_transactions(num: usize, mut rng: &mut StdRng) -> Vec { + let mut mempool_transactions = vec![]; + + for _ in 0..num { + let kp1 = KeyPair::generate(&mut rng); + let addr1 = Address::from(&kp1.public); + let account1 = TestAccount { + address: addr1, + keypair: kp1, + }; + + let kp2 = KeyPair::generate(&mut rng); + let addr2 = Address::from(&kp2.public); + let account2 = TestAccount { + address: addr2, + keypair: kp2, + }; + + let mempool_transaction = TestTransaction { + fee: 0, + value: 10, + recipient: account1, + sender: account2, + }; + mempool_transactions.push(mempool_transaction); + } + + mempool_transactions + } +} diff --git a/mempool/tests/mod.rs b/mempool/tests/mod.rs index 693dda6264..ec5f5b29d6 100644 --- a/mempool/tests/mod.rs +++ b/mempool/tests/mod.rs @@ -11,7 +11,10 @@ use nimiq_keys::{ Address, Ed25519PublicKey as SchnorrPublicKey, KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey, SecureGenerate, }; -use nimiq_mempool::{config::MempoolConfig, mempool::Mempool, mempool_transactions::TxPriority}; +use nimiq_mempool::{ + config::MempoolConfig, executor::PubsubIdOrPeerId, mempool::Mempool, + mempool_transactions::TxPriority, +}; use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId}; use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy}; use nimiq_serde::{Deserialize, Serialize}; @@ -57,10 +60,14 @@ async fn send_get_mempool_txns( txn_len: usize, ) -> (Vec, usize) { // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); send_txn_to_mempool(&mempool, mock_network, mock_id, transactions).await; @@ -89,7 +96,7 @@ async fn send_txn_to_mempool( tokio::task::spawn(async move { for txn in transactions { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .unwrap(); } @@ -121,7 +128,7 @@ async fn send_control_txn_to_mempool( tokio::task::spawn(async move { for txn in transactions { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .unwrap(); } @@ -865,10 +872,14 @@ async fn multiple_start_stop() { let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); + let mock_id = MockId::new(hub.new_address().into()); // Subscribe mempool with the mpsc stream created mempool @@ -885,7 +896,7 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx1 - .send((txn.clone(), mock_id1.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id1.clone()))) .await .unwrap(); } @@ -909,7 +920,7 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .expect_err("Send should fail, executor is stopped"); } @@ -934,10 +945,14 @@ async fn multiple_start_stop() { let (txn_stream_tx, txn_stream_rx) = mpsc::channel(64); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + mock_network.clone(), + ); + let mock_id = MockId::new(hub.new_address().into()); // Subscribe mempool with the mpsc stream created mempool @@ -952,7 +967,7 @@ async fn multiple_start_stop() { tokio::task::spawn(async move { for txn in txns { txn_stream_tx - .send((txn.clone(), mock_id.clone())) + .send((txn.clone(), PubsubIdOrPeerId::PubsubId(mock_id.clone()))) .await .unwrap(); } @@ -1119,10 +1134,14 @@ async fn mempool_update() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(Arc::clone(&blockchain), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + Arc::clone(&blockchain), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1247,10 +1266,14 @@ async fn mempool_update_aged_transaction() { )); // Create mempool and subscribe with a custom txn stream - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1398,10 +1421,14 @@ async fn mempool_update_not_enough_balance() { )); // Create mempool and subscribe with a custom txn stream - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1560,10 +1587,14 @@ async fn mempool_update_pruned_account() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_txn_to_mempool(&mempool, mock_network, mock_id, txns).await; @@ -1661,10 +1692,14 @@ async fn mempool_basic_prioritization_control_tx() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // Send txns to mempool send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns.clone()).await; @@ -1770,10 +1805,14 @@ async fn mempool_regular_and_control_tx() { )); // Create mempool and subscribe with a custom txn stream. - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // This is the transaction produced in the block let control_tx = TransactionBuilder::new_create_staker( @@ -1916,7 +1955,9 @@ async fn applies_total_tx_size_limits() { size_limit: txns_len - (1 + txns[1].serialized_size()), ..Default::default() }; - let mempool = Mempool::new(blockchain, mempool_config); + let mut hub = MockHub::new(); + let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new(blockchain, mempool_config, Arc::clone(&mock_network)); // The worst transaction is the second transaction with the lowest fee. let worst_tx = txns[1].hash::(); @@ -2010,14 +2051,15 @@ async fn assert_rebase_invalidates_mempool_tx(tx1: Transaction, tx2: Transaction let temp_producer2 = TemporaryBlockProducer::new(); assert_eq!(temp_producer2.push(block), Ok(PushResult::Extended)); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); // Create mempool and subscribe with a custom txn stream. let mempool = Mempool::new( Arc::clone(&temp_producer1.blockchain), MempoolConfig::default(), + Arc::clone(&mock_network), ); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); // Include tx1 in block1 let block1 = temp_producer1.next_block_with_txs(vec![], false, vec![tx1.clone()]); @@ -2047,14 +2089,15 @@ async fn assert_rebase_invalidates_mempool_tx(tx1: Transaction, tx2: Transaction async fn mempool_update_rebase_earlier_time() { let temp_producer1 = TemporaryBlockProducer::new(); + let mut hub = MockHub::new(); + let mock_id = MockId::new(hub.new_address().into()); + let mock_network = Arc::new(hub.new_network()); // Create mempool and subscribe with a custom txn stream. let mempool = Mempool::new( Arc::clone(&temp_producer1.blockchain), MempoolConfig::default(), + Arc::clone(&mock_network), ); - let mut hub = MockHub::new(); - let mock_id = MockId::new(hub.new_address().into()); - let mock_network = Arc::new(hub.new_network()); // Create vesting and redeeming txs let key_pair = ed25519_key_pair(ACCOUNT_SECRET_KEY); @@ -2129,10 +2172,14 @@ async fn it_can_reject_invalid_vesting_contract_transaction() { let producer = BlockProducer::new(signing_key(), voting_key()); // Create mempool and subscribe with a custom txn stream - let mempool = Mempool::new(blockchain.clone(), MempoolConfig::default()); let mut hub = MockHub::new(); let mock_id = MockId::new(hub.new_address().into()); let mock_network = Arc::new(hub.new_network()); + let mempool = Mempool::new( + blockchain.clone(), + MempoolConfig::default(), + Arc::clone(&mock_network), + ); // #1.0: Micro block that creates a vesting contract let bc = blockchain.upgradable_read(); diff --git a/network-interface/src/peer_info.rs b/network-interface/src/peer_info.rs index c384a709ed..b16f509562 100644 --- a/network-interface/src/peer_info.rs +++ b/network-interface/src/peer_info.rs @@ -66,10 +66,14 @@ impl Services { | Services::FULL_BLOCKS | Services::ACCOUNTS_PROOF | Services::ACCOUNTS_CHUNKS + | Services::MEMPOOL } NodeType::Light => Services::empty(), NodeType::Full => { - Services::ACCOUNTS_PROOF | Services::FULL_BLOCKS | Services::ACCOUNTS_CHUNKS + Services::ACCOUNTS_PROOF + | Services::FULL_BLOCKS + | Services::ACCOUNTS_CHUNKS + | Services::MEMPOOL } } } diff --git a/test-utils/src/consensus.rs b/test-utils/src/consensus.rs new file mode 100644 index 0000000000..525c6a3643 --- /dev/null +++ b/test-utils/src/consensus.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use nimiq_blockchain::Blockchain; +use nimiq_blockchain_proxy::BlockchainProxy; +use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, BlsCache, Consensus}; +use nimiq_network_interface::network::Network; +use nimiq_zkp_component::ZKPComponent; +use parking_lot::{Mutex, RwLock}; + +use crate::test_network::TestNetwork; + +/// Given a blockchain and a network creates an instance of Consensus. +pub async fn consensus( + blockchain: Arc>, + net: Arc, +) -> Consensus { + let blockchain_proxy = BlockchainProxy::from(&blockchain); + + let zkp_proxy = ZKPComponent::new(blockchain_proxy.clone(), Arc::clone(&net), None) + .await + .proxy(); + + let syncer = SyncerProxy::new_history( + blockchain_proxy.clone(), + Arc::clone(&net), + Arc::new(Mutex::new(BlsCache::new_test())), + net.subscribe_events(), + ) + .await; + + Consensus::new(blockchain_proxy, net, syncer, 0, zkp_proxy) +} diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 8ef1e0d6cb..17b5c13164 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -2,6 +2,7 @@ pub mod accounts_revert; pub mod block_production; pub mod blockchain; pub mod blockchain_with_rng; +pub mod consensus; pub mod mock_node; pub mod node; pub mod test_custom_block; diff --git a/utils/src/stream.rs b/utils/src/stream.rs index c67c5f1b2d..0d13dc3959 100644 --- a/utils/src/stream.rs +++ b/utils/src/stream.rs @@ -49,10 +49,20 @@ impl FuturesOrdered { } /// Returns the number of futures in the queue. /// - /// See also [`futures::stream::FuturesOrdered::is_empty`]. + /// See also [`futures::stream::FuturesOrdered::len`]. pub fn len(&self) -> usize { self.inner.len() } + /// Push all the futures produced by the iterator into the back of the queue. + /// + /// See also [`futures::stream::FuturesOrdered::extend`]. + pub fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + self.inner.extend(iter); + self.waker.wake(); + } /// Push a future into the back of the queue. /// /// See also [`futures::stream::FuturesOrdered::push`]. @@ -116,10 +126,20 @@ impl FuturesUnordered { } /// Returns the number of futures in the set. /// - /// See also [`futures::stream::FuturesUnordered::is_empty`]. + /// See also [`futures::stream::FuturesUnordered::len`]. pub fn len(&self) -> usize { self.inner.len() } + /// Push all the futures produced by the iterator into the set of futures. + /// + /// See also [`futures::stream::FuturesUnordered::extend`]. + pub fn extend(&mut self, iter: I) + where + I: IntoIterator, + { + self.inner.extend(iter); + self.waker.wake(); + } /// Push a future into the set. /// /// See also [`futures::stream::FuturesUnordered::push`]. diff --git a/validator/src/proposal_buffer.rs b/validator/src/proposal_buffer.rs index a5eac73d83..0a7709d288 100644 --- a/validator/src/proposal_buffer.rs +++ b/validator/src/proposal_buffer.rs @@ -563,11 +563,7 @@ mod test { use futures::{FutureExt, StreamExt}; use nimiq_block::MacroHeader; - use nimiq_blockchain::Blockchain; - use nimiq_blockchain_proxy::BlockchainProxy; - use nimiq_consensus::{ - sync::syncer_proxy::SyncerProxy, BlsCache, Consensus, ConsensusEvent, ConsensusProxy, - }; + use nimiq_consensus::{ConsensusEvent, ConsensusProxy}; use nimiq_keys::{KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey}; use nimiq_network_interface::network::Network as NetworkInterface; use nimiq_network_mock::{MockHub, MockNetwork}; @@ -577,40 +573,16 @@ mod test { use nimiq_test_log::test; use nimiq_test_utils::{ block_production::{TemporaryBlockProducer, SIGNING_KEY}, - test_network::TestNetwork, + consensus::consensus, }; use nimiq_time::{sleep, timeout}; use nimiq_utils::spawn; use nimiq_validator_network::network_impl::ValidatorNetworkImpl; - use nimiq_zkp_component::ZKPComponent; - use parking_lot::{Mutex, RwLock}; use tokio::select; use super::{ProposalAndPubsubId, ProposalBuffer}; use crate::{aggregation::tendermint::proposal::Header, r#macro::ProposalTopic}; - /// Given a blockchain and a network creates an instance of Consensus. - async fn consensus( - blockchain: Arc>, - net: Arc, - ) -> Consensus { - let blockchain_proxy = BlockchainProxy::from(&blockchain); - - let zkp_proxy = ZKPComponent::new(blockchain_proxy.clone(), Arc::clone(&net), None) - .await - .proxy(); - - let syncer = SyncerProxy::new_history( - blockchain_proxy.clone(), - Arc::clone(&net), - Arc::new(Mutex::new(BlsCache::new_test())), - net.subscribe_events(), - ) - .await; - - Consensus::new(blockchain_proxy, net, syncer, 0, zkp_proxy) - } - async fn setup() -> ( ConsensusProxy, TemporaryBlockProducer, diff --git a/web-client/src/client/lib.rs b/web-client/src/client/lib.rs index 458ac924e2..eee1254c32 100644 --- a/web-client/src/client/lib.rs +++ b/web-client/src/client/lib.rs @@ -357,7 +357,7 @@ impl Client { let is_established = self .inner .consensus_proxy() - .subscribe_events() + .subscribe_consensus_events() .any(|event| async move { if let Ok(state) = event { matches!(state, ConsensusEvent::Established { .. }) @@ -989,7 +989,7 @@ impl Client { let consensus = self.inner.consensus_proxy(); let network = self.inner.network(); - let mut consensus_events = consensus.subscribe_events(); + let mut consensus_events = consensus.subscribe_consensus_events(); let consensus_listeners = Rc::clone(&self.consensus_changed_listeners);