Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store each relation child in its own key #325

Merged
merged 31 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c6c5d59
Store each relation child in its own key
someone235 Nov 15, 2023
d16f049
Database version upgrade logic
someone235 Nov 18, 2023
89494a7
Improve staging relations perf
someone235 Nov 28, 2023
493b4fa
Implement CachedDbSetAccess
someone235 Nov 29, 2023
2ea8e53
Use BlockHasher for children store
someone235 Nov 29, 2023
3432558
Pass children readlock
someone235 Nov 29, 2023
b0c8f0f
clippy fix
someone235 Nov 29, 2023
198e097
Use default Debug impl for ReadLock
someone235 Nov 30, 2023
35e3122
Address review comments
someone235 Nov 30, 2023
0a4c5dd
Remove relations rwlock
someone235 Nov 30, 2023
e8f61b1
Get rid of relations service
someone235 Nov 30, 2023
b54a91c
Use RefCell instead of Mutex in MemoryRelationsStore and StagingRelat…
someone235 Nov 30, 2023
e41e34c
fix clippy warnings
michaelsutton Nov 30, 2023
dd1c892
fix simpa for low delay values
michaelsutton Nov 30, 2023
ed71486
Improve delete_children n StagingRelationsStore
someone235 Dec 1, 2023
9ca1b91
Suggestion for removing the need for `RefCell` (#4)
michaelsutton Dec 1, 2023
1f9dcf6
flatten staging (semantic change only)
michaelsutton Dec 1, 2023
7852fc5
unify deletions
michaelsutton Dec 1, 2023
48a635b
use correct prefix
michaelsutton Dec 1, 2023
de00e4f
bug fix: add to child deletions even if not removed from insertions
michaelsutton Dec 1, 2023
0dea3ce
remove unused API method
michaelsutton Dec 1, 2023
08c904f
fix user msg
michaelsutton Dec 1, 2023
0977ae3
add simpa as test
michaelsutton Dec 1, 2023
8401add
Revert "Get rid of relations service"
someone235 Dec 5, 2023
0fad12f
Revert "Remove relations rwlock"
someone235 Dec 5, 2023
26302c7
Remove redundant ChildKey
someone235 Dec 5, 2023
64a5317
set access:
michaelsutton Dec 5, 2023
fe1e82f
bug fix: make sure to propagate key not found err if staging has no d…
michaelsutton Dec 5, 2023
8336c93
clean
michaelsutton Dec 5, 2023
f118dcd
Merge branch 'master' into relations-children-prefix
someone235 Dec 5, 2023
5292b76
Remove redundant comment
someone235 Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(move |c| c.get_ghostdag_data(hash)).await
}

