Skip to content

Commit

Permalink
Merge pull request #47 from flashbots/agent-controller
Browse files Browse the repository at this point in the history
Agent controller
  • Loading branch information
zeroXbrock authored Nov 18, 2024
2 parents 5c940c0 + cbf64c8 commit ae17ca1
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 122 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = [
members = [ "crates/bundle_provider",
"crates/cli/",
"crates/core/",
"crates/sqlite_db/",
Expand All @@ -22,6 +22,7 @@ repository = "https://github.com/flashbots/contender"
contender_core = { path = "crates/core/" }
contender_sqlite = { path = "crates/sqlite_db/" }
contender_testfile = { path = "crates/testfile/" }
contender_bundle_provider = { path = "crates/bundle_provider/" }

eyre = "0.6.12"
tokio = { version = "1.40.0" }
Expand Down
15 changes: 15 additions & 0 deletions crates/bundle_provider/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "contender_bundle_provider"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
alloy = { workspace = true, features = ["node-bindings", "rpc-types-mev"] }
jsonrpsee = { workspace = true, features = ["http-client", "client-core"] }
serde = { workspace = true, features = ["derive"] }
alloy-serde = { workspace = true }
File renamed without changes.
3 changes: 3 additions & 0 deletions crates/bundle_provider/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod bundle_provider;

pub use bundle_provider::{BundleClient, EthSendBundle, EthSendBundleResponse};
229 changes: 180 additions & 49 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
mod commands;

use alloy::{
network::AnyNetwork,
network::{AnyNetwork, EthereumWallet, TransactionBuilder},
primitives::{
utils::{format_ether, parse_ether},
Address, U256,
},
providers::{Provider, ProviderBuilder},
providers::{PendingTransactionConfig, Provider, ProviderBuilder},
rpc::types::TransactionRequest,
signers::local::PrivateKeySigner,
transports::http::reqwest::Url,
};
use commands::{ContenderCli, ContenderSubcommand};
use contender_core::{
agent_controller::{AgentStore, SignerStore},
db::{DbOps, RunTx},
generator::{
types::{AnyProvider, FunctionCallDefinition, SpamRequest},
types::{AnyProvider, EthProvider, FunctionCallDefinition, SpamRequest},
RandSeed,
},
spammer::{BlockwiseSpammer, LogCallback, NilCallback, TimedSpammer},
Expand All @@ -32,26 +34,6 @@ static DB: LazyLock<SqliteDb> = std::sync::LazyLock::new(|| {
SqliteDb::from_file("contender.db").expect("failed to open contender.db")
});

fn get_signers_with_defaults(private_keys: Option<Vec<String>>) -> Vec<PrivateKeySigner> {
if private_keys.is_none() {
println!("No private keys provided. Using default private keys.");
}
let private_keys = private_keys.unwrap_or_default();
let private_keys = [
private_keys,
DEFAULT_PRV_KEYS
.into_iter()
.map(|s| s.to_owned())
.collect::<Vec<_>>(),
]
.concat();

private_keys
.into_iter()
.map(|k| PrivateKeySigner::from_str(&k).expect("Invalid private key"))
.collect::<Vec<PrivateKeySigner>>()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = ContenderCli::parse_args();
Expand Down Expand Up @@ -81,7 +63,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
&testconfig.setup.to_owned().unwrap_or(vec![]),
signers.as_slice(),
);
check_balances(&user_signers, min_balance, &rpc_client).await;
let broke_accounts = find_insufficient_balance_addrs(
&user_signers.iter().map(|s| s.address()).collect::<Vec<_>>(),
min_balance,
&rpc_client,
)
.await?;
if !broke_accounts.is_empty() {
panic!("Some accounts do not have sufficient balance");
}

let scenario = TestScenario::new(
testconfig.to_owned(),
Expand All @@ -90,6 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None,
RandSeed::new(),
&signers,
Default::default(),
);

scenario.deploy_contracts().await?;
Expand All @@ -114,30 +105,107 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rpc_client = ProviderBuilder::new()
.network::<AnyNetwork>()
.on_http(url.to_owned());
let eth_client = ProviderBuilder::new().on_http(url.to_owned());

let duration = duration.unwrap_or_default();
let min_balance = parse_ether(&min_balance)?;

let signers = get_signers_with_defaults(private_keys);
let user_signers = get_signers_with_defaults(private_keys);
let spam = testconfig
.spam
.as_ref()
.expect("No spam function calls found in testfile");

// distill all FunctionCallDefinitions from the spam requests
let mut fn_calls = vec![];
// distill all from_pool arguments from the spam requests
let mut from_pools = vec![];

for s in spam {
match s {
SpamRequest::Tx(fn_call) => {
fn_calls.push(fn_call.to_owned());
if let Some(from_pool) = &fn_call.from_pool {
from_pools.push(from_pool);
}
}
SpamRequest::Bundle(bundle) => {
fn_calls.extend(bundle.txs.iter().map(|s| s.to_owned()));
fn_calls.extend(bundle.txs.iter().map(|s| {
if let Some(from_pool) = &s.from_pool {
from_pools.push(from_pool);
}
s.to_owned()
}));
}
}
}

check_private_keys(&fn_calls, signers.as_slice());
check_balances(signers.as_slice(), min_balance, &rpc_client).await;
let mut agents = AgentStore::new();
let signers_per_block = txs_per_block.unwrap_or(spam.len()) / spam.len();

let mut all_signers = vec![];
all_signers.extend_from_slice(&user_signers);

for from_pool in from_pools {
if agents.has_agent(from_pool) {
continue;
}

let agent = SignerStore::new_random(signers_per_block, &rand_seed);
all_signers.extend_from_slice(&agent.signers);
agents.add_agent(from_pool, agent);
}

check_private_keys(&fn_calls, &all_signers);

let insufficient_balance_addrs = find_insufficient_balance_addrs(
&all_signers.iter().map(|s| s.address()).collect::<Vec<_>>(),
min_balance,
&rpc_client,
)
.await?;

let admin_signer = &user_signers[0];
let mut pending_fund_txs = vec![];
let admin_nonce = rpc_client
.get_transaction_count(admin_signer.address())
.await?;
for (idx, address) in insufficient_balance_addrs.iter().enumerate() {
if !is_balance_sufficient(&admin_signer.address(), min_balance, &rpc_client).await?
{
// panic early if admin account runs out of funds
return Err(format!(
"Admin account {} has insufficient balance to fund this account.",
admin_signer.address()
)
.into());
}

let balance = rpc_client.get_balance(*address).await?;
println!(
"Account {} has insufficient balance. (has {}, needed {})",
address,
format_ether(balance),
format_ether(min_balance)
);

let fund_amount = min_balance;
pending_fund_txs.push(
fund_account(
&admin_signer,
*address,
fund_amount,
&eth_client,
Some(admin_nonce + idx as u64),
)
.await?,
);
}

for tx in pending_fund_txs {
let pending = rpc_client.watch_pending_transaction(tx).await?;
println!("funding tx confirmed ({})", pending.await?);
}

if txs_per_block.is_some() && txs_per_second.is_some() {
panic!("Cannot set both --txs-per-block and --txs-per-second");
Expand All @@ -153,7 +221,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
url,
builder_url.map(|url| Url::parse(&url).expect("Invalid builder URL")),
rand_seed,
&signers,
&user_signers,
agents,
);
println!("Blockwise spamming with {} txs per block", txs_per_block);
match spam_callback_default(!disable_reports, Arc::new(rpc_client).into()).await {
Expand Down Expand Up @@ -185,7 +254,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
url,
None,
rand_seed,
&signers,
&user_signers,
agents,
);
let tps = txs_per_second.unwrap_or(10);
println!("Timed spamming with {} txs per second", tps);
Expand Down Expand Up @@ -236,12 +306,11 @@ enum SpamCallbackType {
/// Panics if any of the function calls' `from` addresses do not have a corresponding private key.
fn check_private_keys(fn_calls: &[FunctionCallDefinition], prv_keys: &[PrivateKeySigner]) {
for fn_call in fn_calls {
let address = fn_call
.from
.parse::<Address>()
.expect("invalid 'from' address");
if prv_keys.iter().all(|k| k.address() != address) {
panic!("No private key found for address: {}", address);
if let Some(from) = &fn_call.from {
let address = from.parse::<Address>().expect("invalid 'from' address");
if prv_keys.iter().all(|k| k.address() != address) {
panic!("No private key found for address: {}", address);
}
}
}
}
Expand All @@ -259,6 +328,63 @@ const DEFAULT_PRV_KEYS: [&str; 10] = [
"0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6",
];

fn get_signers_with_defaults(private_keys: Option<Vec<String>>) -> Vec<PrivateKeySigner> {
if private_keys.is_none() {
println!("No private keys provided. Using default private keys.");
}
let private_keys = private_keys.unwrap_or_default();
let private_keys = [
private_keys,
DEFAULT_PRV_KEYS
.into_iter()
.map(|s| s.to_owned())
.collect::<Vec<_>>(),
]
.concat();

private_keys
.into_iter()
.map(|k| PrivateKeySigner::from_str(&k).expect("Invalid private key"))
.collect::<Vec<PrivateKeySigner>>()
}

async fn fund_account(
admin_signer: &PrivateKeySigner,
recipient: Address,
amount: U256,
rpc_client: &EthProvider,
nonce: Option<u64>,
) -> Result<PendingTransactionConfig, Box<dyn std::error::Error>> {
println!(
"funding account {} with user account {}",
recipient,
admin_signer.address()
);

let gas_price = rpc_client.get_gas_price().await?;
let nonce = nonce.unwrap_or(
rpc_client
.get_transaction_count(admin_signer.address())
.await?,
);
let chain_id = rpc_client.get_chain_id().await?;
let tx_req = TransactionRequest {
from: Some(admin_signer.address()),
to: Some(alloy::primitives::TxKind::Call(recipient)),
value: Some(amount),
gas: Some(21000),
gas_price: Some(gas_price + 4_200_000_000),
nonce: Some(nonce),
chain_id: Some(chain_id),
..Default::default()
};
let eth_wallet = EthereumWallet::from(admin_signer.to_owned());
let tx = tx_req.build(&eth_wallet).await?;
let res = rpc_client.send_tx_envelope(tx).await?;

Ok(res.into_inner())
}

async fn spam_callback_default(
log_txs: bool,
rpc_client: Option<Arc<AnyProvider>>,
Expand All @@ -271,23 +397,28 @@ async fn spam_callback_default(
SpamCallbackType::Nil(NilCallback::new())
}

async fn check_balances(
prv_keys: &[PrivateKeySigner],
async fn is_balance_sufficient(
address: &Address,
min_balance: U256,
rpc_client: &AnyProvider,
) {
for prv_key in prv_keys {
let address = prv_key.address();
let balance = rpc_client.get_balance(address).await.unwrap();
if balance < min_balance {
panic!(
"Insufficient balance for address {}. Required={} Actual={}. If needed, use --min-balance to set a lower threshold.",
address,
format_ether(min_balance),
format_ether(balance)
);
) -> Result<bool, Box<dyn std::error::Error>> {
let balance = rpc_client.get_balance(*address).await?;
Ok(balance >= min_balance)
}

/// Returns an error if any of the private keys do not have sufficient balance.
async fn find_insufficient_balance_addrs(
addresses: &[Address],
min_balance: U256,
rpc_client: &AnyProvider,
) -> Result<Vec<Address>, Box<dyn std::error::Error>> {
let mut insufficient_balance_addrs = vec![];
for address in addresses {
if !is_balance_sufficient(address, min_balance, rpc_client).await? {
insufficient_balance_addrs.push(*address);
}
}
Ok(insufficient_balance_addrs)
}

fn write_run_txs<T: std::io::Write>(
Expand Down
Loading

0 comments on commit ae17ca1

Please sign in to comment.