Skip to content

Commit

Permalink
named Distributor (#319)
Browse files Browse the repository at this point in the history
* wip

* wip, a lot left!

* a bit of debug statements and more funsies

* tell and tell_everyone are working.

* ok we're almost there, lets see if i can handle a vec<replies> and if i can register ppl to a new Distributor

* yay it works!

* lints

* 19 lints remaining

* 9 warnings to go

* docs + tests

* i think we re good to go

* example fix

* san

* move miri to a .sh file

* bump nightlies and anyhow

* clippy pass + prepare to merge
  • Loading branch information
o0Ignition0o authored Apr 6, 2021
1 parent e923d55 commit c6016a9
Show file tree
Hide file tree
Showing 21 changed files with 1,282 additions and 63 deletions.
13 changes: 2 additions & 11 deletions .github/workflows/miri.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Install
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-01-01
toolchain: nightly-2021-03-20
override: true
- uses: davidB/rust-cargo-make@v1
with:
Expand All @@ -24,13 +24,4 @@ jobs:
RUST_BACKTRACE: full
RUST_LOG: 'trace'
run: |
rustup component add miri
cargo miri setup
cargo clean
# Do some Bastion shake
cd src/bastion && \
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly dispatcher && \
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly path && \
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly broadcast && \
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" cargo miri test --features lever/nightly children_ref && \
cd -
tools/miri.sh
2 changes: 1 addition & 1 deletion .github/workflows/sanitizers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Install
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-01-01
toolchain: nightly-2021-03-20
override: true
- uses: davidB/rust-cargo-make@v1
with:
Expand Down
3 changes: 2 additions & 1 deletion src/bastion-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ tokio-runtime = ["tokio"]

[dependencies]
bastion-utils = "0.3.2"
lightproc = "0.3"
# lightproc = "0.3"
lightproc = { path = "../lightproc" }
# bastion-utils = { path = "../bastion-utils" }

crossbeam-utils = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion src/bastion-executor/src/placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn get_num_cores() -> Option<usize> {
///
/// Sets the current threads affinity
pub fn set_for_current(core_id: CoreId) {
tracing::info!("Executor: placement: set affinity on core {}", core_id.id);
tracing::trace!("Executor: placement: set affinity on core {}", core_id.id);
set_for_current_helper(core_id);
}

Expand Down
10 changes: 7 additions & 3 deletions src/bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
[dependencies]
bastion-executor = { git = "https://github.com/bastion-rs/bastion.git" }
lightproc = "0.3"
# bastion-executor = { version = "= 0.3.7-alpha.0", path = "../bastion-executor" }
# lightproc = { version = "= 0.3", path = "../lightproc" }
# bastion-executor = { path = "../bastion-executor" }
# lightproc = { path = "../lightproc" }

lever = "0.1"
futures = "0.3.5"
Expand All @@ -73,9 +73,12 @@ artillery-core = { version = "0.1.2-alpha.3", optional = true }
# Log crates
tracing-subscriber = "0.2.6"
tracing = "0.1.15"
anyhow = "1.0.31"
anyhow = "1.0"
crossbeam-queue = "0.3.0"
log = "0.4.14"
lasso = {version = "0.5", features = ["multi-threaded"] }
once_cell = "1.7.2"
thiserror = "1.0.24"

[target.'cfg(not(windows))'.dependencies]
nuclei = "0.1"
Expand All @@ -91,6 +94,7 @@ rayon = "1.3.1"
num_cpus = "1.13.0"
# hello_tokio example
tokio = { version="1.1", features = ["time", "macros"] }
# bastion-executor = { path = "../bastion-executor" }
bastion-executor = { git = "https://github.com/bastion-rs/bastion.git" }
once_cell = "1.5.2"
tokio-test = "0.4.0"
245 changes: 245 additions & 0 deletions src/bastion/examples/distributor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
///! Create a conference.
///!
///! 1st Group
///! Staff (5) - Going to organize the event // OK
///!
///! 2nd Group
///! Enthusiasts (50) - interested in participating to the conference (haven't registered yet) // OK
///!
///! 3rd Group
///! Attendees (empty for now) - Participate
///!
///! Enthusiast -> Ask one of the staff members "when is the conference going to happen ?" // OK
///! Broadcast / Question => Answer 0 or 1 Staff members are going to reply eventually? // OK
///!
///! Staff -> Send a Leaflet to all of the enthusiasts, letting them know that they can register. // OK
///!
///! "hey conference <awesomeconference> is going to happen. will you be there?"
///! Broadcast / Question -> if people reply with YES => fill the 3rd group
///! some enthusiasts are now attendees
///!
///! Staff -> send the actual schedule and misc infos to Attendees
///! Broadcast / Statement (Attendees)
///!
///! An attendee sends a thank you note to one staff member (and not bother everyone)
///! One message / Statement (Staff) // OK
///!
///! ```rust
///! let staff = Distributor::named("staff");
///! let enthusiasts = Distributor::named("enthusiasts");
///! let attendees = Disitributor::named("attendees");
///! // Enthusiast -> Ask the whole staff "when is the conference going to happen ?"
///! ask_one(Message + Clone) -> Result<impl Future<Output = Reply>, CouldNotSendError>
///! // await_one // await_all
///! // first ? means "have we been able to send the question?"
///! // it s in a month
///! let replies = staff.ask_one("when is the conference going to happen ?")?.await?;
///! ask_everyone(Message + Clone) -> Result<impl Stream<Item = Reply>, CouldNotSendError>
///! let participants = enthusiasts.ask_everyone("here's our super nice conference, it s happening people!").await?;
///! for participant in participants {
///! // grab the sender and add it to the attendee recipient group
///! }
///! // send the schedule
///! tell_everyone(Message + Clone) -> Result<(), CouldNotSendError>
///! attendees.tell_everyone("hey there, conf is in a week, here s where and how it s going to happen")?;
///! // send a thank you note
///! tell(Message) -> Result<(), CouldNotSendError>
///! staff.tell_one("thank's it was amazing")?;
///! children
///! .with_redundancy(10)
///! .with_distributor(Distributor::named("staff"))
///! // We create the function to exec when each children is called
///! .with_exec(move |ctx: BastionContext| async move { /* ... */ })
///! children
///! .with_redundancy(100)
///! .with_distributor(Distributor::named("enthusiasts"))
///! // We create the function to exec when each children is called
///! .with_exec(move |ctx: BastionContext| async move { /* ... */ })
///! children
///! .with_redundancy(0)
///! .with_distributor(Distributor::named("attendees"))
///! // We create the function to exec when each children is called
///! .with_exec(move |ctx: BastionContext| async move { /* ... */ })
///! ```
use anyhow::{anyhow, Context, Result as AnyResult};
use bastion::distributor::*;
use bastion::prelude::*;
use tracing::Level;

// true if the person attends the conference
#[derive(Debug)]
struct RSVP {
attends: bool,
child_ref: ChildRef,
}

#[derive(Debug, Clone)]
struct ConferenceSchedule {
start: std::time::Duration,
end: std::time::Duration,
misc: String,
}

/// cargo r --features=tokio-runtime distributor
#[tokio::main]
async fn main() -> AnyResult<()> {
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();

// Initialize bastion
Bastion::init();

// 1st Group
Bastion::supervisor(|supervisor| {
supervisor.children(|children| {
// Iniit staff
// Staff (5 members) - Going to organize the event
children
.with_redundancy(5)
.with_distributor(Distributor::named("staff"))
.with_exec(organize_the_event)
})
})
// 2nd Group
.and_then(|_| {
Bastion::supervisor(|supervisor| {
supervisor.children(|children| {
// Enthusiasts (50) - interested in participating to the conference (haven't registered yet)
children
.with_redundancy(50)
.with_distributor(Distributor::named("enthusiasts"))
.with_exec(be_interested_in_the_conference)
})
})
})
.map_err(|_| anyhow!("couldn't setup the bastion"))?;

Bastion::start();

// Wait a bit until everyone is ready
// std::thread::sleep(std::time::Duration::from_secs(1));

let staff = Distributor::named("staff");
let enthusiasts = Distributor::named("enthusiasts");
let attendees = Distributor::named("attendees");

// Enthusiast -> Ask one of the staff members "when is the conference going to happen ?"
let answer = staff.ask_one("when is the next conference going to happen?")?;
MessageHandler::new(
answer
.await
.expect("coulnd't find out when the next conference is going to happen :("),
)
.on_tell(|reply: String, _sender_addr| {
tracing::info!("received a reply to my message:\n{}", reply);
});

// "hey conference <awesomeconference> is going to happen. will you be there?"
// Broadcast / Question -> if people reply with YES => fill the 3rd group
let answers = enthusiasts
.ask_everyone("hey, the conference is going to happen, will you be there?")
.expect("couldn't ask everyone");

for answer in answers.into_iter() {
MessageHandler::new(answer.await.expect("couldn't receive reply"))
.on_tell(|rsvp: RSVP, _| {
if rsvp.attends {
tracing::info!("{:?} will be there! :)", rsvp.child_ref.id());
attendees
.subscribe(rsvp.child_ref)
.expect("couldn't subscribe attendee");
} else {
tracing::error!("{:?} won't make it :(", rsvp.child_ref.id());
}
})
.on_fallback(|unknown, _sender_addr| {
tracing::error!(
"distributor_test: uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
}

// Ok now that attendees have subscribed, let's send information around!
tracing::info!("Let's send invitations!");
// Staff -> send the actual schedule and misc infos to Attendees
let total_sent = attendees
.tell_everyone(ConferenceSchedule {
start: std::time::Duration::from_secs(60),
end: std::time::Duration::from_secs(3600),
misc: "it's going to be amazing!".to_string(),
})
.context("couldn't let everyone know the conference is available!")?;

tracing::error!("total number of attendees: {}", total_sent.len());

tracing::info!("the conference is running!");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

// An attendee sends a thank you note to one staff member (and not bother everyone)
staff
.tell_one("the conference was amazing thank you so much!")
.context("couldn't thank the staff members :(")?;

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// And we're done!
Bastion::stop();

// BEWARE, this example doesn't return
Bastion::block_until_stopped();

Ok(())
}

async fn organize_the_event(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_question(|message: &str, sender| {
tracing::info!("received a question: \n{}", message);
sender
.reply("uh i think it will be next month!".to_string())
.unwrap();
})
.on_tell(|message: &str, _| {
tracing::info!("received a message: \n{}", message);
})
.on_fallback(|unknown, _sender_addr| {
tracing::error!(
"staff: uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
}
}

async fn be_interested_in_the_conference(ctx: BastionContext) -> Result<(), ()> {
loop {
MessageHandler::new(ctx.recv().await?)
.on_tell(|message: std::sync::Arc<&str>, _| {
tracing::info!(
"child {}, received a broadcast message:\n{}",
ctx.current().id(),
message
);
})
.on_tell(|schedule: ConferenceSchedule, _| {
tracing::info!(
"child {}, received broadcast conference schedule!:\n{:?}",
ctx.current().id(),
schedule
);
})
.on_question(|message: &str, sender| {
tracing::info!("received a question: \n{}", message);
// ILL BE THERE!
sender
.reply(RSVP {
attends: rand::random(),
child_ref: ctx.current().clone(),
})
.unwrap();
});
}
}
3 changes: 3 additions & 0 deletions src/bastion/examples/hello_tokio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[cfg(feature = "tokio-runtime")]
use anyhow::Result as AnyResult;
#[cfg(feature = "tokio-runtime")]
use bastion::prelude::*;
#[cfg(feature = "tokio-runtime")]
use tokio;
#[cfg(feature = "tokio-runtime")]
use tracing::{error, warn, Level};

/// `cargo run --features=tokio-runtime --example hello_tokio`
Expand Down
4 changes: 2 additions & 2 deletions src/bastion/examples/prime_numbers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ mod prime_number {
fn number_or_panic(number_to_return: u128) -> u128 {
// Let's roll a dice
if rand::random::<u8>() % 6 == 0 {
panic!(format!(
panic!(
"I was about to return {} but I chose to panic instead!",
number_to_return
))
)
}
number_to_return
}
Expand Down
Loading

0 comments on commit c6016a9

Please sign in to comment.