Skip to content

Commit

Permalink
add basic TPS analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
LePremierHomme committed Jan 10, 2025
1 parent 2cae29d commit 886a0e2
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 23 deletions.
21 changes: 16 additions & 5 deletions fendermint/testing/materializer/src/bencher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,34 @@ use std::time::{Duration, Instant};
#[derive(Debug, Clone, Default)]
pub struct Bencher {
pub start_time: Option<Instant>,
pub records: HashMap<String, Duration>,
pub latencies: HashMap<String, Duration>,
pub block_inclusion: Option<u64>,
}

impl Bencher {
pub fn new() -> Self {
Self {
start_time: None,
records: HashMap::new(),
latencies: HashMap::new(),
block_inclusion: None,
}
}

pub async fn start(&mut self) {
pub fn start(&mut self) {
self.start_time = Some(Instant::now());
}

pub async fn record(&mut self, label: String) {
pub fn mempool(&mut self) {
self.set_latency("mempool".to_string());
}

pub fn block_inclusion(&mut self, block_number: u64) {
self.set_latency("block_inclusion".to_string());
self.block_inclusion = Some(block_number);
}

fn set_latency(&mut self, label: String) {
let duration = self.start_time.unwrap().elapsed();
self.records.insert(label, duration);
self.latencies.insert(label, duration);
}
}
37 changes: 37 additions & 0 deletions fendermint/testing/materializer/src/concurrency/collect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::concurrency::signal::Signal;
use ethers::prelude::H256;
use ethers::providers::Http;
use ethers::providers::{Middleware, Provider};
use ethers::types::Block;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

pub async fn collect_blocks<F>(
cancel: Arc<Signal>,
provider: Provider<Http>,
assert: F,
) -> anyhow::Result<HashMap<u64, Block<H256>>>
where
F: Fn(&Block<H256>),
{
let mut blocks = HashMap::new();
loop {
if cancel.received() {
break;
}

// TODO: improve: use less calls, make sure blocks cannot be missed
let block_number = provider.get_block_number().await?;
let block = provider.get_block(block_number).await?.unwrap();
assert(&block);
blocks.insert(block_number.as_u64(), block);

sleep(Duration::from_millis(100)).await;
}
Ok(blocks)
}
2 changes: 2 additions & 0 deletions fendermint/testing/materializer/src/concurrency/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

pub mod collect;
pub mod config;
pub mod nonce_manager;
pub mod signal;
pub mod reporting;

pub use reporting::*;
Expand Down
50 changes: 41 additions & 9 deletions fendermint/testing/materializer/src/concurrency/reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::bencher::Bencher;
use crate::concurrency::config;
use crate::concurrency::config::ExecutionStep;
use anyhow::anyhow;
use ethers::prelude::{Block, H256};
use std::collections::{HashMap, HashSet};
use std::io;
use std::time::Duration;
Expand All @@ -21,36 +22,58 @@ pub struct TestResult {
pub struct StepSummary {
pub cfg: ExecutionStep,
pub avg_latencies: HashMap<String, Duration>,
pub avg_tps: u64,
pub errs: Vec<anyhow::Error>,
}

impl StepSummary {
fn new(cfg: ExecutionStep, results: Vec<TestResult>) -> Self {
let mut total_durations: HashMap<String, Duration> = HashMap::new();
let mut counts: HashMap<String, usize> = HashMap::new();
let mut sum_latencies: HashMap<String, Duration> = HashMap::new();
let mut count_latencies: HashMap<String, usize> = HashMap::new();
let mut block_inclusions: HashMap<u64, u64> = HashMap::new();
let mut errs = Vec::new();

for res in results {
let Some(bencher) = res.bencher else { continue };
for (key, duration) in bencher.records.clone() {
*total_durations.entry(key.clone()).or_insert(Duration::ZERO) += duration;
*counts.entry(key).or_insert(0) += 1;

for (key, duration) in bencher.latencies.clone() {
*sum_latencies.entry(key.clone()).or_insert(Duration::ZERO) += duration;
*count_latencies.entry(key).or_insert(0) += 1;
}

if let Some(block) = bencher.block_inclusion {
*block_inclusions.entry(block).or_insert(0) += 1;
}

if let Some(err) = res.err {
errs.push(err);
}
}

let avg_latencies = total_durations
// TODO: improve:
// 1. don't assume block time is 1s.
// 2. don't scope the count to execution step,
// because blocks might be shared with prev/next step,
// which skews the results.
// 3. don't use naive avg.
let avg_tps = {
let sum: u64 = block_inclusions.values().sum();
let count = block_inclusions.len();
sum / count as u64
};

let avg_latencies = sum_latencies
.into_iter()
.map(|(key, total)| {
let count = counts[&key];
let count = count_latencies[&key];
(key, total / count as u32)
})
.collect();

Self {
cfg,
avg_latencies,
avg_tps,
errs,
}
}
Expand All @@ -62,7 +85,11 @@ pub struct ExecutionSummary {
}

impl ExecutionSummary {
pub fn new(cfg: config::Execution, results: Vec<Vec<TestResult>>) -> Self {
pub fn new(
cfg: config::Execution,
_blocks: HashMap<u64, Block<H256>>,
results: Vec<Vec<TestResult>>,
) -> Self {
let mut summaries = Vec::new();
for (i, step_results) in results.into_iter().enumerate() {
let cfg = cfg.steps[i].clone();
Expand Down Expand Up @@ -100,14 +127,19 @@ impl ExecutionSummary {
.flat_map(|summary| summary.avg_latencies.keys().cloned())
.collect();

let mut header = vec!["max_concurrency".to_string(), "duration".to_string()];
let mut header = vec![
"max_concurrency".to_string(),
"duration (s)".to_string(),
"TPS".to_string(),
];
header.extend(latencies.iter().map(|key| format!("{} latency (ms)", key)));
data.push(header);

for summary in self.summaries.iter() {
let mut row = vec![];
row.push(summary.cfg.max_concurrency.to_string());
row.push(summary.cfg.duration.as_secs().to_string());
row.push(summary.avg_tps.to_string());

for key in &latencies {
let latency = summary
Expand Down
24 changes: 24 additions & 0 deletions fendermint/testing/materializer/src/concurrency/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

pub struct Signal(tokio::sync::Semaphore);

impl Signal {
pub fn new() -> Self {
Self(tokio::sync::Semaphore::new(0))
}

pub fn send(&self) {
self.0.close();
}

pub fn received(&self) -> bool {
self.0.is_closed()
}
}

impl Default for Signal {
fn default() -> Self {
Self::new()
}
}
53 changes: 44 additions & 9 deletions fendermint/testing/materializer/tests/docker_tests/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;

use crate::make_testnet;
use anyhow::{bail, Context};
use ethers::prelude::{Block, H256};
use ethers::types::U256;
use ethers::{
core::k256::ecdsa::SigningKey,
Expand All @@ -15,7 +16,9 @@ use ethers::{
types::{Eip1559TransactionRequest, H160},
};
use fendermint_materializer::bencher::Bencher;
use fendermint_materializer::concurrency::collect::collect_blocks;
use fendermint_materializer::concurrency::nonce_manager::NonceManager;
use fendermint_materializer::concurrency::signal::Signal;
use fendermint_materializer::{
concurrency::{self, config::Execution},
materials::DefaultAccount,
Expand Down Expand Up @@ -43,11 +46,17 @@ where
Ok(SignerMiddleware::new(provider, wallet))
}

const BLOCK_GAS_LIMIT: u64 = 10_000_000_000;
const MAX_TX_GAS_LIMIT: u64 = 3_000_000;

#[serial_test::serial]
#[tokio::test]
async fn test_concurrent_transfer() -> Result<(), anyhow::Error> {
let (testnet, cleanup) = make_testnet(MANIFEST, |_| {}).await?;

let block_gas_limit = U256::from(BLOCK_GAS_LIMIT);
let max_tx_gas_limit = U256::from(MAX_TX_GAS_LIMIT);

let pangea = testnet.node(&testnet.root().node("pangea"))?;
let provider = pangea
.ethapi_http_provider()?
Expand All @@ -57,11 +66,27 @@ async fn test_concurrent_transfer() -> Result<(), anyhow::Error> {
.await
.context("failed to get chain ID")?;

let cancel = Arc::new(Signal::new());

// Set up background blocks collector.
let blocks_collector = {
let cancel = cancel.clone();
let provider = provider.clone();
let assert = move |block: &Block<H256>| {
// Make sure block gas limit isn't the bottleneck.
let unused_gas_limit = block_gas_limit - block.gas_limit;
assert!(unused_gas_limit >= max_tx_gas_limit);
};
tokio::spawn(collect_blocks(cancel, provider, assert))
};

// Drive concurrency.
let cfg = Execution::new()
.add_step(1, 5)
.add_step(10, 5)
.add_step(100, 5)
.add_step(150, 5);
.add_step(150, 5)
.add_step(200, 5);
let testnet = Arc::new(testnet);
let testnet_clone = testnet.clone();
let nonce_manager = Arc::new(NonceManager::new());
Expand All @@ -78,26 +103,33 @@ async fn test_concurrent_transfer() -> Result<(), anyhow::Error> {

let middleware = make_middleware(provider, sender, chain_id)
.await
.context("failed to set up middleware")?;
.context("make_middleware")?;

let sender: H160 = sender.eth_addr().into();
let nonce = nonce_manager.get_and_increment(sender).await;

// Create the simplest transaction possible: send tokens between accounts.
let to: H160 = recipient.eth_addr().into();
let transfer = Eip1559TransactionRequest::new()
let mut tx = Eip1559TransactionRequest::new()
.to(to)
.value(1)
.nonce(nonce);

bencher.start().await;
let gas_estimation = middleware
.estimate_gas(&tx.clone().into(), None)
.await
.unwrap();
tx = tx.gas(gas_estimation);
assert!(gas_estimation <= max_tx_gas_limit);

bencher.start();

let pending: PendingTransaction<_> = middleware
.send_transaction(transfer, None)
.send_transaction(tx, None)
.await
.context("failed to send txn")?;
let tx_hash = pending.tx_hash();
println!("sent pending txn {:?} (test_id={})", tx_hash, test_id);
println!("sent tx {:?} (test_id={})", tx_hash, test_id);

// We expect that the transaction is pending, however it should not return an error.
match middleware.get_transaction(tx_hash).await {
Expand All @@ -107,27 +139,30 @@ async fn test_concurrent_transfer() -> Result<(), anyhow::Error> {
bail!("failed to get pending transaction: {e}")
}
}
bencher.record("mempool".to_string()).await;
bencher.mempool();

loop {
if let Ok(Some(tx)) = middleware.get_transaction_receipt(tx_hash).await {
println!(
"tx included in block {:?} (test_id={})",
tx.block_number, test_id
);
let block_number = tx.block_number.unwrap().as_u64();
bencher.block_inclusion(block_number);
break;
}
sleep(Duration::from_millis(100)).await;
}
bencher.record("block_inclusion".to_string()).await;

Ok(bencher)
};
test.boxed()
})
.await;

let summary = concurrency::ExecutionSummary::new(cfg.clone(), results);
cancel.send();
let blocks = blocks_collector.await??;
let summary = concurrency::ExecutionSummary::new(cfg.clone(), blocks, results);
summary.print();

let res = summary.to_result();
Expand Down

0 comments on commit 886a0e2

Please sign in to comment.