Skip to content

Commit

Permalink
Consensus: acquire the consensus sync event subscription through the …
Browse files Browse the repository at this point in the history
…ConsensusProxy rather than the Consensus directly
  • Loading branch information
Eligioo committed Oct 9, 2024
1 parent 6d61614 commit 8ca80e9
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 29 deletions.
15 changes: 11 additions & 4 deletions consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
RequestBlocksProof, RequestSubscribeToAddress, RequestTransactionReceiptsByAddress,
RequestTransactionsProof, ResponseBlocksProof,
},
sync::syncer::SyncEvent,
ConsensusEvent,
};

Expand All @@ -45,7 +46,8 @@ pub struct ConsensusProxy<N: Network> {
pub network: Arc<N>,
pub(crate) established_flag: Arc<AtomicBool>,
pub(crate) synced_validity_window_flag: Arc<AtomicBool>,
pub(crate) events: BroadcastSender<ConsensusEvent>,
pub(crate) consensus_events: BroadcastSender<ConsensusEvent>,
pub(crate) sync_events: BroadcastSender<SyncEvent<N::PeerId>>,
pub(crate) request: MpscSender<ConsensusRequest<N>>,
}

Expand All @@ -56,7 +58,8 @@ impl<N: Network> Clone for ConsensusProxy<N> {
network: Arc::clone(&self.network),
established_flag: Arc::clone(&self.established_flag),
synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag),
events: self.events.clone(),
consensus_events: self.consensus_events.clone(),
sync_events: self.sync_events.clone(),
request: self.request.clone(),
}
}
Expand Down Expand Up @@ -84,8 +87,12 @@ impl<N: Network> ConsensusProxy<N> {
&& self.synced_validity_window_flag.load(Ordering::Acquire)
}

pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
pub fn subscribe_consensus_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.consensus_events.subscribe())
}

pub fn subscribe_sync_events(&self) -> BroadcastStream<SyncEvent<<N as Network>::PeerId>> {
BroadcastStream::new(self.sync_events.subscribe())
}

