diff --git a/consensus/core/src/config/constants.rs b/consensus/core/src/config/constants.rs index c4635083b7..146e30a17c 100644 --- a/consensus/core/src/config/constants.rs +++ b/consensus/core/src/config/constants.rs @@ -121,7 +121,7 @@ pub mod perf { const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000; const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200; - const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000; + const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2_000; const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000; #[derive(Clone, Debug)] diff --git a/consensus/core/src/errors/block.rs b/consensus/core/src/errors/block.rs index f5c235476a..132c6619f7 100644 --- a/consensus/core/src/errors/block.rs +++ b/consensus/core/src/errors/block.rs @@ -64,8 +64,8 @@ pub enum RuleError { #[error("expected header blue work {0} but got {1}")] UnexpectedHeaderBlueWork(BlueWorkType, BlueWorkType), - #[error("block difficulty of {0} is not the expected value of {1}")] - UnexpectedDifficulty(u32, u32), + #[error("block {0} difficulty of {1} is not the expected value of {2}")] + UnexpectedDifficulty(Hash, u32, u32), #[error("block timestamp of {0} is not after expected {1}")] TimeTooOld(u64, u64), diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index b3edd55ca4..eca78ee2a4 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -241,23 +241,13 @@ impl Consensus { body_receiver, virtual_sender, block_processors_pool, + params, db.clone(), - storage.statuses_store.clone(), - storage.ghostdag_store.clone(), - storage.headers_store.clone(), - storage.block_transactions_store.clone(), - storage.body_tips_store.clone(), - services.reachability_service.clone(), - services.coinbase_manager.clone(), - services.mass_calculator.clone(), - services.transaction_validator.clone(), - services.window_manager.clone(), - params.max_block_mass, - params.genesis.clone(), + &storage, + &services, pruning_lock.clone(), notification_root.clone(), counters.clone(), - params.storage_mass_activation, )); let virtual_processor = Arc::new(VirtualStateProcessor::new( diff --git a/consensus/src/model/stores/ghostdag.rs b/consensus/src/model/stores/ghostdag.rs index fd2600a1c4..4ed02e4cec 100644 --- a/consensus/src/model/stores/ghostdag.rs +++ b/consensus/src/model/stores/ghostdag.rs @@ -48,6 +48,7 @@ impl MemSizeEstimator for GhostdagData { impl MemSizeEstimator for CompactGhostdagData {} impl From<&GhostdagData> for CompactGhostdagData { + #[inline(always)] fn from(value: &GhostdagData) -> Self { Self { blue_score: value.blue_score, blue_work: value.blue_work, selected_parent: value.selected_parent } } diff --git a/consensus/src/pipeline/body_processor/body_validation_in_context.rs b/consensus/src/pipeline/body_processor/body_validation_in_context.rs index b03643df87..ec42f0f447 100644 --- a/consensus/src/pipeline/body_processor/body_validation_in_context.rs +++ b/consensus/src/pipeline/body_processor/body_validation_in_context.rs @@ -8,6 +8,7 @@ use kaspa_consensus_core::block::Block; use kaspa_database::prelude::StoreResultExtensions; use kaspa_hashes::Hash; use kaspa_utils::option::OptionExtensions; +use once_cell::unsync::Lazy; use std::sync::Arc; impl BlockBodyProcessor { @@ -18,13 +19,31 @@ impl BlockBodyProcessor { } fn check_block_transactions_in_context(self: &Arc, block: &Block) -> BlockProcessResult<()> { - let (pmt, _) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?; + // Note: This is somewhat expensive during ibd, as it incurs cache misses. + + // Use lazy evaluation to avoid unnecessary work, as most of the time we expect the txs not to have lock time. + let lazy_pmt_res = + Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) { + Ok((pmt, pmt_window)) => { + if !self.block_window_cache_for_past_median_time.contains_key(&block.hash()) { + self.block_window_cache_for_past_median_time.insert(block.hash(), pmt_window); + }; + Ok(pmt) + } + Err(e) => Err(e), + }); + for tx in block.transactions.iter() { - if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) { - return Err(RuleError::TxInContextFailed(tx.id(), e)); - } + // Quick check to avoid the expensive Lazy eval during ibd (in most cases). + // TODO: refactor this and avoid classifying the tx lock outside of the transaction validator. + if tx.lock_time != 0 { + if let Err(e) = + self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*lazy_pmt_res).clone()?) + { + return Err(RuleError::TxInContextFailed(tx.id(), e)); + }; + }; } - Ok(()) } diff --git a/consensus/src/pipeline/body_processor/processor.rs b/consensus/src/pipeline/body_processor/processor.rs index 6885c78b5e..ebb11a2003 100644 --- a/consensus/src/pipeline/body_processor/processor.rs +++ b/consensus/src/pipeline/body_processor/processor.rs @@ -1,10 +1,14 @@ use crate::{ - consensus::services::DbWindowManager, + consensus::{ + services::{ConsensusServices, DbWindowManager}, + storage::ConsensusStorage, + }, errors::{BlockProcessResult, RuleError}, model::{ services::reachability::MTReachabilityService, stores::{ block_transactions::DbBlockTransactionsStore, + block_window_cache::BlockWindowCacheStore, ghostdag::DbGhostdagStore, headers::DbHeadersStore, reachability::DbReachabilityStore, @@ -23,7 +27,10 @@ use crossbeam_channel::{Receiver, Sender}; use kaspa_consensus_core::{ block::Block, blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid}, - config::{genesis::GenesisBlock, params::ForkActivation}, + config::{ + genesis::GenesisBlock, + params::{ForkActivation, Params}, + }, mass::MassCalculator, tx::Transaction, }; @@ -60,6 +67,7 @@ pub struct BlockBodyProcessor { pub(super) headers_store: Arc, pub(super) block_transactions_store: Arc, pub(super) body_tips_store: Arc>, + pub(super) block_window_cache_for_past_median_time: Arc, // Managers and services pub(super) reachability_service: MTReachabilityService, @@ -91,47 +99,42 @@ impl BlockBodyProcessor { sender: Sender, thread_pool: Arc, + params: &Params, db: Arc, - statuses_store: Arc>, - ghostdag_store: Arc, - headers_store: Arc, - block_transactions_store: Arc, - body_tips_store: Arc>, - - reachability_service: MTReachabilityService, - coinbase_manager: CoinbaseManager, - mass_calculator: MassCalculator, - transaction_validator: TransactionValidator, - window_manager: DbWindowManager, - max_block_mass: u64, - genesis: GenesisBlock, + storage: &Arc, + services: &Arc, + pruning_lock: SessionLock, notification_root: Arc, counters: Arc, - storage_mass_activation: ForkActivation, ) -> Self { Self { receiver, sender, thread_pool, db, - statuses_store, - reachability_service, - ghostdag_store, - headers_store, - block_transactions_store, - body_tips_store, - coinbase_manager, - mass_calculator, - transaction_validator, - window_manager, - max_block_mass, - genesis, + + max_block_mass: params.max_block_mass, + genesis: params.genesis.clone(), + + statuses_store: storage.statuses_store.clone(), + ghostdag_store: storage.ghostdag_store.clone(), + headers_store: storage.headers_store.clone(), + block_transactions_store: storage.block_transactions_store.clone(), + body_tips_store: storage.body_tips_store.clone(), + block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(), + + reachability_service: services.reachability_service.clone(), + coinbase_manager: services.coinbase_manager.clone(), + mass_calculator: services.mass_calculator.clone(), + transaction_validator: services.transaction_validator.clone(), + window_manager: services.window_manager.clone(), + pruning_lock, task_manager: BlockTaskDependencyManager::new(), notification_root, counters, - storage_mass_activation, + storage_mass_activation: params.storage_mass_activation, } } diff --git a/consensus/src/pipeline/header_processor/pre_pow_validation.rs b/consensus/src/pipeline/header_processor/pre_pow_validation.rs index a4dfb8b1e7..7764e1c150 100644 --- a/consensus/src/pipeline/header_processor/pre_pow_validation.rs +++ b/consensus/src/pipeline/header_processor/pre_pow_validation.rs @@ -35,7 +35,7 @@ impl HeaderProcessor { ctx.mergeset_non_daa = Some(daa_window.mergeset_non_daa); if header.bits != expected_bits { - return Err(RuleError::UnexpectedDifficulty(header.bits, expected_bits)); + return Err(RuleError::UnexpectedDifficulty(header.hash, header.bits, expected_bits)); } ctx.block_window_for_difficulty = Some(daa_window.window); diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index c654fef430..1f0c4ff38b 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -16,6 +16,7 @@ use crate::{ stores::{ acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore}, block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore}, + block_window_cache::BlockWindowCacheStore, daa::DbDaaStore, depth::{DbDepthStore, DepthStoreReader}, ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader}, @@ -76,6 +77,7 @@ use kaspa_database::prelude::{StoreError, StoreResultEmptyTuple, StoreResultExte use kaspa_hashes::Hash; use kaspa_muhash::MuHash; use kaspa_notify::{events::EventType, notifier::Notify}; +use once_cell::unsync::Lazy; use super::errors::{PruningImportError, PruningImportResult}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; @@ -149,6 +151,10 @@ pub struct VirtualStateProcessor { pub(super) parents_manager: DbParentsManager, pub(super) depth_manager: DbBlockDepthManager, + // block window caches + pub(super) block_window_cache_for_difficulty: Arc, + pub(super) block_window_cache_for_past_median_time: Arc, + // Pruning lock pruning_lock: SessionLock, @@ -206,6 +212,9 @@ impl VirtualStateProcessor { pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(), lkg_virtual_state: storage.lkg_virtual_state.clone(), + block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(), + block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(), + ghostdag_manager: services.ghostdag_manager.clone(), reachability_service: services.reachability_service.clone(), relations_service: services.relations_service.clone(), @@ -291,6 +300,10 @@ impl VirtualStateProcessor { let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap(); let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None); + let sink_ghostdag_data = Lazy::new(|| self.ghostdag_store.get_data(new_sink).unwrap()); + // Cache the DAA and Median time windows of the sink for future use, as well as prepare for virtual's window calculations + self.cache_sink_windows(new_sink, prev_sink, &sink_ghostdag_data); + let new_virtual_state = self .calculate_and_commit_virtual_state( virtual_read, @@ -302,12 +315,19 @@ impl VirtualStateProcessor { ) .expect("all possible rule errors are unexpected here"); + let compact_sink_ghostdag_data = if let Some(sink_ghostdag_data) = Lazy::get(&sink_ghostdag_data) { + // If we had to retrieve the full data, we convert it to compact + sink_ghostdag_data.to_compact() + } else { + // Else we query the compact data directly. + self.ghostdag_store.get_compact_data(new_sink).unwrap() + }; + // Update the pruning processor about the virtual state change - let sink_ghostdag_data = self.ghostdag_store.get_compact_data(new_sink).unwrap(); // Empty the channel before sending the new message. If pruning processor is busy, this step makes sure // the internal channel does not grow with no need (since we only care about the most recent message) let _consume = self.pruning_receiver.try_iter().count(); - self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap(); + self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data: compact_sink_ghostdag_data }).unwrap(); // Emit notifications let accumulated_diff = Arc::new(accumulated_diff); @@ -319,7 +339,7 @@ impl VirtualStateProcessor { .notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents))) .expect("expecting an open unbounded channel"); self.notification_root - .notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score))) + .notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(compact_sink_ghostdag_data.blue_score))) .expect("expecting an open unbounded channel"); self.notification_root .notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score))) @@ -540,6 +560,26 @@ impl VirtualStateProcessor { drop(selected_chain_write); } + /// Caches the DAA and Median time windows of the sink block (if needed). Following, virtual's window calculations will + /// naturally hit the cache finding the sink's windows and building upon them. + fn cache_sink_windows(&self, new_sink: Hash, prev_sink: Hash, sink_ghostdag_data: &impl Deref>) { + // We expect that the `new_sink` is cached (or some close-enough ancestor thereof) if it is equal to the `prev_sink`, + // Hence we short-circuit the check of the keys in such cases, thereby reducing the access of the read-lock + if new_sink != prev_sink { + // this is only important for ibd performance, as we incur expensive cache misses otherwise. + // this occurs because we cannot rely on header processing to pre-cache in this scenario. + if !self.block_window_cache_for_difficulty.contains_key(&new_sink) { + self.block_window_cache_for_difficulty + .insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data.deref()).unwrap().window); + }; + + if !self.block_window_cache_for_past_median_time.contains_key(&new_sink) { + self.block_window_cache_for_past_median_time + .insert(new_sink, self.window_manager.calc_past_median_time(sink_ghostdag_data.deref()).unwrap().1); + }; + } + } + /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation. /// /// Guaranteed to be `>= self.max_block_parents` diff --git a/consensus/src/processes/window.rs b/consensus/src/processes/window.rs index ca0f71cf20..ab09b1e7cb 100644 --- a/consensus/src/processes/window.rs +++ b/consensus/src/processes/window.rs @@ -17,7 +17,12 @@ use kaspa_hashes::Hash; use kaspa_math::Uint256; use kaspa_utils::refs::Refs; use once_cell::unsync::Lazy; -use std::{cmp::Reverse, iter::once, ops::Deref, sync::Arc}; +use std::{ + cmp::Reverse, + iter::once, + ops::{Deref, DerefMut}, + sync::Arc, +}; use super::{ difficulty::{FullDifficultyManager, SampledDifficultyManager}, @@ -332,52 +337,36 @@ impl None, }; - if let Some(cache) = cache { - if let Some(selected_parent_binary_heap) = cache.get(&ghostdag_data.selected_parent) { - // Only use the cached window if it originates from here - if let WindowOrigin::Sampled = selected_parent_binary_heap.origin() { - let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap(); - - let mut heap = - Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_binary_heap).clone())); - for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { - match block { - SampledBlock::Sampled(block) => { - heap.try_push(block.hash, block.blue_work); - } - SampledBlock::NonDaa(hash) => { - mergeset_non_daa_inserter(hash); - } - } - } - - return if let Ok(heap) = Lazy::into_value(heap) { - Ok(Arc::new(heap.binary_heap)) - } else { - Ok(selected_parent_binary_heap.clone()) - }; - } - } + let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap(); + + // Try to initialize the window from the cache directly + if let Some(res) = self.try_init_from_cache( + window_size, + sample_rate, + cache, + ghostdag_data, + selected_parent_blue_work, + Some(&mut mergeset_non_daa_inserter), + ) { + return Ok(res); } + // else we populate the window with the passed ghostdag_data. let mut window_heap = BoundedSizeBlockHeap::new(WindowOrigin::Sampled, window_size); - let parent_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap(); - - for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, parent_ghostdag.blue_work) { - match block { - SampledBlock::Sampled(block) => { - window_heap.try_push(block.hash, block.blue_work); - } - SampledBlock::NonDaa(hash) => { - mergeset_non_daa_inserter(hash); - } - } - } + self.push_mergeset( + &mut &mut window_heap, + sample_rate, + ghostdag_data, + selected_parent_blue_work, + Some(&mut mergeset_non_daa_inserter), + ); + let mut current_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap(); - let mut current_ghostdag = parent_ghostdag; + // Note: no need to check for cache here, as we already tried to initialize from the passed ghostdag's selected parent cache in `self.try_init_from_cache` - // Walk down the chain until we cross the window boundaries + // Walk down the chain until we cross the window boundaries. loop { + // check if we may exit early. if current_ghostdag.selected_parent.is_origin() { // Reaching origin means there's no more data, so we expect the window to already be full, otherwise we err. // This error can happen only during an IBD from pruning proof when processing the first headers in the pruning point's @@ -387,50 +376,101 @@ impl); + + // see if we can inherit and merge with the selected parent cache + if self.try_merge_with_selected_parent_cache(&mut window_heap, cache, ¤t_ghostdag.selected_parent) { + // if successful, we may break out of the loop, with the window already filled. + break; + }; + + // update the current ghostdag to the parent ghostdag, and continue the loop. current_ghostdag = parent_ghostdag; } Ok(Arc::new(window_heap.binary_heap)) } - fn try_push_mergeset( + /// Push the mergeset samples into the bounded heap. + /// Note: receives the heap argument as a DerefMut so that Lazy can be passed and be evaluated *only if an actual push is needed* + fn push_mergeset( &self, - heap: &mut BoundedSizeBlockHeap, + heap: &mut impl DerefMut, sample_rate: u64, ghostdag_data: &GhostdagData, selected_parent_blue_work: BlueWorkType, - ) -> bool { - // If the window is full and the selected parent is less than the minimum then we break - // because this means that there cannot be any more blocks in the past with higher blue work - if !heap.can_push(ghostdag_data.selected_parent, selected_parent_blue_work) { - return true; - } - - for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { - match block { - SampledBlock::Sampled(block) => { + mergeset_non_daa_inserter: Option, + ) { + if let Some(mut mergeset_non_daa_inserter) = mergeset_non_daa_inserter { + // If we have a non-daa inserter, we most iterate over the whole mergeset and op the sampled and non-daa blocks. + for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { + match block { + SampledBlock::Sampled(block) => { + heap.try_push(block.hash, block.blue_work); + } + SampledBlock::NonDaa(hash) => mergeset_non_daa_inserter(hash), + }; + } + } else { + // If we don't have a non-daa inserter, we can iterate over the sampled mergeset and return early if we can't push anymore. + for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { + if let SampledBlock::Sampled(block) = block { if !heap.try_push(block.hash, block.blue_work) { - break; + return; } } - SampledBlock::NonDaa(_) => {} } } - false + } + + fn try_init_from_cache( + &self, + window_size: usize, + sample_rate: u64, + cache: Option<&Arc>, + ghostdag_data: &GhostdagData, + selected_parent_blue_work: BlueWorkType, + mergeset_non_daa_inserter: Option, + ) -> Option> { + cache.and_then(|cache| { + cache.get(&ghostdag_data.selected_parent).map(|selected_parent_window| { + let mut heap = Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_window).clone())); + // We pass a Lazy heap as an optimization to avoid cloning the selected parent heap in cases where the mergeset contains no samples + self.push_mergeset(&mut heap, sample_rate, ghostdag_data, selected_parent_blue_work, mergeset_non_daa_inserter); + if let Ok(heap) = Lazy::into_value(heap) { + Arc::new(heap.binary_heap) + } else { + selected_parent_window.clone() + } + }) + }) + } + + fn try_merge_with_selected_parent_cache( + &self, + heap: &mut BoundedSizeBlockHeap, + cache: Option<&Arc>, + selected_parent: &Hash, + ) -> bool { + cache + .and_then(|cache| { + cache.get(selected_parent).map(|selected_parent_window| { + heap.merge_ancestor_heap(&mut (*selected_parent_window).clone()); + }) + }) + .is_some() } fn sampled_mergeset_iterator<'a>( @@ -686,4 +726,14 @@ impl BoundedSizeBlockHeap { self.binary_heap.push(r_sortable_block); true } + + // This method is intended to be used to merge the ancestor heap with the current heap. + fn merge_ancestor_heap(&mut self, ancestor_heap: &mut BlockWindowHeap) { + self.binary_heap.blocks.append(&mut ancestor_heap.blocks); + // Below we saturate for cases where ancestor may be close to, the origin, or genesis. + // Note: this is a no-op if overflow_amount is 0, i.e. because of the saturating sub, the sum of the two heaps is less or equal to the size bound. + for _ in 0..self.binary_heap.len().saturating_sub(self.size_bound) { + self.binary_heap.blocks.pop(); + } + } }