Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator health status #3207

Open
wants to merge 3 commits into
base: albatross
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions primitives/account/src/account/staking_contract/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ use crate::{
/// in the first place.
/// (**) The validator may be set to automatically reactivate itself upon inactivation.
/// If this setting is not enabled the state change can only be triggered manually.
/// However, there is a validator health status with the following states:
/// -> Green: Everything is working as expected, if the validator is deactivated its status changes to Yellow
/// -> Yellow: If the validator is deactivated again, its status is changed to Red
/// -> Red: If the validator is deactivated again, the automatic reactivate (if enabled) has no effect
/// Human intervention is required at this point
/// To go from Red to Yellow or Yellow to Green, the validator needs to be active for at least a quarter of an epoch
///
/// Create, Update, Deactivate, Retire and Re-activate are incoming transactions to the staking contract.
/// Delete is an outgoing transaction from the staking contract.
Expand Down
3 changes: 2 additions & 1 deletion test-utils/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub async fn build_validators<N: TestNetwork + NetworkInterface>(
peer_ids: &[u64],
hub: &mut Option<MockHub>,
is_prover_active: bool,
automatic_reactivate: bool,
) -> Vec<Validator<ValidatorNetworkImpl<N>>>
where
N::Error: Send + Sync,
Expand Down Expand Up @@ -113,7 +114,7 @@ where
let (v, c) = build_validator(
peer_ids[i],
Address::from(&validator_keys[i]),
false,
automatic_reactivate,
signing_keys[i].clone(),
voting_keys[i].clone(),
fee_keys[i].clone(),
Expand Down
27 changes: 26 additions & 1 deletion validator/src/micro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::{future::BoxFuture, ready, FutureExt, Stream};
use nimiq_block::{Block, EquivocationProof, MicroBlock, SkipBlockInfo};
use nimiq_blockchain::{BlockProducer, BlockProducerError, Blockchain};
use nimiq_blockchain_interface::AbstractBlockchain;
use nimiq_keys::Address;
use nimiq_mempool::mempool::Mempool;
use nimiq_primitives::policy::Policy;
use nimiq_time::sleep;
Expand All @@ -18,7 +19,10 @@ use nimiq_validator_network::ValidatorNetwork;
use nimiq_vrf::VrfSeed;
use parking_lot::RwLock;

use crate::{aggregation::skip_block::SkipBlockAggregation, validator::Validator};
use crate::{
aggregation::skip_block::SkipBlockAggregation,
validator::{HealthState, Validator},
};

pub(crate) enum ProduceMicroBlockEvent {
MicroBlock,
Expand All @@ -36,6 +40,8 @@ struct NextProduceMicroBlockEvent<TValidatorNetwork> {
block_number: u32,
producer_timeout: Duration,
block_separation_time: Duration,
validator_address: Address,
health_state: Arc<RwLock<HealthState>>,
}

impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<TValidatorNetwork> {
Expand All @@ -53,6 +59,8 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<T
block_number: u32,
producer_timeout: Duration,
block_separation_time: Duration,
validator_address: Address,
health_state: Arc<RwLock<HealthState>>,
) -> Self {
Self {
blockchain,
Expand All @@ -65,6 +73,8 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<T
block_number,
producer_timeout,
block_separation_time,
validator_address,
health_state,
}
}

Expand Down Expand Up @@ -117,6 +127,7 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<T
info!(
block_number = self.block_number,
slot_band = self.validator_slot_band,
address = %self.validator_address,
"Our turn, producing micro block #{}",
self.block_number,
);
Expand Down Expand Up @@ -153,6 +164,11 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<T
num_transactions
);

if !self.health_state.read().publish {
log::warn!(block = block.block_number(), "Not publishing block");
break Some(Some(ProduceMicroBlockEvent::MicroBlock));
}

// Publish the block. It is valid as we have just created it.
Validator::publish_block(Arc::clone(&self.network), block.clone());

Expand Down Expand Up @@ -181,6 +197,10 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<T
continue;
}

// Each successfull block will decrease the number of inactivations
let current_inactivations = self.health_state.read().inactivations;
self.health_state.write().inactivations = current_inactivations.saturating_sub(1);

let event = result
.map(move |_result| ProduceMicroBlockEvent::MicroBlock)
.ok();
Expand All @@ -194,6 +214,7 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> NextProduceMicroBlockEvent<T
debug!(
block_number = self.block_number,
slot_band = self.validator_slot_band,
address = %self.validator_address,
"Not our turn, waiting for micro block #{}",
self.block_number,
);
Expand Down Expand Up @@ -404,6 +425,8 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> ProduceMicroBlock<TValidator
block_number: u32,
producer_timeout: Duration,
block_separation_time: Duration,
validator_address: Address,
health_state: Arc<RwLock<HealthState>>,
) -> Self {
let next_event = NextProduceMicroBlockEvent::new(
blockchain,
Expand All @@ -416,6 +439,8 @@ impl<TValidatorNetwork: ValidatorNetwork + 'static> ProduceMicroBlock<TValidator
block_number,
producer_timeout,
block_separation_time,
validator_address,
health_state,
)
.next()
.boxed();
Expand Down
147 changes: 142 additions & 5 deletions validator/src/validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cmp,
error::Error,
future::Future,
pin::Pin,
Expand Down Expand Up @@ -51,6 +52,13 @@ use crate::{
r#macro::{MappedReturn, ProduceMacroBlock, ProposalTopic},
};

/// The number of consecutive inactivations from which a validator is considered with yellow health
const VALIDATOR_YELLOW_HEALTH_INACTIVATIONS: u32 = 2;
/// The number of consecutive inactivations from which a validator is considered with yellow health
const VALIDATOR_RED_HEALTH_INACTIVATIONS: u32 = 4;
// The maximum number of blocks the reactivate transaction can be delayed
const MAX_REACTIVATE_DELAY: u32 = 10_000;

#[derive(PartialEq)]
enum ValidatorStakingState {
Active,
Expand All @@ -62,6 +70,34 @@ pub struct ConsensusState {
equivocation_proofs: EquivocationProofPool,
}

/// Struct that represents the overall health of a validator
/// Green means the Validator is working as expected,
/// If the validator is deactivated, we change its health to Yellow
/// If the validator is Yellow and it is deactivated again, we change its health to Red
/// While in the Red state, the automatic reactivate has no effect and human intervention is required
/// If the validator is Yellow and is not deactivated in a quarter of an epoch, we change its status to Green.
/// If the validator is Red and is not deactivated in one epoch, we change its status to Yellow.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ValidatorHealth {
Green,
Yellow,
Red,
}

/// Struct that represents the overall Validator Health
pub struct HealthState {
/// The current validator health
pub health: ValidatorHealth,
/// For testing/debug purposes control whether produced blocks are published by the validator
pub publish: bool,
/// Number of inactivations that have ocurred in the current epoch
pub inactivations: u32,
/// Next block number where the re-activate txns should be sent
pub reactivate_bn: u32,
/// Flag that indicates an ongoing reactivation
pub pending_reactivate: bool,
}

/// Validator inactivity
struct InactivityState {
inactive_tx_hash: Blake2bHash,
Expand All @@ -76,6 +112,7 @@ pub struct ValidatorProxy {
pub automatic_reactivate: Arc<AtomicBool>,
pub slot_band: Arc<RwLock<Option<u16>>>,
pub consensus_state: Arc<RwLock<ConsensusState>>,
pub validator_health: Arc<RwLock<HealthState>>,
}

impl Clone for ValidatorProxy {
Expand All @@ -88,6 +125,7 @@ impl Clone for ValidatorProxy {
automatic_reactivate: Arc::clone(&self.automatic_reactivate),
slot_band: Arc::clone(&self.slot_band),
consensus_state: Arc::clone(&self.consensus_state),
validator_health: Arc::clone(&self.validator_health),
}
}
}
Expand Down Expand Up @@ -119,6 +157,8 @@ where
slot_band: Arc<RwLock<Option<u16>>>,
consensus_state: Arc<RwLock<ConsensusState>>,
validator_state: Option<InactivityState>,
health_state: Arc<RwLock<HealthState>>,

automatic_reactivate: Arc<AtomicBool>,

macro_producer: Option<ProduceMacroBlock<TValidatorNetwork>>,
Expand Down Expand Up @@ -200,6 +240,14 @@ where
.await
});

