Skip to content

Commit

Permalink
Pass children readlock
Browse files Browse the repository at this point in the history
  • Loading branch information
someone235 committed Nov 29, 2023
1 parent 177cf3a commit f17fd2d
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 44 deletions.
7 changes: 6 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,12 @@ impl ConsensusApi for Consensus {
}

fn get_block_children(&self, hash: Hash) -> Option<Arc<Vec<Hash>>> {
self.services.relations_service.get_children(hash).unwrap_option()
self.services
.relations_service
.get_children(hash)
.unwrap_option()
.map(|children| Arc::new(children.read().iter().copied().collect_vec()))
// TODO: No need for Arc
}

fn get_block_parents(&self, hash: Hash) -> Option<Arc<Vec<Hash>>> {
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/model/services/relations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::model::stores::relations::RelationsStoreReader;
use kaspa_database::prelude::StoreError;
use kaspa_consensus_core::BlockHashSet;
use kaspa_database::prelude::{ReadLock, StoreError, StoreResult};
use kaspa_hashes::Hash;
use parking_lot::RwLock;
use std::sync::Arc;
Expand All @@ -22,7 +23,7 @@ impl<T: RelationsStoreReader> RelationsStoreReader for MTRelationsService<T> {
self.store.read()[self.level].get_parents(hash)
}

fn get_children(&self, hash: Hash) -> Result<kaspa_consensus_core::blockhash::BlockHashes, StoreError> {
fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
self.store.read()[self.level].get_children(hash)
}

Expand Down
15 changes: 6 additions & 9 deletions consensus/src/model/stores/children.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use itertools::Itertools;
use kaspa_consensus_core::BlockHashSet;
use kaspa_consensus_core::BlockHasher;
use kaspa_consensus_core::BlockLevel;
use kaspa_database::prelude::BatchDbWriter;
use kaspa_database::prelude::CachedDbSetAccess;
use kaspa_database::prelude::DbWriter;
use kaspa_database::prelude::ReadLock;
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::StoreResult;
use kaspa_database::prelude::DB;
Expand All @@ -15,7 +16,7 @@ use std::fmt::Display;
use std::sync::Arc;

pub trait ChildrenStoreReader {
fn get(&self, hash: Hash) -> StoreResult<Vec<Hash>>;
fn get(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>>;
}

pub trait ChildrenStore {
Expand Down Expand Up @@ -99,8 +100,8 @@ impl DbChildrenStore {
}

impl ChildrenStoreReader for DbChildrenStore {
fn get(&self, parent: Hash) -> StoreResult<Vec<Hash>> {
Ok(self.access.read(parent)?.read().iter().copied().collect_vec()) // TODO: Pass read lock
fn get(&self, parent: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
self.access.read(parent) // TODO: Pass read lock
}
}

Expand All @@ -111,11 +112,7 @@ impl ChildrenStore for DbChildrenStore {
}

fn delete_children(&self, mut writer: impl DbWriter, parent: Hash) -> Result<(), StoreError> {
for child in self.get(parent).unwrap() {
self.access.delete(&mut writer, parent, child)?;
}

Ok(())
self.access.delete_bucket(&mut writer, parent)
}

fn delete_child(&self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> {
Expand Down
30 changes: 16 additions & 14 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use itertools::Itertools;
use kaspa_consensus_core::BlockHashSet;
use kaspa_consensus_core::{blockhash::BlockHashes, BlockHashMap, BlockHasher, BlockLevel, HashMapCustomHasher};
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::DB;
use kaspa_database::prelude::{BatchDbWriter, DbWriter};
use kaspa_database::prelude::{BatchDbWriter, DbWriter, StoreResultExtensions};
use kaspa_database::prelude::{CachedDbAccess, DbKey, DirectDbWriter};
use kaspa_database::prelude::{DirectWriter, MemoryWriter};
use kaspa_database::prelude::{ReadLock, StoreError};
use kaspa_database::prelude::{StoreResult, DB};
use kaspa_database::registry::{DatabaseStorePrefixes, SEPARATOR};
use kaspa_hashes::Hash;
use parking_lot::RwLock;
Expand All @@ -20,7 +20,7 @@ use super::children::{ChildrenStore, ChildrenStoreReader, DbChildrenStore};
/// Reader API for `RelationsStore`.
pub trait RelationsStoreReader {
fn get_parents(&self, hash: Hash) -> Result<BlockHashes, StoreError>;
fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError>;
fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>>;
fn has(&self, hash: Hash) -> Result<bool, StoreError>;

/// Returns the counts of entries in parents/children stores. To be used for tests only
Expand Down Expand Up @@ -72,11 +72,11 @@ impl RelationsStoreReader for DbRelationsStore {
self.parents_access.read(hash)
}

fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
if !self.parents_access.has(hash)? {
Err(StoreError::KeyNotFound(DbKey::new(self.parents_access.prefix(), hash)))
} else {
Ok(self.children_store.get(hash).unwrap().into())
self.children_store.get(hash)
}
}

Expand Down Expand Up @@ -155,7 +155,9 @@ impl<'a> ChildrenStore for StagingRelationsStore<'a> {
fn delete_children(&self, _writer: impl DbWriter, parent: Hash) -> Result<(), StoreError> {
let mut staging_children_write = self.staging_children.write();
staging_children_write.insertions.remove(&parent);
let store_children = self.store.children_store.get(parent).unwrap_or_default();
let store_children =
self.store.children_store.get(parent).unwrap_option().unwrap_or_default().read().iter().copied().collect_vec();

for child in store_children {
match staging_children_write.deletions.entry(parent) {
Entry::Occupied(mut e) => {
Expand Down Expand Up @@ -272,15 +274,15 @@ impl RelationsStoreReader for StagingRelationsStore<'_> {
}
}

fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
self.check_not_in_deletions(hash)?;
let store_children = self.store.get_children(hash).unwrap_or_default();
let store_children = self.store.get_children(hash).unwrap_option().unwrap_or_default().read().iter().copied().collect_vec();
let staging_children_read = self.staging_children.read();
let insertions = staging_children_read.insertions.get(&hash).cloned().unwrap_or_default();
let deletions = staging_children_read.deletions.get(&hash).cloned().unwrap_or_default();
let children: Vec<_> =
let children: BlockHashSet =
BlockHashSet::from_iter(store_children.iter().copied().chain(insertions)).difference(&deletions).copied().collect();
Ok(BlockHashes::from(children))
Ok(children.into())
}

fn has(&self, hash: Hash) -> Result<bool, StoreError> {
Expand Down Expand Up @@ -366,12 +368,12 @@ impl RelationsStoreReader for MemoryRelationsStore {
}
}

fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
if !self.has(hash)? {
Err(StoreError::KeyNotFound(DbKey::new(DatabaseStorePrefixes::RelationsChildren.as_ref(), hash)))
} else {
match self.children_map.read().get(&hash) {
Some(children) => Ok(BlockHashes::clone(children)),
Some(children) => Ok(BlockHashSet::from_iter(children.iter().copied()).into()),
None => Ok(Default::default()),
}
}
Expand Down Expand Up @@ -431,7 +433,7 @@ mod tests {

let expected_children = [(1, vec![2, 3, 5]), (2, vec![4]), (3, vec![4]), (4, vec![5]), (5, vec![])];
for (i, vec) in expected_children {
let store_children: BlockHashSet = store.get_children(i.into()).unwrap().iter().copied().collect();
let store_children: BlockHashSet = store.get_children(i.into()).unwrap().read().iter().copied().collect();
let expected: BlockHashSet = vec.iter().copied().map(Hash::from).collect();
assert_eq!(store_children, expected);
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/processes/parents_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<T: HeaderStoreReader, U: ReachabilityStoreReader, V: RelationsStoreReader>
.expect("at least one of the parents is expected to be in the future of the pruning point");
direct_parent_headers.swap(0, first_parent_in_future_of_pruning_point);

let origin_children = self.relations_service.get_children(ORIGIN).unwrap();
let origin_children = self.relations_service.get_children(ORIGIN).unwrap().read().iter().copied().collect_vec();
let origin_children_headers =
origin_children.iter().copied().map(|parent| self.headers_store.get_header(parent).unwrap()).collect_vec();

Expand Down Expand Up @@ -204,7 +204,7 @@ mod tests {
header::Header,
BlockHashSet, HashMapCustomHasher,
};
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::{ReadLock, StoreError, StoreResult};
use kaspa_hashes::Hash;
use parking_lot::RwLock;

Expand Down Expand Up @@ -262,8 +262,8 @@ mod tests {
unimplemented!()
}

fn get_children(&self, hash: Hash) -> Result<BlockHashes, StoreError> {
Ok(self.children.clone())
fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
Ok(BlockHashSet::from_iter(self.children.iter().copied()).into())
}

fn has(&self, hash: Hash) -> Result<bool, StoreError> {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl PruningProofManager {
relations_stores[level].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap();
ghostdag_stores[level].insert(ORIGIN, self.ghostdag_managers[level].origin_ghostdag_data()).unwrap();
}

db.write(batch).unwrap();
}

Expand Down Expand Up @@ -644,7 +644,7 @@ impl PruningProofManager {
}

headers.push(self.headers_store.get_header(current).unwrap());
for child in self.relations_stores.read()[level].get_children(current).unwrap().iter().copied() {
for child in self.relations_stores.read()[level].get_children(current).unwrap().read().iter().copied() {
queue.push(Reverse(SortableBlock::new(child, self.ghostdag_stores[level].get_blue_work(child).unwrap())));
}
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/reachability/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ pub fn validate_relations<S: RelationsStoreReader + ?Sized>(relations: &S) -> st
let parents = relations.get_parents(current)?;
assert_eq!(parents.len(), parents.iter().copied().unique_by(|&h| h).count(), "duplicate hashes in parents array");
for parent in parents.iter().copied() {
let parent_children = relations.get_children(parent)?;
let parent_children = relations.get_children(parent)?.read().iter().copied().collect_vec();
assert!(parent_children.contains(&current), "missing child entry");
}
let children = relations.get_children(current)?;
let children = relations.get_children(current)?.read().iter().copied().collect_vec();
assert_eq!(children.len(), children.iter().copied().unique_by(|&h| h).count(), "duplicate hashes in children array");
for child in children.iter().copied() {
if visited.insert(child) {
Expand Down
36 changes: 27 additions & 9 deletions consensus/src/processes/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
S: RelationsStore + ChildrenStore + ?Sized,
{
let children = relations.get_children(hash)?; // if the first entry was found, we expect all others as well, hence we unwrap below
for child in children.iter().copied() {
for child in children.read().iter().copied() {
let child_parents = relations.get_parents(child).unwrap();
// If the removed hash is the only parent of child, then replace it with `origin`
let replace_with: &[Hash] = if child_parents.as_slice() == [hash] { &[ORIGIN] } else { &[] };
Expand All @@ -60,7 +60,7 @@ where
let parents = relations.get_parents(hash).unwrap();
let children = relations.get_children(hash).unwrap();
let mergeset = unordered_mergeset_without_selected_parent(relations, reachability, selected_parent, &parents);
for child in children.iter().copied() {
for child in children.read().iter().copied() {
let other_parents = relations.get_parents(child).unwrap().iter().copied().filter(|&p| p != hash).collect_vec();
let needed_grandparents = parents
.iter()
Expand Down Expand Up @@ -167,11 +167,20 @@ mod tests {
relations.insert(2.into(), Arc::new(vec![1.into()])).unwrap();

assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []);
assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [1.into()]);
assert_eq!(
relations.get_children(ORIGIN).unwrap().read().iter().copied().collect::<BlockHashSet>(),
BlockHashSet::from_iter([1.into()])
);
assert_eq!(relations.get_parents(1.into()).unwrap().as_slice(), [ORIGIN]);
assert_eq!(relations.get_children(1.into()).unwrap().as_slice(), [2.into()]);
assert_eq!(
relations.get_children(1.into()).unwrap().read().iter().copied().collect::<BlockHashSet>(),
BlockHashSet::from_iter([2.into()])
);
assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [1.into()]);
assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []);
assert_eq!(
relations.get_children(2.into()).unwrap().read().iter().copied().collect::<BlockHashSet>(),
BlockHashSet::from_iter([])
);

let mut batch = WriteBatch::default();
let mut staging_relations = StagingRelationsStore::new(&mut relations);
Expand All @@ -180,12 +189,18 @@ mod tests {
db.write(batch).unwrap();

assert_match!(relations.get_parents(1.into()), Err(StoreError::KeyNotFound(_)));
assert_match!(relations.get_children(1.into()), Err(StoreError::KeyNotFound(_)));
assert_match!(relations.get_children(1.into()).unwrap_err(), StoreError::KeyNotFound(_));

assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []);
assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), [2.into()]);
assert_eq!(
relations.get_children(ORIGIN).unwrap().read().iter().copied().collect::<BlockHashSet>(),
BlockHashSet::from_iter([2.into()])
);
assert_eq!(relations.get_parents(2.into()).unwrap().as_slice(), [ORIGIN]);
assert_eq!(relations.get_children(2.into()).unwrap().as_slice(), []);
assert_eq!(
relations.get_children(2.into()).unwrap().read().iter().copied().collect::<BlockHashSet>(),
BlockHashSet::from_iter([])
);

let mut batch = WriteBatch::default();
let mut staging_relations = StagingRelationsStore::new(&mut relations);
Expand All @@ -197,6 +212,9 @@ mod tests {
assert_match!(relations.get_children(2.into()), Err(StoreError::KeyNotFound(_)));

assert_eq!(relations.get_parents(ORIGIN).unwrap().as_slice(), []);
assert_eq!(relations.get_children(ORIGIN).unwrap().as_slice(), []);
assert_eq!(
relations.get_children(ORIGIN).unwrap().read().iter().copied().collect::<BlockHashSet>(),
BlockHashSet::from_iter([])
);
}
}
20 changes: 20 additions & 0 deletions database/src/set_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::{hash_map::RandomState, HashSet},
error::Error,
fmt::Debug,
hash::BuildHasher,
sync::Arc,
};
Expand All @@ -28,6 +29,7 @@ where
prefix: Vec<u8>,
}

#[derive(Default)]
pub struct ReadLock<T>(Arc<RwLock<T>>);

impl<T> ReadLock<T> {
Expand All @@ -40,6 +42,24 @@ impl<T> ReadLock<T> {
}
}

impl<T> From<T> for ReadLock<T> {
fn from(value: T) -> Self {
Self::new(Arc::new(RwLock::new(value)))
}
}

impl<T: Debug> Debug for ReadLock<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ReadLock").field(&self.0).finish()
}
}

// impl<T: Default> Default for ReadLock<T> {
// fn default() -> Self {
// Self(Default::default())
// }
// }

impl<TKey, TData, S, W> CachedDbSetAccess<TKey, TData, S, W>
where
TKey: Clone + std::hash::Hash + Eq + Send + Sync + AsRef<[u8]>,
Expand Down
2 changes: 1 addition & 1 deletion simpa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ fn topologically_ordered_hashes(src_consensus: &Consensus, genesis_hash: Hash) -
let mut vec = Vec::new();
let relations = src_consensus.relations_stores.read();
while let Some(current) = queue.pop_front() {
for child in relations[0].get_children(current).unwrap().iter() {
for child in relations[0].get_children(current).unwrap().read().iter() {
if visited.insert(*child) {
queue.push_back(*child);
vec.push(*child);
Expand Down

0 comments on commit f17fd2d

Please sign in to comment.