Skip to content

Commit

Permalink
use select
Browse files Browse the repository at this point in the history
cryptoAtwill committed Dec 30, 2024
1 parent fa6ca03 commit 09da20a
Showing 4 changed files with 81 additions and 122 deletions.
62 changes: 3 additions & 59 deletions fendermint/vm/topdown/src/vote/mod.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,9 @@
mod operation;

use crate::sync::TopDownSyncEvent;
use crate::vote::operation::{OperationMetrics, OperationStateMachine};
use crate::vote::operation::OperationStateMachine;
use crate::BlockHeight;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};

#[derive(Clone)]
@@ -17,12 +16,6 @@ pub struct VoteRecord {}
pub struct Config {
/// The reactor request channel buffer size
req_channel_buffer_size: usize,
/// The number of requests the reactor should process per run before handling other tasks
req_batch_processing_size: usize,
/// The number of vote recording requests the reactor should process per run before handling other tasks
gossip_req_processing_size: usize,
/// The time to sleep for voting loop if nothing happens
voting_sleep_interval_sec: u64,
}

pub struct VoteReactorClient {
@@ -38,8 +31,6 @@ pub fn start_vote_reactor(
let (tx, rx) = mpsc::channel(config.req_channel_buffer_size);

tokio::spawn(async move {
let sleep = Duration::new(config.voting_sleep_interval_sec, 0);

let inner = VotingHandler {
req_rx: rx,
gossip_rx,
@@ -49,8 +40,7 @@ pub fn start_vote_reactor(
};
let mut machine = OperationStateMachine::new(inner);
loop {
machine = machine.step();
tokio::time::sleep(sleep).await;
machine = machine.step().await;
}
});

@@ -68,6 +58,7 @@ struct VotingHandler {
req_rx: mpsc::Receiver<VoteReactorRequest>,
/// Receiver from gossip pub/sub, mostly listening to incoming votes
gossip_rx: broadcast::Receiver<VoteRecord>,
/// Sender for gossip pub/sub, publishing new votes signed by current node
gossip_tx: mpsc::Sender<VoteRecord>,
/// Listens to internal events and handles the events accordingly
internal_event_listener: broadcast::Receiver<TopDownSyncEvent>,
@@ -80,51 +71,4 @@ impl VotingHandler {
fn record_vote(&self, _vote: VoteRecord) {}

fn handle_event(&self, _event: TopDownSyncEvent) {}

fn process_external_request(&mut self, _metrics: &OperationMetrics) -> usize {
let mut n = 0;
while n < self.config.req_batch_processing_size {
match self.req_rx.try_recv() {
Ok(req) => {
self.handle_request(req);
n += 1
}
Err(mpsc::error::TryRecvError::Disconnected) => {
tracing::warn!("voting reactor tx closed unexpected");
break;
}
Err(mpsc::error::TryRecvError::Empty) => break,
}
}
n
}

fn process_gossip_subscription_votes(&mut self) -> usize {
let mut n = 0;
while n < self.config.gossip_req_processing_size {
match self.gossip_rx.try_recv() {
Ok(vote) => {
self.record_vote(vote);
n += 1;
}
Err(broadcast::error::TryRecvError::Empty) => break,
_ => {
tracing::warn!("gossip sender lagging or closed");
break;
}
}
}
n
}

fn poll_internal_event(&mut self) -> Option<TopDownSyncEvent> {
match self.internal_event_listener.try_recv() {
Ok(event) => Some(event),
Err(broadcast::error::TryRecvError::Empty) => None,
_ => {
tracing::warn!("gossip sender lagging or closed");
None
}
}
}
}
61 changes: 32 additions & 29 deletions fendermint/vm/topdown/src/vote/operation/active.rs
Original file line number Diff line number Diff line change
@@ -3,11 +3,13 @@

use crate::vote::operation::paused::PausedOperationMode;
use crate::vote::operation::{
OperationMetrics, OperationModeHandler, OperationStateMachine, PAUSED,
OperationMetrics, OperationMode, OperationModeHandler, OperationStateMachine,
};
use crate::vote::TopDownSyncEvent;
use crate::vote::VotingHandler;
use async_trait::async_trait;
use std::fmt::{Display, Formatter};
use tokio::select;

/// In active mode, we observe a steady rate of topdown checkpoint commitments on chain.
/// Our lookahead buffer is sliding continuously. As we acquire new finalised parent blocks,
@@ -23,36 +25,37 @@ impl Display for ActiveOperationMode {
}
}

impl OperationModeHandler for ActiveOperationMode {
fn advance(mut self) -> OperationStateMachine {
let mut n = self.handler.process_external_request(&self.metrics);
tracing::debug!(
num = n,
status = self.to_string(),
"handled external requests"
);

n = self.handler.process_gossip_subscription_votes();
tracing::debug!(num = n, status = self.to_string(), "handled gossip votes");

if n == 0 {
todo!("handle transition to soft recover")
}
impl ActiveOperationMode {
fn into_paused(mut self) -> OperationStateMachine {
self.metrics.mode_changed(OperationMode::Paused);
OperationStateMachine::Paused(PausedOperationMode {
metrics: self.metrics,
handler: self.handler,
})
}
}

while let Some(v) = self.handler.poll_internal_event() {
// top down is now syncing, pause everything
if matches!(v, TopDownSyncEvent::NodeSyncing) {
self.metrics.mode_changed(PAUSED);
return OperationStateMachine::Paused(PausedOperationMode {
metrics: self.metrics,
handler: self.handler,
});
#[async_trait]
impl OperationModeHandler for ActiveOperationMode {
async fn advance(mut self) -> OperationStateMachine {
loop {
select! {
Some(req) = self.handler.req_rx.recv() => {
self.handler.handle_request(req);
},
Ok(vote) = self.handler.gossip_rx.recv() => {
self.handler.record_vote(vote);

// TODO: need to handle soft recovery transition
},
Ok(event) = self.handler.internal_event_listener.recv() => {
// top down is now syncing, pause everything
if matches!(event, TopDownSyncEvent::NodeSyncing) {
return self.into_paused();
}
self.handler.handle_event(event);
}
}

// handle the polled event
self.handler.handle_event(v);
}

OperationStateMachine::Active(self)
}
}
30 changes: 17 additions & 13 deletions fendermint/vm/topdown/src/vote/operation/mod.rs
Original file line number Diff line number Diff line change
@@ -7,14 +7,17 @@ mod paused;
use crate::vote::operation::active::ActiveOperationMode;
use crate::vote::operation::paused::PausedOperationMode;
use crate::vote::VotingHandler;
use async_trait::async_trait;
use std::fmt::Display;

pub type OperationMode = &'static str;
pub const INITIALIZED: &str = "init";
pub const PAUSED: &str = "paused";
pub const ACTIVE: &str = "active";
#[repr(u8)]
#[derive(Debug, Copy, Clone)]
pub enum OperationMode {
Paused = 0,
Active = 1,
}

/// The operation mode of voting reactor.
/// The operation state machine of voting reactor.
///
/// Active: Active publishing votes and aggregating votes normally
/// Paused: Stops voting reactor due to unknown or irrecoverable issues
@@ -42,34 +45,35 @@ pub enum OperationStateMachine {
/// Tracks the operation mdoe metrics for the voting system
pub(crate) struct OperationMetrics {
current_mode: OperationMode,
previous_mode: OperationMode,
previous_mode: Option<OperationMode>,
}

#[async_trait]
pub(crate) trait OperationModeHandler: Display {
fn advance(self) -> OperationStateMachine;
async fn advance(self) -> OperationStateMachine;
}

impl OperationStateMachine {
/// Always start with Paused operation mode, one needs to know the exact status from syncer.
pub fn new(handler: VotingHandler) -> OperationStateMachine {
let metrics = OperationMetrics {
current_mode: PAUSED,
previous_mode: INITIALIZED,
current_mode: OperationMode::Paused,
previous_mode: None,
};
Self::Paused(PausedOperationMode { metrics, handler })
}

pub fn step(self) -> Self {
pub async fn step(self) -> Self {
match self {
OperationStateMachine::Paused(p) => p.advance(),
OperationStateMachine::Active(p) => p.advance(),
OperationStateMachine::Paused(p) => p.advance().await,
OperationStateMachine::Active(p) => p.advance().await,
}
}
}

impl OperationMetrics {
pub fn mode_changed(&mut self, mode: OperationMode) {
self.previous_mode = self.current_mode;
self.previous_mode = Some(self.current_mode);
self.current_mode = mode;
}
}
50 changes: 29 additions & 21 deletions fendermint/vm/topdown/src/vote/operation/paused.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,12 @@
use crate::sync::TopDownSyncEvent;
use crate::vote::operation::active::ActiveOperationMode;
use crate::vote::operation::{
OperationMetrics, OperationModeHandler, OperationStateMachine, ACTIVE,
OperationMetrics, OperationMode, OperationModeHandler, OperationStateMachine,
};
use crate::vote::VotingHandler;
use async_trait::async_trait;
use std::fmt::{Display, Formatter};
use tokio::select;

/// The paused operation mode handler.
///
@@ -27,29 +29,35 @@ impl Display for PausedOperationMode {
}
}

impl OperationModeHandler for PausedOperationMode {
fn advance(mut self) -> OperationStateMachine {
let n = self.handler.process_external_request(&self.metrics);
tracing::debug!(
num = n,
status = self.to_string(),
"handled external requests"
);

if let Some(v) = self.handler.poll_internal_event() {
// top down is still syncing, not doing anything for now
if matches!(v, TopDownSyncEvent::NodeSyncing) {
return OperationStateMachine::Paused(self);
}

// handle the polled event
self.handler.handle_event(v);
}

self.metrics.mode_changed(ACTIVE);
impl PausedOperationMode {
fn into_active(mut self) -> OperationStateMachine {
self.metrics.mode_changed(OperationMode::Active);
OperationStateMachine::Active(ActiveOperationMode {
metrics: self.metrics,
handler: self.handler,
})
}
}

#[async_trait]
impl OperationModeHandler for PausedOperationMode {
async fn advance(mut self) -> OperationStateMachine {
loop {
select! {
Some(req) = self.handler.req_rx.recv() => {
self.handler.handle_request(req);
},
Ok(vote) = self.handler.gossip_rx.recv() => {
self.handler.record_vote(vote);
},
Ok(event) = self.handler.internal_event_listener.recv() => {
// top down is still syncing, pause everything
if !matches!(event, TopDownSyncEvent::NodeSyncing) {
return self.into_active();
}
self.handler.handle_event(event);
}
}
}
}
}

0 comments on commit 09da20a

Please sign in to comment.