Skip to content

Commit

Permalink
Acquire the consensus sync event subscription through the ConsensusPr…
Browse files Browse the repository at this point in the history
…oxy rather than the Consensus directly
  • Loading branch information
Eligioo committed Aug 23, 2024
1 parent c0834d8 commit c23fbfe
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 29 deletions.
15 changes: 11 additions & 4 deletions consensus/src/consensus/consensus_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
RequestBlocksProof, RequestSubscribeToAddress, RequestTransactionReceiptsByAddress,
RequestTransactionsProof, ResponseBlocksProof,
},
sync::syncer::SyncEvent,
ConsensusEvent,
};

Expand All @@ -45,7 +46,8 @@ pub struct ConsensusProxy<N: Network> {
pub network: Arc<N>,
pub(crate) established_flag: Arc<AtomicBool>,
pub(crate) synced_validity_window_flag: Arc<AtomicBool>,
pub(crate) events: BroadcastSender<ConsensusEvent>,
pub(crate) consensus_events: BroadcastSender<ConsensusEvent>,
pub(crate) sync_events: BroadcastSender<SyncEvent<N::PeerId>>,
pub(crate) request: MpscSender<ConsensusRequest<N>>,
}

Expand All @@ -56,7 +58,8 @@ impl<N: Network> Clone for ConsensusProxy<N> {
network: Arc::clone(&self.network),
established_flag: Arc::clone(&self.established_flag),
synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag),
events: self.events.clone(),
consensus_events: self.consensus_events.clone(),
sync_events: self.sync_events.clone(),
request: self.request.clone(),
}
}
Expand Down Expand Up @@ -84,8 +87,12 @@ impl<N: Network> ConsensusProxy<N> {
&& self.synced_validity_window_flag.load(Ordering::Acquire)
}

pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
pub fn subscribe_consensus_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.consensus_events.subscribe())
}

pub fn subscribe_sync_events(&self) -> BroadcastStream<SyncEvent<<N as Network>::PeerId>> {
BroadcastStream::new(self.sync_events.subscribe())
}