pub async fn async_get_block_children(&self, hash: Hash) -> Option<Arc<Vec<Hash>>> {
pub async fn async_get_block_children(&self, hash: Hash) -> Option<Vec<Hash>> {
self.clone().spawn_blocking(move |c| c.get_block_children(hash)).await
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn get_block_children(&self, hash: Hash) -> Option<Arc<Vec<Hash>>> {
fn get_block_children(&self, hash: Hash) -> Option<Vec<Hash>> {
unimplemented!()
}

Expand Down
24 changes: 23 additions & 1 deletion consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum ConsensusEntryType {
New(ConsensusEntry),
}

#[derive(Serialize, Deserialize, Clone, Default)]
#[derive(Serialize, Deserialize, Clone)]
pub struct MultiConsensusMetadata {
current_consensus_key: Option<u64>,
staging_consensus_key: Option<u64>,
Expand All @@ -54,6 +54,20 @@ pub struct MultiConsensusMetadata {
version: u32,
}

const LATEST_DB_VERSION: u32 = 1;
impl Default for MultiConsensusMetadata {
fn default() -> Self {
Self {
current_consensus_key: Default::default(),
someone235 marked this conversation as resolved.
Show resolved Hide resolved
staging_consensus_key: Default::default(),
max_key_used: Default::default(),
is_archival_node: Default::default(),
props: Default::default(),
version: LATEST_DB_VERSION,
}
}
}

#[derive(Clone)]
pub struct MultiConsensusManagementStore {
db: Arc<DB>,
Expand Down Expand Up @@ -199,6 +213,14 @@ impl MultiConsensusManagementStore {
self.metadata.write(BatchDbWriter::new(&mut batch), &metadata).unwrap();
}
}

pub fn should_upgrade(&self) -> StoreResult<bool> {
match self.metadata.read() {
Ok(data) => Ok(data.version != LATEST_DB_VERSION),
Err(StoreError::KeyNotFound(_)) => Ok(false),
Err(err) => Err(err),
}
}
}

pub struct Factory {
Expand Down
10 changes: 7 additions & 3 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,12 +759,16 @@ impl ConsensusApi for Consensus {
Ok((&*ghostdag).into())
}

fn get_block_children(&self, hash: Hash) -> Option<Arc<Vec<Hash>>> {
self.services.relations_service.get_children(hash).unwrap_option()
fn get_block_children(&self, hash: Hash) -> Option<Vec<Hash>> {
self.storage
.relations_primary_store
.get_children(hash)
.unwrap_option()
.map(|children| children.read().iter().copied().collect_vec())
}

fn get_block_parents(&self, hash: Hash) -> Option<Arc<Vec<Hash>>> {
self.services.relations_service.get_parents(hash).unwrap_option()
self.storage.relations_primary_store.get_parents(hash).unwrap_option()
}

fn get_block_status(&self, hash: Hash) -> Option<BlockStatus> {
Expand Down
21 changes: 8 additions & 13 deletions consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::storage::ConsensusStorage;
use crate::{
config::Config,
model::{
services::{reachability::MTReachabilityService, relations::MTRelationsService, statuses::MTStatusesService},
services::{reachability::MTReachabilityService, statuses::MTStatusesService},
stores::{
block_window_cache::BlockWindowCacheStore, daa::DbDaaStore, depth::DbDepthStore, ghostdag::DbGhostdagStore,
headers::DbHeadersStore, headers_selected_tip::DbHeadersSelectedTipStore, past_pruning_points::DbPastPruningPointsStore,
Expand All @@ -22,14 +22,14 @@ use kaspa_txscript::caches::TxScriptCacheCounters;
use std::sync::Arc;

pub type DbGhostdagManager =
GhostdagManager<DbGhostdagStore, MTRelationsService<DbRelationsStore>, MTReachabilityService<DbReachabilityStore>, DbHeadersStore>;
GhostdagManager<DbGhostdagStore, DbRelationsStore, MTReachabilityService<DbReachabilityStore>, DbHeadersStore>;

pub type DbDagTraversalManager = DagTraversalManager<DbGhostdagStore, DbReachabilityStore, MTRelationsService<DbRelationsStore>>;
pub type DbDagTraversalManager = DagTraversalManager<DbGhostdagStore, DbReachabilityStore, DbRelationsStore>;

pub type DbWindowManager = DualWindowManager<DbGhostdagStore, BlockWindowCacheStore, DbHeadersStore, DbDaaStore>;

pub type DbSyncManager = SyncManager<
MTRelationsService<DbRelationsStore>,
DbRelationsStore,
DbReachabilityStore,
DbGhostdagStore,
DbSelectedChainStore,
Expand All @@ -41,15 +41,14 @@ pub type DbSyncManager = SyncManager<
pub type DbPruningPointManager =
PruningPointManager<DbGhostdagStore, DbReachabilityStore, DbHeadersStore, DbPastPruningPointsStore, DbHeadersSelectedTipStore>;
pub type DbBlockDepthManager = BlockDepthManager<DbDepthStore, DbReachabilityStore, DbGhostdagStore>;
pub type DbParentsManager = ParentsManager<DbHeadersStore, DbReachabilityStore, MTRelationsService<DbRelationsStore>>;
pub type DbParentsManager = ParentsManager<DbHeadersStore, DbReachabilityStore, DbRelationsStore>;

pub struct ConsensusServices {
// Underlying storage
storage: Arc<ConsensusStorage>,

// Services and managers
pub statuses_service: MTStatusesService<DbStatusesStore>,
pub relations_service: MTRelationsService<DbRelationsStore>,
pub reachability_service: MTReachabilityService<DbReachabilityStore>,
pub window_manager: DbWindowManager,
pub dag_traversal_manager: DbDagTraversalManager,
Expand All @@ -75,14 +74,11 @@ impl ConsensusServices {
let params = &config.params;

let statuses_service = MTStatusesService::new(storage.statuses_store.clone());
let relations_services =
(0..=params.max_block_level).map(|level| MTRelationsService::new(storage.relations_stores.clone(), level)).collect_vec();
let relations_service = relations_services[0].clone();
let reachability_service = MTReachabilityService::new(storage.reachability_store.clone());
let dag_traversal_manager = DagTraversalManager::new(
params.genesis.hash,
storage.ghostdag_primary_store.clone(),
relations_service.clone(),
storage.relations_primary_store.clone(),
reachability_service.clone(),
);
let window_manager = DualWindowManager::new(
Expand Down Expand Up @@ -122,7 +118,7 @@ impl ConsensusServices {
params.genesis.hash,
params.ghostdag_k,
ghostdag_store,
relations_services[level].clone(),
storage.relations_stores[level].clone(),
storage.headers_store.clone(),
reachability_service.clone(),
)
Expand Down Expand Up @@ -169,7 +165,7 @@ impl ConsensusServices {
params.genesis.hash,
storage.headers_store.clone(),
reachability_service.clone(),
relations_service.clone(),
storage.relations_primary_store.clone(),
);

let pruning_proof_manager = Arc::new(PruningProofManager::new(
Expand Down Expand Up @@ -201,7 +197,6 @@ impl ConsensusServices {
Arc::new(Self {
storage,
statuses_service,
relations_service,
reachability_service,
window_manager,
dag_traversal_manager,
Expand Down
29 changes: 18 additions & 11 deletions consensus/src/consensus/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@ use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
use parking_lot::RwLock;
use rand::Rng;
use std::{cmp::max, mem::size_of, ops::DerefMut, sync::Arc};
use std::{
cmp::max,
mem::size_of,
ops::{Deref, DerefMut},
sync::Arc,
};

pub struct ConsensusStorage {
// DB
db: Arc<DB>,

// Locked stores
pub statuses_store: Arc<RwLock<DbStatusesStore>>,
pub relations_stores: Arc<RwLock<Vec<DbRelationsStore>>>,
pub relations_stores: Arc<[DbRelationsStore]>,
pub relations_primary_store: DbRelationsStore,
pub reachability_store: Arc<RwLock<DbReachabilityStore>>,
pub reachability_relations_store: Arc<RwLock<DbRelationsStore>>,
pub pruning_point_store: Arc<RwLock<DbPruningStore>>,
Expand Down Expand Up @@ -107,14 +113,14 @@ impl ConsensusStorage {

// Headers
let statuses_store = Arc::new(RwLock::new(DbStatusesStore::new(db.clone(), noise(statuses_cache_size))));
let relations_stores = Arc::new(RwLock::new(
(0..=params.max_block_level)
.map(|level| {
let cache_size = max(relations_cache_size.checked_shr(level as u32).unwrap_or(0), 2 * params.pruning_proof_m);
DbRelationsStore::new(db.clone(), level, noise(cache_size))
})
.collect_vec(),
));
let relations_stores: Arc<[_]> = (0..=params.max_block_level)
.map(|level| {
let cache_size = max(relations_cache_size.checked_shr(level as u32).unwrap_or(0), 2 * params.pruning_proof_m);
DbRelationsStore::new(db.clone(), level, noise(cache_size))
})
.collect_vec()
.into();
let relations_primary_store = relations_stores[0].clone();
let reachability_store = Arc::new(RwLock::new(DbReachabilityStore::new(db.clone(), noise(reachability_cache_size))));

let reachability_relations_store = Arc::new(RwLock::new(DbRelationsStore::with_prefix(
Expand Down Expand Up @@ -162,7 +168,7 @@ impl ConsensusStorage {

// Ensure that reachability stores are initialized
reachability::init(reachability_store.write().deref_mut()).unwrap();
relations::init(reachability_relations_store.write().deref_mut());
relations::init(&mut reachability_relations_store.write().deref());

Arc::new(Self {
db,
Expand All @@ -188,6 +194,7 @@ impl ConsensusStorage {
utxo_multisets_store,
block_window_cache_for_difficulty,
block_window_cache_for_past_median_time,
relations_primary_store,
})
}
}
1 change: 0 additions & 1 deletion consensus/src/model/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod reachability;
pub mod relations;
pub mod statuses;
36 changes: 0 additions & 36 deletions consensus/src/model/services/relations.rs

This file was deleted.

124 changes: 124 additions & 0 deletions consensus/src/model/stores/children.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use kaspa_consensus_core::BlockHashSet;
use kaspa_consensus_core::BlockHasher;
use kaspa_consensus_core::BlockLevel;
use kaspa_database::prelude::BatchDbWriter;
use kaspa_database::prelude::CachedDbSetAccess;
use kaspa_database::prelude::DbWriter;
use kaspa_database::prelude::ReadLock;
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::StoreResult;
use kaspa_database::prelude::DB;
use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
use kaspa_hashes::HASH_SIZE;
use rocksdb::WriteBatch;
use std::fmt::Display;
use std::sync::Arc;

pub trait ChildrenStoreReader {
fn get(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>>;
}

pub trait ChildrenStore {
fn insert_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError>;
fn delete_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError>;
}

struct ChildKey {
parent: Hash,
child: Hash,
}

const KEY_SIZE: usize = 2 * HASH_SIZE;

#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)]
struct DbChildKey([u8; KEY_SIZE]);

impl AsRef<[u8]> for DbChildKey {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl Display for DbChildKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key: ChildKey = (*self).into();
write!(f, "{}:{}", key.parent, key.child)
}
}

impl From<ChildKey> for DbChildKey {
fn from(key: ChildKey) -> Self {
let mut bytes = [0; KEY_SIZE];
bytes[..HASH_SIZE].copy_from_slice(&key.parent.as_bytes());
bytes[HASH_SIZE..].copy_from_slice(&key.child.as_bytes());
Self(bytes)
}
}

impl From<DbChildKey> for ChildKey {
fn from(k: DbChildKey) -> Self {
let parent_bytes: [u8; HASH_SIZE] = k.0[..HASH_SIZE].try_into().unwrap();
let parent: Hash = parent_bytes.into();

let child_bytes: [u8; HASH_SIZE] = k.0[HASH_SIZE..].try_into().unwrap();
let child: Hash = child_bytes.into();
Self { parent, child }
}
}

/// A DB + cache implementation of `DbChildrenStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbChildrenStore {
db: Arc<DB>,
access: CachedDbSetAccess<Hash, Hash, BlockHasher, BlockHasher>,
}

impl DbChildrenStore {
pub fn new(db: Arc<DB>, level: BlockLevel, cache_size: u64) -> Self {
let lvl_bytes = level.to_le_bytes();
Self {
db: Arc::clone(&db),
access: CachedDbSetAccess::new(
db,
cache_size,
DatabaseStorePrefixes::RelationsChildren.into_iter().chain(lvl_bytes).collect(),
),
}
}

pub fn with_prefix(db: Arc<DB>, prefix: &[u8], cache_size: u64) -> Self {
let db_prefix = prefix.iter().copied().chain(DatabaseStorePrefixes::RelationsChildren).collect();
Self { db: Arc::clone(&db), access: CachedDbSetAccess::new(db, cache_size, db_prefix) }
}

pub fn insert_batch(&self, batch: &mut WriteBatch, parent: Hash, child: Hash) -> Result<(), StoreError> {
self.access.write(BatchDbWriter::new(batch), parent, child)?;
Ok(())
}

pub(crate) fn delete_children(&self, mut writer: impl DbWriter, parent: Hash) -> Result<(), StoreError> {
self.access.delete_bucket(&mut writer, parent)
}

pub(crate) fn prefix(&self) -> &[u8] {
self.access.prefix()
}
}

impl ChildrenStoreReader for DbChildrenStore {
fn get(&self, parent: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
self.access.read(parent)
}
}

impl ChildrenStore for &DbChildrenStore {
fn insert_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> {
self.access.write(writer, parent, child)?;
Ok(())
}

fn delete_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> {
self.access.delete(writer, parent, child)
}
}
Loading