Skip to content

Commit

Permalink
Optimize window cache building for ibd (#576)
Browse files Browse the repository at this point in the history
* show changes.

* optimize window caches for ibd.

* do lints and checks etc..

* bench and compare.

* clean-up

* rework lock time check a bit.

* // bool: todo!(),

* fmt

* address some reveiw points.

* address reveiw comments.

* update comments.

* pass tests.

* fix blue work assumption, update error message.

* Update window.rs

slight comment update.

* simplify a bit more.

* remove some unneeded things. rearrange access to cmpct gdd.

* fix conflicts.

* lints..

* address reveiw points from m. sutton.

* uncomplicate check_block_transactions_in_context

* commit in lazy

* fix lints.

* query compact data as much as possible.

* Use DefefMut to unify push_mergeset logic for all cases (addresses @tiram's review)

* comment on cache_sink_windows

* add comment to new_sink != prev_sink

* return out of push_mergeset, if we cannot push any more.

* remove unused diff cache and do non-daa as option.
  • Loading branch information
D-Stacks authored Nov 19, 2024
1 parent 1d3b9a9 commit a0aeec3
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 117 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub mod perf {

const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000;
const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2_000;
const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000;

#[derive(Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub enum RuleError {
#[error("expected header blue work {0} but got {1}")]
UnexpectedHeaderBlueWork(BlueWorkType, BlueWorkType),

#[error("block difficulty of {0} is not the expected value of {1}")]
UnexpectedDifficulty(u32, u32),
#[error("block {0} difficulty of {1} is not the expected value of {2}")]
UnexpectedDifficulty(Hash, u32, u32),

#[error("block timestamp of {0} is not after expected {1}")]
TimeTooOld(u64, u64),
Expand Down
16 changes: 3 additions & 13 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,13 @@ impl Consensus {
body_receiver,
virtual_sender,
block_processors_pool,
params,
db.clone(),
storage.statuses_store.clone(),
storage.ghostdag_store.clone(),
storage.headers_store.clone(),
storage.block_transactions_store.clone(),
storage.body_tips_store.clone(),
services.reachability_service.clone(),
services.coinbase_manager.clone(),
services.mass_calculator.clone(),
services.transaction_validator.clone(),
services.window_manager.clone(),
params.max_block_mass,
params.genesis.clone(),
&storage,
&services,
pruning_lock.clone(),
notification_root.clone(),
counters.clone(),
params.storage_mass_activation,
));

let virtual_processor = Arc::new(VirtualStateProcessor::new(
Expand Down
1 change: 1 addition & 0 deletions consensus/src/model/stores/ghostdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl MemSizeEstimator for GhostdagData {
impl MemSizeEstimator for CompactGhostdagData {}

impl From<&GhostdagData> for CompactGhostdagData {
#[inline(always)]
fn from(value: &GhostdagData) -> Self {
Self { blue_score: value.blue_score, blue_work: value.blue_work, selected_parent: value.selected_parent }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kaspa_consensus_core::block::Block;
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use once_cell::unsync::Lazy;
use std::sync::Arc;

impl BlockBodyProcessor {
Expand All @@ -18,13 +19,31 @@ impl BlockBodyProcessor {
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let (pmt, _) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?;
// Note: This is somewhat expensive during ibd, as it incurs cache misses.

// Use lazy evaluation to avoid unnecessary work, as most of the time we expect the txs not to have lock time.
let lazy_pmt_res =
Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) {
Ok((pmt, pmt_window)) => {
if !self.block_window_cache_for_past_median_time.contains_key(&block.hash()) {
self.block_window_cache_for_past_median_time.insert(block.hash(), pmt_window);
};
Ok(pmt)
}
Err(e) => Err(e),
});

for tx in block.transactions.iter() {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
}
// Quick check to avoid the expensive Lazy eval during ibd (in most cases).
// TODO: refactor this and avoid classifying the tx lock outside of the transaction validator.
if tx.lock_time != 0 {
if let Err(e) =
self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*lazy_pmt_res).clone()?)
{
return Err(RuleError::TxInContextFailed(tx.id(), e));
};
};
}

Ok(())
}

Expand Down
61 changes: 32 additions & 29 deletions consensus/src/pipeline/body_processor/processor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::{
consensus::services::DbWindowManager,
consensus::{
services::{ConsensusServices, DbWindowManager},
storage::ConsensusStorage,
},
errors::{BlockProcessResult, RuleError},
model::{
services::reachability::MTReachabilityService,
stores::{
block_transactions::DbBlockTransactionsStore,
block_window_cache::BlockWindowCacheStore,
ghostdag::DbGhostdagStore,
headers::DbHeadersStore,
reachability::DbReachabilityStore,
Expand All @@ -23,7 +27,10 @@ use crossbeam_channel::{Receiver, Sender};
use kaspa_consensus_core::{
block::Block,
blockstatus::BlockStatus::{self, StatusHeaderOnly, StatusInvalid},
config::{genesis::GenesisBlock, params::ForkActivation},
config::{
genesis::GenesisBlock,
params::{ForkActivation, Params},
},
mass::MassCalculator,
tx::Transaction,
};
Expand Down Expand Up @@ -60,6 +67,7 @@ pub struct BlockBodyProcessor {
pub(super) headers_store: Arc<DbHeadersStore>,
pub(super) block_transactions_store: Arc<DbBlockTransactionsStore>,
pub(super) body_tips_store: Arc<RwLock<DbTipsStore>>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Managers and services
pub(super) reachability_service: MTReachabilityService<DbReachabilityStore>,
Expand Down Expand Up @@ -91,47 +99,42 @@ impl BlockBodyProcessor {
sender: Sender<VirtualStateProcessingMessage>,
thread_pool: Arc<ThreadPool>,

params: &Params,
db: Arc<DB>,
statuses_store: Arc<RwLock<DbStatusesStore>>,
ghostdag_store: Arc<DbGhostdagStore>,
headers_store: Arc<DbHeadersStore>,
block_transactions_store: Arc<DbBlockTransactionsStore>,
body_tips_store: Arc<RwLock<DbTipsStore>>,

reachability_service: MTReachabilityService<DbReachabilityStore>,
coinbase_manager: CoinbaseManager,
mass_calculator: MassCalculator,
transaction_validator: TransactionValidator,
window_manager: DbWindowManager,
max_block_mass: u64,
genesis: GenesisBlock,
storage: &Arc<ConsensusStorage>,
services: &Arc<ConsensusServices>,

pruning_lock: SessionLock,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
storage_mass_activation: ForkActivation,
) -> Self {
Self {
receiver,
sender,
thread_pool,
db,
statuses_store,
reachability_service,
ghostdag_store,
headers_store,
block_transactions_store,
body_tips_store,
coinbase_manager,
mass_calculator,
transaction_validator,
window_manager,
max_block_mass,
genesis,

max_block_mass: params.max_block_mass,
genesis: params.genesis.clone(),

statuses_store: storage.statuses_store.clone(),
ghostdag_store: storage.ghostdag_store.clone(),
headers_store: storage.headers_store.clone(),
block_transactions_store: storage.block_transactions_store.clone(),
body_tips_store: storage.body_tips_store.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

reachability_service: services.reachability_service.clone(),
coinbase_manager: services.coinbase_manager.clone(),
mass_calculator: services.mass_calculator.clone(),
transaction_validator: services.transaction_validator.clone(),
window_manager: services.window_manager.clone(),

pruning_lock,
task_manager: BlockTaskDependencyManager::new(),
notification_root,
counters,
storage_mass_activation,
storage_mass_activation: params.storage_mass_activation,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl HeaderProcessor {
ctx.mergeset_non_daa = Some(daa_window.mergeset_non_daa);

if header.bits != expected_bits {
return Err(RuleError::UnexpectedDifficulty(header.bits, expected_bits));
return Err(RuleError::UnexpectedDifficulty(header.hash, header.bits, expected_bits));
}

ctx.block_window_for_difficulty = Some(daa_window.window);
Expand Down
46 changes: 43 additions & 3 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
stores::{
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
block_window_cache::BlockWindowCacheStore,
daa::DbDaaStore,
depth::{DbDepthStore, DepthStoreReader},
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
Expand Down Expand Up @@ -76,6 +77,7 @@ use kaspa_database::prelude::{StoreError, StoreResultEmptyTuple, StoreResultExte
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_notify::{events::EventType, notifier::Notify};
use once_cell::unsync::Lazy;

use super::errors::{PruningImportError, PruningImportResult};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
Expand Down Expand Up @@ -149,6 +151,10 @@ pub struct VirtualStateProcessor {
pub(super) parents_manager: DbParentsManager,
pub(super) depth_manager: DbBlockDepthManager,

// block window caches
pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Pruning lock
pruning_lock: SessionLock,

Expand Down Expand Up @@ -206,6 +212,9 @@ impl VirtualStateProcessor {
pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(),
lkg_virtual_state: storage.lkg_virtual_state.clone(),

block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

ghostdag_manager: services.ghostdag_manager.clone(),
reachability_service: services.reachability_service.clone(),
relations_service: services.relations_service.clone(),
Expand Down Expand Up @@ -291,6 +300,10 @@ impl VirtualStateProcessor {

let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap();
let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None);
let sink_ghostdag_data = Lazy::new(|| self.ghostdag_store.get_data(new_sink).unwrap());
// Cache the DAA and Median time windows of the sink for future use, as well as prepare for virtual's window calculations
self.cache_sink_windows(new_sink, prev_sink, &sink_ghostdag_data);

let new_virtual_state = self
.calculate_and_commit_virtual_state(
virtual_read,
Expand All @@ -302,12 +315,19 @@ impl VirtualStateProcessor {
)
.expect("all possible rule errors are unexpected here");

let compact_sink_ghostdag_data = if let Some(sink_ghostdag_data) = Lazy::get(&sink_ghostdag_data) {
// If we had to retrieve the full data, we convert it to compact
sink_ghostdag_data.to_compact()
} else {
// Else we query the compact data directly.
self.ghostdag_store.get_compact_data(new_sink).unwrap()
};

// Update the pruning processor about the virtual state change
let sink_ghostdag_data = self.ghostdag_store.get_compact_data(new_sink).unwrap();
// Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
// the internal channel does not grow with no need (since we only care about the most recent message)
let _consume = self.pruning_receiver.try_iter().count();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data: compact_sink_ghostdag_data }).unwrap();

// Emit notifications
let accumulated_diff = Arc::new(accumulated_diff);
Expand All @@ -319,7 +339,7 @@ impl VirtualStateProcessor {
.notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score)))
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(compact_sink_ghostdag_data.blue_score)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
Expand Down Expand Up @@ -540,6 +560,26 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

/// Caches the DAA and Median time windows of the sink block (if needed). Following, virtual's window calculations will
/// naturally hit the cache finding the sink's windows and building upon them.
fn cache_sink_windows(&self, new_sink: Hash, prev_sink: Hash, sink_ghostdag_data: &impl Deref<Target = Arc<GhostdagData>>) {
// We expect that the `new_sink` is cached (or some close-enough ancestor thereof) if it is equal to the `prev_sink`,
// Hence we short-circuit the check of the keys in such cases, thereby reducing the access of the read-lock
if new_sink != prev_sink {
// this is only important for ibd performance, as we incur expensive cache misses otherwise.
// this occurs because we cannot rely on header processing to pre-cache in this scenario.
if !self.block_window_cache_for_difficulty.contains_key(&new_sink) {
self.block_window_cache_for_difficulty
.insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data.deref()).unwrap().window);
};

if !self.block_window_cache_for_past_median_time.contains_key(&new_sink) {
self.block_window_cache_for_past_median_time
.insert(new_sink, self.window_manager.calc_past_median_time(sink_ghostdag_data.deref()).unwrap().1);
};
}
}

/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
///
/// Guaranteed to be `>= self.max_block_parents`
Expand Down
Loading

0 comments on commit a0aeec3

Please sign in to comment.