Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of Mempool Syncer #2832

Open
wants to merge 14 commits into
base: albatross
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
RequestBlocksProof, RequestSubscribeToAddress, RequestTransactionReceiptsByAddress,
RequestTransactionsProof, ResponseBlocksProof,
},
sync::syncer::SyncEvent,
ConsensusEvent,
};

Expand All @@ -42,7 +43,8 @@ pub struct ConsensusProxy<N: Network> {
pub network: Arc<N>,
pub(crate) established_flag: Arc<AtomicBool>,
pub(crate) synced_validity_window_flag: Arc<AtomicBool>,
pub(crate) events: broadcast::Sender<ConsensusEvent>,
pub(crate) consensus_events: broadcast::Sender<ConsensusEvent>,
pub(crate) sync_events: broadcast::Sender<SyncEvent<N::PeerId>>,
pub(crate) request: mpsc::Sender<ConsensusRequest<N>>,
}

Expand All @@ -53,7 +55,8 @@ impl<N: Network> Clone for ConsensusProxy<N> {
network: Arc::clone(&self.network),
established_flag: Arc::clone(&self.established_flag),
synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag),
events: self.events.clone(),
consensus_events: self.consensus_events.clone(),
sync_events: self.sync_events.clone(),
request: self.request.clone(),
}
}
Expand Down Expand Up @@ -81,8 +84,12 @@ impl<N: Network> ConsensusProxy<N> {
&& self.synced_validity_window_flag.load(Ordering::Acquire)
}

pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
pub fn subscribe_consensus_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.consensus_events.subscribe())
}

pub fn subscribe_sync_events(&self) -> BroadcastStream<SyncEvent<<N as Network>::PeerId>> {
BroadcastStream::new(self.sync_events.subscribe())
}

