From c9538dc47b25640e44a373e9ab43cd9623715a1c Mon Sep 17 00:00:00 2001 From: Marc Paquete Date: Mon, 24 Oct 2022 13:36:50 -0400 Subject: [PATCH 01/11] Add btreeset example for CNR --- node-replication/examples/cnr_btreeset.rs | 157 ++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 node-replication/examples/cnr_btreeset.rs diff --git a/node-replication/examples/cnr_btreeset.rs b/node-replication/examples/cnr_btreeset.rs new file mode 100644 index 00000000..614631ed --- /dev/null +++ b/node-replication/examples/cnr_btreeset.rs @@ -0,0 +1,157 @@ +// Copyright © 2019-2022 VMware, Inc. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! A minimal example that impements a replicated BTreeSet. +#![feature(generic_associated_types)] + +use crossbeam_skiplist::SkipSet; + +use std::sync::Arc; + +use node_replication::cnr::{Dispatch, Log, LogMapper, LogMetaData, Replica}; + +#[derive(Default)] +struct CnrBtreeSet { + storage: SkipSet, +} + +#[derive(Clone, Debug, PartialEq)] +enum Modify { + Put(u64), + Delete(u64), +} + +impl LogMapper for Modify { + fn hash(&self, _nlogs: usize, logs: &mut Vec) { + logs.push(0); + } +} + +#[derive(Clone, Debug, PartialEq)] +enum Access { + Get(u64), + Contains(u64), +} + +impl LogMapper for Access { + fn hash(&self, _nlogs: usize, logs: &mut Vec) { + logs.push(0); + } +} + +impl Dispatch for CnrBtreeSet { + type ReadOperation<'rop> = Access; + type WriteOperation = Modify; + type Response = Option; + + fn dispatch<'rop>(&self, op: Self::ReadOperation<'rop>) -> Self::Response { + match op { + Access::Get(key) => self.storage.get(&key).map(|v| *v), + Access::Contains(key) => { + let response = self.storage.contains(&key); + Some(response as u64) + } + } + } + + fn dispatch_mut(&self, op: Self::WriteOperation) -> Self::Response { + match op { + Modify::Put(key) => { + let response = self.storage.insert(key); + Some(*response) + } + Modify::Delete(key) => { + let response = self.storage.remove(&key).unwrap(); + Some(*response) + } + } + } +} + +/// We initialize a log, and two replicas for a B-tree, register with the replica +/// and then execute operations. +fn main() { + const N_OPS: u64 = 10_000; + // The operation log for storing `WriteOperation`, it has a size of 2 MiB: + let log = Arc::new( + Log::<::WriteOperation>::new_with_bytes( + 2 * 1024 * 1024, + LogMetaData::new(1), + ), + ); + + // Create two replicas of the b-tree + let replica1 = Replica::::new(vec![log.clone()]); + let replica2 = Replica::::new(vec![log.clone()]); + + // The replica executes Modify or Access operations by calling + // 'execute_mut' and `execute`. Eventually they end up in the `Dispatch` trait. + let thread_loop = |replica: &Arc>, starting_point: u64, ridx| { + for i in starting_point..starting_point + N_OPS { + let _r = match i % 4 { + 0 => { + let response = replica.execute_mut(Modify::Put(i), ridx); + assert_eq!(response, Some(i)); + response + } + 1 => { + let response = replica.execute(Access::Contains(i - 1), ridx); + assert_eq!(response, Some(1)); + response + } + 2 => { + let response = replica.execute(Access::Get(i - 2), ridx); + assert_eq!(response, Some(i - 2)); + response + } + 3 => { + let response = replica.execute_mut(Modify::Delete(i - 3), ridx); + assert_eq!(response, Some(i - 3)); + response + } + + _ => unreachable!(), + }; + } + }; + + // Finally, we spawn three threads that issue operations, thread 1 and 2 + // will use replica1 and thread 3 will use replica 2: + let mut threads = Vec::with_capacity(3); + let replica11 = replica1.clone(); + threads.push( + std::thread::Builder::new() + .name("thread 1".to_string()) + .spawn(move || { + let ridx = replica11.register().expect("Unable to register with log"); + thread_loop(&replica11, 0, ridx); + }), + ); + + let replica12 = replica1.clone(); + threads.push( + std::thread::Builder::new() + .name("thread 2".to_string()) + .spawn(move || { + let ridx = replica12.register().expect("Unable to register with log"); + thread_loop(&replica12, 100000, ridx); + }), + ); + + threads.push( + std::thread::Builder::new() + .name("thread 3".to_string()) + .spawn(move || { + let ridx = replica2.register().expect("Unable to register with log"); + thread_loop(&replica2, 200000, ridx); + }), + ); + + // Wait for all the threads to finish + for thread in threads { + thread + .expect("all threads should complete") + .join() + .unwrap_or_default(); + } +} From 9f50ad36f1b96e645a39b1a5d9b2a5b3a79ed091 Mon Sep 17 00:00:00 2001 From: Marc Paquette Date: Wed, 30 Nov 2022 16:41:41 -0500 Subject: [PATCH 02/11] Implement add_replica Implement add_replica to dynamically add replicas to a Node Replicated data structure. --- bench_utils/src/mkbench.rs | 4 +- node-replication/examples/nr_async_hashmap.rs | 1 + node-replication/examples/nr_btreeset.rs | 2 +- node-replication/examples/nr_hashmap.rs | 2 +- node-replication/examples/nr_stack.rs | 1 + node-replication/src/log.rs | 55 +++++- node-replication/src/nr/mod.rs | 160 ++++++++++++++++-- node-replication/src/nr/replica.rs | 34 ++-- node-replication/src/nr/rwlock.rs | 26 +-- node-replication/tests/nr_stack.rs | 41 ++--- 10 files changed, 251 insertions(+), 75 deletions(-) diff --git a/bench_utils/src/mkbench.rs b/bench_utils/src/mkbench.rs index f63a88ee..a164e5af 100644 --- a/bench_utils/src/mkbench.rs +++ b/bench_utils/src/mkbench.rs @@ -140,7 +140,7 @@ pub trait DsInterface { ) -> ::Response; } -impl<'a, T: Dispatch + Sync + Default> DsInterface for NodeReplicated { +impl<'a, T: Dispatch + Sync + Default + Clone> DsInterface for NodeReplicated { type D = T; fn new(replicas: NonZeroUsize, _logs: NonZeroUsize, log_size: usize) -> Arc { @@ -235,7 +235,7 @@ pub fn baseline_comparison( >, log_size: usize, ) where - R::D: Dispatch + Sync + Default, + R::D: Dispatch + Sync + Default + Clone, ::WriteOperation: Send + Sync, ::ReadOperation<'static>: Sync + Send + Clone, ::Response: Send, diff --git a/node-replication/examples/nr_async_hashmap.rs b/node-replication/examples/nr_async_hashmap.rs index c9e6e9f3..a614f037 100644 --- a/node-replication/examples/nr_async_hashmap.rs +++ b/node-replication/examples/nr_async_hashmap.rs @@ -17,6 +17,7 @@ use node_replication::nr::NodeReplicated; const CAPACITY: usize = 32; /// The node-replicated hashmap uses a std hashmap internally. +#[derive(Clone)] struct NrHashMap { storage: HashMap, } diff --git a/node-replication/examples/nr_btreeset.rs b/node-replication/examples/nr_btreeset.rs index 52d89b89..6e47ca4d 100644 --- a/node-replication/examples/nr_btreeset.rs +++ b/node-replication/examples/nr_btreeset.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use node_replication::nr::Dispatch; use node_replication::nr::NodeReplicated; -#[derive(Default)] +#[derive(Default, Clone)] struct NrBtreeSet { storage: BTreeSet, } diff --git a/node-replication/examples/nr_hashmap.rs b/node-replication/examples/nr_hashmap.rs index 68f37738..dcd9774f 100644 --- a/node-replication/examples/nr_hashmap.rs +++ b/node-replication/examples/nr_hashmap.rs @@ -12,7 +12,7 @@ use node_replication::nr::Dispatch; use node_replication::nr::NodeReplicated; /// The node-replicated hashmap uses a std hashmap internally. -#[derive(Default)] +#[derive(Default, Clone)] struct NrHashMap { storage: HashMap, } diff --git a/node-replication/examples/nr_stack.rs b/node-replication/examples/nr_stack.rs index 0e25bdcd..453c1fbd 100644 --- a/node-replication/examples/nr_stack.rs +++ b/node-replication/examples/nr_stack.rs @@ -28,6 +28,7 @@ enum Access { } /// The actual stack is implemented with a (single-threaded) Vec. +#[derive(Clone)] struct Stack where T: Default, diff --git a/node-replication/src/log.rs b/node-replication/src/log.rs index 4eb19c7c..bb5a6300 100644 --- a/node-replication/src/log.rs +++ b/node-replication/src/log.rs @@ -25,7 +25,7 @@ use crate::replica::MAX_THREADS_PER_REPLICA; /// A token that identifies a replica for a log. /// /// The replica is supposed to call [`Log::register()`] to get the token. -#[derive(Eq, PartialEq, Debug)] +#[derive(Eq, PartialEq, Debug, Clone)] #[cfg(not(loom))] pub struct LogToken(pub(crate) usize); #[cfg(loom)] @@ -429,6 +429,29 @@ where (min_replica_idx, min_local_tail) } + /// Loops over all `ltails` and finds the replica with the highest tail. + /// + /// # Returns + /// The ID (in `LogToken`) of the replica with the highest tail and the + /// corresponding/highest tail `idx` in the `Log`. + pub(crate) fn find_max_tail(&self) -> (usize, usize) { + let r = self.next.load(Ordering::Relaxed); + let (mut max_replica_idx, mut max_local_tail) = (0, self.ltails[0].load(Ordering::Relaxed)); + + // Find the local tail across all replicas. + for idx in 1..r { + let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed); + //info!("Replica {} cur_local_tail {}.", idx - 1, cur_local_tail); + + if cur_local_tail > max_local_tail { + max_local_tail = cur_local_tail; + max_replica_idx = idx - 1; + } + } + + (max_replica_idx, max_local_tail) + } + /// Resets the log. This is required for microbenchmarking the log; with /// this method, we can re-use the log across experimental runs without /// having to re-allocate the log over and over again (which blows up the @@ -680,4 +703,34 @@ mod tests { } assert!(l.register().is_none()); } + + // Tests to ensure find_max_tail() gets the most up to date replica when called + #[test] + fn test_find_max_tail_gets_highest() { + let l = Log::::default(); + let _lt = l.register().unwrap(); + + l.next.store(5, Ordering::Relaxed); + l.ltails[0].store(1023, Ordering::Relaxed); + l.ltails[1].store(224, Ordering::Relaxed); + l.ltails[2].store(4096, Ordering::Relaxed); + l.ltails[3].store(799, Ordering::Relaxed); + + assert_eq!(l.find_max_tail(), (2, 4096)) + } + + // Tests to ensure find_min_tail() finds the replica with the lowest tail. + #[test] + fn test_find_min_tail_gets_lowest() { + let l = Log::::default(); + let _lt = l.register().unwrap(); + + l.next.store(5, Ordering::Relaxed); + l.ltails[0].store(1023, Ordering::Relaxed); + l.ltails[1].store(224, Ordering::Relaxed); + l.ltails[2].store(4096, Ordering::Relaxed); + l.ltails[3].store(799, Ordering::Relaxed); + + assert_eq!(l.find_min_tail(), (1, 224)) + } } diff --git a/node-replication/src/nr/mod.rs b/node-replication/src/nr/mod.rs index bedf790e..c84d6492 100644 --- a/node-replication/src/nr/mod.rs +++ b/node-replication/src/nr/mod.rs @@ -80,6 +80,8 @@ use reusable_box::ReusableBoxFuture; use arrayvec::ArrayVec; +use core::sync::atomic::Ordering; + mod context; pub mod log; pub mod replica; @@ -277,7 +279,7 @@ impl<'f> Drop for AffinityToken<'f> { } /// Errors that can be encountered by interacting with [`NodeReplicated`]. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum NodeReplicatedError { /// Not enough memory to create a [`NodeReplicated`] instance. OutOfMemory, @@ -304,15 +306,16 @@ impl From for NodeReplicatedError { /// [`NodeReplicated`] instance. Finally, it routes threads to the correct /// replica and handles liveness of replicas by making sure to advance replicas /// which are behind automatically. -pub struct NodeReplicated { +pub struct NodeReplicated { log: Log, replicas: Vec>>, + // thread_routing: HashMap, affinity_mngr: AffinityManager, } impl NodeReplicated where - D: Default + Dispatch + Sized + Sync, + D: Default + Dispatch + Sized + Sync + Clone, { /// Creates a new, replicated data-structure from a single-threaded /// data-structure that implements [`Dispatch`]. It uses the [`Default`] @@ -399,7 +402,7 @@ where .register() .expect("Succeeds (num_replicas < MAX_REPLICAS_PER_LOG)"); - let r = { + let r = { // Allocate the replica on the proper NUMA node let _aff_tkn = affinity_mngr.switch(replica_id); Box::try_new(Replica::new(log_token))? @@ -417,6 +420,68 @@ where affinity_mngr, }) } + + + + /// Adds a new replica to the NodeReplicated. It returns the index of the added replica within + /// NodeReplicated.replicas[x]. + /// + /// # Example + /// + /// Test ignored for lack of access to `MACHINE_TOPOLOGY` (see benchmark code + /// for an example). + /// + /// ```ignore + /// let replicas = NonZeroUsize::new(1).unwrap(); + /// let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + /// let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + /// let _ = async_ds.execute_mut(1, ttkn_a); + /// let _ = async_ds.execute_mut(2, ttkn_a); + /// let added_replica = async_ds.add_replica().unwrap(); + /// let added_replica_data = async_ds.replicas[added_replica].data.read(0).junk; + /// assert_eq!(2, added_replica_data); + /// ``` + pub fn add_replica(&mut self) -> Result { + self.replicas.try_reserve(1)?; + + let log_token = self + .log + .register() + .expect("Succeeds (num_replicas < MAX_REPLICAS_PER_LOG)"); + + let r = { + // Allocate the replica on the proper NUMA node + let _aff_tkn = self.affinity_mngr.switch(self.replicas.len()); + Box::try_new(Replica::new(log_token.clone()))? + // aff_tkn is dropped here + }; + + // This succeeds, we did `try_reserve` earlier so no `try_push` is + // necessary. + self.replicas.push(r); + + let replica_id = self.replicas.len() - 1; + + // get the most up to date replica + let (max_replica_idx, max_local_tail) = self.log.find_max_tail(); + + // copy data from existing replica + let replica_locked = self.replicas[max_replica_idx] + .data + .read(log_token.0) + .clone(); + let new_replica_data = &mut self.replicas[replica_id].data.write(log_token.0); + **new_replica_data = replica_locked; + + // push ltail entry for new replica + self.log.ltails[replica_id].store(max_local_tail, Ordering::Relaxed); + + // find and push existing lmask entry for new replica + let lmask_status = self.log.lmasks[max_replica_idx].get(); + self.log.lmasks[replica_id].set(lmask_status); + + Ok(replica_id) + } } impl NodeReplicated @@ -438,7 +503,7 @@ where impl NodeReplicated where - D: Dispatch + Sized + Sync, + D: Dispatch + Sized + Sync + Clone, { /// Registers a thread with a given replica in the [`NodeReplicated`] /// data-structure. Returns an Option containing a [`ThreadToken`] if the @@ -463,7 +528,7 @@ where /// use node_replication::nr::NodeReplicated; /// use node_replication::nr::Dispatch; /// - /// #[derive(Default)] + /// #[derive(Default, Clone)] /// struct Void; /// impl Dispatch for Void { /// type ReadOperation<'rop> = (); @@ -528,7 +593,7 @@ where /// use node_replication::nr::NodeReplicated; /// use node_replication::nr::Dispatch; /// - /// #[derive(Default)] + /// #[derive(Default,Clone)] /// struct Void; /// impl Dispatch for Void { /// type ReadOperation<'rop> = (); @@ -562,7 +627,7 @@ where /// /// e.g., either `Sync` an out-of-date, behind replica, or call `execute_locked` or /// `execute_mut_locked` to resume the operation with a combiner lock. - enum ResolveOp<'a, D: core::marker::Sync + Dispatch + Sized> { + enum ResolveOp<'a, D: core::marker::Sync + Dispatch + Sized + Clone> { /// Resumes a replica that earlier returned with an Error (and the CombinerLock). Exec(Option>), /// Indicates need to [`Replica::sync()`] a replica with the given ID. @@ -644,7 +709,7 @@ where /// use node_replication::nr::NodeReplicated; /// use node_replication::nr::Dispatch; /// - /// #[derive(Default)] + /// #[derive(Default, Clone)] /// struct Void; /// impl Dispatch for Void { /// type ReadOperation<'rop> = usize; @@ -675,7 +740,7 @@ where /// /// e.g., either `Sync` an out-of-date, behind replica, or call `execute_locked` or /// `execute_mut_locked` to resume the operation with a combiner lock. - enum ResolveOp<'a, 'rop, D: core::marker::Sync + Dispatch + Sized> { + enum ResolveOp<'a, 'rop, D: core::marker::Sync + Dispatch + Sized + Clone> { /// Resumes a replica that earlier returned with an Error (and the CombinerLock). Exec(Option>, D::ReadOperation<'rop>), /// Indicates need to [`Replica::sync()`] a replica with the given ID. @@ -778,4 +843,79 @@ mod test { let res = block_on(resp).unwrap(); assert_eq!(res, 1); } + + #[test] + fn test_add_replica_increments_replica_count() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + assert_eq!(async_ds.replicas.len(), 1); + let _ = async_ds.add_replica(); + assert_eq!(async_ds.replicas.len(), 2); + let _ = async_ds.add_replica(); + assert_eq!(async_ds.replicas.len(), 3); + } + + #[test] + #[should_panic(expected = "Succeeds (num_replicas < MAX_REPLICAS_PER_LOG")] + fn test_add_replica_does_not_exceed_max_replicas() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + for _ in 0..MAX_REPLICAS_PER_LOG { + let _ = async_ds.add_replica(); + } + } + + #[test] + fn test_add_replica_syncs_replica_data() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); + + let added_replica = async_ds.add_replica().unwrap(); + + let added_replica_data = async_ds.replicas[added_replica].data.read(0).junk; + + assert_eq!(3, added_replica_data); + } + + #[test] + fn test_add_replica_syncs_replica_lmask() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); + + let replica_lmask = async_ds.log.lmasks[0].get(); + let added_replica = async_ds.add_replica().unwrap(); + let added_replica_lmask = async_ds.log.lmasks[added_replica].get(); + assert_eq!(replica_lmask, added_replica_lmask); + } + + #[test] + fn test_add_replica_syncs_replica_ltail() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); + + let replica_ltails = async_ds.log.ltails[0].load(Ordering::Relaxed); + let added_replica = async_ds.add_replica().unwrap(); + let added_replica_ltails = async_ds.log.ltails[added_replica].load(Ordering::Relaxed); + assert_eq!(replica_ltails, added_replica_ltails); + } } diff --git a/node-replication/src/nr/replica.rs b/node-replication/src/nr/replica.rs index 207e7ccb..feb46ece 100644 --- a/node-replication/src/nr/replica.rs +++ b/node-replication/src/nr/replica.rs @@ -36,7 +36,7 @@ pub use crate::replica::MAX_THREADS_PER_REPLICA; /// implement their own version of [`crate::nr::NodeReplicated`]. pub enum ReplicaError<'r, D> where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { /// We don't have space in the log to enqueue our batch of operations. /// @@ -76,7 +76,7 @@ where impl Debug for ReplicaError<'_, D> where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -116,7 +116,7 @@ where /// replicas. pub struct Replica where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { /// An identifier that we got from the Log when the replica was registered /// against the shared-log ([`Log::register()`]). Required to pass to the @@ -160,18 +160,18 @@ where /// The underlying data structure. This is shared among all threads that are /// registered with this replica. Each replica maintains its own copy of /// `data`. - data: CachePadded>, + pub data: CachePadded>, } /// The Replica is [`Sync`]. /// /// Member variables are protected by the combiner lock of the replica /// (`combiner`). Contexts are thread-safe. -unsafe impl Sync for Replica where D: Sized + Sync + Dispatch {} +unsafe impl Sync for Replica where D: Sized + Sync + Dispatch + Clone {} impl core::fmt::Debug for Replica where - D: Sized + Sync + Dispatch, + D: Sized + Sync + Dispatch + Clone, { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { write!(f, "Replica") @@ -180,7 +180,7 @@ where impl Replica where - D: Sized + Default + Dispatch + Sync, + D: Sized + Default + Dispatch + Sync + Clone, { /// Constructs an instance of a replicated data structure. /// @@ -200,7 +200,7 @@ where /// use node_replication::nr::Replica; /// /// // The data structure we want replicated. - /// #[derive(Default)] + /// #[derive(Default, Clone)] /// struct Data { /// junk: u64, /// } @@ -248,14 +248,14 @@ where /// to reset it to 0. pub struct CombinerLock<'a, D> where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { replica: &'a Replica, } impl<'a, D> CombinerLock<'a, D> where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { /// Inidcates we're holding the CombinerLock. /// @@ -269,7 +269,7 @@ where impl Drop for CombinerLock<'_, D> where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { /// Allow other threads to perform flat combining once we have finished all /// our work. @@ -290,7 +290,7 @@ where impl Debug for CombinerLock<'_, D> where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "CombinerLock") @@ -299,7 +299,7 @@ where impl Replica where - D: Sized + Dispatch + Sync, + D: Sized + Dispatch + Sync + Clone, { /// Similar to [`Replica::new`], but we pass an existing data-structure as /// an argument (`d`) rather than relying on the [`Default`] trait to create @@ -362,7 +362,7 @@ where /// use node_replication::nr::Log; /// use node_replication::nr::Replica; /// - /// #[derive(Default)] + /// #[derive(Default, Clone)] /// struct Data { /// junk: u64, /// } @@ -443,7 +443,7 @@ where /// use node_replication::nr::Log; /// use node_replication::nr::Replica; /// - /// #[derive(Default)] + /// #[derive(Default, Clone)] /// struct Data { /// junk: u64, /// } @@ -542,7 +542,7 @@ where /// /// use std::sync::Arc; /// - /// #[derive(Default)] + /// #[derive(Default, Clone)] /// struct Data { /// junk: u64, /// } @@ -912,7 +912,7 @@ pub(crate) mod test { use std::vec; // Really dumb data structure to test against the Replica and shared log. - #[derive(Default)] + #[derive(Default, Clone)] pub(crate) struct Data { pub(crate) junk: u64, } diff --git a/node-replication/src/nr/rwlock.rs b/node-replication/src/nr/rwlock.rs index ba80fe6f..802dfeaa 100644 --- a/node-replication/src/nr/rwlock.rs +++ b/node-replication/src/nr/rwlock.rs @@ -42,7 +42,7 @@ const RLOCK_DEFAULT: CachePadded = CachePadded::new(AtomicUsize::ne /// Calling `write()` returns a write-guard that can be used to safely mutate `T`. pub struct RwLock where - T: Sized + Sync, + T: Sized + Sync + Clone, { /// The writer lock. There can be at most one writer at any given point of time. wlock: CachePadded, @@ -56,7 +56,7 @@ where /// A read-guard that can be used to read the underlying data structure. Writes on /// the data structure will be blocked as long as one of these is lying around. -pub struct ReadGuard<'a, T: Sized + Sync + 'a> { +pub struct ReadGuard<'a, T: Sized + Sync + Clone + 'a> { /// Id of the thread that acquired this guard. Required at drop time so that /// we can release the appropriate read lock. tid: usize, @@ -67,14 +67,14 @@ pub struct ReadGuard<'a, T: Sized + Sync + 'a> { /// A write-guard that can be used to write to the underlying data structure. All /// reads will be blocked until this is dropped. -pub struct WriteGuard<'a, T: Sized + Sync + 'a> { +pub struct WriteGuard<'a, T: Sized + Sync + Clone + 'a> { /// A reference to the Rwlock wrapping the data-structure. lock: &'a RwLock, } impl Default for RwLock where - T: Sized + Default + Sync, + T: Sized + Default + Sync + Clone, { /// Returns a new instance of a RwLock. Default constructs the /// underlying data structure. @@ -89,7 +89,7 @@ where impl RwLock where - T: Sized + Sync, + T: Sized + Sync + Clone, { /// Returns a new instance of a RwLock. Default constructs the /// underlying data structure. @@ -218,14 +218,14 @@ where } } -impl<'rwlock, T: Sized + Sync> ReadGuard<'rwlock, T> { +impl<'rwlock, T: Sized + Sync + Clone> ReadGuard<'rwlock, T> { /// Returns a read guard over a passed in reader-writer lock. unsafe fn new(lock: &'rwlock RwLock, tid: usize) -> ReadGuard<'rwlock, T> { ReadGuard { tid, lock } } } -impl<'rwlock, T: Sized + Sync> WriteGuard<'rwlock, T> { +impl<'rwlock, T: Sized + Sync + Clone> WriteGuard<'rwlock, T> { /// Returns a write guard over a passed in reader-writer lock. unsafe fn new(lock: &'rwlock RwLock) -> WriteGuard<'rwlock, T> { WriteGuard { lock } @@ -235,11 +235,11 @@ impl<'rwlock, T: Sized + Sync> WriteGuard<'rwlock, T> { /// `Sync` trait allows `RwLock` to be shared between threads. The `read()` and /// `write()` logic ensures that we will never have threads writing to and /// reading from the underlying data structure simultaneously. -unsafe impl Sync for RwLock {} +unsafe impl Sync for RwLock {} /// This `Deref` trait allows a thread to use T from a ReadGuard. /// ReadGuard can only be dereferenced into an immutable reference. -impl Deref for ReadGuard<'_, T> { +impl Deref for ReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -249,7 +249,7 @@ impl Deref for ReadGuard<'_, T> { /// This `Deref` trait allows a thread to use T from a WriteGuard. /// This allows us to dereference an immutable reference. -impl Deref for WriteGuard<'_, T> { +impl Deref for WriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -259,7 +259,7 @@ impl Deref for WriteGuard<'_, T> { /// This `DerefMut` trait allow a thread to use T from a WriteGuard. /// This allows us to dereference a mutable reference. -impl DerefMut for WriteGuard<'_, T> { +impl DerefMut for WriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.lock.data.get() } } @@ -267,7 +267,7 @@ impl DerefMut for WriteGuard<'_, T> { /// This `Drop` trait implements the unlock logic for a reader lock. Once the `ReadGuard` /// goes out of scope, the corresponding read lock is marked as released. -impl Drop for ReadGuard<'_, T> { +impl Drop for ReadGuard<'_, T> { fn drop(&mut self) { unsafe { let tid = self.tid; @@ -278,7 +278,7 @@ impl Drop for ReadGuard<'_, T> { /// This `Drop` trait implements the unlock logic for a writer lock. Once the `WriteGuard` /// goes out of scope, the corresponding write lock is marked as released. -impl Drop for WriteGuard<'_, T> { +impl Drop for WriteGuard<'_, T> { fn drop(&mut self) { unsafe { self.lock.write_unlock(); diff --git a/node-replication/tests/nr_stack.rs b/node-replication/tests/nr_stack.rs index 78df701e..7dc7f6dc 100644 --- a/node-replication/tests/nr_stack.rs +++ b/node-replication/tests/nr_stack.rs @@ -8,7 +8,7 @@ extern crate rand; extern crate std; use std::collections::HashMap; -use std::sync::{Arc, Barrier, RwLock}; +use std::sync::{Arc, Barrier}; use std::thread; use std::usize; @@ -27,10 +27,9 @@ enum OpRd { Peek, } +#[derive(Clone)] struct Stack { storage: Vec, - popped: Vec>, - peeked: RwLock>>, } fn compare_vectors(a: &Vec, b: &Vec) -> bool { @@ -44,19 +43,15 @@ impl Stack { } pub fn pop(&mut self) -> Option { - let r = self.storage.pop(); - self.popped.push(r); - return r; + self.storage.pop() } pub fn peek(&self) -> Option { - let mut r = None; let len = self.storage.len(); if len > 0 { - r = Some(self.storage[len - 1]); + return Some(self.storage[len - 1]); } - self.peeked.write().unwrap().push(r); - return r; + return None; } } @@ -64,8 +59,6 @@ impl Default for Stack { fn default() -> Stack { let s = Stack { storage: Default::default(), - popped: Default::default(), - peeked: Default::default(), }; s @@ -109,8 +102,6 @@ fn sequential_test() { let r = Replica::::new(ltkn); let idx = r.register().expect("Failed to register with Replica."); let mut correct_stack: Vec = Vec::new(); - let mut correct_popped: Vec> = Vec::new(); - let mut correct_peeked: Vec> = Vec::new(); // Populate with some initial data for _i in 0..50 { @@ -124,15 +115,14 @@ fn sequential_test() { match op % 3usize { 0usize => { let o = r.execute_mut(&log, OpWr::Pop, idx).unwrap(); - let popped = correct_stack.pop(); - assert_eq!(popped, o); - correct_popped.push(popped); + let popped = correct_stack.pop().unwrap(); + assert_eq!(popped, o.unwrap()); } 1usize => { let element = orng.gen(); let pushed = r.execute_mut(&log, OpWr::Push(element), idx).unwrap(); - assert_eq!(pushed, Some(element)); correct_stack.push(element); + assert_eq!(pushed, Some(element)); } 2usize => { let o = r.execute(&log, OpRd::Peek, idx).unwrap(); @@ -142,31 +132,22 @@ fn sequential_test() { ele = Some(correct_stack[len - 1]); } assert_eq!(ele, o); - correct_peeked.push(ele); } _ => unreachable!(), } } let v = |data: &Stack| { - assert!( - compare_vectors(&correct_popped, &data.popped), - "Pop operation error detected" - ); assert!( compare_vectors(&correct_stack, &data.storage), "Push operation error detected" ); - assert!( - compare_vectors(&correct_peeked, &data.peeked.read().unwrap()), - "Peek operation error detected" - ); }; r.verify(&log, v); } /// A stack to verify that the log works correctly with multiple threads. -#[derive(Eq, PartialEq)] +#[derive(Eq, PartialEq, Clone)] struct VerifyStack { storage: Vec, per_replica_counter: HashMap, @@ -481,7 +462,7 @@ fn replicas_are_equal() { let mut p0 = vec![]; let v = |data: &Stack| { d0.extend_from_slice(&data.storage); - p0.extend_from_slice(&data.popped); + p0.extend_from_slice(&data.storage); }; replicas[0].verify(&log, v); @@ -489,7 +470,7 @@ fn replicas_are_equal() { let mut p1 = vec![]; let v = |data: &Stack| { d1.extend_from_slice(&data.storage); - p1.extend_from_slice(&data.popped); + p1.extend_from_slice(&data.storage); }; replicas[1].verify(&log, v); From 2e54b0014b75c51bcf2528bfed2b800dcec5eaea Mon Sep 17 00:00:00 2001 From: Marc Paquette Date: Thu, 1 Dec 2022 11:29:47 -0500 Subject: [PATCH 03/11] Update nr.yml --- .github/workflows/nr.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nr.yml b/.github/workflows/nr.yml index fcae7098..44ec754f 100644 --- a/.github/workflows/nr.yml +++ b/.github/workflows/nr.yml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Install dependencies - run: sudo apt-get update && sudo apt-get install -y libhwloc-dev gnuplot libfuse-dev liburcu-dev liburcu6 + run: sudo apt-get update && sudo apt-get install -y libhwloc-dev gnuplot libfuse-dev liburcu-dev liburcu8 - uses: actions/checkout@v2.4.0 - name: Install rust toolchain working-directory: ./node-replication From 61f25d5c6d9920980604ce31b0136d19917a2110 Mon Sep 17 00:00:00 2001 From: Marc Paquette Date: Mon, 19 Dec 2022 11:09:22 -0500 Subject: [PATCH 04/11] Add initial Replica remove and refactor data structures Added a remove_replica to NR. Refactored `replicas` `lmasks` and `ltails` to use hashmaps for better add/remove semantics. --- node-replication/Cargo.toml | 3 +- node-replication/src/cnr/log.rs | 68 ++++++------ node-replication/src/log.rs | 161 +++++++++++++++++----------- node-replication/src/nr/log.rs | 59 ++++++---- node-replication/src/nr/mod.rs | 184 +++++++++++++++++++++++++------- 5 files changed, 323 insertions(+), 152 deletions(-) diff --git a/node-replication/Cargo.toml b/node-replication/Cargo.toml index 868bde58..eef2d7d8 100644 --- a/node-replication/Cargo.toml +++ b/node-replication/Cargo.toml @@ -21,6 +21,7 @@ crossbeam-utils = {version = "0.8.5", default-features = false} # renamed to avoid confusion with our own `log` modules: logging = { version = "0.4", package = "log" } static_assertions = "1.1.0" +bm = { git = "https://github.com/reynoldsbd/bm" } [target.'cfg(loom)'.dependencies] arr_macro = "0.1.3" @@ -103,4 +104,4 @@ harness = false [[bench]] name = "chashbench" -harness = false \ No newline at end of file +harness = false diff --git a/node-replication/src/cnr/log.rs b/node-replication/src/cnr/log.rs index acedb669..d7ff6555 100644 --- a/node-replication/src/cnr/log.rs +++ b/node-replication/src/cnr/log.rs @@ -136,13 +136,12 @@ where let used = tail - head + 1; if used > self.slog.len() / 3 { - let r = self.next.load(Ordering::Relaxed); let mut is_stuck = false; - let cur_local_tail = self.ltails[idx.0 - 1].load(Ordering::Relaxed); + let cur_local_tail = self.ltails[&(idx.0)].load(Ordering::Relaxed); // Find the smallest local tail across all replicas. - for idx_iter in 1..r { - let local_tail = self.ltails[idx_iter - 1].load(Ordering::Relaxed); + for idx_iter in 1..MAX_REPLICAS_PER_LOG { + let local_tail = self.ltails[&(idx_iter - 1)].load(Ordering::Relaxed); if cur_local_tail > local_tail && cur_local_tail - local_tail > self.slog.len() / 3 { @@ -297,9 +296,9 @@ where is_scan: bool, depends_on: Option>>, ) { - let num_replicas = self.next.load(Ordering::Relaxed) - 1; + let num_replicas = self.replica_count(); let e = self.slog[self.index(offset)].as_ptr(); - let mut m = self.lmasks[idx - 1].get(); + let mut m = self.lmasks[&(idx - 1)].get(); // This entry was just reserved so it should be dead (!= m). However, if // the log has wrapped around, then the alive mask has flipped. In this @@ -362,7 +361,7 @@ where d: &mut F, ) { // Load the logical log offset from which we must execute operations. - let ltail = self.ltails[idx.0 - 1].load(Ordering::Relaxed); + let ltail = self.ltails[&(idx.0 - 1)].load(Ordering::Relaxed); // Check if we have any work to do by comparing our local tail with the log's // global tail. If they're equal, then we're done here and can simply return. @@ -385,14 +384,15 @@ where let mut iteration = 1; let e = self.slog[self.index(i)].as_ptr(); - while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[idx.0 - 1].get() } { + while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[&(idx.0 - 1)].get() } + { if iteration % WARN_THRESHOLD == 0 { warn!( "alivef not being set for self.index(i={}) = {} (self.lmasks[{}] is {})...", i, self.index(i), idx.0 - 1, - self.lmasks[idx.0 - 1].get() + self.lmasks[&(idx.0 - 1)].get() ); } iteration += 1; @@ -424,11 +424,11 @@ where // Increment ltail for each operations, needed for scan // operations as the rubberband is ltail sensitive. - self.ltails[idx.0 - 1].fetch_add(1, Ordering::Relaxed); + self.ltails[&(idx.0 - 1)].fetch_add(1, Ordering::Relaxed); // Looks like we're going to wrap around now; flip this replica's local mask. if self.index(i) == self.slog.len() - 1 { - self.lmasks[idx.0 - 1].set(!self.lmasks[idx.0 - 1].get()); + self.lmasks[&(idx.0 - 1)].set(!self.lmasks[&(idx.0 - 1)].get()); //trace!("idx: {} lmask: {}", idx, self.lmasks[idx - 1].get()); } } @@ -436,7 +436,7 @@ where // Update the completed tail after we've executed these operations. // Also update this replica's local tail. self.ctail.fetch_max(gtail, Ordering::Relaxed); - self.ltails[idx.0 - 1].store(gtail, Ordering::Relaxed); + self.ltails[&(idx.0 - 1)].store(gtail, Ordering::Relaxed); } /// Advances the head of the log forward. If a replica has stopped making progress, @@ -569,15 +569,14 @@ mod tests { assert_eq!(l.slog.len(), n); assert_eq!(l.head.load(Ordering::Relaxed), 0); assert_eq!(l.tail.load(Ordering::Relaxed), 0); - assert_eq!(l.next.load(Ordering::Relaxed), 1); assert_eq!(l.ctail.load(Ordering::Relaxed), 0); for i in 0..MAX_REPLICAS_PER_LOG { - assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0); + assert_eq!(l.ltails[&i].load(Ordering::Relaxed), 0); } for i in 0..MAX_REPLICAS_PER_LOG { - assert_eq!(l.lmasks[i].get(), true); + assert_eq!(l.lmasks[&i].get(), true); } } @@ -621,11 +620,15 @@ mod tests { let l = Log::::new_with_metadata(LogMetaData::new(1)); let tkn = l.register().unwrap(); - l.next.store(5, Ordering::Relaxed); - l.ltails[0].store(1023, Ordering::Relaxed); - l.ltails[1].store(224, Ordering::Relaxed); - l.ltails[2].store(4096, Ordering::Relaxed); - l.ltails[3].store(799, Ordering::Relaxed); + for i in 0..4 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } + + l.ltails[&0].store(1023, Ordering::Relaxed); + l.ltails[&1].store(224, Ordering::Relaxed); + l.ltails[&2].store(4096, Ordering::Relaxed); + l.ltails[&3].store(799, Ordering::Relaxed); l.advance_head(&tkn, &mut |_o: Operation, _i: usize, _, _, _, _| -> bool { true @@ -648,10 +651,9 @@ mod tests { a }; - l.next.store(2, Ordering::Relaxed); l.tail .store(l.slog.len() - GC_FROM_HEAD - 1, Ordering::Relaxed); - l.ltails[0].store(1024, Ordering::Relaxed); + l.ltails[&0].store(1024, Ordering::Relaxed); l.append(&o, &tkn, |_o: Operation, _i: usize, _, _, _, _| -> bool { true }); @@ -679,14 +681,18 @@ mod tests { a }; - l.next.store(2, Ordering::Relaxed); + for i in 0..5 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } + l.head.store(2 * 8192, Ordering::Relaxed); l.tail.store(l.slog.len() - 10, Ordering::Relaxed); l.append(&o, &tkn, |_o: Operation, _i: usize, _, _, _, _| -> bool { true }); - assert_eq!(l.lmasks[0].get(), true); + assert_eq!(l.lmasks[&0].get(), true); assert_eq!(l.tail.load(Ordering::Relaxed), l.slog.len() + 1014); } @@ -714,7 +720,7 @@ mod tests { ); assert_eq!( l.tail.load(Ordering::Relaxed), - l.ltails[0].load(Ordering::Relaxed) + l.ltails[&0].load(Ordering::Relaxed) ); } @@ -788,7 +794,7 @@ mod tests { ); assert_eq!( l.tail.load(Ordering::Relaxed), - l.ltails[0].load(Ordering::Relaxed) + l.ltails[&0].load(Ordering::Relaxed) ); } @@ -816,17 +822,16 @@ mod tests { l.append(&o, &tkn, |_o: Operation, _i: usize, _, _, _, _| -> bool { true }); // Required for GC to work correctly. - l.next.store(2, Ordering::SeqCst); l.head.store(2 * 8192, Ordering::SeqCst); l.tail.store(l.slog.len() - 10, Ordering::SeqCst); l.append(&o, &tkn, |_o: Operation, _i: usize, _, _, _, _| -> bool { true }); - l.ltails[0].store(l.slog.len() - 10, Ordering::SeqCst); + l.ltails[&0].store(l.slog.len() - 10, Ordering::SeqCst); l.exec(&tkn, &mut f); - assert_eq!(l.lmasks[0].get(), false); + assert_eq!(l.lmasks[&0].get(), false); assert_eq!(l.tail.load(Ordering::Relaxed), l.slog.len() + 1014); } @@ -916,7 +921,10 @@ mod tests { assert_eq!(l.slog.len(), total_entries); // Intentionally not using `register()`, (will fail the test due to GC). - let tkn = LogToken(1); + // let tkn = LogToken(1); + // l.replica_inventory.set_bit(1,true); + + let tkn = l.register().unwrap(); let o1 = [(Arc::new(Operation::Read), 1, false)]; let o2 = [(Arc::new(Operation::Read), 1, false)]; diff --git a/node-replication/src/log.rs b/node-replication/src/log.rs index bb5a6300..a14287d9 100644 --- a/node-replication/src/log.rs +++ b/node-replication/src/log.rs @@ -4,6 +4,10 @@ //! Contains the shared Log, in a nutshell it's a multi-producer, multi-consumer //! circular-buffer. +extern crate std; + +use std::collections::HashMap; + use alloc::boxed::Box; use alloc::vec::Vec; @@ -14,6 +18,7 @@ use core::mem::size_of; #[cfg(not(loom))] use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use bm::AtomicBitmap; use crossbeam_utils::CachePadded; #[cfg(loom)] pub use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -153,16 +158,17 @@ where /// Required for garbage collection; since replicas make progress over the log /// independently, we want to make sure that we don't garbage collect operations /// that haven't been executed by all replicas. - pub(crate) ltails: [CachePadded; MAX_REPLICAS_PER_LOG], + pub(crate) ltails: HashMap>, /// Identifier that will be allocated to the next replica that registers with /// this Log. Also required to correctly index into ltails above. - pub(crate) next: CachePadded, + pub replica_inventory: Box, /// Array consisting of local alive masks for each registered replica. Required /// because replicas make independent progress over the log, so we need to /// track log wrap-arounds for each of them separately. - pub(crate) lmasks: [CachePadded>; MAX_REPLICAS_PER_LOG], + // pub(crate) lmasks: [CachePadded>; MAX_REPLICAS_PER_LOG], + pub(crate) lmasks: HashMap>>, /// Meta-data used by log implementations. pub(crate) metadata: LM, @@ -242,19 +248,30 @@ where #[allow(clippy::declare_interior_mutable_const)] const LMASK_DEFAULT: CachePadded> = CachePadded::new(Cell::new(true)); + let mut lmask_init = HashMap::with_capacity(MAX_REPLICAS_PER_LOG); + for i in 0..MAX_REPLICAS_PER_LOG { + lmask_init.insert(i, LMASK_DEFAULT); + } + #[cfg(not(loom))] { #[allow(clippy::declare_interior_mutable_const)] const LTAIL_DEFAULT: CachePadded = CachePadded::new(AtomicUsize::new(0)); + let mut ltails_init = HashMap::with_capacity(MAX_REPLICAS_PER_LOG); + + for i in 0..MAX_REPLICAS_PER_LOG { + ltails_init.insert(i, LTAIL_DEFAULT); + } + Log { slog: raw, head: CachePadded::new(AtomicUsize::new(0usize)), tail: CachePadded::new(AtomicUsize::new(0usize)), ctail: CachePadded::new(AtomicUsize::new(0usize)), - ltails: [LTAIL_DEFAULT; MAX_REPLICAS_PER_LOG], - next: CachePadded::new(AtomicUsize::new(1usize)), - lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG], + ltails: ltails_init, + replica_inventory: Box::new(AtomicUsize::new(1usize)), + lmasks: lmask_init, metadata, } } @@ -264,14 +281,22 @@ where #[cfg(loom)] { use arr_macro::arr; + + const LTAIL_DEFAULT: CachePadded = CachePadded::new(AtomicUsize::new(0)); + + let mut ltails_init = HashMap::with_capacity(3); + + for i in 0..3 { + ltails_init.insert(i, LTAIL_DEFAULT); + } Log { slog: raw, head: CachePadded::new(AtomicUsize::new(0usize)), tail: CachePadded::new(AtomicUsize::new(0usize)), ctail: CachePadded::new(AtomicUsize::new(0usize)), - ltails: arr![CachePadded::new(AtomicUsize::new(0)); 3], // MAX_REPLICAS_PER_LOG - next: CachePadded::new(AtomicUsize::new(1usize)), - lmasks: [LMASK_DEFAULT; MAX_REPLICAS_PER_LOG], + ltails: ltails_init, + replica_inventory: Box::new(AtomicUsize::new(1usize)), + lmasks: HashMap::with_capacity(MAX_REPLICAS_PER_LOG), metadata, } } @@ -379,25 +404,17 @@ where /// let idx = l.register().expect("Failed to register with the Log."); /// ``` pub fn register(&self) -> Option { - // Loop until we either run out of identifiers or we manage to increment `next`. - loop { - let n = self.next.load(Ordering::Relaxed); - - // Check if we've exceeded the maximum number of replicas the log can support. - if n > MAX_REPLICAS_PER_LOG { - return None; - }; - - if self - .next - .compare_exchange_weak(n, n + 1, Ordering::SeqCst, Ordering::SeqCst) - != Ok(n) + for replica_id in 0..=MAX_REPLICAS_PER_LOG { + if !self + .replica_inventory + .load_bit(replica_id, Ordering::Relaxed) { - continue; - }; - - return Some(LogToken(n)); + self.replica_inventory + .compare_and_swap(replica_id, false, true, Ordering::Relaxed); + return Some(LogToken(replica_id)); + } } + None } /// Returns a physical index given a logical index into the shared log. @@ -406,23 +423,38 @@ where logical & (self.slog.len() - 1) } + pub(crate) fn replica_count(&self) -> usize { + let mut count = 0; + for replica_id in 0..MAX_REPLICAS_PER_LOG { + if self + .replica_inventory + .load_bit(replica_id, Ordering::Relaxed) + { + count += 1; + } + } + count + } + /// Loops over all `ltails` and finds the replica with the lowest tail. /// /// # Returns /// The ID (in `LogToken`) of the replica with the lowest tail and the /// corresponding/lowest tail `idx` in the `Log`. pub(crate) fn find_min_tail(&self) -> (usize, usize) { - let r = self.next.load(Ordering::Relaxed); - let (mut min_replica_idx, mut min_local_tail) = (0, self.ltails[0].load(Ordering::Relaxed)); + let (mut min_replica_idx, mut min_local_tail) = + (0, self.ltails[&0].load(Ordering::Relaxed)); // Find the smallest local tail across all replicas. - for idx in 1..r { - let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed); - //info!("Replica {} cur_local_tail {}.", idx - 1, cur_local_tail); - - if cur_local_tail < min_local_tail { - min_local_tail = cur_local_tail; - min_replica_idx = idx - 1; + for idx in 1..MAX_REPLICAS_PER_LOG { + if self.replica_inventory.get_bit(idx) { + let cur_local_tail = self.ltails[&(idx - 1)].load(Ordering::Relaxed); + //info!("Replica {} cur_local_tail {}.", idx - 1, cur_local_tail); + + if cur_local_tail < min_local_tail { + min_local_tail = cur_local_tail; + min_replica_idx = idx - 1; + } } } @@ -435,12 +467,12 @@ where /// The ID (in `LogToken`) of the replica with the highest tail and the /// corresponding/highest tail `idx` in the `Log`. pub(crate) fn find_max_tail(&self) -> (usize, usize) { - let r = self.next.load(Ordering::Relaxed); - let (mut max_replica_idx, mut max_local_tail) = (0, self.ltails[0].load(Ordering::Relaxed)); + let (mut max_replica_idx, mut max_local_tail) = + (0, self.ltails[&0].load(Ordering::Relaxed)); // Find the local tail across all replicas. - for idx in 1..r { - let cur_local_tail = self.ltails[idx - 1].load(Ordering::Relaxed); + for idx in 1..MAX_REPLICAS_PER_LOG { + let cur_local_tail = self.ltails[&(idx - 1)].load(Ordering::Relaxed); //info!("Replica {} cur_local_tail {}.", idx - 1, cur_local_tail); if cur_local_tail > max_local_tail { @@ -469,12 +501,17 @@ where // First, reset global metadata. self.head.store(0, Ordering::SeqCst); self.tail.store(0, Ordering::SeqCst); - self.next.store(1, Ordering::SeqCst); + for i in 0..MAX_REPLICAS_PER_LOG { + self.replica_inventory + .compare_and_swap(i, true, false, Ordering::Relaxed); + } + self.replica_inventory + .compare_and_swap(0, false, true, Ordering::Relaxed); // Next, reset replica-local metadata. for r in 0..MAX_REPLICAS_PER_LOG { - self.ltails[r].store(0, Ordering::Relaxed); - self.lmasks[r].set(true); + self.ltails[&r].store(0, Ordering::Relaxed); + self.lmasks[&r].set(true); } // Next, free up all log entries. Use pointers to avoid memcpy and speed up the @@ -546,7 +583,7 @@ where /// ``` #[inline(always)] pub(crate) fn is_replica_synced_for_reads(&self, idx: &LogToken, ctail: usize) -> bool { - self.ltails[idx.0 - 1].load(Ordering::Relaxed) >= ctail + self.ltails[&(idx.0 - 1)].load(Ordering::Relaxed) >= ctail } /// This method returns the current ctail value for the log. @@ -608,22 +645,22 @@ mod tests { // Tests if a small log can be correctly constructed. #[test] - fn test_log_create() { + fn test_std_log_create() { let l = Log::::new_with_bytes(1024 * 1024, ()); let n = (1024 * 1024) / Log::::entry_size(); assert_eq!(l.slog.len(), n); assert_eq!(l.head.load(Ordering::Relaxed), 0); assert_eq!(l.tail.load(Ordering::Relaxed), 0); - assert_eq!(l.next.load(Ordering::Relaxed), 1); assert_eq!(l.ctail.load(Ordering::Relaxed), 0); assert_eq!(l.metadata, ()); for i in 0..MAX_REPLICAS_PER_LOG { - assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0); + assert_eq!(l.ltails[&i].load(Ordering::Relaxed), 0); } for i in 0..MAX_REPLICAS_PER_LOG { - assert_eq!(l.lmasks[i].get(), true); + std::dbg!(i); + assert_eq!(l.lmasks[&i].get(), true); } } @@ -667,15 +704,14 @@ mod tests { assert_eq!(l.slog.len(), n); assert_eq!(l.head.load(Ordering::Relaxed), 0); assert_eq!(l.tail.load(Ordering::Relaxed), 0); - assert_eq!(l.next.load(Ordering::Relaxed), 1); assert_eq!(l.ctail.load(Ordering::Relaxed), 0); for i in 0..MAX_REPLICAS_PER_LOG { - assert_eq!(l.ltails[i].load(Ordering::Relaxed), 0); + assert_eq!(l.ltails[&i].load(Ordering::Relaxed), 0); } for i in 0..MAX_REPLICAS_PER_LOG { - assert_eq!(l.lmasks[i].get(), true); + assert_eq!(l.lmasks[&i].get(), true); } } @@ -691,7 +727,7 @@ mod tests { fn test_log_register() { let l = Log::::new_with_bytes(1024, ()); assert_eq!(l.register(), Some(LogToken(1))); - assert_eq!(l.next.load(Ordering::Relaxed), 2); + assert_eq!(l.replica_count(), 2); } // Tests that we can register exactly `MAX_REPLICAS_PER_LOG` replicas. @@ -710,11 +746,10 @@ mod tests { let l = Log::::default(); let _lt = l.register().unwrap(); - l.next.store(5, Ordering::Relaxed); - l.ltails[0].store(1023, Ordering::Relaxed); - l.ltails[1].store(224, Ordering::Relaxed); - l.ltails[2].store(4096, Ordering::Relaxed); - l.ltails[3].store(799, Ordering::Relaxed); + l.ltails[&0].store(1023, Ordering::Relaxed); + l.ltails[&1].store(224, Ordering::Relaxed); + l.ltails[&2].store(4096, Ordering::Relaxed); + l.ltails[&3].store(799, Ordering::Relaxed); assert_eq!(l.find_max_tail(), (2, 4096)) } @@ -725,11 +760,15 @@ mod tests { let l = Log::::default(); let _lt = l.register().unwrap(); - l.next.store(5, Ordering::Relaxed); - l.ltails[0].store(1023, Ordering::Relaxed); - l.ltails[1].store(224, Ordering::Relaxed); - l.ltails[2].store(4096, Ordering::Relaxed); - l.ltails[3].store(799, Ordering::Relaxed); + for i in 0..4 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } + + l.ltails[&0].store(1023, Ordering::Relaxed); + l.ltails[&1].store(224, Ordering::Relaxed); + l.ltails[&2].store(4096, Ordering::Relaxed); + l.ltails[&3].store(799, Ordering::Relaxed); assert_eq!(l.find_min_tail(), (1, 224)) } diff --git a/node-replication/src/nr/log.rs b/node-replication/src/nr/log.rs index 3fbbe0c1..2c202de7 100644 --- a/node-replication/src/nr/log.rs +++ b/node-replication/src/nr/log.rs @@ -152,7 +152,7 @@ where // Successfully reserved entries on the shared log. Add the operations in. for (i, op) in ops.iter().enumerate().take(nops) { let e = self.slog[self.index(tail + i)].as_ptr(); - let mut m = self.lmasks[idx.0 - 1].get(); + let mut m = self.lmasks[&(idx.0 - 1)].get(); // This entry was just reserved so it should be dead (!= m). However, if // the log has wrapped around, then the alive mask has flipped. In this @@ -236,7 +236,7 @@ where #[inline(always)] pub(crate) fn exec(&self, idx: &LogToken, d: &mut F) { // Load the logical log offset from which we must execute operations. - let ltail = self.ltails[idx.0 - 1].load(Ordering::Relaxed); + let ltail = self.ltails[&(idx.0 - 1)].load(Ordering::Relaxed); // Check if we have any work to do by comparing our local tail with the log's // global tail. If they're equal, then we're done here and can simply return. @@ -259,14 +259,15 @@ where let mut iteration = 1; let e = self.slog[self.index(i)].as_ptr(); - while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[idx.0 - 1].get() } { + while unsafe { (*e).alivef.load(Ordering::Acquire) != self.lmasks[&(idx.0 - 1)].get() } + { if iteration % WARN_THRESHOLD == 0 { warn!( "alivef not being set for self.index(i={}) = {} (self.lmasks[{}] is {})...", i, self.index(i), idx.0 - 1, - self.lmasks[idx.0 - 1].get() + self.lmasks[&(idx.0 - 1)].get() ); } iteration += 1; @@ -284,7 +285,7 @@ where // Looks like we're going to wrap around now; flip this replica's local mask. if self.index(i) == self.slog.len() - 1 { - self.lmasks[idx.0 - 1].set(!self.lmasks[idx.0 - 1].get()); + self.lmasks[&(idx.0 - 1)].set(!self.lmasks[&(idx.0 - 1)].get()); //trace!("idx: {} lmask: {}", idx, self.lmasks[idx - 1].get()); } } @@ -292,7 +293,7 @@ where // Update the completed tail after we've executed these operations. Also update // this replica's local tail. self.ctail.fetch_max(gtail, Ordering::Relaxed); - self.ltails[idx.0 - 1].store(gtail, Ordering::Relaxed); + self.ltails[&(idx.0 - 1)].store(gtail, Ordering::Relaxed); } /// Advances the head of the log forward. If a replica has stopped making @@ -395,15 +396,19 @@ mod tests { // Tests that we can advance the head of the log to the smallest of all replica-local tails. #[test] - fn test_log_advance_head() { + fn test_log_nr_advance_head() { let l = Log::::default(); let lt = l.register().unwrap(); - l.next.store(5, Ordering::Relaxed); - l.ltails[0].store(1023, Ordering::Relaxed); - l.ltails[1].store(224, Ordering::Relaxed); - l.ltails[2].store(4096, Ordering::Relaxed); - l.ltails[3].store(799, Ordering::Relaxed); + for i in 0..3 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } + + l.ltails[&0].store(1023, Ordering::Relaxed); + l.ltails[&1].store(224, Ordering::Relaxed); + l.ltails[&2].store(4096, Ordering::Relaxed); + // l.ltails[3].store(799, Ordering::Relaxed); assert!(l .advance_head(<, &mut |_o: Operation, _mine: bool| {}) @@ -425,10 +430,14 @@ mod tests { a }; - l.next.store(2, Ordering::Relaxed); + for i in 0..2 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } + l.tail .store(l.slog.len() - GC_FROM_HEAD - 1, Ordering::Relaxed); - l.ltails[0].store(1024, Ordering::Relaxed); + l.ltails[&0].store(1024, Ordering::Relaxed); assert!(l.append(&o, <, |_o: Operation, _mine: bool| {}).is_ok()); assert_eq!(l.head.load(Ordering::Relaxed), 1024); @@ -453,12 +462,15 @@ mod tests { a }; - l.next.store(2, Ordering::Relaxed); + for i in 0..5 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } l.head.store(2 * 8192, Ordering::Relaxed); l.tail.store(l.slog.len() - 10, Ordering::Relaxed); assert!(l.append(&o, <, |_o: Operation, _mine: bool| {}).is_ok()); - assert_eq!(l.lmasks[0].get(), true); + assert_eq!(l.lmasks[&0].get(), true); assert_eq!(l.tail.load(Ordering::Relaxed), l.slog.len() + 1014); } @@ -483,7 +495,7 @@ mod tests { ); assert_eq!( l.tail.load(Ordering::Relaxed), - l.ltails[0].load(Ordering::Relaxed) + l.ltails[&0].load(Ordering::Relaxed) ); } @@ -544,7 +556,7 @@ mod tests { ); assert_eq!( l.tail.load(Ordering::Relaxed), - l.ltails[0].load(Ordering::Relaxed) + l.ltails[&0].load(Ordering::Relaxed) ); } @@ -568,15 +580,20 @@ mod tests { }; assert!(l.append(&o, <, |_o: Operation, _mine| {}).is_ok()); // Required for GC to work correctly. - l.next.store(2, Ordering::SeqCst); + + for i in 0..2 { + l.replica_inventory + .compare_and_swap(i, false, true, Ordering::Relaxed); + } + l.head.store(2 * 8192, Ordering::SeqCst); l.tail.store(l.slog.len() - 10, Ordering::SeqCst); assert!(l.append(&o, <, |_o: Operation, _mine| {}).is_ok()); - l.ltails[0].store(l.slog.len() - 10, Ordering::SeqCst); + l.ltails[&0].store(l.slog.len() - 10, Ordering::SeqCst); l.exec(<, &mut f); - assert_eq!(l.lmasks[0].get(), false); + assert_eq!(l.lmasks[&0].get(), false); assert_eq!(l.tail.load(Ordering::Relaxed), l.slog.len() + 1014); } diff --git a/node-replication/src/nr/mod.rs b/node-replication/src/nr/mod.rs index c84d6492..416e3235 100644 --- a/node-replication/src/nr/mod.rs +++ b/node-replication/src/nr/mod.rs @@ -70,11 +70,14 @@ //! } //! ``` +extern crate std; +use std::collections::HashMap; + use alloc::boxed::Box; -use alloc::vec::Vec; use core::fmt::Debug; use core::marker::Sync; use core::num::NonZeroUsize; + #[cfg(feature = "async")] use reusable_box::ReusableBoxFuture; @@ -283,6 +286,7 @@ impl<'f> Drop for AffinityToken<'f> { pub enum NodeReplicatedError { /// Not enough memory to create a [`NodeReplicated`] instance. OutOfMemory, + DuplicateReplica, } impl From for NodeReplicatedError { @@ -308,8 +312,14 @@ impl From for NodeReplicatedError { /// which are behind automatically. pub struct NodeReplicated { log: Log, - replicas: Vec>>, + replicas: HashMap>, // thread_routing: HashMap, + // contexts: Vec::WriteOperation, ::Response>>, // reroute + // threads' + // work + // queue to + // new + // replica affinity_mngr: AffinityManager, } @@ -394,35 +404,30 @@ where let affinity_mngr = AffinityManager::new(Box::try_new(chg_mem_affinity)?); let log = Log::new_with_bytes(log_size, ()); - let mut replicas = Vec::new(); - replicas.try_reserve(num_replicas.get())?; + let mut replicas = HashMap::with_capacity(MAX_REPLICAS_PER_LOG); for replica_id in 0..num_replicas.get() { let log_token = log .register() .expect("Succeeds (num_replicas < MAX_REPLICAS_PER_LOG)"); - let r = { + let r = { // Allocate the replica on the proper NUMA node let _aff_tkn = affinity_mngr.switch(replica_id); - Box::try_new(Replica::new(log_token))? + Replica::new(log_token) // aff_tkn is dropped here }; - // This succeeds, we did `try_reserve` earlier so no `try_push` is - // necessary. - replicas.push(r); + replicas.insert(replica_id, r); } Ok(NodeReplicated { - replicas, log, + replicas, affinity_mngr, }) } - - /// Adds a new replica to the NodeReplicated. It returns the index of the added replica within /// NodeReplicated.replicas[x]. /// @@ -442,8 +447,6 @@ where /// assert_eq!(2, added_replica_data); /// ``` pub fn add_replica(&mut self) -> Result { - self.replicas.try_reserve(1)?; - let log_token = self .log .register() @@ -452,36 +455,59 @@ where let r = { // Allocate the replica on the proper NUMA node let _aff_tkn = self.affinity_mngr.switch(self.replicas.len()); - Box::try_new(Replica::new(log_token.clone()))? + Replica::new(log_token.clone()) // aff_tkn is dropped here }; - // This succeeds, we did `try_reserve` earlier so no `try_push` is - // necessary. - self.replicas.push(r); + // TODO: Need better algo to establish replica_id + let replica_id = self.new_replica_id(); - let replica_id = self.replicas.len() - 1; + if self.replicas.contains_key(&replica_id) { + return core::prelude::v1::Err(NodeReplicatedError::DuplicateReplica); + } + + self.replicas.insert(replica_id, r); // get the most up to date replica let (max_replica_idx, max_local_tail) = self.log.find_max_tail(); // copy data from existing replica - let replica_locked = self.replicas[max_replica_idx] + std::dbg!(&max_replica_idx, &max_local_tail); + let replica_locked = self.replicas[&max_replica_idx] .data .read(log_token.0) .clone(); - let new_replica_data = &mut self.replicas[replica_id].data.write(log_token.0); + let new_replica_data = &mut self.replicas[&replica_id].data.write(log_token.0); **new_replica_data = replica_locked; // push ltail entry for new replica - self.log.ltails[replica_id].store(max_local_tail, Ordering::Relaxed); + self.log.ltails[&replica_id].store(max_local_tail, Ordering::Relaxed); // find and push existing lmask entry for new replica - let lmask_status = self.log.lmasks[max_replica_idx].get(); - self.log.lmasks[replica_id].set(lmask_status); + let lmask_status = self.log.lmasks[&max_replica_idx].get(); + self.log.lmasks[&replica_id].set(lmask_status); + + // Register the the replica with a thread_id and return the ThreadToken + Ok(replica_id) + } + pub fn remove_replica( + &mut self, + replica_id: ReplicaId, + ) -> Result { + self.replicas.remove(&replica_id); Ok(replica_id) } + + fn new_replica_id(&self) -> usize { + // identify max replica id + let max_key = self.replicas.iter().max_by_key(|&(k, _v)| k); + if let Some(key) = max_key { + *key.0 + 1 + } else { + 0 + } + } } impl NodeReplicated @@ -546,7 +572,7 @@ where /// ``` pub fn register(&self, replica_id: ReplicaId) -> Option { if replica_id < self.replicas.len() { - let rtkn = self.replicas[replica_id].register()?; + let rtkn = self.replicas[&replica_id].register()?; Some(ThreadToken::new(replica_id, rtkn)) } else { None @@ -562,9 +588,9 @@ where if let Some(combiner_lock) = cl { // We expect to have already enqueued the op (it's a re-try since have the combiner lock), // so technically its not needed to supply it again (but we currently do it anyways...) - self.replicas[tkn.rid].execute_mut_locked(&self.log, op, tkn.rtkn, combiner_lock) + self.replicas[&tkn.rid].execute_mut_locked(&self.log, op, tkn.rtkn, combiner_lock) } else { - self.replicas[tkn.rid].execute_mut(&self.log, op, tkn.rtkn) + self.replicas[&tkn.rid].execute_mut(&self.log, op, tkn.rtkn) } } @@ -651,10 +677,10 @@ where { assert_ne!(stuck_ridx, tkn.rid); let _aftkn = self.affinity_mngr.switch(stuck_ridx); - self.replicas[stuck_ridx].sync(&self.log); + self.replicas[&stuck_ridx].sync(&self.log); // Affinity is reverted here, _aftkn is dropped. } - return self.replicas[tkn.rid] + return self.replicas[&tkn.rid] .get_response(&self.log, tkn.rtkn.tid()) .expect("GcFailed has to produced a response"); } @@ -664,7 +690,7 @@ where debug_assert_ne!(ridx, tkn.rid); //warn!("execute_mut ResolveOp::Sync {}", ridx); let _aftkn = self.affinity_mngr.switch(ridx); - self.replicas[ridx].try_sync(&self.log); + self.replicas[&ridx].try_sync(&self.log); // _aftkn is dropped here, reverting affinity change } } @@ -679,9 +705,9 @@ where ) -> Result<::Response, (ReplicaError, ::ReadOperation<'rop>)> { if let Some(combiner_lock) = cl { - self.replicas[tkn.rid].execute_locked(&self.log, op, tkn.rtkn, combiner_lock) + self.replicas[&tkn.rid].execute_locked(&self.log, op, tkn.rtkn, combiner_lock) } else { - self.replicas[tkn.rid].execute(&self.log, op, tkn.rtkn) + self.replicas[&tkn.rid].execute(&self.log, op, tkn.rtkn) } } @@ -771,7 +797,7 @@ where // Holds trivially because of all the other asserts in this function debug_assert_ne!(ridx, tkn.rid); let _aftkn = self.affinity_mngr.switch(ridx); - self.replicas[ridx].try_sync(&self.log); + self.replicas[&ridx].try_sync(&self.log); // _aftkn is dropped here, reverting affinity change } } @@ -812,7 +838,7 @@ where #[doc(hidden)] pub fn sync(&self, tkn: ThreadToken) { - self.replicas[tkn.rid].sync(&self.log) + self.replicas[&tkn.rid].sync(&self.log) } } @@ -879,7 +905,7 @@ mod test { let added_replica = async_ds.add_replica().unwrap(); - let added_replica_data = async_ds.replicas[added_replica].data.read(0).junk; + let added_replica_data = async_ds.replicas[&added_replica].data.read(0).junk; assert_eq!(3, added_replica_data); } @@ -895,9 +921,9 @@ mod test { let _ = async_ds.execute_mut(5, ttkn_a); let _ = async_ds.execute_mut(2, ttkn_a); - let replica_lmask = async_ds.log.lmasks[0].get(); + let replica_lmask = async_ds.log.lmasks[&0].get(); let added_replica = async_ds.add_replica().unwrap(); - let added_replica_lmask = async_ds.log.lmasks[added_replica].get(); + let added_replica_lmask = async_ds.log.lmasks[&added_replica].get(); assert_eq!(replica_lmask, added_replica_lmask); } @@ -913,9 +939,89 @@ mod test { let _ = async_ds.execute_mut(5, ttkn_a); let _ = async_ds.execute_mut(2, ttkn_a); - let replica_ltails = async_ds.log.ltails[0].load(Ordering::Relaxed); + let replica_ltails = async_ds.log.ltails[&0].load(Ordering::Relaxed); let added_replica = async_ds.add_replica().unwrap(); - let added_replica_ltails = async_ds.log.ltails[added_replica].load(Ordering::Relaxed); + let added_replica_ltails = async_ds.log.ltails[&added_replica].load(Ordering::Relaxed); assert_eq!(replica_ltails, added_replica_ltails); } + + #[test] + fn test_add_replica_adds_replicas_in_order() { + let replicas = NonZeroUsize::new(4).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + let ttkn_b = async_ds.register(1).expect("Unable to register with log"); + let ttkn_c = async_ds.register(2).expect("Unable to register with log"); + let _ttkn_d = async_ds.register(3).expect("Unable to register with log"); + + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_b); + let _ = async_ds.execute_mut(2, ttkn_c); + + let _ = async_ds.remove_replica(1); + let _ = async_ds.remove_replica(0); + + let added_replica_four = async_ds.add_replica().unwrap(); + + assert_eq!(added_replica_four, 4); + } + + #[test] + fn test_remove_replica_returns_replica_id() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let _ = async_ds.register(0).expect("Unable to register with log"); + let replica_id = async_ds.remove_replica(0); + assert_eq!(replica_id.unwrap(), 0); + } + + #[test] + fn test_remove_replica_removes_replica_in_order() { + let replicas = NonZeroUsize::new(2).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + let ttkn_b = async_ds.register(1).expect("Unable to register with log"); + + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(4, ttkn_b); + + let _ = async_ds.remove_replica(999); + } + + #[test] + fn test_remove_replica_error_on_invalid_replica_id_move() { + let replicas = NonZeroUsize::new(2).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + let ttkn_b = async_ds.register(1).expect("Unable to register with log"); + + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(4, ttkn_b); + + let _ = async_ds.remove_replica(999); + + assert_eq!(async_ds.replicas.len(), 2); + } + #[test] + fn test_remove_replica_() { + let replicas = NonZeroUsize::new(2).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + let ttkn_b = async_ds.register(1).expect("Unable to register with log"); + + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(4, ttkn_b); + + let _ = async_ds.remove_replica(999); + + assert_eq!(async_ds.replicas.len(), 2); + } + + // Check Lock before removing + // Check replica integrity after deletion + // Ensure correct keys + + // ltails and lmasks need hashmap managment + // remove ltails and lmask entries on removal + // } From b3112c1b67b7adc1b73f50869befa79bc7cce117 Mon Sep 17 00:00:00 2001 From: Marc Paquette Date: Thu, 22 Dec 2022 13:39:35 -0500 Subject: [PATCH 05/11] Implement remove_replica for NR Thread registration and deregistration remains outstanding and further works needs to be done. --- node-replication/src/log.rs | 10 ++++- node-replication/src/nr/mod.rs | 63 ++++++++++++++++--------------- node-replication/src/nr/rwlock.rs | 2 +- 3 files changed, 42 insertions(+), 33 deletions(-) diff --git a/node-replication/src/log.rs b/node-replication/src/log.rs index a14287d9..e157b0c2 100644 --- a/node-replication/src/log.rs +++ b/node-replication/src/log.rs @@ -473,7 +473,6 @@ where // Find the local tail across all replicas. for idx in 1..MAX_REPLICAS_PER_LOG { let cur_local_tail = self.ltails[&(idx - 1)].load(Ordering::Relaxed); - //info!("Replica {} cur_local_tail {}.", idx - 1, cur_local_tail); if cur_local_tail > max_local_tail { max_local_tail = cur_local_tail; @@ -484,6 +483,14 @@ where (max_replica_idx, max_local_tail) } + /// Removes log entries for associated replicas. This is to allow dynamic adding and removing + /// of replicas for memory efficiency & performance purposes. + pub(crate) fn remove_replica(&mut self, log_token: LogToken) { + self.replica_inventory.compare_and_swap(log_token.0,true,false,Ordering::Relaxed); + self.ltails.insert(log_token.0, CachePadded::new(AtomicUsize::new(0))); + self.lmasks.insert(log_token.0, CachePadded::new(Cell::new(true))); + } + /// Resets the log. This is required for microbenchmarking the log; with /// this method, we can re-use the log across experimental runs without /// having to re-allocate the log over and over again (which blows up the @@ -659,7 +666,6 @@ mod tests { } for i in 0..MAX_REPLICAS_PER_LOG { - std::dbg!(i); assert_eq!(l.lmasks[&i].get(), true); } } diff --git a/node-replication/src/nr/mod.rs b/node-replication/src/nr/mod.rs index 416e3235..3d514d3f 100644 --- a/node-replication/src/nr/mod.rs +++ b/node-replication/src/nr/mod.rs @@ -446,7 +446,7 @@ where /// let added_replica_data = async_ds.replicas[added_replica].data.read(0).junk; /// assert_eq!(2, added_replica_data); /// ``` - pub fn add_replica(&mut self) -> Result { + pub fn add_replica(&mut self) -> Result { let log_token = self .log .register() @@ -459,7 +459,6 @@ where // aff_tkn is dropped here }; - // TODO: Need better algo to establish replica_id let replica_id = self.new_replica_id(); if self.replicas.contains_key(&replica_id) { @@ -472,7 +471,6 @@ where let (max_replica_idx, max_local_tail) = self.log.find_max_tail(); // copy data from existing replica - std::dbg!(&max_replica_idx, &max_local_tail); let replica_locked = self.replicas[&max_replica_idx] .data .read(log_token.0) @@ -488,13 +486,19 @@ where self.log.lmasks[&replica_id].set(lmask_status); // Register the the replica with a thread_id and return the ThreadToken - Ok(replica_id) + let registered_replica = self.register(replica_id); + + match registered_replica { + Some(thread_token) => Ok(thread_token), + None => Err(NodeReplicatedError::DuplicateReplica), + } } pub fn remove_replica( &mut self, replica_id: ReplicaId, ) -> Result { + self.log.remove_replica(log::LogToken(replica_id)); self.replicas.remove(&replica_id); Ok(replica_id) } @@ -541,10 +545,7 @@ where /// /// # Arguments /// - /// - `replica_id`: Which replica the thread should be registered with. This - /// should be less than the `num_replicas` argument provided in the - /// constructor (see [`NodeReplicated::new`]). In most cases, `replica_id` - /// will correspond to the NUMA node that the thread is running on. + /// - `replica_id`: Which replica the thread should be registered with. /// /// # Example /// @@ -567,11 +568,10 @@ where /// /// let replicas = NonZeroUsize::new(2).unwrap(); /// let nrht = NodeReplicated::::new(replicas, |_| { 0 }).unwrap(); - /// let ttkn = nrht.register(0).unwrap(); - /// assert!(nrht.register(replicas.get()).is_none()); + /// assert!(nrht.register(0).is_some()); /// ``` pub fn register(&self, replica_id: ReplicaId) -> Option { - if replica_id < self.replicas.len() { + if self.replicas.len() < MAX_REPLICAS_PER_LOG { let rtkn = self.replicas[&replica_id].register()?; Some(ThreadToken::new(replica_id, rtkn)) } else { @@ -905,7 +905,7 @@ mod test { let added_replica = async_ds.add_replica().unwrap(); - let added_replica_data = async_ds.replicas[&added_replica].data.read(0).junk; + let added_replica_data = async_ds.replicas[&added_replica.rid].data.read(0).junk; assert_eq!(3, added_replica_data); } @@ -923,7 +923,7 @@ mod test { let replica_lmask = async_ds.log.lmasks[&0].get(); let added_replica = async_ds.add_replica().unwrap(); - let added_replica_lmask = async_ds.log.lmasks[&added_replica].get(); + let added_replica_lmask = async_ds.log.lmasks[&added_replica.rid].get(); assert_eq!(replica_lmask, added_replica_lmask); } @@ -941,7 +941,7 @@ mod test { let replica_ltails = async_ds.log.ltails[&0].load(Ordering::Relaxed); let added_replica = async_ds.add_replica().unwrap(); - let added_replica_ltails = async_ds.log.ltails[&added_replica].load(Ordering::Relaxed); + let added_replica_ltails = async_ds.log.ltails[&added_replica.rid].load(Ordering::Relaxed); assert_eq!(replica_ltails, added_replica_ltails); } @@ -959,11 +959,11 @@ mod test { let _ = async_ds.execute_mut(2, ttkn_c); let _ = async_ds.remove_replica(1); - let _ = async_ds.remove_replica(0); + let _ = async_ds.remove_replica(2); let added_replica_four = async_ds.add_replica().unwrap(); - assert_eq!(added_replica_four, 4); + assert_eq!(added_replica_four.rid, 4); } #[test] @@ -976,7 +976,7 @@ mod test { } #[test] - fn test_remove_replica_removes_replica_in_order() { + fn test_remove_replica_noop_on_invalid_replica_id_removal() { let replicas = NonZeroUsize::new(2).unwrap(); let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); let ttkn_a = async_ds.register(0).expect("Unable to register with log"); @@ -985,22 +985,30 @@ mod test { let _ = async_ds.execute_mut(1, ttkn_a); let _ = async_ds.execute_mut(4, ttkn_b); - let _ = async_ds.remove_replica(999); + let _ = async_ds.remove_replica(15); + + assert_eq!(async_ds.replicas.len(), 2); } #[test] - fn test_remove_replica_error_on_invalid_replica_id_move() { - let replicas = NonZeroUsize::new(2).unwrap(); + fn test_remove_replica_syncs_replica_data() { + let replicas = NonZeroUsize::new(1).unwrap(); let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); - let ttkn_b = async_ds.register(1).expect("Unable to register with log"); + //add a few iterations of log entries let _ = async_ds.execute_mut(1, ttkn_a); - let _ = async_ds.execute_mut(4, ttkn_b); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); - let _ = async_ds.remove_replica(999); + let added_replica = async_ds.add_replica().unwrap(); - assert_eq!(async_ds.replicas.len(), 2); + let _ = async_ds.remove_replica(0); + let _ = async_ds.execute_mut(5, added_replica); + + let added_replica_data = async_ds.replicas[&added_replica.rid].data.read(0).junk; + assert_eq!(4, added_replica_data); } #[test] fn test_remove_replica_() { @@ -1012,16 +1020,11 @@ mod test { let _ = async_ds.execute_mut(1, ttkn_a); let _ = async_ds.execute_mut(4, ttkn_b); - let _ = async_ds.remove_replica(999); + let _ = async_ds.remove_replica(15); assert_eq!(async_ds.replicas.len(), 2); } // Check Lock before removing // Check replica integrity after deletion - // Ensure correct keys - - // ltails and lmasks need hashmap managment - // remove ltails and lmask entries on removal - // } diff --git a/node-replication/src/nr/rwlock.rs b/node-replication/src/nr/rwlock.rs index 802dfeaa..1d895ab3 100644 --- a/node-replication/src/nr/rwlock.rs +++ b/node-replication/src/nr/rwlock.rs @@ -67,7 +67,7 @@ pub struct ReadGuard<'a, T: Sized + Sync + Clone + 'a> { /// A write-guard that can be used to write to the underlying data structure. All /// reads will be blocked until this is dropped. -pub struct WriteGuard<'a, T: Sized + Sync + Clone + 'a> { +pub struct WriteGuard<'a, T: Sized + Sync + Clone + 'a> { /// A reference to the Rwlock wrapping the data-structure. lock: &'a RwLock, } From 3809090ba72d6a9e197fcf20db0ab56f0c16a059 Mon Sep 17 00:00:00 2001 From: Marc Paquette Date: Fri, 13 Jan 2023 14:18:30 -0500 Subject: [PATCH 06/11] Add tests for remove_replicate --- node-replication/src/log.rs | 35 ++++++++++++++++++++++++++++++---- node-replication/src/nr/mod.rs | 17 +++++++++-------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/node-replication/src/log.rs b/node-replication/src/log.rs index e157b0c2..653d8a51 100644 --- a/node-replication/src/log.rs +++ b/node-replication/src/log.rs @@ -485,10 +485,13 @@ where /// Removes log entries for associated replicas. This is to allow dynamic adding and removing /// of replicas for memory efficiency & performance purposes. - pub(crate) fn remove_replica(&mut self, log_token: LogToken) { - self.replica_inventory.compare_and_swap(log_token.0,true,false,Ordering::Relaxed); - self.ltails.insert(log_token.0, CachePadded::new(AtomicUsize::new(0))); - self.lmasks.insert(log_token.0, CachePadded::new(Cell::new(true))); + pub(crate) fn remove_log_replica(&mut self, log_token: LogToken) { + self.replica_inventory + .compare_and_swap(log_token.0, true, false, Ordering::Relaxed); + self.ltails + .insert(log_token.0, CachePadded::new(AtomicUsize::new(0))); + self.lmasks + .insert(log_token.0, CachePadded::new(Cell::new(true))); } /// Resets the log. This is required for microbenchmarking the log; with @@ -778,4 +781,28 @@ mod tests { assert_eq!(l.find_min_tail(), (1, 224)) } + + // Test to validate that remove_replica operates correctly + #[test] + fn test_remove_replica() { + let mut log = Log::::default(); + let mut replicas: Vec = Vec::new(); + for _i in 0..MAX_REPLICAS_PER_LOG { + replicas.insert(0, log.register().unwrap()); + } + + let chosen_one = replicas.pop().unwrap(); + let log_token = &chosen_one.0.clone(); + + log.remove_log_replica(chosen_one); + + // replica inventory to be false for the deleted entry + assert_eq!(log.replica_inventory.get_bit(*log_token), false); + + // ltails to be zerod out + assert_eq!(log.ltails[log_token].load(Ordering::Relaxed), 0); + + // lmasks to be set to true + assert_eq!(log.lmasks[log_token].get(), true); + } } diff --git a/node-replication/src/nr/mod.rs b/node-replication/src/nr/mod.rs index 3d514d3f..dd32640e 100644 --- a/node-replication/src/nr/mod.rs +++ b/node-replication/src/nr/mod.rs @@ -287,6 +287,7 @@ pub enum NodeReplicatedError { /// Not enough memory to create a [`NodeReplicated`] instance. OutOfMemory, DuplicateReplica, + UnableToRemoveReplica, } impl From for NodeReplicatedError { @@ -314,12 +315,6 @@ pub struct NodeReplicated { log: Log, replicas: HashMap>, // thread_routing: HashMap, - // contexts: Vec::WriteOperation, ::Response>>, // reroute - // threads' - // work - // queue to - // new - // replica affinity_mngr: AffinityManager, } @@ -498,8 +493,14 @@ where &mut self, replica_id: ReplicaId, ) -> Result { - self.log.remove_replica(log::LogToken(replica_id)); - self.replicas.remove(&replica_id); + self.log.remove_log_replica(log::LogToken(replica_id)); + + if self.replicas.contains_key(&replica_id) { + self.replicas.remove(&replica_id); + } else { + return Err(NodeReplicatedError::UnableToRemoveReplica); + } + Ok(replica_id) } From 02ad960660a504fb350b0621c232f130aba67bde Mon Sep 17 00:00:00 2001 From: Marc Paquette Date: Fri, 13 Jan 2023 14:38:37 -0500 Subject: [PATCH 07/11] Add Clone to failing benchmarks to have tests pass. --- node-replication/benches/vspace.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node-replication/benches/vspace.rs b/node-replication/benches/vspace.rs index e3d447f7..a56f8480 100644 --- a/node-replication/benches/vspace.rs +++ b/node-replication/benches/vspace.rs @@ -135,6 +135,7 @@ impl fmt::Display for MapAction { } } +#[derive(Clone)] pub struct VSpace { pub pml4: Pin>, allocs: Vec<(*mut u8, usize)>, @@ -487,7 +488,7 @@ enum OpcodeRd { Identify(u64), } -#[derive(Default)] +#[derive(Default, Clone)] struct VSpaceDispatcher { vspace: VSpace, } From e2a8357277c546f99fee3b3671697cf147e84f8e Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Fri, 3 Feb 2023 16:11:19 -0500 Subject: [PATCH 08/11] Add an example to test add_replica and remove_replica. --- .../examples/nr_dynamic_replica.rs | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 node-replication/examples/nr_dynamic_replica.rs diff --git a/node-replication/examples/nr_dynamic_replica.rs b/node-replication/examples/nr_dynamic_replica.rs new file mode 100644 index 00000000..c61152b9 --- /dev/null +++ b/node-replication/examples/nr_dynamic_replica.rs @@ -0,0 +1,142 @@ +// Copyright © 2019-2022 VMware, Inc. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 OR MIT + +//! An example that dynamically varies the amount of replicas over time. +#![feature(generic_associated_types)] + +use std::collections::HashMap; +use std::num::NonZeroUsize; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::RwLock; + +use node_replication::nr::Dispatch; +use node_replication::nr::NodeReplicated; + +/// The node-replicated hashmap uses a std hashmap internally. +#[derive(Default, Clone)] +struct NrHashMap { + storage: HashMap, +} + +/// We support a mutable put operation to insert a value for a given key. +#[derive(Clone, Debug, PartialEq)] +enum Modify { + /// Insert (key, value) + Put(u64, u64), +} + +/// We support an immutable read operation to lookup a key from the hashmap. +#[derive(Clone, Debug, PartialEq)] +enum Access { + // Retrieve key. + Get(u64), +} + +/// The Dispatch trait executes `ReadOperation` (our Access enum) and `WriteOperation` +/// (our `Modify` enum) against the replicated data-structure. +impl Dispatch for NrHashMap { + type ReadOperation<'rop> = Access; + type WriteOperation = Modify; + type Response = Option; + + /// The `dispatch` function contains the logic for the immutable operations. + fn dispatch<'rop>(&self, op: Self::ReadOperation<'rop>) -> Self::Response { + match op { + Access::Get(key) => self.storage.get(&key).map(|v| *v), + } + } + + /// The `dispatch_mut` function contains the logic for the mutable operations. + fn dispatch_mut(&mut self, op: Self::WriteOperation) -> Self::Response { + match op { + Modify::Put(key, value) => self.storage.insert(key, value), + } + } +} + +fn main() { + // Setup logging and some constants. + let _r = env_logger::try_init(); + + const NUM_THREADS: usize = 4; + + // We start with 4 replicas. + let initial_replicas: NonZeroUsize = NonZeroUsize::new(4).unwrap(); + let finished = Arc::new(AtomicBool::new(false)); + + // The node-replicated hashmap is wrapped in an Arc> to allow for + // the RwLock is currently needed because `add_replica` and `remove_replica` + // are not yet thread-safe. We will remove this in the future: + let nrht = Arc::new(RwLock::new( + NodeReplicated::::new(initial_replicas, |_rid| 0).unwrap(), + )); + + // The worker threads will just issue operations until the `finished` flag is set. + let thread_loop = + |replica: Arc>>, ttkn, finished: Arc| { + let mut i = 0; + while !finished.load(Ordering::Relaxed) { + let _r = match i % 2 { + 0 => replica + .read() + .unwrap() + .execute_mut(Modify::Put(i, i + 1), ttkn), + 1 => { + let response = replica.read().unwrap().execute(Access::Get(i - 1), ttkn); + assert_eq!(response, Some(i)); + response + } + _ => unreachable!(), + }; + i += 1; + + if i % 1_000_000 == 0 { + println!("Thread {:?} executed {} operations", ttkn, i); + } + } + }; + + let mut threads = Vec::with_capacity(NUM_THREADS); + for t in 0..NUM_THREADS { + let nrht_cln = nrht.clone(); + let finished = finished.clone(); + threads.push(std::thread::spawn(move || { + let ttkn = nrht_cln + .read() + .unwrap() + .register(t % initial_replicas) + .expect( + format!( + "Unable to register thread with replica {}", + t % initial_replicas + ) + .as_str(), + ); + thread_loop(nrht_cln, ttkn, finished); + })); + } + + // First we move from 4 to 1 replica by removing one every 3 seconds: + for next_rid in &[3, 2, 1] { + std::thread::sleep(std::time::Duration::from_secs(3)); + println!("About to remove replica {:?}", next_rid); + let x = nrht.write().unwrap().remove_replica(*next_rid).unwrap(); + println!("Removed replica {:?}", x); + } + + // Then we increase back from 1 to 4 replicas: + for next_rid in &[1, 2, 3] { + std::thread::sleep(std::time::Duration::from_secs(3)); + println!("About to add replica {:?}", next_rid); + let x = nrht.write().unwrap().add_replica().unwrap(); + println!("Added replica {:?}", x); + } + + finished.store(true, Ordering::Relaxed); + // Wait for all the threads to finish + for thread in threads { + thread.join().unwrap(); + } +} From c5b4ea4a7bc96f0b75b58a6b679b494a7043f74b Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Fri, 3 Feb 2023 16:22:42 -0500 Subject: [PATCH 09/11] Add two replicas first. --- .../examples/nr_dynamic_replica.rs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/node-replication/examples/nr_dynamic_replica.rs b/node-replication/examples/nr_dynamic_replica.rs index c61152b9..1205763d 100644 --- a/node-replication/examples/nr_dynamic_replica.rs +++ b/node-replication/examples/nr_dynamic_replica.rs @@ -118,15 +118,33 @@ fn main() { })); } - // First we move from 4 to 1 replica by removing one every 3 seconds: - for next_rid in &[3, 2, 1] { + // Initially, we go from 4 to 6 replicas: + for next_rid in &[4, 5] { + std::thread::sleep(std::time::Duration::from_secs(3)); + println!("About to add a new replica {:?}", next_rid); + let r = nrht.write().unwrap().add_replica().unwrap(); + println!("Added replica {:?}", r); + // We also spawn a thread that accesses that new replica: + let nrht_cln = nrht.clone(); + let finished = finished.clone(); + threads.push(std::thread::spawn(move || { + let ttkn = + nrht_cln.read().unwrap().register(*next_rid).expect( + format!("Unable to register thread with replica {}", next_rid).as_str(), + ); + thread_loop(nrht_cln, ttkn, finished); + })); + } + + // Then we go from 6 to 1 replica by removing one every 3 seconds: + for next_rid in &[5, 4, 3, 2, 1] { std::thread::sleep(std::time::Duration::from_secs(3)); println!("About to remove replica {:?}", next_rid); let x = nrht.write().unwrap().remove_replica(*next_rid).unwrap(); println!("Removed replica {:?}", x); } - // Then we increase back from 1 to 4 replicas: + // Then we increase again from 1 to 4 replicas: for next_rid in &[1, 2, 3] { std::thread::sleep(std::time::Duration::from_secs(3)); println!("About to add replica {:?}", next_rid); From 90679625ee5b04d3c1419cca32fbfe4d933782f7 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Tue, 11 Apr 2023 12:16:56 -0700 Subject: [PATCH 10/11] Breakout context, wip thread routing. --- bench_utils/src/cnr_mkbench.rs | 27 +-- node-replication/Cargo.toml | 2 +- node-replication/src/lib.rs | 1 + node-replication/src/log.rs | 2 +- node-replication/src/nr/mod.rs | 280 ++++++++++++++++++++---------- node-replication/src/nr/rwlock.rs | 2 +- node-replication/src/replica.rs | 4 +- 7 files changed, 213 insertions(+), 105 deletions(-) diff --git a/bench_utils/src/cnr_mkbench.rs b/bench_utils/src/cnr_mkbench.rs index 7af27093..73392057 100644 --- a/bench_utils/src/cnr_mkbench.rs +++ b/bench_utils/src/cnr_mkbench.rs @@ -33,6 +33,7 @@ use tokio::runtime::Runtime; use node_replication::cnr::{ Dispatch, Log, LogMetaData, Replica, ReplicaToken, MAX_REPLICAS_PER_LOG, }; +#[cfg(feature = "async")] use node_replication::nr::reusable_box::ReusableBoxFuture; use crate::benchmark::*; @@ -783,18 +784,24 @@ where duration ); - let mut futures: Vec< - ReusableBoxFuture<<::D as Dispatch>::Response>, - > = Vec::with_capacity(batch_size); + #[cfg(feature = "async")] + { + let mut futures: Vec< + ReusableBoxFuture<<::D as Dispatch>::Response>, + > = Vec::with_capacity(batch_size); - for _i in 0..batch_size { - let resp = match &operations[0] { - Operation::ReadOperation(op) => replica.exec_ro(*op, replica_token), - Operation::WriteOperation(op) => replica.exec(*op, replica_token), - }; - futures.push(ReusableBoxFuture::new(async { resp })); + for _i in 0..batch_size { + let resp = match &operations[0] { + Operation::ReadOperation(op) => { + replica.exec_ro(*op, replica_token) + } + Operation::WriteOperation(op) => { + replica.exec(*op, replica_token) + } + }; + futures.push(ReusableBoxFuture::new(async { resp })); + } } - let mut operations_per_second: Vec = Vec::with_capacity(32); let mut operations_completed: usize = 0; let mut iter: usize = 0; diff --git a/node-replication/Cargo.toml b/node-replication/Cargo.toml index eef2d7d8..8754523c 100644 --- a/node-replication/Cargo.toml +++ b/node-replication/Cargo.toml @@ -55,7 +55,7 @@ futures = { version = "0.3.17" } debug = true [features] -default = ["async"] +default = [] async = [] # Benchmark features (not intended for public use, no impact on library code) diff --git a/node-replication/src/lib.rs b/node-replication/src/lib.rs index f0c6adf2..0eb009c4 100644 --- a/node-replication/src/lib.rs +++ b/node-replication/src/lib.rs @@ -36,6 +36,7 @@ pub mod replica; pub mod cnr; pub mod nr; +pub mod simdr; #[cfg(doctest)] mod test_readme { diff --git a/node-replication/src/log.rs b/node-replication/src/log.rs index 653d8a51..38d34706 100644 --- a/node-replication/src/log.rs +++ b/node-replication/src/log.rs @@ -52,7 +52,7 @@ const_assert!(DEFAULT_LOG_BYTES.is_power_of_two()); /// our system Can't make it arbitrarily high as it will lead to more memory /// overheads / bigger structs. #[cfg(not(loom))] -pub const MAX_REPLICAS_PER_LOG: usize = 16; +pub const MAX_REPLICAS_PER_LOG: usize = 8; #[cfg(loom)] // Otherwise uses too much stack space wich crashes in loom... pub const MAX_REPLICAS_PER_LOG: usize = 3; diff --git a/node-replication/src/nr/mod.rs b/node-replication/src/nr/mod.rs index dd32640e..17c1f250 100644 --- a/node-replication/src/nr/mod.rs +++ b/node-replication/src/nr/mod.rs @@ -71,12 +71,13 @@ //! ``` extern crate std; -use std::collections::HashMap; -use alloc::boxed::Box; +use alloc::collections::BTreeMap; +use alloc::{boxed::Box, vec::Vec}; use core::fmt::Debug; use core::marker::Sync; use core::num::NonZeroUsize; +use replica::MAX_THREADS_PER_REPLICA; #[cfg(feature = "async")] use reusable_box::ReusableBoxFuture; @@ -98,9 +99,12 @@ pub mod rwlock; #[path = "loom_rwlock.rs"] pub mod rwlock; +use crate::nr::context::Context; pub use log::{Log, MAX_REPLICAS_PER_LOG}; pub use replica::{CombinerLock, Replica, ReplicaError, ReplicaId, ReplicaToken}; +const MAX_THREADS_PER_INSTANCE: usize = MAX_REPLICAS_PER_LOG * MAX_THREADS_PER_REPLICA; + /// Trait that a (single-threaded) data structure must implement to be usable /// with NR. /// @@ -147,7 +151,7 @@ pub trait Dispatch { /// For maximum type-safety this would be an affine type, then we'd have to /// return it again in `execute` and `execute_mut`. However it feels like this /// would hurt API ergonomics a lot. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct ThreadToken { /// The replica this thread is registered with (reading from). /// @@ -170,6 +174,10 @@ impl ThreadToken { pub fn new(rid: ReplicaId, rtkn: ReplicaToken) -> Self { Self { rid, rtkn } } + + pub fn gtid(&self) -> usize { + self.rid * MAX_THREADS_PER_REPLICA + self.rtkn.0 + } } /// To make it harder to use the same ThreadToken on multiple threads. @@ -313,8 +321,14 @@ impl From for NodeReplicatedError { /// which are behind automatically. pub struct NodeReplicated { log: Log, - replicas: HashMap>, - // thread_routing: HashMap, + replicas: BTreeMap>, + /// List of per-thread contexts. Threads buffer write operations here when + /// they cannot perform flat combining (because another thread might already + /// be doing so). + /// + /// The vector is initialized with [`MAX_THREADS_PER_REPLICA`] [`Context`] + /// elements. + contexts: Vec::WriteOperation, ::Response>>, affinity_mngr: AffinityManager, } @@ -399,7 +413,12 @@ where let affinity_mngr = AffinityManager::new(Box::try_new(chg_mem_affinity)?); let log = Log::new_with_bytes(log_size, ()); - let mut replicas = HashMap::with_capacity(MAX_REPLICAS_PER_LOG); + let mut contexts = Vec::with_capacity(MAX_THREADS_PER_INSTANCE); + for _idx in 0..MAX_THREADS_PER_INSTANCE { + contexts.push(Default::default()); + } + + let mut replicas = BTreeMap::new(); for replica_id in 0..num_replicas.get() { let log_token = log @@ -417,6 +436,7 @@ where } Ok(NodeReplicated { + contexts, log, replicas, affinity_mngr, @@ -441,7 +461,7 @@ where /// let added_replica_data = async_ds.replicas[added_replica].data.read(0).junk; /// assert_eq!(2, added_replica_data); /// ``` - pub fn add_replica(&mut self) -> Result { + pub fn add_replica(&mut self) -> Result<(), NodeReplicatedError> { let log_token = self .log .register() @@ -481,12 +501,13 @@ where self.log.lmasks[&replica_id].set(lmask_status); // Register the the replica with a thread_id and return the ThreadToken - let registered_replica = self.register(replica_id); + //let registered_replica = self.register(replica_id); - match registered_replica { + /*match registered_replica { Some(thread_token) => Ok(thread_token), None => Err(NodeReplicatedError::DuplicateReplica), - } + }*/ + Ok(()) } pub fn remove_replica( @@ -574,7 +595,9 @@ where pub fn register(&self, replica_id: ReplicaId) -> Option { if self.replicas.len() < MAX_REPLICAS_PER_LOG { let rtkn = self.replicas[&replica_id].register()?; - Some(ThreadToken::new(replica_id, rtkn)) + let ttkn = ThreadToken::new(replica_id, rtkn); + + Some(ttkn) } else { None } @@ -650,6 +673,8 @@ where op: ::WriteOperation, tkn: ThreadToken, ) -> ::Response { + while !self.make_pending(op.clone(), tkn.gtid()) {} + /// An enum to keep track of a stack of operations we should do on Replicas. /// /// e.g., either `Sync` an out-of-date, behind replica, or call `execute_locked` or @@ -681,9 +706,11 @@ where self.replicas[&stuck_ridx].sync(&self.log); // Affinity is reverted here, _aftkn is dropped. } - return self.replicas[&tkn.rid] - .get_response(&self.log, tkn.rtkn.tid()) - .expect("GcFailed has to produced a response"); + + //return self.replicas[&tkn.rid] + //.get_response(&self.log, tkn.rtkn.tid()) + //.expect("GcFailed has to produced a response"); + return self.get_response(tkn); } }, ResolveOp::Sync(ridx) => { @@ -837,20 +864,90 @@ where resp.set(async move { self.execute(op, tkn) }); } + fn select_replica(&self, tkn: ThreadToken) -> ReplicaId { + if self.replicas.contains_key(&tkn.rid) { + // Use the replica where the thread originally registered with if it + // exists + tkn.rid + } else { + // Distribute evenly across remaining replicas + let key_idx = tkn.rtkn.0 % self.replicas.len(); + *self.replicas.keys().nth(key_idx).unwrap() + } + } + + /// Enqueues an operation inside a thread local context. Returns a boolean + /// indicating whether the operation was enqueued (true) or not (false). + #[inline(always)] + fn make_pending(&self, op: ::WriteOperation, idx: usize) -> bool { + self.contexts[idx - 1].enqueue(op, ()) + } + + /// Busy waits until a response is available within the thread's context. + /// + /// # Arguments + /// - `slog`: The shared log. + /// - `idx`: identifies this thread. + pub(crate) fn get_response(&self, tkn: ThreadToken) -> ::Response { + let mut iter = 0; + let interval = 1 << 29; + + // Keep trying to retrieve a response from the thread context. After trying `interval` + // times with no luck, try to perform flat combining to make some progress. + loop { + let r = self.contexts[tkn.gtid()].res(); + if let Some(resp) = r { + return resp; + } + + iter += 1; + + if iter == interval { + self.sync(tkn); + iter = 0; + } + } + } + #[doc(hidden)] pub fn sync(&self, tkn: ThreadToken) { self.replicas[&tkn.rid].sync(&self.log) } } -#[cfg(feature = "async")] #[cfg(test)] mod test { use super::replica::test::Data; + #[cfg(feature = "async")] use super::reusable_box::ReusableBoxFuture; use super::*; use core::num::NonZeroUsize; + #[test] + fn select_correct_replica() { + fn mkttkn(rid: usize, tid: usize) -> ThreadToken { + ThreadToken { + rid, + rtkn: ReplicaToken(tid), + } + } + + let replicas = NonZeroUsize::new(2).unwrap(); + let nds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + + assert_eq!(nds.select_replica(mkttkn(0, 0)), 0); + assert_eq!(nds.select_replica(mkttkn(1, 1)), 1); + // Doesn't have active replica, assign to 0 or 1: + assert_eq!(nds.select_replica(mkttkn(3, 0)), 0); + // Threads on same (inactive) replicas are split evenly among active + // replicas: + assert_eq!(nds.select_replica(mkttkn(3, 1)), 1); + assert_eq!(nds.select_replica(mkttkn(3, 2)), 0); + assert_eq!(nds.select_replica(mkttkn(4, 0)), 0); + assert_eq!(nds.select_replica(mkttkn(4, 1)), 1); + } + + #[cfg(feature = "async")] #[tokio::test] async fn test_box_reuse() { use futures::executor::block_on; @@ -892,81 +989,82 @@ mod test { } } - #[test] - fn test_add_replica_syncs_replica_data() { - let replicas = NonZeroUsize::new(1).unwrap(); - let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); - - let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + /* + #[test] + fn test_add_replica_syncs_replica_data() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); - //add a few iterations of log entries - let _ = async_ds.execute_mut(1, ttkn_a); - let _ = async_ds.execute_mut(5, ttkn_a); - let _ = async_ds.execute_mut(2, ttkn_a); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); - let added_replica = async_ds.add_replica().unwrap(); + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); - let added_replica_data = async_ds.replicas[&added_replica.rid].data.read(0).junk; + let added_replica = async_ds.add_replica().unwrap(); - assert_eq!(3, added_replica_data); - } + let added_replica_data = async_ds.replicas[&added_replica.rid].data.read(0).junk; - #[test] - fn test_add_replica_syncs_replica_lmask() { - let replicas = NonZeroUsize::new(1).unwrap(); - let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); - - let ttkn_a = async_ds.register(0).expect("Unable to register with log"); - //add a few iterations of log entries - let _ = async_ds.execute_mut(1, ttkn_a); - let _ = async_ds.execute_mut(5, ttkn_a); - let _ = async_ds.execute_mut(2, ttkn_a); - - let replica_lmask = async_ds.log.lmasks[&0].get(); - let added_replica = async_ds.add_replica().unwrap(); - let added_replica_lmask = async_ds.log.lmasks[&added_replica.rid].get(); - assert_eq!(replica_lmask, added_replica_lmask); - } + assert_eq!(3, added_replica_data); + } - #[test] - fn test_add_replica_syncs_replica_ltail() { - let replicas = NonZeroUsize::new(1).unwrap(); - let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + #[test] + fn test_add_replica_syncs_replica_lmask() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); + + let replica_lmask = async_ds.log.lmasks[&0].get(); + let added_replica = async_ds.add_replica().unwrap(); + let added_replica_lmask = async_ds.log.lmasks[&added_replica.rid].get(); + assert_eq!(replica_lmask, added_replica_lmask); + } - let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + #[test] + fn test_add_replica_syncs_replica_ltail() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); - //add a few iterations of log entries - let _ = async_ds.execute_mut(1, ttkn_a); - let _ = async_ds.execute_mut(5, ttkn_a); - let _ = async_ds.execute_mut(2, ttkn_a); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); - let replica_ltails = async_ds.log.ltails[&0].load(Ordering::Relaxed); - let added_replica = async_ds.add_replica().unwrap(); - let added_replica_ltails = async_ds.log.ltails[&added_replica.rid].load(Ordering::Relaxed); - assert_eq!(replica_ltails, added_replica_ltails); - } + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); - #[test] - fn test_add_replica_adds_replicas_in_order() { - let replicas = NonZeroUsize::new(4).unwrap(); - let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); - let ttkn_a = async_ds.register(0).expect("Unable to register with log"); - let ttkn_b = async_ds.register(1).expect("Unable to register with log"); - let ttkn_c = async_ds.register(2).expect("Unable to register with log"); - let _ttkn_d = async_ds.register(3).expect("Unable to register with log"); + let replica_ltails = async_ds.log.ltails[&0].load(Ordering::Relaxed); + let added_replica = async_ds.add_replica().unwrap(); + let added_replica_ltails = async_ds.log.ltails[&added_replica.rid].load(Ordering::Relaxed); + assert_eq!(replica_ltails, added_replica_ltails); + } - let _ = async_ds.execute_mut(1, ttkn_a); - let _ = async_ds.execute_mut(5, ttkn_b); - let _ = async_ds.execute_mut(2, ttkn_c); + #[test] + fn test_add_replica_adds_replicas_in_order() { + let replicas = NonZeroUsize::new(4).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + let ttkn_b = async_ds.register(1).expect("Unable to register with log"); + let ttkn_c = async_ds.register(2).expect("Unable to register with log"); + let _ttkn_d = async_ds.register(3).expect("Unable to register with log"); - let _ = async_ds.remove_replica(1); - let _ = async_ds.remove_replica(2); + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_b); + let _ = async_ds.execute_mut(2, ttkn_c); - let added_replica_four = async_ds.add_replica().unwrap(); + let _ = async_ds.remove_replica(1); + let _ = async_ds.remove_replica(2); - assert_eq!(added_replica_four.rid, 4); - } + let added_replica_four = async_ds.add_replica().unwrap(); + assert_eq!(added_replica_four.rid, 4); + } + */ #[test] fn test_remove_replica_returns_replica_id() { let replicas = NonZeroUsize::new(1).unwrap(); @@ -991,26 +1089,28 @@ mod test { assert_eq!(async_ds.replicas.len(), 2); } - #[test] - fn test_remove_replica_syncs_replica_data() { - let replicas = NonZeroUsize::new(1).unwrap(); - let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); + /* + #[test] + fn test_remove_replica_syncs_replica_data() { + let replicas = NonZeroUsize::new(1).unwrap(); + let mut async_ds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); - let ttkn_a = async_ds.register(0).expect("Unable to register with log"); + let ttkn_a = async_ds.register(0).expect("Unable to register with log"); - //add a few iterations of log entries - let _ = async_ds.execute_mut(1, ttkn_a); - let _ = async_ds.execute_mut(5, ttkn_a); - let _ = async_ds.execute_mut(2, ttkn_a); + //add a few iterations of log entries + let _ = async_ds.execute_mut(1, ttkn_a); + let _ = async_ds.execute_mut(5, ttkn_a); + let _ = async_ds.execute_mut(2, ttkn_a); - let added_replica = async_ds.add_replica().unwrap(); + let added_replica = async_ds.add_replica().unwrap(); - let _ = async_ds.remove_replica(0); - let _ = async_ds.execute_mut(5, added_replica); + let _ = async_ds.remove_replica(0); + let _ = async_ds.execute_mut(5, added_replica); - let added_replica_data = async_ds.replicas[&added_replica.rid].data.read(0).junk; - assert_eq!(4, added_replica_data); - } + let added_replica_data = async_ds.replicas[&added_replica.rid].data.read(0).junk; + assert_eq!(4, added_replica_data); + } + */ #[test] fn test_remove_replica_() { let replicas = NonZeroUsize::new(2).unwrap(); diff --git a/node-replication/src/nr/rwlock.rs b/node-replication/src/nr/rwlock.rs index 1d895ab3..0d99b50d 100644 --- a/node-replication/src/nr/rwlock.rs +++ b/node-replication/src/nr/rwlock.rs @@ -441,7 +441,7 @@ mod tests { #[test] fn test_parallel_readers() { let lock = Arc::new(RwLock::::default()); - let t = 100; + let t = MAX_READER_THREADS; unsafe { *lock.data.get() = t; diff --git a/node-replication/src/replica.rs b/node-replication/src/replica.rs index 4f41807b..c700dbed 100644 --- a/node-replication/src/replica.rs +++ b/node-replication/src/replica.rs @@ -11,7 +11,7 @@ use static_assertions::const_assert; /// [`crate::nr::Replica::register()`] or [`crate::cnr::Replica::register()`] /// function will start to return None. #[cfg(not(loom))] -pub const MAX_THREADS_PER_REPLICA: usize = 256; +pub const MAX_THREADS_PER_REPLICA: usize = 32; #[cfg(loom)] pub const MAX_THREADS_PER_REPLICA: usize = 2; // MAX_THREADS_PER_REPLICA must be a power of two @@ -46,7 +46,7 @@ pub type ThreadIdx = usize; /// and returned again by [`crate::nr::Replica::execute`] and /// [`crate::nr::Replica::execute_mut`]. However it feels like this would hurt /// API ergonomics a lot. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct ReplicaToken(pub(crate) ThreadIdx); /// Make it harder to accidentially use the same ReplicaToken on multiple From 2695e6b1ce19d263debc7ccbf7aea5a353b75903 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Tue, 11 Apr 2023 18:03:27 -0700 Subject: [PATCH 11/11] Select replica fn. --- node-replication/src/lib.rs | 1 - node-replication/src/nr/mod.rs | 74 ++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/node-replication/src/lib.rs b/node-replication/src/lib.rs index 0eb009c4..f0c6adf2 100644 --- a/node-replication/src/lib.rs +++ b/node-replication/src/lib.rs @@ -36,7 +36,6 @@ pub mod replica; pub mod cnr; pub mod nr; -pub mod simdr; #[cfg(doctest)] mod test_readme { diff --git a/node-replication/src/nr/mod.rs b/node-replication/src/nr/mod.rs index 17c1f250..20429365 100644 --- a/node-replication/src/nr/mod.rs +++ b/node-replication/src/nr/mod.rs @@ -876,6 +876,46 @@ where } } + fn select_replica2(&self, tkn: ThreadToken) -> ReplicaId { + let replicas = self.replicas.keys().fold(0, |acc, rid| acc | (1 << rid)) as usize; + logging::info!("replicas: {:b}", replicas); + + if ((1 << tkn.rid) & replicas) > 0 { + // Use the replica where the thread originally registered with if it + // exists + logging::info!("using original replica: {}", tkn.rid); + tkn.rid + } + else { + let key_idx = tkn.rtkn.0 % (replicas.count_ones() as usize); + logging::info!("key_idx: {key_idx}"); + let mut replicas = replicas; + logging::info!("replica: {:b}", replicas); + let mut idx = 0; + logging::info!("idx: {idx}"); + let mut replica_idx = 0; + logging::info!("replica_idx: {replica_idx}"); + + while idx <= key_idx { + replica_idx += replicas.trailing_zeros(); + logging::info!("replica_idx: {replica_idx} leading zeros was {}", replicas.trailing_zeros()); + replicas <<= replicas.trailing_zeros()+1; + idx += 1; + } + + replica_idx as usize + } + } + + + fn context_iterator(&self, for_replica: ReplicaId) -> ContextIterator { + ContextIterator { + contexts: &self.contexts, + for_replica, + next_idx: 0, + } + } + /// Enqueues an operation inside a thread local context. Returns a boolean /// indicating whether the operation was enqueued (true) or not (false). #[inline(always)] @@ -915,6 +955,30 @@ where } } +struct ContextIterator<'a, D: Dispatch> { + contexts: &'a Vec::WriteOperation, ::Response>>, + for_replica: ReplicaId, + next_idx: usize, +} + +/* +impl<'a, D: Dispatch> core::iter::Iterator for ContextIterator<'a, D> { + type Item = &'a Context<::WriteOperation, ::Response>; + + fn next(&mut self) -> Option { + while self.next_idx < self.contexts.len() { + let idx = self.next_idx; + self.next_idx += 1; + + if let Some(ctx) = self.contexts[idx] { + return Some(ctx); + } + } + None + } +} +*/ + #[cfg(test)] mod test { use super::replica::test::Data; @@ -925,6 +989,8 @@ mod test { #[test] fn select_correct_replica() { + env_logger::try_init(); + fn mkttkn(rid: usize, tid: usize) -> ThreadToken { ThreadToken { rid, @@ -936,15 +1002,23 @@ mod test { let nds = NodeReplicated::::new(replicas, |_ac| 0).expect("Can't create Ds"); assert_eq!(nds.select_replica(mkttkn(0, 0)), 0); + assert_eq!(nds.select_replica(mkttkn(0, 0)), nds.select_replica2(mkttkn(0, 0))); assert_eq!(nds.select_replica(mkttkn(1, 1)), 1); + assert_eq!(nds.select_replica(mkttkn(1, 1)), nds.select_replica2(mkttkn(1, 1))); + // Doesn't have active replica, assign to 0 or 1: assert_eq!(nds.select_replica(mkttkn(3, 0)), 0); + assert_eq!(nds.select_replica(mkttkn(3, 0)), nds.select_replica2(mkttkn(3, 0))); // Threads on same (inactive) replicas are split evenly among active // replicas: assert_eq!(nds.select_replica(mkttkn(3, 1)), 1); + assert_eq!(nds.select_replica(mkttkn(3, 1)), nds.select_replica2(mkttkn(3, 1))); assert_eq!(nds.select_replica(mkttkn(3, 2)), 0); + assert_eq!(nds.select_replica(mkttkn(3, 2)), nds.select_replica2(mkttkn(3, 2))); assert_eq!(nds.select_replica(mkttkn(4, 0)), 0); + assert_eq!(nds.select_replica(mkttkn(4, 0)), nds.select_replica2(mkttkn(4, 0))); assert_eq!(nds.select_replica(mkttkn(4, 1)), 1); + assert_eq!(nds.select_replica(mkttkn(4, 1)), nds.select_replica2(mkttkn(4, 1))); } #[cfg(feature = "async")]