Skip to content

Commit

Permalink
Revert "Remove relations rwlock"
Browse files Browse the repository at this point in the history
This reverts commit 0a4c5dd.
  • Loading branch information
someone235 committed Dec 5, 2023
1 parent 8401add commit 0fad12f
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 81 deletions.
26 changes: 11 additions & 15 deletions consensus/src/consensus/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,15 @@ use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
use parking_lot::RwLock;
use rand::Rng;
use std::{
cmp::max,
mem::size_of,
ops::{Deref, DerefMut},
sync::Arc,
};
use std::{cmp::max, mem::size_of, ops::DerefMut, sync::Arc};

pub struct ConsensusStorage {
// DB
db: Arc<DB>,

// Locked stores
pub statuses_store: Arc<RwLock<DbStatusesStore>>,
pub relations_stores: Arc<[DbRelationsStore]>,
pub relations_stores: Arc<RwLock<Vec<DbRelationsStore>>>,
pub reachability_store: Arc<RwLock<DbReachabilityStore>>,
pub reachability_relations_store: Arc<RwLock<DbRelationsStore>>,
pub pruning_point_store: Arc<RwLock<DbPruningStore>>,
Expand Down Expand Up @@ -112,13 +107,14 @@ impl ConsensusStorage {

// Headers
let statuses_store = Arc::new(RwLock::new(DbStatusesStore::new(db.clone(), noise(statuses_cache_size))));
let relations_stores: Arc<[_]> = (0..=params.max_block_level)
.map(|level| {
let cache_size = max(relations_cache_size.checked_shr(level as u32).unwrap_or(0), 2 * params.pruning_proof_m);
DbRelationsStore::new(db.clone(), level, noise(cache_size))
})
.collect_vec()
.into();
let relations_stores = Arc::new(RwLock::new(
(0..=params.max_block_level)
.map(|level| {
let cache_size = max(relations_cache_size.checked_shr(level as u32).unwrap_or(0), 2 * params.pruning_proof_m);
DbRelationsStore::new(db.clone(), level, noise(cache_size))
})
.collect_vec(),
));
let reachability_store = Arc::new(RwLock::new(DbReachabilityStore::new(db.clone(), noise(reachability_cache_size))));

let reachability_relations_store = Arc::new(RwLock::new(DbRelationsStore::with_prefix(
Expand Down Expand Up @@ -166,7 +162,7 @@ impl ConsensusStorage {

// Ensure that reachability stores are initialized
reachability::init(reachability_store.write().deref_mut()).unwrap();
relations::init(&mut reachability_relations_store.write().deref());
relations::init(reachability_relations_store.write().deref_mut());

Arc::new(Self {
db,
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/model/services/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,36 @@ use crate::model::stores::relations::RelationsStoreReader;
use kaspa_consensus_core::BlockHashSet;
use kaspa_database::prelude::{ReadLock, StoreError, StoreResult};
use kaspa_hashes::Hash;
use parking_lot::RwLock;
use std::sync::Arc;

/// Multi-threaded block-relations service imp
#[derive(Clone)]
pub struct MTRelationsService<T: RelationsStoreReader> {
// TODO: Remove this wrapper
store: Arc<[T]>,
store: Arc<RwLock<Vec<T>>>,
level: usize,
}

impl<T: RelationsStoreReader> MTRelationsService<T> {
pub fn new(store: Arc<[T]>, level: u8) -> Self {
pub fn new(store: Arc<RwLock<Vec<T>>>, level: u8) -> Self {
Self { store, level: level as usize }
}
}

impl<T: RelationsStoreReader> RelationsStoreReader for MTRelationsService<T> {
fn get_parents(&self, hash: Hash) -> Result<kaspa_consensus_core::blockhash::BlockHashes, StoreError> {
self.store[self.level].get_parents(hash)
self.store.read()[self.level].get_parents(hash)
}

fn get_children(&self, hash: Hash) -> StoreResult<ReadLock<BlockHashSet>> {
self.store[self.level].get_children(hash)
self.store.read()[self.level].get_children(hash)
}

fn has(&self, hash: Hash) -> Result<bool, StoreError> {
self.store[self.level].has(hash)
self.store.read()[self.level].has(hash)
}

fn counts(&self) -> Result<(usize, usize), StoreError> {
self.store[self.level].counts()
self.store.read()[self.level].counts()
}
}
2 changes: 1 addition & 1 deletion consensus/src/model/stores/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl ChildrenStoreReader for DbChildrenStore {
}
}

impl ChildrenStore for &DbChildrenStore {
impl ChildrenStore for DbChildrenStore {
fn insert_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> {
self.access.write(writer, parent, child)?;
Ok(())
Expand Down
30 changes: 15 additions & 15 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ impl RelationsStoreReader for DbRelationsStore {
///
/// The trait methods itself must remain `&mut self` in order to support staging implementations
/// which are indeed mutated locally
impl ChildrenStore for &DbRelationsStore {
impl ChildrenStore for DbRelationsStore {
fn insert_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> {
(&self.children_store).insert_child(writer, parent, child)
self.children_store.insert_child(writer, parent, child)
}

fn delete_child(&mut self, writer: impl DbWriter, parent: Hash, child: Hash) -> Result<(), StoreError> {
(&self.children_store).delete_child(writer, parent, child)
self.children_store.delete_child(writer, parent, child)
}
}

/// The comment above over `impl ChildrenStore` applies here as well
impl RelationsStore for &DbRelationsStore {
impl RelationsStore for DbRelationsStore {
type DefaultWriter = DirectDbWriter<'static>;

fn default_writer(&self) -> Self::DefaultWriter {
Expand All @@ -154,7 +154,7 @@ impl RelationsStore for &DbRelationsStore {

pub struct StagingRelationsStore<'a> {
// The underlying DB store to commit to
store: &'a DbRelationsStore,
store: &'a mut DbRelationsStore,

/// Full entry deletions (including parents and all children)
/// Assumed to be final, i.e., no other mutations to this entry
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<'a> ChildrenStore for StagingRelationsStore<'a> {
}

impl<'a> StagingRelationsStore<'a> {
pub fn new(store: &'a DbRelationsStore) -> Self {
pub fn new(store: &'a mut DbRelationsStore) -> Self {
Self {
store,
parents_overrides: Default::default(),
Expand All @@ -215,14 +215,14 @@ impl<'a> StagingRelationsStore<'a> {
}
}

pub fn commit(mut self, batch: &mut WriteBatch) -> Result<(), StoreError> {
for (k, v) in self.parents_overrides {
self.store.parents_access.write(BatchDbWriter::new(batch), k, v)?
pub fn commit(&mut self, batch: &mut WriteBatch) -> Result<(), StoreError> {
for (k, v) in self.parents_overrides.iter() {
self.store.parents_access.write(BatchDbWriter::new(batch), *k, (*v).clone())?
}

for (parent, children) in self.children_insertions {
for (parent, children) in self.children_insertions.iter() {
for child in children {
self.store.insert_child(BatchDbWriter::new(batch), parent, child)?;
self.store.insert_child(BatchDbWriter::new(batch), *parent, *child)?;
}
}

Expand All @@ -234,14 +234,14 @@ impl<'a> StagingRelationsStore<'a> {
self.store.parents_access.delete_many(BatchDbWriter::new(batch), &mut self.entry_deletions.iter().copied())?;

// For deleted entries, delete all children
for parent in self.entry_deletions {
for parent in self.entry_deletions.iter().copied() {
self.store.delete_children(BatchDbWriter::new(batch), parent)?;
}

// Delete only the requested children
for (parent, children_to_delete) in self.children_deletions {
for (parent, children_to_delete) in self.children_deletions.iter() {
for child in children_to_delete {
self.store.delete_child(BatchDbWriter::new(batch), parent, child)?;
self.store.delete_child(BatchDbWriter::new(batch), *parent, *child)?;
}
}

Expand Down Expand Up @@ -435,7 +435,7 @@ mod tests {
#[test]
fn test_db_relations_store() {
let (lt, db) = create_temp_db!(kaspa_database::prelude::ConnBuilder::default().with_files_limit(10));
test_relations_store(&DbRelationsStore::new(db, 0, 2));
test_relations_store(DbRelationsStore::new(db, 0, 2));
drop(lt)
}

Expand Down
27 changes: 14 additions & 13 deletions consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ use kaspa_utils::vec::VecExtensions;
use parking_lot::RwLock;
use rayon::ThreadPool;
use rocksdb::WriteBatch;
use std::{
ops::Deref,
sync::{atomic::Ordering, Arc},
};
use std::sync::{atomic::Ordering, Arc};

use super::super::ProcessingCounters;

Expand Down Expand Up @@ -127,7 +124,7 @@ pub struct HeaderProcessor {
db: Arc<DB>,

// Stores
pub(super) relations_stores: Arc<[DbRelationsStore]>,
pub(super) relations_stores: Arc<RwLock<Vec<DbRelationsStore>>>, // TODO: Remove RwLock
pub(super) reachability_store: Arc<RwLock<DbReachabilityStore>>,
pub(super) reachability_relations_store: Arc<RwLock<DbRelationsStore>>,
pub(super) ghostdag_stores: Arc<Vec<Arc<DbGhostdagStore>>>,
Expand Down Expand Up @@ -331,14 +328,15 @@ impl HeaderProcessor {

/// Collects the known parents for all block levels
fn collect_known_parents(&self, header: &Header, block_level: BlockLevel) -> Vec<Arc<Vec<Hash>>> {
let relations_read = self.relations_stores.read();
(0..=block_level)
.map(|level| {
Arc::new(
self.parents_manager
.parents_at_level(header, level)
.iter()
.copied()
.filter(|parent| self.relations_stores[level as usize].has(*parent).unwrap())
.filter(|parent| relations_read[level as usize].has(*parent).unwrap())
.collect_vec()
// This kicks-in only for trusted blocks or for level > 0. If an ordinary block is
// missing direct parents it will fail validation.
Expand Down Expand Up @@ -417,13 +415,14 @@ impl HeaderProcessor {

let reachability_parents = ctx.known_parents[0].clone();

let mut relations_write = self.relations_stores.write();
ctx.known_parents.into_iter().enumerate().for_each(|(level, parents_by_level)| {
(&self.relations_stores[level]).insert_batch(&mut batch, header.hash, parents_by_level).unwrap();
relations_write[level].insert_batch(&mut batch, header.hash, parents_by_level).unwrap();
});

// Write reachability relations. These relations are only needed during header pruning
let reachability_relations_write = self.reachability_relations_store.write();
reachability_relations_write.deref().insert_batch(&mut batch, ctx.hash, reachability_parents).unwrap();
let mut reachability_relations_write = self.reachability_relations_store.write();
reachability_relations_write.insert_batch(&mut batch, ctx.hash, reachability_parents).unwrap();

let statuses_write = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap();

Expand All @@ -439,6 +438,7 @@ impl HeaderProcessor {
drop(reachability_write);
drop(statuses_write);
drop(reachability_relations_write);
drop(relations_write);
drop(hst_write);
}

Expand Down Expand Up @@ -494,17 +494,18 @@ impl HeaderProcessor {
}

pub fn init(&self) {
if self.relations_stores[0].has(ORIGIN).unwrap() {
if self.relations_stores.read()[0].has(ORIGIN).unwrap() {
return;
}

let mut batch = WriteBatch::default();
(0..=self.max_block_level).for_each(|level| {
(&self.relations_stores[level as usize]).insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap()
});
let mut relations_write = self.relations_stores.write();
(0..=self.max_block_level)
.for_each(|level| relations_write[level as usize].insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])).unwrap());
let mut hst_write = self.headers_selected_tip_store.write();
hst_write.set_batch(&mut batch, SortableBlock::new(ORIGIN, 0.into())).unwrap();
self.db.write(batch).unwrap();
drop(hst_write);
drop(relations_write);
}
}
8 changes: 5 additions & 3 deletions consensus/src/pipeline/pruning_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,9 @@ impl PruningProcessor {

if !keep_blocks.contains(&current) {
let mut batch = WriteBatch::default();
let reachability_relations_write = self.reachability_relations_store.write();
let mut staging_relations = StagingRelationsStore::new(&reachability_relations_write);
let mut level_relations_write = self.relations_stores.write();
let mut reachability_relations_write = self.reachability_relations_store.write();
let mut staging_relations = StagingRelationsStore::new(&mut reachability_relations_write);
let mut staging_reachability = StagingReachabilityStore::new(reachability_read);
let mut statuses_write = self.statuses_store.write();

Expand Down Expand Up @@ -381,7 +382,7 @@ impl PruningProcessor {
// TODO: consider adding block level to compact header data
let block_level = self.headers_store.get_header_with_block_level(current).unwrap().block_level;
(0..=block_level as usize).for_each(|level| {
let mut staging_level_relations = StagingRelationsStore::new(&self.relations_stores[level]);
let mut staging_level_relations = StagingRelationsStore::new(&mut level_relations_write[level]);
relations::delete_level_relations(MemoryWriter, &mut staging_level_relations, current).unwrap_option();
staging_level_relations.commit(&mut batch).unwrap();
self.ghostdag_stores[level].delete_batch(&mut batch, current).unwrap_option();
Expand Down Expand Up @@ -409,6 +410,7 @@ impl PruningProcessor {
drop(reachability_write);
drop(statuses_write);
drop(reachability_relations_write);
drop(level_relations_write);

reachability_read = self.reachability_store.upgradable_read();
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/processes/parents_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ mod tests {
}

let reachability_service = MTReachabilityService::new(Arc::new(RwLock::new(reachability_store)));
let relations_store: Arc<[_]> =
vec![RelationsStoreMock { children: BlockHashes::new(vec![pruning_point, pp_anticone_block]) }].into();
let relations_store =
Arc::new(RwLock::new(vec![RelationsStoreMock { children: BlockHashes::new(vec![pruning_point, pp_anticone_block]) }]));
let relations_service = MTRelationsService::new(relations_store, 0);
let parents_manager = ParentsManager::new(250, genesis_hash, headers_store, reachability_service, relations_service);

Expand Down Expand Up @@ -583,7 +583,7 @@ mod tests {
}

let reachability_service = MTReachabilityService::new(Arc::new(RwLock::new(reachability_store)));
let relations_store: Arc<[_]> = vec![RelationsStoreMock { children: BlockHashes::new(vec![pruning_point]) }].into();
let relations_store = Arc::new(RwLock::new(vec![RelationsStoreMock { children: BlockHashes::new(vec![pruning_point]) }]));
let relations_service = MTRelationsService::new(relations_store, 0);
let parents_manager = ParentsManager::new(250, genesis_hash, headers_store, reachability_service, relations_service);

Expand Down
Loading

0 comments on commit 0fad12f

Please sign in to comment.