Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nr dynamic replication #56

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/nr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Install dependencies
run: sudo apt-get update && sudo apt-get install -y libhwloc-dev gnuplot libfuse-dev liburcu-dev liburcu6
run: sudo apt-get update && sudo apt-get install -y libhwloc-dev gnuplot libfuse-dev liburcu-dev liburcu8
- uses: actions/[email protected]
- name: Install rust toolchain
working-directory: ./node-replication
Expand Down
27 changes: 17 additions & 10 deletions bench_utils/src/cnr_mkbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use tokio::runtime::Runtime;
use node_replication::cnr::{
Dispatch, Log, LogMetaData, Replica, ReplicaToken, MAX_REPLICAS_PER_LOG,
};
#[cfg(feature = "async")]
use node_replication::nr::reusable_box::ReusableBoxFuture;

use crate::benchmark::*;
Expand Down Expand Up @@ -783,18 +784,24 @@ where
duration
);

let mut futures: Vec<
ReusableBoxFuture<<<R as ReplicaTrait>::D as Dispatch>::Response>,
> = Vec::with_capacity(batch_size);
#[cfg(feature = "async")]
{
let mut futures: Vec<
ReusableBoxFuture<<<R as ReplicaTrait>::D as Dispatch>::Response>,
> = Vec::with_capacity(batch_size);

for _i in 0..batch_size {
let resp = match &operations[0] {
Operation::ReadOperation(op) => replica.exec_ro(*op, replica_token),
Operation::WriteOperation(op) => replica.exec(*op, replica_token),
};
futures.push(ReusableBoxFuture::new(async { resp }));
for _i in 0..batch_size {
let resp = match &operations[0] {
Operation::ReadOperation(op) => {
replica.exec_ro(*op, replica_token)
}
Operation::WriteOperation(op) => {
replica.exec(*op, replica_token)
}
};
futures.push(ReusableBoxFuture::new(async { resp }));
}
}

let mut operations_per_second: Vec<usize> = Vec::with_capacity(32);
let mut operations_completed: usize = 0;
let mut iter: usize = 0;
Expand Down
4 changes: 2 additions & 2 deletions bench_utils/src/mkbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub trait DsInterface {
) -> <Self::D as Dispatch>::Response;
}

impl<'a, T: Dispatch + Sync + Default> DsInterface for NodeReplicated<T> {
impl<'a, T: Dispatch + Sync + Default + Clone> DsInterface for NodeReplicated<T> {
type D = T;

fn new(replicas: NonZeroUsize, _logs: NonZeroUsize, log_size: usize) -> Arc<Self> {
Expand Down Expand Up @@ -235,7 +235,7 @@ pub fn baseline_comparison<R: DsInterface>(
>,
log_size: usize,
) where
R::D: Dispatch + Sync + Default,
R::D: Dispatch + Sync + Default + Clone,
<R::D as Dispatch>::WriteOperation: Send + Sync,
<R::D as Dispatch>::ReadOperation<'static>: Sync + Send + Clone,
<R::D as Dispatch>::Response: Send,
Expand Down
5 changes: 3 additions & 2 deletions node-replication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ crossbeam-utils = {version = "0.8.5", default-features = false}
# renamed to avoid confusion with our own `log` modules:
logging = { version = "0.4", package = "log" }
static_assertions = "1.1.0"
bm = { git = "https://github.com/reynoldsbd/bm" }

[target.'cfg(loom)'.dependencies]
arr_macro = "0.1.3"
Expand Down Expand Up @@ -54,7 +55,7 @@ futures = { version = "0.3.17" }
debug = true

[features]
default = ["async"]
default = []
async = []

# Benchmark features (not intended for public use, no impact on library code)
Expand Down Expand Up @@ -103,4 +104,4 @@ harness = false

[[bench]]
name = "chashbench"
harness = false
harness = false
3 changes: 2 additions & 1 deletion node-replication/benches/vspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl fmt::Display for MapAction {
}
}

