diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 9921acefe0..73196307c2 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -677,7 +677,12 @@ impl ConsensusApi for Consensus { } fn get_block_children(&self, hash: Hash) -> Option>> { - self.services.relations_service.get_children(hash).unwrap_option() + self.services + .relations_service + .get_children(hash) + .unwrap_option() + .map(|children| Arc::new(children.read().iter().copied().collect_vec())) + // TODO: No need for Arc } fn get_block_parents(&self, hash: Hash) -> Option>> { diff --git a/consensus/src/model/services/relations.rs b/consensus/src/model/services/relations.rs index 2d2b158877..b1fcaa0e35 100644 --- a/consensus/src/model/services/relations.rs +++ b/consensus/src/model/services/relations.rs @@ -1,5 +1,6 @@ use crate::model::stores::relations::RelationsStoreReader; -use kaspa_database::prelude::StoreError; +use kaspa_consensus_core::BlockHashSet; +use kaspa_database::prelude::{ReadLock, StoreError, StoreResult}; use kaspa_hashes::Hash; use parking_lot::RwLock; use std::sync::Arc; @@ -22,7 +23,7 @@ impl RelationsStoreReader for MTRelationsService { self.store.read()[self.level].get_parents(hash) } - fn get_children(&self, hash: Hash) -> Result { + fn get_children(&self, hash: Hash) -> StoreResult> { self.store.read()[self.level].get_children(hash) } diff --git a/consensus/src/model/stores/children.rs b/consensus/src/model/stores/children.rs index b00ab190e6..6b68277a69 100644 --- a/consensus/src/model/stores/children.rs +++ b/consensus/src/model/stores/children.rs @@ -1,9 +1,10 @@ -use itertools::Itertools; +use kaspa_consensus_core::BlockHashSet; use kaspa_consensus_core::BlockHasher; use kaspa_consensus_core::BlockLevel; use kaspa_database::prelude::BatchDbWriter; use kaspa_database::prelude::CachedDbSetAccess; use kaspa_database::prelude::DbWriter; +use kaspa_database::prelude::ReadLock; use kaspa_database::prelude::StoreError; use kaspa_database::prelude::StoreResult; use kaspa_database::prelude::DB; @@ -15,7 +16,7 @@ use std::fmt::Display; use std::sync::Arc; pub trait ChildrenStoreReader { - fn get(&self, hash: Hash) -> StoreResult>; + fn get(&self, hash: Hash) -> StoreResult>; } pub trait ChildrenStore { @@ -99,8 +100,8 @@ impl DbChildrenStore { } impl ChildrenStoreReader for DbChildrenStore { - fn get(&self, parent: Hash) -> StoreResult> { - Ok(self.access.read(parent)?.read().iter().copied().collect_vec()) // TODO: Pass read lock + fn get(&self, parent: Hash) -> StoreResult> { + self.access.read(parent) // TODO: Pass read lock } } @@ -111,11 +112,7 @@ impl ChildrenStore for DbChildrenStore { } fn delete_children(&self, mut writer: impl DbWriter, parent: Hash) -> Result<(), StoreError> { - for child in self.get(parent).unwrap() { - self.access.delete(&mut writer, parent, child)?; - } - - Ok(()) + self.access.delete_bucket(&mut writer, parent) } fn delete_child(&self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> { diff --git a/consensus/src/model/stores/relations.rs b/consensus/src/model/stores/relations.rs index 192d5f4222..c7b4063324 100644 --- a/consensus/src/model/stores/relations.rs +++ b/consensus/src/model/stores/relations.rs @@ -1,11 +1,11 @@ use itertools::Itertools; use kaspa_consensus_core::BlockHashSet; use kaspa_consensus_core::{blockhash::BlockHashes, BlockHashMap, BlockHasher, BlockLevel, HashMapCustomHasher}; -use kaspa_database::prelude::StoreError; -use kaspa_database::prelude::DB; -use kaspa_database::prelude::{BatchDbWriter, DbWriter}; +use kaspa_database::prelude::{BatchDbWriter, DbWriter, StoreResultExtensions}; use kaspa_database::prelude::{CachedDbAccess, DbKey, DirectDbWriter}; use kaspa_database::prelude::{DirectWriter, MemoryWriter}; +use kaspa_database::prelude::{ReadLock, StoreError}; +use kaspa_database::prelude::{StoreResult, DB}; use kaspa_database::registry::{DatabaseStorePrefixes, SEPARATOR}; use kaspa_hashes::Hash; use parking_lot::RwLock; @@ -20,7 +20,7 @@ use super::children::{ChildrenStore, ChildrenStoreReader, DbChildrenStore}; /// Reader API for `RelationsStore`. pub trait RelationsStoreReader { fn get_parents(&self, hash: Hash) -> Result; - fn get_children(&self, hash: Hash) -> Result; + fn get_children(&self, hash: Hash) -> StoreResult>; fn has(&self, hash: Hash) -> Result; /// Returns the counts of entries in parents/children stores. To be used for tests only @@ -72,11 +72,11 @@ impl RelationsStoreReader for DbRelationsStore { self.parents_access.read(hash) } - fn get_children(&self, hash: Hash) -> Result { + fn get_children(&self, hash: Hash) -> StoreResult> { if !self.parents_access.has(hash)? { Err(StoreError::KeyNotFound(DbKey::new(self.parents_access.prefix(), hash))) } else { - Ok(self.children_store.get(hash).unwrap().into()) + self.children_store.get(hash) } } @@ -155,7 +155,9 @@ impl<'a> ChildrenStore for StagingRelationsStore<'a> { fn delete_children(&self, _writer: impl DbWriter, parent: Hash) -> Result<(), StoreError> { let mut staging_children_write = self.staging_children.write(); staging_children_write.insertions.remove(&parent); - let store_children = self.store.children_store.get(parent).unwrap_or_default(); + let store_children = + self.store.children_store.get(parent).unwrap_option().unwrap_or_default().read().iter().copied().collect_vec(); + for child in store_children { match staging_children_write.deletions.entry(parent) { Entry::Occupied(mut e) => { @@ -272,15 +274,15 @@ impl RelationsStoreReader for StagingRelationsStore<'_> { } } - fn get_children(&self, hash: Hash) -> Result { + fn get_children(&self, hash: Hash) -> StoreResult> { self.check_not_in_deletions(hash)?; - let store_children = self.store.get_children(hash).unwrap_or_default(); + let store_children = self.store.get_children(hash).unwrap_option().unwrap_or_default().read().iter().copied().collect_vec(); let staging_children_read = self.staging_children.read(); let insertions = staging_children_read.insertions.get(&hash).cloned().unwrap_or_default(); let deletions = staging_children_read.deletions.get(&hash).cloned().unwrap_or_default(); - let children: Vec<_> = + let children: BlockHashSet = BlockHashSet::from_iter(store_children.iter().copied().chain(insertions)).difference(&deletions).copied().collect(); - Ok(BlockHashes::from(children)) + Ok(children.into()) } fn has(&self, hash: Hash) -> Result { @@ -366,12 +368,12 @@ impl RelationsStoreReader for MemoryRelationsStore { } } - fn get_children(&self, hash: Hash) -> Result { + fn get_children(&self, hash: Hash) -> StoreResult> { if !self.has(hash)? { Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::RelationsChildren.as_ref(), hash))) } else { match self.children_map.read().get(&hash) { - Some(children) => Ok(BlockHashes::clone(children)), + Some(children) => Ok(BlockHashSet::from_iter(children.iter().copied()).into()), None => Ok(Default::default()), } } @@ -431,7 +433,7 @@ mod tests { let expected_children = [(1, vec![2, 3, 5]), (2, vec![4]), (3, vec![4]), (4, vec![5]), (5, vec![])]; for (i, vec) in expected_children { - let store_children: BlockHashSet = store.get_children(i.into()).unwrap().iter().copied().collect(); + let store_children: BlockHashSet = store.get_children(i.into()).unwrap().read().iter().copied().collect(); let expected: BlockHashSet = vec.iter().copied().map(Hash::from).collect(); assert_eq!(store_children, expected); } diff --git a/consensus/src/processes/parents_builder.rs b/consensus/src/processes/parents_builder.rs index 851770a8d9..49b3822d94 100644 --- a/consensus/src/processes/parents_builder.rs +++ b/consensus/src/processes/parents_builder.rs @@ -52,7 +52,7 @@ impl .expect("at least one of the parents is expected to be in the future of the pruning point"); direct_parent_headers.swap(0, first_parent_in_future_of_pruning_point); - let origin_children = self.relations_service.get_children(ORIGIN).unwrap(); + let origin_children = self.relations_service.get_children(ORIGIN).unwrap().read().iter().copied().collect_vec(); let origin_children_headers = origin_children.iter().copied().map(|parent| self.headers_store.get_header(parent).unwrap()).collect_vec(); @@ -204,7 +204,7 @@ mod tests { header::Header, BlockHashSet, HashMapCustomHasher, }; - use kaspa_database::prelude::StoreError; + use kaspa_database::prelude::{ReadLock, StoreError, StoreResult}; use kaspa_hashes::Hash; use parking_lot::RwLock; @@ -262,8 +262,8 @@ mod tests { unimplemented!() } - fn get_children(&self, hash: Hash) -> Result { - Ok(self.children.clone()) + fn get_children(&self, hash: Hash) -> StoreResult> { + Ok(BlockHashSet::from_iter(self.children.iter().copied()).into()) } fn has(&self, hash: Hash) -> Result { diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs index 74bee081b7..174608da2d 100644 --- a/consensus/src/processes/pruning_proof/mod.rs +++ b/consensus/src/processes/pruning_proof/mod.rs @@ -413,7 +413,7 @@ impl PruningProofManager { relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap(); ghostdag_stores[level].insert(ORIGIN, self.ghostdag_managers[level].origin_ghostdag_data()).unwrap(); } - + db.write(batch).unwrap(); } @@ -644,7 +644,7 @@ impl PruningProofManager { } headers.push(self.headers_store.get_header(current).unwrap()); - for child in self.relations_stores.read()[level].get_children(current).unwrap().iter().copied() { + for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() { queue.push(Reverse(SortableBlock::new(child, self.ghostdag_stores[level].get_blue_work(child).unwrap()))); } } diff --git a/consensus/src/processes/reachability/tests/mod.rs b/consensus/src/processes/reachability/tests/mod.rs index 543c934e27..f9cf2ec0cb 100644 --- a/consensus/src/processes/reachability/tests/mod.rs +++ b/consensus/src/processes/reachability/tests/mod.rs @@ -150,10 +150,10 @@ pub fn validate_relations(relations: &S) -> st let parents = relations.get_parents(current)?; assert_eq!(parents.len(), parents.iter().copied().unique_by(|&h| h).count(), "duplicate hashes in parents array"); for parent in parents.iter().copied() { - let parent_children = relations.get_children(parent)?; + let parent_children = relations.get_children(parent)?.read().iter().copied().collect_vec(); assert!(parent_children.contains(¤t), "missing child entry"); } - let children = relations.get_children(current)?; + let children = relations.get_children(current)?.read().iter().copied().collect_vec(); assert_eq!(children.len(), children.iter().copied().unique_by(|&h| h).count(), "duplicate hashes in children array"); for child in children.iter().copied() { if visited.insert(child) { diff --git a/consensus/src/processes/relations.rs b/consensus/src/processes/relations.rs index 4304306147..34a0e58c04 100644 --- a/consensus/src/processes/relations.rs +++ b/consensus/src/processes/relations.rs @@ -33,7 +33,7 @@ where S: RelationsStore + ChildrenStore + ?Sized, { let children = relations.get_children(hash)?; // if the first entry was found, we expect all others as well, hence we unwrap below - for child in children.iter().copied() { + for child in children.read().iter().copied() { let child_parents = relations.get_parents(child).unwrap(); // If the removed hash is the only parent of child, then replace it with `origin` let replace_with: &[Hash] = if child_parents.as_slice() == [hash] { &[ORIGIN] } else { &[] }; @@ -60,7 +60,7 @@ where let parents = relations.get_parents(hash).unwrap(); let children = relations.get_children(hash).unwrap(); let mergeset = unordered_mergeset_without_selected_parent(relations, reachability, selected_parent, &parents); - for child in children.iter().copied() { + for child in children.read().iter().copied() { let other_parents = relations.get_parents(child).unwrap().iter().copied().filter(|&p| p != hash).collect_vec(); let needed_grandparents = parents .iter() @@ -167,11 +167,20 @@ mod tests { relations.insert(2.into(), Arc::new(vec![1.into()])).unwrap(); assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []); - assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [1.into()]); + assert_eq!( + relations.get_children(ORIGIN).unwrap().read().iter().copied().collect::(), + BlockHashSet::from_iter([1.into()]) + ); assert_eq!(relations.get_parents(1.into()).unwrap().as_slice(), [ORIGIN]); - assert_eq!(relations.get_children(1.into()).unwrap().as_slice(), [2.into()]); + assert_eq!( + relations.get_children(1.into()).unwrap().read().iter().copied().collect::(), + BlockHashSet::from_iter([2.into()]) + ); assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [1.into()]); - assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []); + assert_eq!( + relations.get_children(2.into()).unwrap().read().iter().copied().collect::(), + BlockHashSet::from_iter([]) + ); let mut batch = WriteBatch::default(); let mut staging_relations = StagingRelationsStore::new(&mut relations); @@ -180,12 +189,18 @@ mod tests { db.write(batch).unwrap(); assert_match!(relations.get_parents(1.into()), Err(StoreError::KeyNotFound(_))); - assert_match!(relations.get_children(1.into()), Err(StoreError::KeyNotFound(_))); + assert_match!(relations.get_children(1.into()).unwrap_err(), StoreError::KeyNotFound(_)); assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []); - assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [2.into()]); + assert_eq!( + relations.get_children(ORIGIN).unwrap().read().iter().copied().collect::(), + BlockHashSet::from_iter([2.into()]) + ); assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [ORIGIN]); - assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []); + assert_eq!( + relations.get_children(2.into()).unwrap().read().iter().copied().collect::(), + BlockHashSet::from_iter([]) + ); let mut batch = WriteBatch::default(); let mut staging_relations = StagingRelationsStore::new(&mut relations); @@ -197,6 +212,9 @@ mod tests { assert_match!(relations.get_children(2.into()), Err(StoreError::KeyNotFound(_))); assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []); - assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), []); + assert_eq!( + relations.get_children(ORIGIN).unwrap().read().iter().copied().collect::(), + BlockHashSet::from_iter([]) + ); } } diff --git a/database/src/set_access.rs b/database/src/set_access.rs index 9b106cda2f..52e587f4e4 100644 --- a/database/src/set_access.rs +++ b/database/src/set_access.rs @@ -7,6 +7,7 @@ use serde::{de::DeserializeOwned, Serialize}; use std::{ collections::{hash_map::RandomState, HashSet}, error::Error, + fmt::Debug, hash::BuildHasher, sync::Arc, }; @@ -28,6 +29,7 @@ where prefix: Vec, } +#[derive(Default)] pub struct ReadLock(Arc>); impl ReadLock { @@ -40,6 +42,24 @@ impl ReadLock { } } +impl From for ReadLock { + fn from(value: T) -> Self { + Self::new(Arc::new(RwLock::new(value))) + } +} + +impl Debug for ReadLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ReadLock").field(&self.0).finish() + } +} + +// impl Default for ReadLock { +// fn default() -> Self { +// Self(Default::default()) +// } +// } + impl CachedDbSetAccess where TKey: Clone + std::hash::Hash + Eq + Send + Sync + AsRef<[u8]>, diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 8089dc7b25..4e7e16a0d1 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -365,7 +365,7 @@ fn topologically_ordered_hashes(src_consensus: &Consensus, genesis_hash: Hash) - let mut vec = Vec::new(); let relations = src_consensus.relations_stores.read(); while let Some(current) = queue.pop_front() { - for child in relations[0].get_children(current).unwrap().iter() { + for child in relations[0].get_children(current).unwrap().read().iter() { if visited.insert(*child) { queue.push_back(*child); vec.push(*child);