/// Subscribe to remote address notification events
Expand Down
24 changes: 16 additions & 8 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ use self::remote_event_dispatcher::RemoteEventDispatcher;
use crate::{
consensus::head_requests::{HeadRequests, HeadRequestsResult},
messages::{RequestBlock, RequestHead, RequestMacroChain, RequestMissingBlocks},
sync::{live::block_queue::BlockSource, syncer::LiveSyncPushEvent, syncer_proxy::SyncerProxy},
sync::{
live::block_queue::BlockSource,
syncer::{LiveSyncPushEvent, SyncEvent},
syncer_proxy::SyncerProxy,
},
};
#[cfg(feature = "full")]
use crate::{
Expand Down Expand Up @@ -130,7 +134,8 @@ pub struct Consensus<N: Network> {

pub sync: SyncerProxy<N>,

events: BroadcastSender<ConsensusEvent>,
consensus_events: BroadcastSender<ConsensusEvent>,
sync_events: BroadcastSender<SyncEvent<N::PeerId>>,
established_flag: Arc<AtomicBool>,
#[cfg(feature = "full")]
last_batch_number: u32,
Expand Down Expand Up @@ -196,6 +201,7 @@ impl<N: Network> Consensus<N> {
zkp_proxy: ZKPComponentProxy<N>,
) -> Self {
let (tx, _rx) = broadcast(256);
let sync_event_sender = syncer.broadcast_sender();

Self::init_network_request_receivers(&network, &blockchain);

Expand All @@ -216,7 +222,8 @@ impl<N: Network> Consensus<N> {
blockchain,
network,
sync: syncer,
events: tx,
consensus_events: tx,
sync_events: sync_event_sender,
established_flag,
#[cfg(feature = "full")]
last_batch_number: 0,
Expand Down Expand Up @@ -299,7 +306,7 @@ impl<N: Network> Consensus<N> {
}

pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
BroadcastStream::new(self.consensus_events.subscribe())
}

pub fn is_established(&self) -> bool {
Expand All @@ -316,7 +323,8 @@ impl<N: Network> Consensus<N> {
network: Arc::clone(&self.network),
established_flag: Arc::clone(&self.established_flag),
synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag),
events: self.events.clone(),
consensus_events: self.consensus_events.clone(),
sync_events: self.sync_events.clone(),
request: self.requests.0.clone(),
}
}
Expand All @@ -332,7 +340,7 @@ impl<N: Network> Consensus<N> {

// We don't care if anyone is listening.
let (synced_validity_window, _) = self.check_validity_window();
self.events
self.consensus_events
.send(ConsensusEvent::Established {
synced_validity_window,
})
Expand Down Expand Up @@ -547,7 +555,7 @@ impl<N: Network> Future for Consensus<N> {

// Check consensus established state on changes.
if let Some(event) = self.check_established(None) {
self.events.send(event).ok();
self.consensus_events.send(event).ok();
}

// Poll any head requests if active.
Expand All @@ -563,7 +571,7 @@ impl<N: Network> Future for Consensus<N> {

// Update established state using the result.
if let Some(event) = self.check_established(Some(result)) {
self.events.send(event).ok();
self.consensus_events.send(event).ok();
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions consensus/src/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent, Subsc
use nimiq_time::{interval, Interval};
use nimiq_utils::stream::FuturesUnordered;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};
use tokio_stream::wrappers::BroadcastStream;

use crate::{
consensus::ResolveBlockRequest, messages::RequestHead, sync::live::block_queue::BlockSource,
Expand Down Expand Up @@ -106,7 +105,7 @@ pub enum LiveSyncPeerEvent<TPeerId> {
}

#[derive(Clone)]
pub enum SyncerEvent<TPeerId> {
pub enum SyncEvent<TPeerId> {
AddLiveSync(TPeerId),
}

Expand All @@ -131,8 +130,8 @@ pub struct Syncer<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> {
/// A proxy to the blockchain
blockchain: BlockchainProxy,

/// Sending-half of a broadcast channel for publishing syncer events
events: BroadcastSender<SyncerEvent<N::PeerId>>,
/// Sending-half of a broadcast channel for publishing sync events
events: BroadcastSender<SyncEvent<N::PeerId>>,

/// A reference to the network
network: Arc<N>,
Expand Down Expand Up @@ -195,15 +194,15 @@ impl<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> Syncer<N, M, L> {
pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) {
debug!(%peer_id, "Adding peer to live sync");
self.live_sync.add_peer(peer_id);
self.events.send(SyncerEvent::AddLiveSync(peer_id)).ok();
self.events.send(SyncEvent::AddLiveSync(peer_id)).ok();
}

pub fn num_peers(&self) -> usize {
self.live_sync.num_peers()
}

pub fn subscribe_events(&self) -> BroadcastStream<SyncerEvent<<N as Network>::PeerId>> {
BroadcastStream::new(self.events.subscribe())
pub fn broadcast_sender(&self) -> BroadcastSender<SyncEvent<<N as Network>::PeerId>> {
self.events.clone()
}

pub fn peers(&self) -> Vec<N::PeerId> {
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/sync/syncer_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use nimiq_primitives::policy::Policy;
use nimiq_zkp_component::zkp_component::ZKPComponentProxy;
use parking_lot::Mutex;
use pin_project::pin_project;
use tokio_stream::wrappers::BroadcastStream;
use tokio::sync::broadcast::Sender as BroadcastSender;

#[cfg(feature = "full")]
use crate::sync::{
Expand All @@ -32,7 +32,7 @@ use crate::{
queue::QueueConfig,
BlockLiveSync,
},
syncer::{LiveSyncPushEvent, Syncer, SyncerEvent},
syncer::{LiveSyncPushEvent, SyncEvent, Syncer},
},
};

Expand Down Expand Up @@ -228,9 +228,9 @@ impl<N: Network> SyncerProxy<N> {
gen_syncer_match!(self, accepted_block_announcements)
}

/// Returns a broadcast receiver in for syncer events
pub fn subscribe_events(&self) -> BroadcastStream<SyncerEvent<<N as Network>::PeerId>> {
gen_syncer_match!(self, subscribe_events)
/// Returns a broadcast sender for sync events
pub fn broadcast_sender(&self) -> BroadcastSender<SyncEvent<<N as Network>::PeerId>> {
gen_syncer_match!(self, broadcast_sender)
}

/// Returns whether the state sync has finished (or `true` if there is no state sync required)
Expand Down
2 changes: 1 addition & 1 deletion consensus/tests/sync_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub async fn sync_two_peers(
BlockchainEvent::Finalized(_) | BlockchainEvent::EpochFinalized(_)
))
});
let mut consensus_events = consensus2_proxy.subscribe_events();
let mut consensus_events = consensus2_proxy.subscribe_consensus_events();
spawn(consensus2);

for _ in 0..num_batches_live_sync {
Expand Down
4 changes: 2 additions & 2 deletions mempool/mempool-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ impl<N: Network> MempoolTask<N> {
}
}

fn on_syncer_event(&mut self, event: SyncerEvent<N::PeerId>) {
fn on_syncer_event(&mut self, event: SyncEvent<N::PeerId>) {
match event {
SyncerEvent::AddLiveSync(peer_id) => {
SyncEvent::AddLiveSync(peer_id) => {
self.peers_in_live_sync.insert(peer_id);
}
}
Expand Down
4 changes: 2 additions & 2 deletions web-client/src/client/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl Client {
let is_established = self
.inner
.consensus_proxy()
.subscribe_events()
.subscribe_consensus_events()
.any(|event| async move {
if let Ok(state) = event {
matches!(state, ConsensusEvent::Established { .. })
Expand Down Expand Up @@ -904,7 +904,7 @@ impl Client {
let consensus = self.inner.consensus_proxy();
let network = self.inner.network();

let mut consensus_events = consensus.subscribe_events();
let mut consensus_events = consensus.subscribe_consensus_events();

let consensus_listeners = Rc::clone(&self.consensus_changed_listeners);

Expand Down

0 comments on commit 8ca80e9

Please sign in to comment.