Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(infra): concurrent materializer tests #1243

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ tracing-subscriber = { version = "0.3", features = [
"registry",
] }
tracing-appender = "0.2.3"
text-tables = "0.3.1"
url = { version = "2.4.1", features = ["serde"] }
zeroize = "1.6"

Expand Down
1 change: 1 addition & 0 deletions fendermint/testing/materializer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tendermint-rpc = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
text-tables = { workspace = true }
url = { workspace = true }

arbitrary = { workspace = true, optional = true }
Expand Down
40 changes: 40 additions & 0 deletions fendermint/testing/materializer/src/bencher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::collections::HashMap;
use std::time::{Duration, Instant};

#[derive(Debug, Clone, Default)]
pub struct Bencher {
pub start_time: Option<Instant>,
pub latencies: HashMap<String, Duration>,
pub block_inclusion: Option<u64>,
}

impl Bencher {
pub fn new() -> Self {
Self {
start_time: None,
latencies: HashMap::new(),
block_inclusion: None,
}
}

pub fn start(&mut self) {
self.start_time = Some(Instant::now());
}

pub fn mempool(&mut self) {
self.set_latency("mempool".to_string());
}

pub fn block_inclusion(&mut self, block_number: u64) {
self.set_latency("block_inclusion".to_string());
self.block_inclusion = Some(block_number);
}

fn set_latency(&mut self, label: String) {
let duration = self.start_time.unwrap().elapsed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel expect here if better if you assume caller know calling "start" should happen first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revise this API once the reporting summary is more solid.

self.latencies.insert(label, duration);
}
}
37 changes: 37 additions & 0 deletions fendermint/testing/materializer/src/concurrency/collect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::concurrency::signal::Signal;
use ethers::prelude::H256;
use ethers::providers::Http;
use ethers::providers::{Middleware, Provider};
use ethers::types::Block;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

pub async fn collect_blocks<F>(
cancel: Arc<Signal>,
provider: Provider<Http>,
assert: F,
) -> anyhow::Result<HashMap<u64, Block<H256>>>
where
F: Fn(&Block<H256>),
{
let mut blocks = HashMap::new();
loop {
if cancel.received() {
break;
}

// TODO: improve: use less calls, make sure blocks cannot be missed
let block_number = provider.get_block_number().await?;
let block = provider.get_block(block_number).await?.unwrap();
assert(&block);
blocks.insert(block_number.as_u64(), block);

sleep(Duration::from_millis(100)).await;
}
Ok(blocks)
}
29 changes: 29 additions & 0 deletions fendermint/testing/materializer/src/concurrency/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use std::time::Duration;

#[derive(Debug, Clone, Default)]
pub struct Execution {
pub steps: Vec<ExecutionStep>,
}

impl Execution {
pub fn new() -> Self {
Self { steps: Vec::new() }
}

pub fn add_step(mut self, max_concurrency: usize, secs: u64) -> Self {
self.steps.push(ExecutionStep {
max_concurrency,
duration: Duration::from_secs(secs),
});
self
}
}

#[derive(Debug, Clone)]
pub struct ExecutionStep {
pub max_concurrency: usize,
pub duration: Duration,
}
80 changes: 80 additions & 0 deletions fendermint/testing/materializer/src/concurrency/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

pub mod collect;
pub mod config;
pub mod nonce_manager;
pub mod reporting;
pub mod signal;

use crate::bencher::Bencher;
use crate::concurrency::reporting::TestResult;
use ethers::types::H256;
use futures::FutureExt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Semaphore;

#[derive(Debug)]
pub struct TestInput {
pub test_id: usize,
pub bencher: Bencher,
}

#[derive(Debug)]
pub struct TestOutput {
pub bencher: Bencher,
pub tx_hash: H256,
}