/// Subscribe to remote address notification events
Expand Down
23 changes: 15 additions & 8 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ use self::remote_event_dispatcher::RemoteEventDispatcher;
use crate::{
consensus::head_requests::{HeadRequests, HeadRequestsResult},
messages::{RequestBlock, RequestHead, RequestMacroChain, RequestMissingBlocks},
sync::{live::block_queue::BlockSource, syncer::LiveSyncPushEvent, syncer_proxy::SyncerProxy},
sync::{
live::block_queue::BlockSource,
syncer::{LiveSyncPushEvent, SyncEvent},
syncer_proxy::SyncerProxy,
},
};
#[cfg(feature = "full")]
use crate::{
Expand Down Expand Up @@ -128,7 +132,8 @@ pub struct Consensus<N: Network> {

pub sync: SyncerProxy<N>,

events: broadcast::Sender<ConsensusEvent>,
consensus_events: broadcast::Sender<ConsensusEvent>,
sync_events: broadcast::Sender<SyncEvent<N::PeerId>>,
established_flag: Arc<AtomicBool>,
#[cfg(feature = "full")]
last_batch_number: u32,
Expand Down Expand Up @@ -212,7 +217,8 @@ impl<N: Network> Consensus<N> {
blockchain,
network,
sync: syncer,
events: broadcast::Sender::new(256),
consensus_events: broadcast::Sender::new(256),
sync_events: broadcast::Sender::new(256),
established_flag,
#[cfg(feature = "full")]
last_batch_number: 0,
Expand Down Expand Up @@ -295,7 +301,7 @@ impl<N: Network> Consensus<N> {
}

pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
BroadcastStream::new(self.consensus_events.subscribe())
}

pub fn is_established(&self) -> bool {
Expand All @@ -312,7 +318,8 @@ impl<N: Network> Consensus<N> {
network: Arc::clone(&self.network),
established_flag: Arc::clone(&self.established_flag),
synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag),
events: self.events.clone(),
consensus_events: self.consensus_events.clone(),
sync_events: self.sync_events.clone(),
request: self.requests.0.clone(),
}
}
Expand All @@ -328,7 +335,7 @@ impl<N: Network> Consensus<N> {

// We don't care if anyone is listening.
let (synced_validity_window, _) = self.check_validity_window();
self.events
self.consensus_events
.send(ConsensusEvent::Established {
synced_validity_window,
})
Expand Down Expand Up @@ -543,7 +550,7 @@ impl<N: Network> Future for Consensus<N> {

// Check consensus established state on changes.
if let Some(event) = self.check_established(None) {
self.events.send(event).ok();
self.consensus_events.send(event).ok();
}

// Poll any head requests if active.
Expand All @@ -559,7 +566,7 @@ impl<N: Network> Future for Consensus<N> {

// Update established state using the result.
if let Some(event) = self.check_established(Some(result)) {
self.events.send(event).ok();
self.consensus_events.send(event).ok();
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions consensus/src/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent, Subsc
use nimiq_primitives::policy::Policy;
use nimiq_time::{interval, Interval};
use nimiq_utils::stream::FuturesUnordered;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};

use crate::{
consensus::ResolveBlockRequest, messages::RequestHead, sync::live::block_queue::BlockSource,
Expand Down Expand Up @@ -106,6 +107,11 @@ pub enum LiveSyncPeerEvent<TPeerId> {
Ahead(TPeerId),
}

#[derive(Clone)]
pub enum SyncEvent<TPeerId> {
AddLiveSync(TPeerId),
}

/// Syncer is the main synchronization object inside `Consensus`
/// It has a reference to the main blockchain and network and has two dynamic
/// trait objects:
Expand All @@ -127,6 +133,9 @@ pub struct Syncer<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> {
/// A proxy to the blockchain
blockchain: BlockchainProxy,

/// Sending-half of a broadcast channel for publishing sync events
events: BroadcastSender<SyncEvent<N::PeerId>>,

/// A reference to the network
network: Arc<N>,

Expand Down Expand Up @@ -159,12 +168,15 @@ impl<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> Syncer<N, M, L> {
macro_sync: M,
) -> Syncer<N, M, L> {
let network_events = network.subscribe_events();
let (tx, _rx) = broadcast(256);

Syncer {
live_sync,
macro_sync,
blockchain,
network,
network_events,
events: tx,
outdated_peers: Default::default(),
incompatible_peers: Default::default(),
check_interval: interval(Self::CHECK_INTERVAL),
Expand All @@ -185,12 +197,17 @@ impl<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> Syncer<N, M, L> {
pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) {
debug!(%peer_id, "Adding peer to live sync");
self.live_sync.add_peer(peer_id);
self.events.send(SyncEvent::AddLiveSync(peer_id)).ok();
}

pub fn num_peers(&self) -> usize {
self.live_sync.num_peers()
}

pub fn broadcast_sender(&self) -> BroadcastSender<SyncEvent<<N as Network>::PeerId>> {
self.events.clone()
}

pub fn peers(&self) -> Vec<N::PeerId> {
self.live_sync.peers()
}
Expand Down
8 changes: 7 additions & 1 deletion consensus/src/sync/syncer_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use nimiq_primitives::policy::Policy;
use nimiq_zkp_component::zkp_component::ZKPComponentProxy;
use parking_lot::Mutex;
use pin_project::pin_project;
use tokio::sync::broadcast::Sender as BroadcastSender;

#[cfg(feature = "full")]
use crate::sync::{
Expand All @@ -30,7 +31,7 @@ use crate::{
queue::QueueConfig,
BlockLiveSync,
},
syncer::{LiveSyncPushEvent, Syncer},
syncer::{LiveSyncPushEvent, SyncEvent, Syncer},
},
BlsCache,
};
Expand Down Expand Up @@ -227,6 +228,11 @@ impl<N: Network> SyncerProxy<N> {
gen_syncer_match!(self, accepted_block_announcements)
}

/// Returns a broadcast sender for sync events
pub fn broadcast_sender(&self) -> BroadcastSender<SyncEvent<<N as Network>::PeerId>> {
gen_syncer_match!(self, broadcast_sender)
}

/// Returns whether the state sync has finished (or `true` if there is no state sync required)
pub fn state_complete(&self) -> bool {
gen_syncer_match!(self, state_complete)
Expand Down
2 changes: 1 addition & 1 deletion consensus/tests/sync_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pub async fn sync_two_peers(
BlockchainEvent::Finalized(_) | BlockchainEvent::EpochFinalized(_)
))
});
let mut consensus_events = consensus2_proxy.subscribe_events();
let mut consensus_events = consensus2_proxy.subscribe_consensus_events();
spawn(consensus2);

for _ in 0..num_batches_live_sync {
Expand Down
3 changes: 2 additions & 1 deletion mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ nimiq-account = { workspace = true }
nimiq-block = { workspace = true }
nimiq-blockchain = { workspace = true }
nimiq-blockchain-interface = { workspace = true }
nimiq-consensus = { workspace = true }
nimiq-database = { workspace = true }
nimiq-hash = { workspace = true }
nimiq-keys = { workspace = true }
nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["coin", "networks"] }
nimiq-serde = { workspace = true }
nimiq-time = { workspace = true }
nimiq-transaction = { workspace = true }
nimiq-utils = { workspace = true, features = ["spawn", "time"] }

Expand All @@ -58,7 +60,6 @@ nimiq-genesis-builder = { workspace = true }
nimiq-network-mock = { workspace = true }
nimiq-test-log = { workspace = true }
nimiq-test-utils = { workspace = true }
nimiq-time = { workspace = true }
nimiq-transaction-builder = { workspace = true }
nimiq-vrf = { workspace = true }

Expand Down
67 changes: 61 additions & 6 deletions mempool/mempool-task/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashSet,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -8,9 +9,9 @@ use futures::{stream::BoxStream, Future, Stream, StreamExt};
use log::{debug, warn};
use nimiq_blockchain::Blockchain;
use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent};
use nimiq_consensus::{Consensus, ConsensusEvent, ConsensusProxy};
use nimiq_consensus::{sync::syncer::SyncEvent, Consensus, ConsensusEvent, ConsensusProxy};
use nimiq_mempool::{config::MempoolConfig, mempool::Mempool};
use nimiq_network_interface::network::Network;
use nimiq_network_interface::network::{Network, NetworkEvent, SubscribeEvents};
use nimiq_utils::spawn;
use parking_lot::RwLock;
#[cfg(feature = "metrics")]
Expand All @@ -34,6 +35,10 @@ pub struct MempoolTask<N: Network> {

consensus_event_rx: BroadcastStream<ConsensusEvent>,
blockchain_event_rx: BoxStream<'static, BlockchainEvent>,
network_event_rx: SubscribeEvents<N::PeerId>,
sync_event_rx: BroadcastStream<SyncEvent<<N as Network>::PeerId>>,

peers_in_live_sync: HashSet<N::PeerId>,

pub mempool: Arc<Mempool>,
mempool_active: bool,
Expand All @@ -50,17 +55,29 @@ impl<N: Network> MempoolTask<N> {
mempool_config: MempoolConfig,
) -> Self {
let consensus_event_rx = consensus.subscribe_events();
let network_event_rx = consensus.network.subscribe_events();

let mempool = Arc::new(Mempool::new(Arc::clone(&blockchain), mempool_config));
let mempool = Arc::new(Mempool::new(
Arc::clone(&blockchain),
mempool_config,
Arc::clone(&consensus.network),
));
let mempool_active = false;

let blockchain_event_rx = blockchain.read().notifier_as_stream();

let proxy = consensus.proxy();
let sync_event_rx = proxy.subscribe_sync_events();
let peers_in_live_sync = HashSet::from_iter(consensus.sync.peers());

Self {
consensus: consensus.proxy(),
consensus: proxy,
peers_in_live_sync,

consensus_event_rx,
blockchain_event_rx,
network_event_rx,
sync_event_rx,

mempool: Arc::clone(&mempool),
mempool_active,
Expand Down Expand Up @@ -92,26 +109,37 @@ impl<N: Network> MempoolTask<N> {

let mempool = Arc::clone(&self.mempool);
let network = Arc::clone(&self.consensus.network);
let peers = self.peers_in_live_sync.clone().into_iter().collect();
#[cfg(not(feature = "metrics"))]
spawn({
let consensus = self.consensus.clone();
async move {
// The mempool is not updated while consensus is lost.
// Thus, we need to check all transactions if they are still valid.
mempool.cleanup();
mempool.start_executors(network, None, None).await;
mempool
.start_executors(network, None, None, peers, consensus)
.await;
}
});
#[cfg(feature = "metrics")]
spawn({
let mempool_monitor = self.mempool_monitor.clone();
let ctrl_mempool_monitor = self.control_mempool_monitor.clone();
let consensus = self.consensus.clone();
async move {
// The mempool is not updated while consensus is lost.
// Thus, we need to check all transactions if they are still valid.
mempool.cleanup();

mempool
.start_executors(network, Some(mempool_monitor), Some(ctrl_mempool_monitor))
.start_executors(
network,
Some(mempool_monitor),
Some(ctrl_mempool_monitor),
peers,
consensus,
)
.await;
}
});
Expand Down Expand Up @@ -167,12 +195,39 @@ impl<N: Network> MempoolTask<N> {
}
}
}

fn on_consensus_sync_event(&mut self, event: SyncEvent<N::PeerId>) {
match event {
SyncEvent::AddLiveSync(peer_id) => {
self.peers_in_live_sync.insert(peer_id);
}
}
}

fn on_network_event(&mut self, event: NetworkEvent<N::PeerId>) {
match event {
NetworkEvent::PeerLeft(peer_id) => {
self.peers_in_live_sync.remove(&peer_id);
}
NetworkEvent::PeerJoined(_, _) | NetworkEvent::DhtReady => (),
}
}
}

impl<N: Network> Stream for MempoolTask<N> {
type Item = MempoolEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Process consensus sync updates.
while let Poll::Ready(Some(Ok(event))) = self.sync_event_rx.poll_next_unpin(cx) {
self.on_consensus_sync_event(event)
}

// Process network updates.
while let Poll::Ready(Some(Ok(event))) = self.network_event_rx.poll_next_unpin(cx) {
self.on_network_event(event)
}

// Process consensus updates.
// Start mempool as soon as we have consensus and can enforce the validity window.
// Stop the mempool if we lose consensus or cannot enforce the validity window.
Expand Down
Loading
Loading