From 09da20a27c72239406e0998903c3a1cce65c48ae Mon Sep 17 00:00:00 2001 From: cryptoAtwill Date: Mon, 30 Dec 2024 17:37:18 +0800 Subject: [PATCH] use select --- fendermint/vm/topdown/src/vote/mod.rs | 62 +------------------ .../vm/topdown/src/vote/operation/active.rs | 61 +++++++++--------- .../vm/topdown/src/vote/operation/mod.rs | 30 +++++---- .../vm/topdown/src/vote/operation/paused.rs | 50 ++++++++------- 4 files changed, 81 insertions(+), 122 deletions(-) diff --git a/fendermint/vm/topdown/src/vote/mod.rs b/fendermint/vm/topdown/src/vote/mod.rs index d3cf2cab2..dad097398 100644 --- a/fendermint/vm/topdown/src/vote/mod.rs +++ b/fendermint/vm/topdown/src/vote/mod.rs @@ -4,10 +4,9 @@ mod operation; use crate::sync::TopDownSyncEvent; -use crate::vote::operation::{OperationMetrics, OperationStateMachine}; +use crate::vote::operation::OperationStateMachine; use crate::BlockHeight; use serde::{Deserialize, Serialize}; -use std::time::Duration; use tokio::sync::{broadcast, mpsc}; #[derive(Clone)] @@ -17,12 +16,6 @@ pub struct VoteRecord {} pub struct Config { /// The reactor request channel buffer size req_channel_buffer_size: usize, - /// The number of requests the reactor should process per run before handling other tasks - req_batch_processing_size: usize, - /// The number of vote recording requests the reactor should process per run before handling other tasks - gossip_req_processing_size: usize, - /// The time to sleep for voting loop if nothing happens - voting_sleep_interval_sec: u64, } pub struct VoteReactorClient { @@ -38,8 +31,6 @@ pub fn start_vote_reactor( let (tx, rx) = mpsc::channel(config.req_channel_buffer_size); tokio::spawn(async move { - let sleep = Duration::new(config.voting_sleep_interval_sec, 0); - let inner = VotingHandler { req_rx: rx, gossip_rx, @@ -49,8 +40,7 @@ pub fn start_vote_reactor( }; let mut machine = OperationStateMachine::new(inner); loop { - machine = machine.step(); - tokio::time::sleep(sleep).await; + machine = machine.step().await; } }); @@ -68,6 +58,7 @@ struct VotingHandler { req_rx: mpsc::Receiver, /// Receiver from gossip pub/sub, mostly listening to incoming votes gossip_rx: broadcast::Receiver, + /// Sender for gossip pub/sub, publishing new votes signed by current node gossip_tx: mpsc::Sender, /// Listens to internal events and handles the events accordingly internal_event_listener: broadcast::Receiver, @@ -80,51 +71,4 @@ impl VotingHandler { fn record_vote(&self, _vote: VoteRecord) {} fn handle_event(&self, _event: TopDownSyncEvent) {} - - fn process_external_request(&mut self, _metrics: &OperationMetrics) -> usize { - let mut n = 0; - while n < self.config.req_batch_processing_size { - match self.req_rx.try_recv() { - Ok(req) => { - self.handle_request(req); - n += 1 - } - Err(mpsc::error::TryRecvError::Disconnected) => { - tracing::warn!("voting reactor tx closed unexpected"); - break; - } - Err(mpsc::error::TryRecvError::Empty) => break, - } - } - n - } - - fn process_gossip_subscription_votes(&mut self) -> usize { - let mut n = 0; - while n < self.config.gossip_req_processing_size { - match self.gossip_rx.try_recv() { - Ok(vote) => { - self.record_vote(vote); - n += 1; - } - Err(broadcast::error::TryRecvError::Empty) => break, - _ => { - tracing::warn!("gossip sender lagging or closed"); - break; - } - } - } - n - } - - fn poll_internal_event(&mut self) -> Option { - match self.internal_event_listener.try_recv() { - Ok(event) => Some(event), - Err(broadcast::error::TryRecvError::Empty) => None, - _ => { - tracing::warn!("gossip sender lagging or closed"); - None - } - } - } } diff --git a/fendermint/vm/topdown/src/vote/operation/active.rs b/fendermint/vm/topdown/src/vote/operation/active.rs index c70497fa8..81ac7499b 100644 --- a/fendermint/vm/topdown/src/vote/operation/active.rs +++ b/fendermint/vm/topdown/src/vote/operation/active.rs @@ -3,11 +3,13 @@ use crate::vote::operation::paused::PausedOperationMode; use crate::vote::operation::{ - OperationMetrics, OperationModeHandler, OperationStateMachine, PAUSED, + OperationMetrics, OperationMode, OperationModeHandler, OperationStateMachine, }; use crate::vote::TopDownSyncEvent; use crate::vote::VotingHandler; +use async_trait::async_trait; use std::fmt::{Display, Formatter}; +use tokio::select; /// In active mode, we observe a steady rate of topdown checkpoint commitments on chain. /// Our lookahead buffer is sliding continuously. As we acquire new finalised parent blocks, @@ -23,36 +25,37 @@ impl Display for ActiveOperationMode { } } -impl OperationModeHandler for ActiveOperationMode { - fn advance(mut self) -> OperationStateMachine { - let mut n = self.handler.process_external_request(&self.metrics); - tracing::debug!( - num = n, - status = self.to_string(), - "handled external requests" - ); - - n = self.handler.process_gossip_subscription_votes(); - tracing::debug!(num = n, status = self.to_string(), "handled gossip votes"); - - if n == 0 { - todo!("handle transition to soft recover") - } +impl ActiveOperationMode { + fn into_paused(mut self) -> OperationStateMachine { + self.metrics.mode_changed(OperationMode::Paused); + OperationStateMachine::Paused(PausedOperationMode { + metrics: self.metrics, + handler: self.handler, + }) + } +} - while let Some(v) = self.handler.poll_internal_event() { - // top down is now syncing, pause everything - if matches!(v, TopDownSyncEvent::NodeSyncing) { - self.metrics.mode_changed(PAUSED); - return OperationStateMachine::Paused(PausedOperationMode { - metrics: self.metrics, - handler: self.handler, - }); +#[async_trait] +impl OperationModeHandler for ActiveOperationMode { + async fn advance(mut self) -> OperationStateMachine { + loop { + select! { + Some(req) = self.handler.req_rx.recv() => { + self.handler.handle_request(req); + }, + Ok(vote) = self.handler.gossip_rx.recv() => { + self.handler.record_vote(vote); + + // TODO: need to handle soft recovery transition + }, + Ok(event) = self.handler.internal_event_listener.recv() => { + // top down is now syncing, pause everything + if matches!(event, TopDownSyncEvent::NodeSyncing) { + return self.into_paused(); + } + self.handler.handle_event(event); + } } - - // handle the polled event - self.handler.handle_event(v); } - - OperationStateMachine::Active(self) } } diff --git a/fendermint/vm/topdown/src/vote/operation/mod.rs b/fendermint/vm/topdown/src/vote/operation/mod.rs index 578c4c30e..c14721ad1 100644 --- a/fendermint/vm/topdown/src/vote/operation/mod.rs +++ b/fendermint/vm/topdown/src/vote/operation/mod.rs @@ -7,14 +7,17 @@ mod paused; use crate::vote::operation::active::ActiveOperationMode; use crate::vote::operation::paused::PausedOperationMode; use crate::vote::VotingHandler; +use async_trait::async_trait; use std::fmt::Display; -pub type OperationMode = &'static str; -pub const INITIALIZED: &str = "init"; -pub const PAUSED: &str = "paused"; -pub const ACTIVE: &str = "active"; +#[repr(u8)] +#[derive(Debug, Copy, Clone)] +pub enum OperationMode { + Paused = 0, + Active = 1, +} -/// The operation mode of voting reactor. +/// The operation state machine of voting reactor. /// /// Active: Active publishing votes and aggregating votes normally /// Paused: Stops voting reactor due to unknown or irrecoverable issues @@ -42,34 +45,35 @@ pub enum OperationStateMachine { /// Tracks the operation mdoe metrics for the voting system pub(crate) struct OperationMetrics { current_mode: OperationMode, - previous_mode: OperationMode, + previous_mode: Option, } +#[async_trait] pub(crate) trait OperationModeHandler: Display { - fn advance(self) -> OperationStateMachine; + async fn advance(self) -> OperationStateMachine; } impl OperationStateMachine { /// Always start with Paused operation mode, one needs to know the exact status from syncer. pub fn new(handler: VotingHandler) -> OperationStateMachine { let metrics = OperationMetrics { - current_mode: PAUSED, - previous_mode: INITIALIZED, + current_mode: OperationMode::Paused, + previous_mode: None, }; Self::Paused(PausedOperationMode { metrics, handler }) } - pub fn step(self) -> Self { + pub async fn step(self) -> Self { match self { - OperationStateMachine::Paused(p) => p.advance(), - OperationStateMachine::Active(p) => p.advance(), + OperationStateMachine::Paused(p) => p.advance().await, + OperationStateMachine::Active(p) => p.advance().await, } } } impl OperationMetrics { pub fn mode_changed(&mut self, mode: OperationMode) { - self.previous_mode = self.current_mode; + self.previous_mode = Some(self.current_mode); self.current_mode = mode; } } diff --git a/fendermint/vm/topdown/src/vote/operation/paused.rs b/fendermint/vm/topdown/src/vote/operation/paused.rs index a66c01a48..b74360189 100644 --- a/fendermint/vm/topdown/src/vote/operation/paused.rs +++ b/fendermint/vm/topdown/src/vote/operation/paused.rs @@ -4,10 +4,12 @@ use crate::sync::TopDownSyncEvent; use crate::vote::operation::active::ActiveOperationMode; use crate::vote::operation::{ - OperationMetrics, OperationModeHandler, OperationStateMachine, ACTIVE, + OperationMetrics, OperationMode, OperationModeHandler, OperationStateMachine, }; use crate::vote::VotingHandler; +use async_trait::async_trait; use std::fmt::{Display, Formatter}; +use tokio::select; /// The paused operation mode handler. /// @@ -27,29 +29,35 @@ impl Display for PausedOperationMode { } } -impl OperationModeHandler for PausedOperationMode { - fn advance(mut self) -> OperationStateMachine { - let n = self.handler.process_external_request(&self.metrics); - tracing::debug!( - num = n, - status = self.to_string(), - "handled external requests" - ); - - if let Some(v) = self.handler.poll_internal_event() { - // top down is still syncing, not doing anything for now - if matches!(v, TopDownSyncEvent::NodeSyncing) { - return OperationStateMachine::Paused(self); - } - - // handle the polled event - self.handler.handle_event(v); - } - - self.metrics.mode_changed(ACTIVE); +impl PausedOperationMode { + fn into_active(mut self) -> OperationStateMachine { + self.metrics.mode_changed(OperationMode::Active); OperationStateMachine::Active(ActiveOperationMode { metrics: self.metrics, handler: self.handler, }) } } + +#[async_trait] +impl OperationModeHandler for PausedOperationMode { + async fn advance(mut self) -> OperationStateMachine { + loop { + select! { + Some(req) = self.handler.req_rx.recv() => { + self.handler.handle_request(req); + }, + Ok(vote) = self.handler.gossip_rx.recv() => { + self.handler.record_vote(vote); + }, + Ok(event) = self.handler.internal_event_listener.recv() => { + // top down is still syncing, pause everything + if !matches!(event, TopDownSyncEvent::NodeSyncing) { + return self.into_active(); + } + self.handler.handle_event(event); + } + } + } + } +}