Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(infra): concurrent materializer tests #1243

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ tracing-subscriber = { version = "0.3", features = [
"registry",
] }
tracing-appender = "0.2.3"
text-tables = "0.3.1"
url = { version = "2.4.1", features = ["serde"] }
zeroize = "1.6"

Expand Down
1 change: 1 addition & 0 deletions fendermint/testing/materializer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tendermint-rpc = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
text-tables = { workspace = true }
url = { workspace = true }

arbitrary = { workspace = true, optional = true }
Expand Down
29 changes: 29 additions & 0 deletions fendermint/testing/materializer/src/bencher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::collections::HashMap;
use std::time::{Duration, Instant};

#[derive(Debug, Clone, Default)]
pub struct Bencher {
pub start_time: Option<Instant>,
pub records: HashMap<String, Duration>,
}

impl Bencher {
pub fn new() -> Self {
Self {
start_time: None,
records: HashMap::new(),
}
}

pub async fn start(&mut self) {
self.start_time = Some(Instant::now());
}

pub async fn record(&mut self, label: String) {
let duration = self.start_time.unwrap().elapsed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel expect here if better if you assume caller know calling "start" should happen first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revise this API once the reporting summary is more solid.

self.records.insert(label, duration);
}
}
29 changes: 29 additions & 0 deletions fendermint/testing/materializer/src/concurrency/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::time::Duration;

#[derive(Debug, Clone, Default)]
pub struct Execution {
pub steps: Vec<ExecutionStep>,
}

impl Execution {
pub fn new() -> Self {
Self { steps: Vec::new() }
}

pub fn add_step(mut self, max_concurrency: usize, secs: u64) -> Self {
self.steps.push(ExecutionStep {
max_concurrency,
duration: Duration::from_secs(secs),
});
self
}
}

#[derive(Debug, Clone)]
pub struct ExecutionStep {
pub max_concurrency: usize,
pub duration: Duration,
}
64 changes: 64 additions & 0 deletions fendermint/testing/materializer/src/concurrency/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

pub mod config;
pub mod nonce_manager;
pub mod reporting;

pub use reporting::*;

use crate::bencher::Bencher;
use futures::FutureExt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Semaphore;

pub async fn execute<F>(cfg: config::Execution, test_factory: F) -> Vec<Vec<TestResult>>
where
F: Fn(usize, Bencher) -> Pin<Box<dyn Future<Output = anyhow::Result<Bencher>> + Send>>,
{
let mut test_id = 0;
let mut results = Vec::new();
for (step_id, step) in cfg.steps.iter().enumerate() {
let semaphore = Arc::new(Semaphore::new(step.max_concurrency));
let mut handles = Vec::new();
let step_results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let execution_start = Instant::now();
loop {
if execution_start.elapsed() > step.duration {
break;
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let bencher = Bencher::new();
let task = test_factory(test_id, bencher).boxed();
let step_results = step_results.clone();
let handle = tokio::spawn(async move {
let res = task.await;
let (bencher, err) = match res {
Ok(bencher) => (Some(bencher), None),
Err(err) => (None, Some(err)),
};
step_results.lock().await.push(TestResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this triggers a lot of threads, then everyone is waiting on this as well, also as step_results gets big, so allocation might take some time. Just curious, if step_results are not updated at all, will there be a big difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unlikely to be a bottleneck, it can only impose a small delay in the after-test lifecycle of the future, which isn't recorded nor is time sensitive. But I'll double check that once I'll get to high max concurrency figures.

test_id,
step_id,
bencher,
err,
});
drop(permit);
});
handles.push(handle);
test_id += 1;
}

// Exhaust unfinished handles.
for handle in handles {
handle.await.unwrap();
}

let step_results = Arc::try_unwrap(step_results).unwrap().into_inner();
results.push(step_results)
}
results
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use ethers::prelude::H160;
use ethers::types::U256;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Default)]
pub struct NonceManager {
nonces: Arc<Mutex<HashMap<H160, U256>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bottom neck as well, every address is waiting on the same lock. Maybe this might help: https://github.com/xacrimon/dashmap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is just a temporary solution, I was hoping to remove it entirely. If not, I'll optimize it.

}

impl NonceManager {
pub fn new() -> Self {
NonceManager {
nonces: Arc::new(Mutex::new(HashMap::new())),
}
}

pub async fn set(&self, addr: H160, amount: U256) {
let mut nonces = self.nonces.lock().await;
nonces.insert(addr, amount);
}

pub async fn get_and_increment(&self, addr: H160) -> U256 {
let mut nonces = self.nonces.lock().await;
let next_nonce = nonces.entry(addr).or_insert_with(U256::zero);
let current_nonce = *next_nonce;
*next_nonce += U256::one();
current_nonce
}
}
127 changes: 127 additions & 0 deletions fendermint/testing/materializer/src/concurrency/reporting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::bencher::Bencher;
use crate::concurrency::config;
use crate::concurrency::config::ExecutionStep;
use anyhow::anyhow;
use std::collections::{HashMap, HashSet};
use std::io;
use std::time::Duration;