pub async fn execute<F>(cfg: config::Execution, test_factory: F) -> Vec<Vec<TestResult>>
where
F: Fn(TestInput) -> Pin<Box<dyn Future<Output = anyhow::Result<TestOutput>> + Send>>,
{
let mut test_id = 0;
let mut results = Vec::new();
for (step_id, step) in cfg.steps.iter().enumerate() {
let semaphore = Arc::new(Semaphore::new(step.max_concurrency));
let mut handles = Vec::new();
let step_results = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let execution_start = Instant::now();
loop {
if execution_start.elapsed() > step.duration {
break;
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let bencher = Bencher::new();
let test_input = TestInput { test_id, bencher };
let task = test_factory(test_input).boxed();
let step_results = step_results.clone();
let handle = tokio::spawn(async move {
let test_output = task.await;
let (bencher, tx_hash, err) = match test_output {
Ok(test_output) => (Some(test_output.bencher), Some(test_output.tx_hash), None),
Err(err) => (None, None, Some(err)),
};
step_results.lock().await.push(TestResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this triggers a lot of threads, then everyone is waiting on this as well, also as step_results gets big, so allocation might take some time. Just curious, if step_results are not updated at all, will there be a big difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unlikely to be a bottleneck, it can only impose a small delay in the after-test lifecycle of the future, which isn't recorded nor is time sensitive. But I'll double check that once I'll get to high max concurrency figures.

test_id,
step_id,
bencher,
tx_hash,
err,
});
drop(permit);
});
handles.push(handle);
test_id += 1;
}

// Exhaust unfinished handles.
for handle in handles {
handle.await.unwrap();
}

let step_results = Arc::try_unwrap(step_results).unwrap().into_inner();
results.push(step_results)
}
results
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use ethers::prelude::H160;
use ethers::types::U256;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Default)]
pub struct NonceManager {
nonces: Arc<Mutex<HashMap<H160, U256>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bottom neck as well, every address is waiting on the same lock. Maybe this might help: https://github.com/xacrimon/dashmap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is just a temporary solution, I was hoping to remove it entirely. If not, I'll optimize it.

}

impl NonceManager {
pub fn new() -> Self {
NonceManager {
nonces: Arc::new(Mutex::new(HashMap::new())),
}
}

pub async fn set(&self, addr: H160, amount: U256) {
let mut nonces = self.nonces.lock().await;
nonces.insert(addr, amount);
}

pub async fn get_and_increment(&self, addr: H160) -> U256 {
let mut nonces = self.nonces.lock().await;
let next_nonce = nonces.entry(addr).or_insert_with(U256::zero);
let current_nonce = *next_nonce;
*next_nonce += U256::one();
current_nonce
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use anyhow::anyhow;
use std::fmt::{Display, Formatter};

#[derive(Debug)]
pub struct Metrics {
pub mean: f64,
pub median: f64,
pub max: f64,
pub min: f64,
pub percentile_90: f64,
}

impl Display for Metrics {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"mean: {:.2}, median: {:.2}, max: {:.2}, min: {:.2}, 90th: {:.2}",
self.mean, self.median, self.max, self.min, self.percentile_90
)
}
}

impl Metrics {
pub fn format_median(&self) -> String {
format!("median: {:.2}", self.median)
}
}

pub fn calc_metrics(data: Vec<f64>) -> anyhow::Result<Metrics> {
if data.is_empty() {
return Err(anyhow!("empty data"));
}

let mut sorted_data = data.clone();
sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap());

let count = sorted_data.len();
let mean: f64 = sorted_data.iter().sum::<f64>() / count as f64;

let median = if count % 2 == 0 {
(sorted_data[count / 2 - 1] + sorted_data[count / 2]) / 2.0
} else {
sorted_data[count / 2]
};

let max = *sorted_data.last().unwrap();
let min = *sorted_data.first().unwrap();

let percentile_90_index = ((count as f64) * 0.9).ceil() as usize - 1;
let percentile_90 = sorted_data[percentile_90_index];

Ok(Metrics {
mean,
median,
max,
min,
percentile_90,
})
}

#[cfg(test)]
mod tests {
use super::super::FLOAT_TOLERANCE;
use super::*;

#[test]
fn test_calc_dataset_metrics() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];

let expected_mean = 5.5;
let expected_median = 5.5;
let expected_max = 10.0;
let expected_min = 1.0;
let expected_percentile_90 = 9.0;

let metrics = calc_metrics(data).unwrap();

assert!((metrics.mean - expected_mean).abs() < FLOAT_TOLERANCE);
assert!((metrics.median - expected_median).abs() < FLOAT_TOLERANCE);
assert!((metrics.max - expected_max).abs() < FLOAT_TOLERANCE);
assert!((metrics.min - expected_min).abs() < FLOAT_TOLERANCE);
assert!((metrics.percentile_90 - expected_percentile_90).abs() < FLOAT_TOLERANCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

pub mod dataset;
pub mod summary;
pub mod tps;

use crate::bencher::Bencher;
use ethers::prelude::H256;

#[cfg(test)]
const FLOAT_TOLERANCE: f64 = 1e-6;

#[derive(Debug)]
pub struct TestResult {
pub test_id: usize,
pub step_id: usize,
pub tx_hash: Option<H256>,
pub bencher: Option<Bencher>,
pub err: Option<anyhow::Error>,
}
Loading
Loading