diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index 07432e801d..8d41c8492c 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -8,9 +8,9 @@ use std::{ use aleph_runtime::{self, opaque::Block, RuntimeApi}; use finality_aleph::{ run_validator_node, AlephBlockImport, AlephConfig, AllBlockMetrics, BlockImporter, - DefaultClock, Justification, JustificationTranslator, MillisecsPerBlock, Protocol, - ProtocolNaming, RateLimiterConfig, RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, - SubstrateNetwork, SyncOracle, TimingBlockMetrics, TracingBlockImport, ValidatorAddressCache, + Justification, JustificationTranslator, MillisecsPerBlock, Protocol, ProtocolNaming, + RateLimiterConfig, RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, + SubstrateNetwork, SyncOracle, TracingBlockImport, ValidatorAddressCache, }; use futures::channel::mpsc; use log::warn; @@ -134,17 +134,7 @@ pub fn new_partial( client.clone(), ); - let timing_metrics = match TimingBlockMetrics::new(config.prometheus_registry(), DefaultClock) { - Ok(timing_metrics) => timing_metrics, - Err(e) => { - warn!( - "Failed to register Prometheus block timing metrics: {:?}.", - e - ); - TimingBlockMetrics::noop() - } - }; - let metrics = AllBlockMetrics::new(timing_metrics); + let metrics = AllBlockMetrics::new(config.prometheus_registry()); let (justification_tx, justification_rx) = mpsc::unbounded(); let tracing_block_import = TracingBlockImport::new(client.clone(), metrics.clone()); diff --git a/finality-aleph/src/block/mock/backend.rs b/finality-aleph/src/block/mock/backend.rs index 29dd144a83..00a82ba37a 100644 --- a/finality-aleph/src/block/mock/backend.rs +++ b/finality-aleph/src/block/mock/backend.rs @@ -264,7 +264,7 @@ impl Finalizer for Backend { } impl BlockImport for Backend { - fn import_block(&mut self, block: MockBlock) { + fn import_block(&mut self, block: MockBlock, _own: bool) { if !block.verify() { return; } diff --git a/finality-aleph/src/block/mod.rs b/finality-aleph/src/block/mod.rs index a2ee9aa6c2..5f22da1a36 100644 --- a/finality-aleph/src/block/mod.rs +++ b/finality-aleph/src/block/mod.rs @@ -130,7 +130,7 @@ pub trait Block: Clone + Codec + Debug + Send + Sync + 'static { /// The block importer. pub trait BlockImport: Send + 'static { /// Import the block. - fn import_block(&mut self, block: B); + fn import_block(&mut self, block: B, own: bool); } /// A facility for finalizing blocks using justifications. diff --git a/finality-aleph/src/block/substrate/mod.rs b/finality-aleph/src/block/substrate/mod.rs index dff06267f2..bf0d1d1b2a 100644 --- a/finality-aleph/src/block/substrate/mod.rs +++ b/finality-aleph/src/block/substrate/mod.rs @@ -6,7 +6,6 @@ use crate::{ aleph_primitives::{Block, Header}, block::{Block as BlockT, BlockId, BlockImport, Header as HeaderT, UnverifiedHeader}, metrics::{AllBlockMetrics, Checkpoint}, - TimingBlockMetrics, }; mod chain_status; @@ -66,7 +65,7 @@ impl BlockImporter { pub fn new(importer: Box>) -> Self { Self { importer, - metrics: AllBlockMetrics::new(TimingBlockMetrics::Noop), + metrics: AllBlockMetrics::new(None), } } @@ -76,9 +75,15 @@ impl BlockImporter { } impl BlockImport for BlockImporter { - fn import_block(&mut self, block: Block) { - let origin = BlockOrigin::NetworkBroadcast; + fn import_block(&mut self, block: Block, own: bool) { + // We only need to distinguish between blocks produced by us and blocks incoming from the network + // for the purpose of running `FinalityRateMetrics`. We use `BlockOrigin` to make this distinction. + let origin = match own { + true => BlockOrigin::Own, + false => BlockOrigin::NetworkBroadcast, + }; let hash = block.header.hash(); + let number = *block.header.number(); let incoming_block = IncomingBlock:: { hash, header: Some(block.header), @@ -91,7 +96,8 @@ impl BlockImport for BlockImporter { import_existing: false, state: None, }; - self.metrics.report_block(hash, Checkpoint::Importing); + self.metrics + .report_block(BlockId::new(hash, number), Checkpoint::Importing, Some(own)); self.importer.import_blocks(origin, vec![incoming_block]); } } diff --git a/finality-aleph/src/data_io/data_provider.rs b/finality-aleph/src/data_io/data_provider.rs index d53e0ecb25..ec19dc6c8e 100644 --- a/finality-aleph/src/data_io/data_provider.rs +++ b/finality-aleph/src/data_io/data_provider.rs @@ -334,8 +334,9 @@ impl DataProvider { let data_to_propose = (*self.data_to_propose.lock()).take(); if let Some(data) = &data_to_propose { + let top_block = data.head_proposal.top_block(); self.metrics - .report_block(data.head_proposal.top_block().hash(), Checkpoint::Proposed); + .report_block(top_block, Checkpoint::Proposed, None); debug!(target: "aleph-data-store", "Outputting {:?} in get_data", data); }; @@ -360,7 +361,7 @@ mod tests { client_chain_builder::ClientChainBuilder, mocks::{aleph_data_from_blocks, THeader, TestClientBuilder, TestClientBuilderExt}, }, - SessionBoundaryInfo, SessionId, SessionPeriod, TimingBlockMetrics, + SessionBoundaryInfo, SessionId, SessionPeriod, }; const SESSION_LEN: u32 = 100; @@ -391,7 +392,7 @@ mod tests { client, session_boundaries, config, - AllBlockMetrics::new(TimingBlockMetrics::noop()), + AllBlockMetrics::new(None), ); let (exit_chain_tracker_tx, exit_chain_tracker_rx) = oneshot::channel(); diff --git a/finality-aleph/src/data_io/legacy/data_provider.rs b/finality-aleph/src/data_io/legacy/data_provider.rs index b97d319c7f..2d6d4f6ad6 100644 --- a/finality-aleph/src/data_io/legacy/data_provider.rs +++ b/finality-aleph/src/data_io/legacy/data_provider.rs @@ -325,10 +325,10 @@ impl DataProvider { let data_to_propose = (*self.data_to_propose.lock()).take(); if let Some(data) = &data_to_propose { - self.metrics.report_block( - *data.head_proposal.branch.last().unwrap(), - Checkpoint::Proposed, - ); + let number = data.head_proposal.number; + let hash = *data.head_proposal.branch.last().unwrap(); + self.metrics + .report_block(BlockId::new(hash, number), Checkpoint::Proposed, None); debug!(target: "aleph-data-store", "Outputting {:?} in get_data", data); }; @@ -354,7 +354,7 @@ mod tests { client_chain_builder::ClientChainBuilder, mocks::{TestClientBuilder, TestClientBuilderExt}, }, - SessionBoundaryInfo, SessionId, SessionPeriod, TimingBlockMetrics, + SessionBoundaryInfo, SessionId, SessionPeriod, }; const SESSION_LEN: u32 = 100; @@ -385,7 +385,7 @@ mod tests { client, session_boundaries, config, - AllBlockMetrics::new(TimingBlockMetrics::noop()), + AllBlockMetrics::new(None), ); let (exit_chain_tracker_tx, exit_chain_tracker_rx) = oneshot::channel(); diff --git a/finality-aleph/src/finalization.rs b/finality-aleph/src/finalization.rs index 70adc492e5..5244a1ae6b 100644 --- a/finality-aleph/src/finalization.rs +++ b/finality-aleph/src/finalization.rs @@ -74,7 +74,8 @@ where match &update_res { Ok(_) => { debug!(target: "aleph-finality", "Successfully finalized block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number); - self.metrics.report_block(hash, Checkpoint::Finalized); + self.metrics + .report_block(block, Checkpoint::Finalized, None); } Err(_) => { debug!(target: "aleph-finality", "Failed to finalize block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number) diff --git a/finality-aleph/src/import.rs b/finality-aleph/src/import.rs index 673a1b96e1..01173e583d 100644 --- a/finality-aleph/src/import.rs +++ b/finality-aleph/src/import.rs @@ -8,7 +8,7 @@ use log::{debug, warn}; use sc_consensus::{ BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport, }; -use sp_consensus::Error as ConsensusError; +use sp_consensus::{BlockOrigin, Error as ConsensusError}; use sp_runtime::{traits::Header as HeaderT, Justification as SubstrateJustification}; use crate::{ @@ -58,14 +58,24 @@ where block: BlockImportParams, ) -> Result { let post_hash = block.post_hash(); + let number = *block.post_header().number(); + let is_own = block.origin == BlockOrigin::Own; // Self-created blocks are imported without using the import queue, // so we need to report them here. - self.metrics.report_block(post_hash, Checkpoint::Importing); + self.metrics.report_block( + BlockId::new(post_hash, number), + Checkpoint::Importing, + Some(is_own), + ); let result = self.inner.import_block(block).await; if let Ok(ImportResult::Imported(_)) = &result { - self.metrics.report_block(post_hash, Checkpoint::Imported); + self.metrics.report_block( + BlockId::new(post_hash, number), + Checkpoint::Imported, + Some(is_own), + ); } result } diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index c2cc248bed..a090148014 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -67,7 +67,7 @@ pub use crate::{ }, import::{AlephBlockImport, RedirectingBlockImport, TracingBlockImport}, justification::AlephJustification, - metrics::{AllBlockMetrics, DefaultClock, TimingBlockMetrics}, + metrics::{AllBlockMetrics, DefaultClock, FinalityRateMetrics, TimingBlockMetrics}, network::{ address_cache::{ValidatorAddressCache, ValidatorAddressingInfo}, Protocol, ProtocolNaming, SubstrateNetwork, SubstrateNetworkEventStream, diff --git a/finality-aleph/src/metrics/all_block.rs b/finality-aleph/src/metrics/all_block.rs index fd4cc53ecd..ac66384027 100644 --- a/finality-aleph/src/metrics/all_block.rs +++ b/finality-aleph/src/metrics/all_block.rs @@ -1,21 +1,53 @@ -use primitives::BlockHash; +use log::warn; +use substrate_prometheus_endpoint::Registry; -use super::{timing::DefaultClock, Checkpoint}; -use crate::TimingBlockMetrics; +use super::{finality_rate::FinalityRateMetrics, timing::DefaultClock, Checkpoint}; +use crate::{metrics::LOG_TARGET, BlockId, TimingBlockMetrics}; /// Wrapper around various block-related metrics. #[derive(Clone)] pub struct AllBlockMetrics { timing_metrics: TimingBlockMetrics, + finality_rate_metrics: FinalityRateMetrics, } impl AllBlockMetrics { - pub fn new(timing_metrics: TimingBlockMetrics) -> Self { - AllBlockMetrics { timing_metrics } + pub fn new(registry: Option<&Registry>) -> Self { + let timing_metrics = match TimingBlockMetrics::new(registry, DefaultClock) { + Ok(timing_metrics) => timing_metrics, + Err(e) => { + warn!( + target: LOG_TARGET, + "Failed to register Prometheus block timing metrics: {:?}.", e + ); + TimingBlockMetrics::Noop + } + }; + let finality_rate_metrics = match FinalityRateMetrics::new(registry) { + Ok(finality_rate_metrics) => finality_rate_metrics, + Err(e) => { + warn!( + target: LOG_TARGET, + "Failed to register Prometheus finality rate metrics: {:?}.", e + ); + FinalityRateMetrics::Noop + } + }; + AllBlockMetrics { + timing_metrics, + finality_rate_metrics, + } } /// Triggers all contained block metrics. - pub fn report_block(&self, hash: BlockHash, checkpoint: Checkpoint) { - self.timing_metrics.report_block(hash, checkpoint); + pub fn report_block(&self, block_id: BlockId, checkpoint: Checkpoint, own: Option) { + self.timing_metrics + .report_block(block_id.hash(), checkpoint); + self.finality_rate_metrics.report_block( + block_id.hash(), + block_id.number(), + checkpoint, + own, + ); } } diff --git a/finality-aleph/src/metrics/finality_rate.rs b/finality-aleph/src/metrics/finality_rate.rs new file mode 100644 index 0000000000..09d084b5e9 --- /dev/null +++ b/finality-aleph/src/metrics/finality_rate.rs @@ -0,0 +1,193 @@ +use std::num::NonZeroUsize; + +use log::warn; +use lru::LruCache; +use parking_lot::Mutex; +use primitives::{BlockHash, BlockNumber}; +use sc_service::Arc; +use sp_core::{bounded_vec::BoundedVec, ConstU32}; +use substrate_prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; + +use super::Checkpoint; +use crate::metrics::LOG_TARGET; + +const MAX_CACHE_SIZE: usize = 1800; +const MAX_INNER_SIZE: u32 = 64; + +type ImportedHashesCache = + Arc>>>>; + +#[derive(Clone)] +pub enum FinalityRateMetrics { + Prometheus { + own_finalized: Counter, + own_hopeless: Counter, + imported_cache: ImportedHashesCache, + }, + Noop, +} + +impl FinalityRateMetrics { + pub fn new(registry: Option<&Registry>) -> Result { + let registry = match registry { + None => return Ok(FinalityRateMetrics::Noop), + Some(registry) => registry, + }; + + Ok(FinalityRateMetrics::Prometheus { + own_finalized: register( + Counter::new("aleph_own_finalized_blocks", "no help")?, + registry, + )?, + own_hopeless: register( + Counter::new("aleph_own_hopeless_blocks", "no help")?, + registry, + )?, + imported_cache: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(MAX_CACHE_SIZE).unwrap(), + ))), + }) + } + + pub fn report_block( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + checkpoint: Checkpoint, + own: Option, + ) { + match checkpoint { + Checkpoint::Imported => { + if let Some(true) = own { + self.report_own_imported(block_hash, block_number); + } + } + Checkpoint::Finalized => self.report_finalized(block_hash, block_number), + _ => {} + } + } + + /// Stores the imported block's hash. Assumes that the imported block is own. + fn report_own_imported(&self, hash: BlockHash, number: BlockNumber) { + let mut imported_cache = match self { + FinalityRateMetrics::Prometheus { imported_cache, .. } => imported_cache.lock(), + FinalityRateMetrics::Noop => return, + }; + + let entry = imported_cache + .get_or_insert_mut(number, BoundedVec::<_, ConstU32>::new); + + if entry.try_push(hash).is_err() { + warn!( + target: LOG_TARGET, + "Finality Rate Metrics encountered too many own imported blocks at level {}", + number + ); + } + } + + /// Counts the blocks at the level of `number` different than the passed block + /// and reports them as hopeless. If `hash` is a hash of own block it will be found + /// in `imported_cache` and reported as finalized. + fn report_finalized(&self, hash: BlockHash, number: BlockNumber) { + let (own_finalized, own_hopeless, imported_cache) = match self { + FinalityRateMetrics::Prometheus { + own_finalized, + own_hopeless, + imported_cache, + } => (own_finalized, own_hopeless, imported_cache), + FinalityRateMetrics::Noop => return, + }; + + let mut imported_cache = imported_cache.lock(); + if let Some(hashes) = imported_cache.get_mut(&number) { + let new_hopeless_count = hashes.iter().filter(|h| **h != hash).count(); + own_hopeless.inc_by(new_hopeless_count as u64); + own_finalized.inc_by((hashes.len() - new_hopeless_count) as u64); + } + imported_cache.pop(&number); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use primitives::{BlockHash, BlockNumber}; + use substrate_prometheus_endpoint::{Counter, Registry, U64}; + + use crate::{metrics::finality_rate::ImportedHashesCache, FinalityRateMetrics}; + + type FinalityRateMetricsInternals = (Counter, Counter, ImportedHashesCache); + + fn extract_internals(metrics: FinalityRateMetrics) -> FinalityRateMetricsInternals { + match metrics { + FinalityRateMetrics::Prometheus { + own_finalized, + own_hopeless, + imported_cache, + } => (own_finalized, own_hopeless, imported_cache), + FinalityRateMetrics::Noop => panic!("metrics should have been initialized properly"), + } + } + + fn verify_state( + metrics: &FinalityRateMetrics, + expected_finalized: u64, + expected_hopeless: u64, + expected_cache: HashMap>, + ) { + let (finalized, hopeless, cache) = extract_internals(metrics.clone()); + let cache = cache.lock(); + assert_eq!(finalized.get(), expected_finalized); + assert_eq!(hopeless.get(), expected_hopeless); + + // verify caches are equal + assert_eq!(expected_cache.len(), cache.len()); + for (level, expected_hashes) in expected_cache { + assert!(cache.contains(&level)); + let hashes = cache.peek(&level).unwrap(); + assert_eq!(expected_hashes, hashes.clone().into_inner()); + } + } + + #[test] + fn imported_cache_behaves_properly() { + let metrics = FinalityRateMetrics::new(Some(&Registry::new())).unwrap(); + + verify_state(&metrics, 0, 0, HashMap::new()); + + let hash0 = BlockHash::random(); + metrics.report_own_imported(hash0, 0); + + verify_state(&metrics, 0, 0, HashMap::from([(0, vec![hash0])])); + + let hash1 = BlockHash::random(); + metrics.report_own_imported(hash1, 1); + + verify_state( + &metrics, + 0, + 0, + HashMap::from([(0, vec![hash0]), (1, vec![hash1])]), + ); + + let hash2 = BlockHash::random(); + metrics.report_own_imported(hash2, 1); + + verify_state( + &metrics, + 0, + 0, + HashMap::from([(0, vec![hash0]), (1, vec![hash1, hash2])]), + ); + + metrics.report_finalized(hash0, 0); + + verify_state(&metrics, 1, 0, HashMap::from([(1, vec![hash1, hash2])])); + + metrics.report_finalized(BlockHash::random(), 1); + + verify_state(&metrics, 1, 2, HashMap::new()); + } +} diff --git a/finality-aleph/src/metrics/mod.rs b/finality-aleph/src/metrics/mod.rs index 7475560f9e..a0961de55a 100644 --- a/finality-aleph/src/metrics/mod.rs +++ b/finality-aleph/src/metrics/mod.rs @@ -1,8 +1,10 @@ mod all_block; mod chain_state; +mod finality_rate; mod timing; pub use all_block::AllBlockMetrics; pub use chain_state::run_chain_state_metrics; +pub use finality_rate::FinalityRateMetrics; pub use timing::{Checkpoint, DefaultClock, TimingBlockMetrics}; const LOG_TARGET: &str = "aleph-metrics"; diff --git a/finality-aleph/src/party/manager/aggregator.rs b/finality-aleph/src/party/manager/aggregator.rs index 1404c043cc..44f4f868e8 100644 --- a/finality-aleph/src/party/manager/aggregator.rs +++ b/finality-aleph/src/party/manager/aggregator.rs @@ -40,15 +40,16 @@ where async fn process_new_block_data( aggregator: &mut Aggregator<'_, CN, LN>, block: BlockId, - metrics: &AllBlockMetrics, + metrics: &mut AllBlockMetrics, ) where CN: Network, LN: Network, { trace!(target: "aleph-party", "Received unit {:?} in aggregator.", block); - metrics.report_block(block.hash(), Checkpoint::Ordered); + let hash = block.hash(); + metrics.report_block(block, Checkpoint::Ordered, None); - aggregator.start_aggregation(block.hash()).await; + aggregator.start_aggregation(hash).await; } fn process_hash( @@ -88,7 +89,7 @@ async fn run_aggregator( io: IO, client: Arc, session_boundaries: &SessionBoundaries, - metrics: AllBlockMetrics, + mut metrics: AllBlockMetrics, mut exit_rx: oneshot::Receiver<()>, ) -> Result<(), ()> where @@ -129,7 +130,7 @@ where process_new_block_data::( &mut aggregator, block, - &metrics + &mut metrics ).await; } else { debug!(target: "aleph-party", "Blocks ended in aggregator."); diff --git a/finality-aleph/src/sync/handler/mod.rs b/finality-aleph/src/sync/handler/mod.rs index b6d3c068e5..c33b12a141 100644 --- a/finality-aleph/src/sync/handler/mod.rs +++ b/finality-aleph/src/sync/handler/mod.rs @@ -472,8 +472,7 @@ where /// Check for equivocations and then send the block to the block importer. /// It's important to pass every incoming block through this function, as the block importer /// will accept equivocated headers, and then notify us by sending back a VERIFIED header. - /// Also, this is the last place we know if we've authored the block, without having to - /// check it by hand. + /// The knowledge of authorship of the block is passed further to the block importer. fn import_block( &mut self, block: B, @@ -486,7 +485,7 @@ where maybe_equivocation_proof, .. } = self.verify_header(block.header().clone(), own_block)?; - self.block_importer.import_block(block); + self.block_importer.import_block(block, own_block); Ok(maybe_equivocation_proof) } @@ -927,7 +926,7 @@ mod tests { .take(branch_length) .collect(); for header in &result { - backend.import_block(MockBlock::new(header.clone(), true)); + backend.import_block(MockBlock::new(header.clone(), true), false); } result } @@ -1075,7 +1074,7 @@ mod tests { handler .handle_justification_from_user(justification) .expect("should work"); - backend.import_block(block); + backend.import_block(block, false); match notifier.next().await { Ok(BlockImported(header)) => { handler.block_imported(header).expect("should work"); @@ -2002,7 +2001,7 @@ mod tests { let headers: Vec = import_branch(&mut backend_a, 110); for header in &headers { - backend_b.import_block(MockBlock::new(header.clone(), true)); + backend_b.import_block(MockBlock::new(header.clone(), true), false); } let justifications: Vec = headers @@ -2709,7 +2708,7 @@ mod tests { let branch: Vec<_> = genesis.random_branch().take(2137).collect(); for header in branch.iter() { let block = MockBlock::new(header.clone(), true); - backend.import_block(block); + backend.import_block(block, false); match notifier.next().await { Ok(BlockImported(header)) => { // we ignore failures, as we expect some