Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #325

Open
wants to merge 6 commits into
base: relrin/bastion-actor-trait
Choose a base branch
from
Open

wip #325

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ lever = "0.1.1-alpha.11"
lightproc = "0.3.5"
regex = "1.3.9"
uuid = { version = "0.8", features = ["v4"] }
once_cell = "1.7.2"

[dev-dependencies]
119 changes: 88 additions & 31 deletions src/bastion/src/system/global_state.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,90 @@
use std::sync::Arc;
/// This module contains implementation of the global state that
/// available to all actors in runtime. To provide safety and avoid
/// data races, the implementation is heavily relies on software
/// transaction memory (or shortly STM) mechanisms to eliminate any
/// potential data races and provide consistency across actors.
use std::any::{Any, TypeId};
use std::ops::Deref;
use std::sync::Arc;
use std::{
any::{Any, TypeId},
sync::RwLock,
};
use std::{collections::hash_map::Entry, ops::Deref};

use lever::sync::atomics::AtomicBox;
use lever::table::lotable::LOTable;
use lightproc::proc_state::AsAny;
use std::collections::HashMap;

use crate::error::{BastionError, Result};

#[derive(Debug)]
pub struct GlobalState {
table: LOTable<TypeId, GlobalDataContainer>,
table: Arc<RwLock<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>>, // todo: remove the arc<rwlock< once we figure it out
}

#[derive(Debug, Clone)]
/// A container for user-defined types.
struct GlobalDataContainer(Arc<AtomicBox<Box<dyn Any>>>);

impl GlobalState {
/// Returns a new instance of global state.
pub(crate) fn new() -> Self {
GlobalState {
table: LOTable::new(),
table: Arc::new(RwLock::new(HashMap::new())),
}
}

/// Inserts the given value in the global state. If the value
/// exists, it will be overridden.
pub fn insert<T: Send + Sync + 'static>(&mut self, value: T) -> bool {
let container = GlobalDataContainer::new(value);
self.table
.insert(TypeId::of::<T>(), container)
.ok()
.write()
.unwrap()
.insert(
TypeId::of::<T>(),
Arc::new(value) as Arc<dyn Any + Send + Sync>,
)
.is_some()
}

/// Returns the requested data type to the caller.
pub fn read<'a, T: Send + Sync + 'static>(&mut self) -> Option<&'a T> {
/// Invokes a function with the requested data type.
pub fn read<T: Send + Sync + 'static>(&mut self, f: impl FnOnce(Option<&T>)) {
self.table
.read()
.unwrap()
.get(&TypeId::of::<T>())
.and_then(|container| container.read::<T>())
.map(|value| f(value.downcast_ref()));
}

/// Invokes a function with the requested data type.
pub fn write<T: std::fmt::Debug + Send + Sync + 'static, F>(&mut self, f: F)
where
F: Fn(Option<&T>) -> Option<T>,
{
let mut hm = self.table.write().unwrap();
let stuff_to_insert = match hm.entry(TypeId::of::<T>()) {
Entry::Occupied(data) => f(data.get().downcast_ref()),
Entry::Vacant(_) => f(None),
};

if let Some(stuff) = stuff_to_insert {
hm.insert(
TypeId::of::<T>(),
Arc::new(stuff) as Arc<dyn Any + Send + Sync>,
);
} else {
hm.remove(&TypeId::of::<T>());
};
}

/// Checks the given values is storing in the global state.
pub fn contains<T: Send + Sync + 'static>(&self) -> bool {
self.table.contains_key(&TypeId::of::<T>())
self.table.read().unwrap().contains_key(&TypeId::of::<T>())
}

/// Deletes the entry from the global state.
pub fn remove<T: Send + Sync + 'static>(&mut self) -> bool {
match self.table.remove(&TypeId::of::<T>()) {
Ok(entry) => entry.is_some(),
Err(_) => false,
}
}
}

impl GlobalDataContainer {
pub fn new<T: Send + Sync + 'static>(value: T) -> Self {
GlobalDataContainer(Arc::new(AtomicBox::new(Box::new(value))))
}

pub fn read<'a, T: Send + Sync + 'static>(&self) -> Option<&'a T> {
let inner = self.0.get();
inner.downcast_ref::<T>()
self.table
.write()
.unwrap()
.remove(&TypeId::of::<T>())
.is_some()
}
}

Expand Down Expand Up @@ -136,4 +153,44 @@ mod tests {
let is_removed = instance.remove::<usize>();
assert_eq!(is_removed, false);
}

#[test]
fn test_write_read() {
let mut instance = GlobalState::new();

#[derive(Debug, PartialEq, Clone)]
struct Hello {
foo: bool,
bar: usize,
}

let expected = Hello { foo: true, bar: 42 };

instance.insert(expected.clone());

instance.read(|actual: Option<&Hello>| {
assert_eq!(&expected, actual.unwrap());
});

let expected_updated = Hello {
foo: false,
bar: 43,
};

instance.write::<Hello, _>(|maybe_to_update| {
let to_update = maybe_to_update.unwrap();

let updated = Hello {
foo: !to_update.foo,
bar: to_update.bar + 1,
};

Some(updated)
});

instance.read(|updated: Option<&Hello>| {
let updated = updated.unwrap();
assert_eq!(updated, &expected_updated);
});
}
}
6 changes: 2 additions & 4 deletions src/bastion/src/system/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
mod global_state;
mod node;

use lazy_static::lazy_static;
use once_cell::sync::Lazy;

use crate::system::node::Node;

lazy_static! {
pub static ref SYSTEM: Node = Node::new();
}
pub static SYSTEM: Lazy<Node> = Lazy::new(Node::new);