/// Subscribe to remote address notification events
Expand Down
24 changes: 16 additions & 8 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ use self::remote_event_dispatcher::RemoteEventDispatcher;
use crate::{
consensus::head_requests::{HeadRequests, HeadRequestsResult},
messages::{RequestBlock, RequestHead, RequestMacroChain, RequestMissingBlocks},
sync::{live::block_queue::BlockSource, syncer::LiveSyncPushEvent, syncer_proxy::SyncerProxy},
sync::{
live::block_queue::BlockSource,
syncer::{LiveSyncPushEvent, SyncEvent},
syncer_proxy::SyncerProxy,
},
};
#[cfg(feature = "full")]
use crate::{
Expand Down Expand Up @@ -127,7 +131,8 @@ pub struct Consensus<N: Network> {

pub sync: SyncerProxy<N>,

events: BroadcastSender<ConsensusEvent>,
consensus_events: BroadcastSender<ConsensusEvent>,
sync_events: BroadcastSender<SyncEvent<N::PeerId>>,
established_flag: Arc<AtomicBool>,
last_batch_number: u32,
synced_validity_window_flag: Arc<AtomicBool>,
Expand Down Expand Up @@ -188,6 +193,7 @@ impl<N: Network> Consensus<N> {
zkp_proxy: ZKPComponentProxy<N>,
) -> Self {
let (tx, _rx) = broadcast(256);
let sync_event_sender = syncer.broadcast_sender();

Self::init_network_request_receivers(&network, &blockchain);

Expand All @@ -208,7 +214,8 @@ impl<N: Network> Consensus<N> {
blockchain,
network,
sync: syncer,
events: tx,
consensus_events: tx,
sync_events: sync_event_sender,
established_flag,
last_batch_number: 0,
synced_validity_window_flag,
Expand Down Expand Up @@ -288,7 +295,7 @@ impl<N: Network> Consensus<N> {
}

pub fn subscribe_events(&self) -> BroadcastStream<ConsensusEvent> {
BroadcastStream::new(self.events.subscribe())
BroadcastStream::new(self.consensus_events.subscribe())
}

pub fn is_established(&self) -> bool {
Expand All @@ -305,7 +312,8 @@ impl<N: Network> Consensus<N> {
network: Arc::clone(&self.network),
established_flag: Arc::clone(&self.established_flag),
synced_validity_window_flag: Arc::clone(&self.synced_validity_window_flag),
events: self.events.clone(),
consensus_events: self.consensus_events.clone(),
sync_events: self.sync_events.clone(),
request: self.requests.0.clone(),
}
}
Expand All @@ -321,7 +329,7 @@ impl<N: Network> Consensus<N> {

// We don't care if anyone is listening.
let (synced_validity_window, _) = self.check_validity_window();
self.events
self.consensus_events
.send(ConsensusEvent::Established {
synced_validity_window,
})
Expand Down Expand Up @@ -514,7 +522,7 @@ impl<N: Network> Future for Consensus<N> {

// Check consensus established state on changes.
if let Some(event) = self.check_established(None) {
self.events.send(event).ok();
self.consensus_events.send(event).ok();
}

// Poll any head requests if active.
Expand All @@ -530,7 +538,7 @@ impl<N: Network> Future for Consensus<N> {

// Update established state using the result.
if let Some(event) = self.check_established(Some(result)) {
self.events.send(event).ok();
self.consensus_events.send(event).ok();
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions consensus/src/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent, Subsc
use nimiq_time::{interval, Interval};
use nimiq_utils::stream::FuturesUnordered;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};
use tokio_stream::wrappers::BroadcastStream;

use crate::{
consensus::ResolveBlockRequest, messages::RequestHead, sync::live::block_queue::BlockSource,
Expand Down Expand Up @@ -106,7 +105,7 @@ pub enum LiveSyncPeerEvent<TPeerId> {
}

#[derive(Clone)]
pub enum SyncerEvent<TPeerId> {
pub enum SyncEvent<TPeerId> {
AddLiveSync(TPeerId),
}

Expand All @@ -131,8 +130,8 @@ pub struct Syncer<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> {
/// A proxy to the blockchain
blockchain: BlockchainProxy,

/// Sending-half of a broadcast channel for publishing syncer events
events: BroadcastSender<SyncerEvent<N::PeerId>>,
/// Sending-half of a broadcast channel for publishing sync events
events: BroadcastSender<SyncEvent<N::PeerId>>,

/// A reference to the network
network: Arc<N>,
Expand Down Expand Up @@ -195,15 +194,15 @@ impl<N: Network, M: MacroSync<N::PeerId>, L: LiveSync<N>> Syncer<N, M, L> {
pub fn move_peer_into_live_sync(&mut self, peer_id: N::PeerId) {
debug!(%peer_id, "Adding peer to live sync");
self.live_sync.add_peer(peer_id);
self.events.send(SyncerEvent::AddLiveSync(peer_id)).ok();
self.events.send(SyncEvent::AddLiveSync(peer_id)).ok();
}

pub fn num_peers(&self) -> usize {
self.live_sync.num_peers()
}

pub fn subscribe_events(&self) -> BroadcastStream<SyncerEvent<<N as Network>::PeerId>> {
BroadcastStream::new(self.events.subscribe())
pub fn broadcast_sender(&self) -> BroadcastSender<SyncEvent<<N as Network>::PeerId>> {
self.events.clone()
}

pub fn peers(&self) -> Vec<N::PeerId> {
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/sync/syncer_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use nimiq_primitives::policy::Policy;
use nimiq_zkp_component::zkp_component::ZKPComponentProxy;
use parking_lot::Mutex;
use pin_project::pin_project;
use tokio_stream::wrappers::BroadcastStream;
use tokio::sync::broadcast::Sender as BroadcastSender;

#[cfg(feature = "full")]
use crate::sync::{
Expand All @@ -31,7 +31,7 @@ use crate::{
queue::QueueConfig,
BlockLiveSync,
},
syncer::{LiveSyncPushEvent, Syncer, SyncerEvent},
syncer::{LiveSyncPushEvent, SyncEvent, Syncer},
},
};

Expand Down Expand Up @@ -227,9 +227,9 @@ impl<N: Network> SyncerProxy<N> {
gen_syncer_match!(self, accepted_block_announcements)
}

/// Returns a broadcast receiver in for syncer events
pub fn subscribe_events(&self) -> BroadcastStream<SyncerEvent<<N as Network>::PeerId>> {
gen_syncer_match!(self, subscribe_events)
/// Returns a broadcast sender for sync events
pub fn broadcast_sender(&self) -> BroadcastSender<SyncEvent<<N as Network>::PeerId>> {
gen_syncer_match!(self, broadcast_sender)
}

/// Returns whether the state sync has finished (or `true` if there is no state sync required)
Expand Down
2 changes: 1 addition & 1 deletion consensus/tests/sync_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub async fn sync_two_peers(
BlockchainEvent::Finalized(_) | BlockchainEvent::EpochFinalized(_)
))
});
let mut consensus_events = consensus2_proxy.subscribe_events();
let mut consensus_events = consensus2_proxy.subscribe_consensus_events();
spawn(consensus2);

for _ in 0..num_batches_live_sync {
Expand Down
4 changes: 2 additions & 2 deletions mempool/mempool-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ impl<N: Network> MempoolTask<N> {
}
}

fn on_syncer_event(&mut self, event: SyncerEvent<N::PeerId>) {
fn on_syncer_event(&mut self, event: SyncEvent<N::PeerId>) {
match event {
SyncerEvent::AddLiveSync(peer_id) => {
SyncEvent::AddLiveSync(peer_id) => {
self.peers_in_live_sync.insert(peer_id);
}
}
Expand Down
4 changes: 2 additions & 2 deletions web-client/src/client/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl Client {
let is_established = self
.inner
.consensus_proxy()
.subscribe_events()
.subscribe_consensus_events()
.any(|event| async move {
if let Ok(state) = event {
matches!(state, ConsensusEvent::Established { .. })
Expand Down Expand Up @@ -904,7 +904,7 @@ impl Client {
let consensus = self.inner.consensus_proxy();
let network = self.inner.network();

let mut consensus_events = consensus.subscribe_events();
let mut consensus_events = consensus.subscribe_consensus_events();

let consensus_listeners = Rc::clone(&self.consensus_changed_listeners);

Expand Down

0 comments on commit c23fbfe

Please sign in to comment.