#[derive(Clone)]
pub struct VSpace {
pub pml4: Pin<Box<PML4>>,
allocs: Vec<(*mut u8, usize)>,
Expand Down Expand Up @@ -487,7 +488,7 @@ enum OpcodeRd {
Identify(u64),
}

#[derive(Default)]
#[derive(Default, Clone)]
struct VSpaceDispatcher {
vspace: VSpace,
}
Expand Down
157 changes: 157 additions & 0 deletions node-replication/examples/cnr_btreeset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright © 2019-2022 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0 OR MIT

//! A minimal example that impements a replicated BTreeSet.
#![feature(generic_associated_types)]

use crossbeam_skiplist::SkipSet;

use std::sync::Arc;

use node_replication::cnr::{Dispatch, Log, LogMapper, LogMetaData, Replica};

#[derive(Default)]
struct CnrBtreeSet {
storage: SkipSet<u64>,
}

#[derive(Clone, Debug, PartialEq)]
enum Modify {
Put(u64),
Delete(u64),
}

impl LogMapper for Modify {
fn hash(&self, _nlogs: usize, logs: &mut Vec<usize>) {
logs.push(0);
}
}

#[derive(Clone, Debug, PartialEq)]
enum Access {
Get(u64),
Contains(u64),
}

impl LogMapper for Access {
fn hash(&self, _nlogs: usize, logs: &mut Vec<usize>) {
logs.push(0);
}
}

impl Dispatch for CnrBtreeSet {
type ReadOperation<'rop> = Access;
type WriteOperation = Modify;
type Response = Option<u64>;

fn dispatch<'rop>(&self, op: Self::ReadOperation<'rop>) -> Self::Response {
match op {
Access::Get(key) => self.storage.get(&key).map(|v| *v),
Access::Contains(key) => {
let response = self.storage.contains(&key);
Some(response as u64)
}
}
}

fn dispatch_mut(&self, op: Self::WriteOperation) -> Self::Response {
match op {
Modify::Put(key) => {
let response = self.storage.insert(key);
Some(*response)
}
Modify::Delete(key) => {
let response = self.storage.remove(&key).unwrap();
Some(*response)
}
}
}
}

/// We initialize a log, and two replicas for a B-tree, register with the replica
/// and then execute operations.
fn main() {
const N_OPS: u64 = 10_000;
// The operation log for storing `WriteOperation`, it has a size of 2 MiB:
let log = Arc::new(
Log::<<CnrBtreeSet as Dispatch>::WriteOperation>::new_with_bytes(
2 * 1024 * 1024,
LogMetaData::new(1),
),
);

// Create two replicas of the b-tree
let replica1 = Replica::<CnrBtreeSet>::new(vec![log.clone()]);
let replica2 = Replica::<CnrBtreeSet>::new(vec![log.clone()]);

// The replica executes Modify or Access operations by calling
// 'execute_mut' and `execute`. Eventually they end up in the `Dispatch` trait.
let thread_loop = |replica: &Arc<Replica<CnrBtreeSet>>, starting_point: u64, ridx| {
for i in starting_point..starting_point + N_OPS {
let _r = match i % 4 {
0 => {
let response = replica.execute_mut(Modify::Put(i), ridx);
assert_eq!(response, Some(i));
response
}
1 => {
let response = replica.execute(Access::Contains(i - 1), ridx);
assert_eq!(response, Some(1));
response
}
2 => {
let response = replica.execute(Access::Get(i - 2), ridx);
assert_eq!(response, Some(i - 2));
response
}
3 => {
let response = replica.execute_mut(Modify::Delete(i - 3), ridx);
assert_eq!(response, Some(i - 3));
response
}

_ => unreachable!(),
};
}
};

// Finally, we spawn three threads that issue operations, thread 1 and 2
// will use replica1 and thread 3 will use replica 2:
let mut threads = Vec::with_capacity(3);
let replica11 = replica1.clone();
threads.push(
std::thread::Builder::new()
.name("thread 1".to_string())
.spawn(move || {
let ridx = replica11.register().expect("Unable to register with log");
thread_loop(&replica11, 0, ridx);
}),
);

let replica12 = replica1.clone();
threads.push(
std::thread::Builder::new()
.name("thread 2".to_string())
.spawn(move || {
let ridx = replica12.register().expect("Unable to register with log");
thread_loop(&replica12, 100000, ridx);
}),
);

threads.push(
std::thread::Builder::new()
.name("thread 3".to_string())
.spawn(move || {
let ridx = replica2.register().expect("Unable to register with log");
thread_loop(&replica2, 200000, ridx);
}),
);

// Wait for all the threads to finish
for thread in threads {
thread
.expect("all threads should complete")
.join()
.unwrap_or_default();
}
}
1 change: 1 addition & 0 deletions node-replication/examples/nr_async_hashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use node_replication::nr::NodeReplicated;
const CAPACITY: usize = 32;

/// The node-replicated hashmap uses a std hashmap internally.
#[derive(Clone)]
struct NrHashMap {
storage: HashMap<usize, usize>,
}
Expand Down
2 changes: 1 addition & 1 deletion node-replication/examples/nr_btreeset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use node_replication::nr::Dispatch;
use node_replication::nr::NodeReplicated;

#[derive(Default)]
#[derive(Default, Clone)]
struct NrBtreeSet {
storage: BTreeSet<u64>,
}
Expand Down
Loading