let health_state = HealthState {
health: ValidatorHealth::Green,
publish: true,
inactivations: 0,
reactivate_bn: 0,
pending_reactivate: false,
};

Self {
consensus: consensus.proxy(),
blockchain,
Expand All @@ -222,6 +270,8 @@ where
slot_band: Arc::new(RwLock::new(None)),
consensus_state: Arc::new(RwLock::new(blockchain_state)),
validator_state: None,
health_state: Arc::new(RwLock::new(health_state)),

automatic_reactivate,

macro_producer: None,
Expand Down Expand Up @@ -448,6 +498,8 @@ where
next_block_number,
Self::compute_micro_block_producer_timeout(head, &blockchain),
Self::BLOCK_SEPARATION_TIME,
self.validator_address.read().clone(),
Arc::clone(&self.health_state),
));
}
}
Expand All @@ -467,6 +519,10 @@ where
self.on_blockchain_extended(hash);
}
BlockchainEvent::EpochFinalized(ref hash) => {
// Reset the inactivations counter
self.health_state.write().inactivations = 0;
// Reset the validator health every epoch
self.health_state.write().health = ValidatorHealth::Green;
self.init_epoch();
// The on_blockchain_extended is necessary for the order of events to not matter.
self.on_blockchain_extended(hash);
Expand Down Expand Up @@ -496,6 +552,14 @@ where

self.check_reactivate(block.block_number());
self.init_block_producer(Some(hash));

let block_number = block.block_number();
let blockchain = self.blockchain.read();

if block_number == self.health_state.read().reactivate_bn {
let inactivity_state = self.reactivate(&blockchain);
self.validator_state = Some(inactivity_state);
}
}