#[derive(Debug)]
pub struct TestResult {
pub test_id: usize,
pub step_id: usize,
pub bencher: Option<Bencher>,
pub err: Option<anyhow::Error>,
}

#[derive(Debug)]
pub struct StepSummary {
pub cfg: ExecutionStep,
pub avg_latencies: HashMap<String, Duration>,
pub errs: Vec<anyhow::Error>,
}

impl StepSummary {
fn new(cfg: ExecutionStep, results: Vec<TestResult>) -> Self {
let mut total_durations: HashMap<String, Duration> = HashMap::new();
let mut counts: HashMap<String, usize> = HashMap::new();
let mut errs = Vec::new();
for res in results {
let Some(bencher) = res.bencher else { continue };
for (key, duration) in bencher.records.clone() {
*total_durations.entry(key.clone()).or_insert(Duration::ZERO) += duration;
*counts.entry(key).or_insert(0) += 1;
}
if let Some(err) = res.err {
errs.push(err);
}
}

let avg_latencies = total_durations
.into_iter()
.map(|(key, total)| {
let count = counts[&key];
(key, total / count as u32)
})
.collect();

Self {
cfg,
avg_latencies,
errs,
}
}
}

#[derive(Debug)]
pub struct ExecutionSummary {
pub summaries: Vec<StepSummary>,
}

impl ExecutionSummary {
pub fn new(cfg: config::Execution, results: Vec<Vec<TestResult>>) -> Self {
let mut summaries = Vec::new();
for (i, step_results) in results.into_iter().enumerate() {
let cfg = cfg.steps[i].clone();
summaries.push(StepSummary::new(cfg, step_results));
}

Self { summaries }
}

pub fn to_result(&self) -> anyhow::Result<()> {
let errs = self.errs();
if errs.is_empty() {
Ok(())
} else {
Err(anyhow!(errs.join("\n")))
}
}

pub fn errs(&self) -> Vec<String> {
let mut errs = Vec::new();
for summary in self.summaries.iter() {
let cloned_errs: Vec<String> =
summary.errs.iter().map(|e| format!("{:?}", e)).collect();
errs.extend(cloned_errs);
}
errs
}

pub fn print(&self) {
let mut data = vec![];

let latencies: HashSet<String> = self
.summaries
.iter()
.flat_map(|summary| summary.avg_latencies.keys().cloned())
.collect();

let mut header = vec!["max_concurrency".to_string(), "duration".to_string()];
header.extend(latencies.iter().map(|key| format!("{} latency (ms)", key)));
data.push(header);

for summary in self.summaries.iter() {
let mut row = vec![];
row.push(summary.cfg.max_concurrency.to_string());
row.push(summary.cfg.duration.as_secs().to_string());

for key in &latencies {
let latency = summary
.avg_latencies
.get(key)
.map_or(String::from("-"), |duration| {
duration.as_millis().to_string()
});
row.push(latency);
}

data.push(row);
}

text_tables::render(&mut io::stdout(), data).unwrap();
}
}
2 changes: 2 additions & 0 deletions fendermint/testing/materializer/src/docker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ macro_rules! env_vars {
};
}

#[derive(Debug)]
pub struct DockerMaterials;

impl Materials for DockerMaterials {
Expand Down Expand Up @@ -160,6 +161,7 @@ pub struct DockerMaterializerState {
port_ranges: BTreeMap<NodeName, DockerPortRange>,
}

#[derive(Debug)]
pub struct DockerMaterializer {
dir: PathBuf,
rng: StdRng,
Expand Down
2 changes: 2 additions & 0 deletions fendermint/testing/materializer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub mod validation;

#[cfg(feature = "arb")]
mod arb;
pub mod bencher;
pub mod concurrency;

/// An ID identifying a resource within its parent.
#[derive(Clone, Serialize, PartialEq, Eq, PartialOrd, Ord)]
Expand Down
9 changes: 9 additions & 0 deletions fendermint/testing/materializer/src/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ where
.ok_or_else(|| anyhow!("account {id} does not exist"))
}

pub fn account_mod_nth(&self, v: usize) -> &M::Account {
let nth = v % self.accounts.len();
self.accounts
.iter()
.nth(nth)
.map(|(_, account)| account)
.unwrap()
}

/// Get a node by name.
pub fn node(&self, name: &NodeName) -> anyhow::Result<&M::Node> {
self.nodes
Expand Down
Loading
Loading