diff --git a/primitives/account/src/account/staking_contract/validator.rs b/primitives/account/src/account/staking_contract/validator.rs index eab42ad87c..93ce458dca 100644 --- a/primitives/account/src/account/staking_contract/validator.rs +++ b/primitives/account/src/account/staking_contract/validator.rs @@ -46,6 +46,12 @@ use crate::{ /// in the first place. /// (**) The validator may be set to automatically reactivate itself upon inactivation. /// If this setting is not enabled the state change can only be triggered manually. +/// However, there is a validator health status with the following states: +/// -> Green: Everything is working as expected, if the validator is deactivated its status changes to Yellow +/// -> Yellow: If the validator is deactivated again, its status is changed to Red +/// -> Red: If the validator is deactivated again, the automatic reactivate (if enabled) has no effect +/// Human intervention is required at this point +/// To go from Red to Yellow or Yellow to Green, the validator needs to be active for at least a quarter of an epoch /// /// Create, Update, Deactivate, Retire and Re-activate are incoming transactions to the staking contract. /// Delete is an outgoing transaction from the staking contract. diff --git a/test-utils/src/validator.rs b/test-utils/src/validator.rs index 7be7081fb0..3fe39dde7b 100644 --- a/test-utils/src/validator.rs +++ b/test-utils/src/validator.rs @@ -66,6 +66,7 @@ pub async fn build_validators( peer_ids: &[u64], hub: &mut Option, is_prover_active: bool, + automatic_reactivate: bool, ) -> Vec>> where N::Error: Send + Sync, @@ -113,7 +114,7 @@ where let (v, c) = build_validator( peer_ids[i], Address::from(&validator_keys[i]), - false, + automatic_reactivate, signing_keys[i].clone(), voting_keys[i].clone(), fee_keys[i].clone(), diff --git a/validator/src/micro.rs b/validator/src/micro.rs index 4d3e60494c..ea764a82fd 100644 --- a/validator/src/micro.rs +++ b/validator/src/micro.rs @@ -10,6 +10,7 @@ use futures::{future::BoxFuture, ready, FutureExt, Stream}; use nimiq_block::{Block, EquivocationProof, MicroBlock, SkipBlockInfo}; use nimiq_blockchain::{BlockProducer, BlockProducerError, Blockchain}; use nimiq_blockchain_interface::AbstractBlockchain; +use nimiq_keys::Address; use nimiq_mempool::mempool::Mempool; use nimiq_primitives::policy::Policy; use nimiq_time::sleep; @@ -18,7 +19,10 @@ use nimiq_validator_network::ValidatorNetwork; use nimiq_vrf::VrfSeed; use parking_lot::RwLock; -use crate::{aggregation::skip_block::SkipBlockAggregation, validator::Validator}; +use crate::{ + aggregation::skip_block::SkipBlockAggregation, + validator::{HealthState, Validator}, +}; pub(crate) enum ProduceMicroBlockEvent { MicroBlock, @@ -36,6 +40,8 @@ struct NextProduceMicroBlockEvent { block_number: u32, producer_timeout: Duration, block_separation_time: Duration, + validator_address: Address, + health_state: Arc>, } impl NextProduceMicroBlockEvent { @@ -53,6 +59,8 @@ impl NextProduceMicroBlockEvent>, ) -> Self { Self { blockchain, @@ -65,6 +73,8 @@ impl NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent NextProduceMicroBlockEvent ProduceMicroBlock>, ) -> Self { let next_event = NextProduceMicroBlockEvent::new( blockchain, @@ -416,6 +439,8 @@ impl ProduceMicroBlock, pub slot_band: Arc>>, pub consensus_state: Arc>, + pub validator_health: Arc>, } impl Clone for ValidatorProxy { @@ -88,6 +125,7 @@ impl Clone for ValidatorProxy { automatic_reactivate: Arc::clone(&self.automatic_reactivate), slot_band: Arc::clone(&self.slot_band), consensus_state: Arc::clone(&self.consensus_state), + validator_health: Arc::clone(&self.validator_health), } } } @@ -119,6 +157,8 @@ where slot_band: Arc>>, consensus_state: Arc>, validator_state: Option, + health_state: Arc>, + automatic_reactivate: Arc, macro_producer: Option>, @@ -200,6 +240,14 @@ where .await }); + let health_state = HealthState { + health: ValidatorHealth::Green, + publish: true, + inactivations: 0, + reactivate_bn: 0, + pending_reactivate: false, + }; + Self { consensus: consensus.proxy(), blockchain, @@ -222,6 +270,8 @@ where slot_band: Arc::new(RwLock::new(None)), consensus_state: Arc::new(RwLock::new(blockchain_state)), validator_state: None, + health_state: Arc::new(RwLock::new(health_state)), + automatic_reactivate, macro_producer: None, @@ -448,6 +498,8 @@ where next_block_number, Self::compute_micro_block_producer_timeout(head, &blockchain), Self::BLOCK_SEPARATION_TIME, + self.validator_address.read().clone(), + Arc::clone(&self.health_state), )); } } @@ -467,6 +519,10 @@ where self.on_blockchain_extended(hash); } BlockchainEvent::EpochFinalized(ref hash) => { + // Reset the inactivations counter + self.health_state.write().inactivations = 0; + // Reset the validator health every epoch + self.health_state.write().health = ValidatorHealth::Green; self.init_epoch(); // The on_blockchain_extended is necessary for the order of events to not matter. self.on_blockchain_extended(hash); @@ -496,6 +552,14 @@ where self.check_reactivate(block.block_number()); self.init_block_producer(Some(hash)); + + let block_number = block.block_number(); + let blockchain = self.blockchain.read(); + + if block_number == self.health_state.read().reactivate_bn { + let inactivity_state = self.reactivate(&blockchain); + self.validator_state = Some(inactivity_state); + } } fn on_blockchain_rebranched( @@ -677,6 +741,11 @@ where ) } + // Computes the next block number where we should send the next reactivate transaction + fn get_reactivate_delay(&self, inactivations: u32) -> u32 { + cmp::min(inactivations.pow(2), MAX_REACTIVATE_DELAY) + } + fn reactivate(&self, blockchain: &Blockchain) -> InactivityState { let validity_start_height = blockchain.block_number(); @@ -692,7 +761,7 @@ where let cn = self.consensus.clone(); spawn(async move { - debug!("Sending reactivate transaction to the network"); + info!("Sending reactivate transaction to the network"); if cn .send_transaction(reactivate_transaction.clone()) .await @@ -737,6 +806,7 @@ where automatic_reactivate: Arc::clone(&self.automatic_reactivate), slot_band: Arc::clone(&self.slot_band), consensus_state: Arc::clone(&self.consensus_state), + validator_health: Arc::clone(&self.health_state), } } @@ -833,13 +903,40 @@ where // Once the validator can be active is established, check the validator staking state. if self.is_synced() { let blockchain = self.blockchain.read(); + let block_number = blockchain.block_number(); match self.get_staking_state(&blockchain) { ValidatorStakingState::Active => { - drop(blockchain); if self.validator_state.is_some() { + drop(blockchain); self.validator_state = None; + self.health_state.write().pending_reactivate = false; info!("Automatically reactivated."); } + let inactivations = self.health_state.read().inactivations; + + log::warn!( + address=%self.validator_address.read(), + inactivations, + "Inactivations counter", + ); + + let validator_health = self.health_state.read().health; + + match validator_health { + ValidatorHealth::Green => {} + ValidatorHealth::Yellow => { + if inactivations < VALIDATOR_YELLOW_HEALTH_INACTIVATIONS { + log::info!(inactivations, "Changed validator health to green"); + self.health_state.write().health = ValidatorHealth::Green; + } + } + ValidatorHealth::Red => { + if inactivations < VALIDATOR_RED_HEALTH_INACTIVATIONS { + log::info!(inactivations, "Changed validator health to yellow"); + self.health_state.write().health = ValidatorHealth::Yellow; + } + } + } } ValidatorStakingState::Inactive(jailed_from) => { if self.validator_state.is_none() @@ -850,9 +947,49 @@ where .unwrap_or(true) && self.automatic_reactivate.load(Ordering::Acquire) { - let inactivity_state = self.reactivate(&blockchain); - drop(blockchain); - self.validator_state = Some(inactivity_state); + // Keep track of how many times we have been deactivated in the current epoch. + if !self.health_state.read().pending_reactivate { + let mut health_state = self.health_state.write(); + health_state.inactivations += 1; + health_state.reactivate_bn = block_number + + self.get_reactivate_delay(health_state.inactivations); + health_state.pending_reactivate = true; + drop(health_state); + + let inactivations = self.health_state.read().inactivations; + + let validator_health = self.health_state.read().health; + match validator_health { + ValidatorHealth::Green => { + if inactivations >= VALIDATOR_YELLOW_HEALTH_INACTIVATIONS { + log::warn!( + inactivations, + "Changed validator health to yellow" + ); + self.health_state.write().health = ValidatorHealth::Yellow; + } + } + ValidatorHealth::Yellow => { + if inactivations >= VALIDATOR_RED_HEALTH_INACTIVATIONS { + log::warn!( + inactivations, + "Changed validator health to red" + ); + self.health_state.write().health = ValidatorHealth::Red; + } + } + ValidatorHealth::Red => { + log::warn!("Validator health is still red") + } + } + + log::warn!( + "Current inactivations counter: {}, next reactivate bn {}, current bn {} ", + inactivations, + self.health_state.read().reactivate_bn, + block_number + ); + } } } ValidatorStakingState::UnknownOrNoStake => {} diff --git a/validator/tests/integration.rs b/validator/tests/integration.rs index 371127bbdd..d6f83efb5a 100644 --- a/validator/tests/integration.rs +++ b/validator/tests/integration.rs @@ -86,9 +86,14 @@ async fn four_validators_can_create_an_epoch() { let env = MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); - let validators = - build_validators::(env, &(1u64..=4u64).collect::>(), &mut None, false) - .await; + let validators = build_validators::( + env, + &(1u64..=4u64).collect::>(), + &mut None, + false, + false, + ) + .await; let blockchain = Arc::clone(&validators.first().unwrap().blockchain); diff --git a/validator/tests/mock.rs b/validator/tests/mock.rs index a16dfd1141..65c13866b0 100644 --- a/validator/tests/mock.rs +++ b/validator/tests/mock.rs @@ -15,7 +15,7 @@ use nimiq_network_interface::{ }; use nimiq_network_libp2p::Network; use nimiq_network_mock::{MockHub, MockNetwork}; -use nimiq_primitives::{networks::NetworkId, policy::Policy}; +use nimiq_primitives::{coin::Coin, networks::NetworkId, policy::Policy}; use nimiq_test_log::test; use nimiq_test_utils::{ test_network::TestNetwork, @@ -24,9 +24,11 @@ use nimiq_test_utils::{ }, }; use nimiq_time::{sleep, timeout}; +use nimiq_transaction_builder::TransactionBuilder; use nimiq_utils::spawn; -use nimiq_validator::aggregation::{ - skip_block::SignedSkipBlockMessage, update::SerializableLevelUpdate, +use nimiq_validator::{ + aggregation::{skip_block::SignedSkipBlockMessage, update::SerializableLevelUpdate}, + validator::ValidatorHealth, }; use serde::{Deserialize, Serialize}; @@ -105,6 +107,7 @@ async fn four_validators_can_create_micro_blocks() { &(1u64..=4u64).collect::>(), &mut Some(hub), false, + false, ) .await; @@ -143,9 +146,14 @@ async fn validators_can_do_skip_block() { let env = MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); - let mut validators = - build_validators::(env, &(5u64..=10u64).collect::>(), &mut None, false) - .await; + let mut validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + false, + ) + .await; // Disconnect the next block producer. let _validator = pop_validator_for_slot( @@ -176,6 +184,189 @@ async fn validators_can_do_skip_block() { assert!(block.block_number() > Policy::genesis_block_number()); } +#[test(tokio::test)] +async fn validator_can_recover_from_yellow_health() { + let env = + MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); + + let validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + true, + ) + .await; + + // Listen for blockchain events from the new block producer (after a skip block). + let validator = validators.first().unwrap(); + let validator_proxy = validator.proxy(); + + let validator = validators.last().unwrap(); + let blockchain = Arc::clone(&validator.blockchain); + let events = blockchain.read().notifier_as_stream(); + + for validator in validators { + log::info!("Spawning validator: {}", validator.validator_address()); + spawn(validator); + } + + validator_proxy.validator_health.write().publish = false; + + log::info!( + "Validator proxy address {}", + validator_proxy.validator_address.read() + ); + + events.take(30).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Yellow => { + log::info!("Current validator health is yellow, as expecteds",) + } + _ => panic!("Validator Health different than expected"), + }; + + // The validator should no longer be skip blocked: + validator_proxy.validator_health.write().publish = true; + + let events = blockchain.read().notifier_as_stream(); + events.take(30).for_each(|_| future::ready(())).await; + + assert_eq!( + validator_proxy.validator_health.read().health, + ValidatorHealth::Green + ); +} + +#[test(tokio::test)] +async fn validator_health_to_red() { + let env = + MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); + + let validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + true, + ) + .await; + + // Listen for blockchain events from the new block producer (after a skip block). + let validator = validators.first().unwrap(); + let validator_proxy = validator.proxy(); + + let validator = validators.last().unwrap(); + let blockchain = Arc::clone(&validator.blockchain); + let events = blockchain.read().notifier_as_stream(); + + for validator in validators { + log::info!("Spawning validator: {}", validator.validator_address()); + spawn(validator); + } + + validator_proxy.validator_health.write().publish = false; + + events.take(30).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Yellow => { + log::info!("Current validator health is yellow, as expected",) + } + _ => panic!("Validator Health different than expected"), + }; + + let events = blockchain.read().notifier_as_stream(); + + // Now we produce more blocks, and the validator should be inactivated again + events.take(20).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Red => { + log::info!("Current validator health is red, as expected",) + } + _ => panic!("Validator Health different than expected"), + }; +} + +#[test(tokio::test)] +async fn validator_health_fully_recover() { + let env = + MdbxDatabase::new_volatile(Default::default()).expect("Could not open a volatile database"); + + let validators = build_validators::( + env, + &(5u64..=10u64).collect::>(), + &mut None, + false, + true, + ) + .await; + + // Listen for blockchain events from the new block producer (after a skip block). + let validator = validators.first().unwrap(); + let validator_proxy = validator.proxy(); + let validator_address = validator.validator_address(); + + log::info!( + "Listening to blockchain events from validator {} ", + validator_address, + ); + + let validator = validators.last().unwrap(); + let blockchain = Arc::clone(&validator.blockchain); + + let events = blockchain.read().notifier_as_stream(); + + for validator in validators { + log::info!("Spawning validator: {}", validator.validator_address()); + spawn(validator); + } + + validator_proxy.validator_health.write().publish = false; + + events.take(30).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Yellow => { + log::info!("Current validator health is yellow, as expected",) + } + _ => panic!("Validator Health different than expected"), + }; + + let events = blockchain.read().notifier_as_stream(); + + // Now we produce more blocks, and the validator should be inactivated again + events.take(20).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + match current_validator_health { + ValidatorHealth::Red => { + log::info!("Current validator health is red, as expected") + } + _ => panic!("Validator Health different than expected"), + }; + + validator_proxy.validator_health.write().publish = true; + + let events = blockchain.read().notifier_as_stream(); + events.take(100).for_each(|_| future::ready(())).await; + + let current_validator_health = validator_proxy.validator_health.read().health; + + assert_eq!(current_validator_health, ValidatorHealth::Green); +} + fn create_skip_block_update( skip_block_info: SkipBlockInfo, key_pair: BlsKeyPair, @@ -227,6 +418,7 @@ async fn validator_can_catch_up() { &(9u64..=16u64).collect::>(), &mut Some(hub), false, + false, ) .await; // Maintain a collection of the corresponding networks.