fn on_blockchain_rebranched(
Expand Down Expand Up @@ -677,6 +741,11 @@ where
)
}

// Computes the next block number where we should send the next reactivate transaction
fn get_reactivate_delay(&self, inactivations: u32) -> u32 {
cmp::min(inactivations.pow(2), MAX_REACTIVATE_DELAY)
}

fn reactivate(&self, blockchain: &Blockchain) -> InactivityState {
let validity_start_height = blockchain.block_number();

Expand All @@ -692,7 +761,7 @@ where

let cn = self.consensus.clone();
spawn(async move {
debug!("Sending reactivate transaction to the network");
info!("Sending reactivate transaction to the network");
if cn
.send_transaction(reactivate_transaction.clone())
.await
Expand Down Expand Up @@ -737,6 +806,7 @@ where
automatic_reactivate: Arc::clone(&self.automatic_reactivate),
slot_band: Arc::clone(&self.slot_band),
consensus_state: Arc::clone(&self.consensus_state),
validator_health: Arc::clone(&self.health_state),
}
}

Expand Down Expand Up @@ -833,13 +903,40 @@ where
// Once the validator can be active is established, check the validator staking state.
if self.is_synced() {
let blockchain = self.blockchain.read();
let block_number = blockchain.block_number();
match self.get_staking_state(&blockchain) {
ValidatorStakingState::Active => {
drop(blockchain);
if self.validator_state.is_some() {
drop(blockchain);
self.validator_state = None;
self.health_state.write().pending_reactivate = false;
info!("Automatically reactivated.");
}
let inactivations = self.health_state.read().inactivations;

log::warn!(
address=%self.validator_address.read(),
inactivations,
"Inactivations counter",
);

let validator_health = self.health_state.read().health;

match validator_health {
ValidatorHealth::Green => {}
ValidatorHealth::Yellow => {
if inactivations < VALIDATOR_YELLOW_HEALTH_INACTIVATIONS {
log::info!(inactivations, "Changed validator health to green");
self.health_state.write().health = ValidatorHealth::Green;
}
}
ValidatorHealth::Red => {
if inactivations < VALIDATOR_RED_HEALTH_INACTIVATIONS {
log::info!(inactivations, "Changed validator health to yellow");
self.health_state.write().health = ValidatorHealth::Yellow;
}
}
}
}
ValidatorStakingState::Inactive(jailed_from) => {
if self.validator_state.is_none()
Expand All @@ -850,9 +947,49 @@ where
.unwrap_or(true)
&& self.automatic_reactivate.load(Ordering::Acquire)
{
let inactivity_state = self.reactivate(&blockchain);
drop(blockchain);
self.validator_state = Some(inactivity_state);
// Keep track of how many times we have been deactivated in the current epoch.
if !self.health_state.read().pending_reactivate {
let mut health_state = self.health_state.write();
health_state.inactivations += 1;
health_state.reactivate_bn = block_number
+ self.get_reactivate_delay(health_state.inactivations);
health_state.pending_reactivate = true;
drop(health_state);

let inactivations = self.health_state.read().inactivations;

let validator_health = self.health_state.read().health;
match validator_health {
ValidatorHealth::Green => {
if inactivations >= VALIDATOR_YELLOW_HEALTH_INACTIVATIONS {
log::warn!(
inactivations,
"Changed validator health to yellow"
);
self.health_state.write().health = ValidatorHealth::Yellow;
}
}
ValidatorHealth::Yellow => {
if inactivations >= VALIDATOR_RED_HEALTH_INACTIVATIONS {
log::warn!(
inactivations,
"Changed validator health to red"
);
self.health_state.write().health = ValidatorHealth::Red;
}
}
ValidatorHealth::Red => {
log::warn!("Validator health is still red")
}
}

log::warn!(
"Current inactivations counter: {}, next reactivate bn {}, current bn {} ",
inactivations,
self.health_state.read().reactivate_bn,
block_number
);
}
}
}
ValidatorStakingState::UnknownOrNoStake => {}
Expand Down
Loading
Loading