From e327cdf2e5d3437fe476972da34a3a210b4ed25d Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Mon, 17 Jun 2024 16:49:49 -0400 Subject: [PATCH] Test framework for restartability Set up some Rust automation for tests that spin up a sequencer network and restart various combinations of nodes, checking that we recover liveness. Instantiate the framework with several combinations of nodes as outlined in https://www.notion.so/espressosys/Persistence-catchup-and-restartability-cf4ddb79df2e41a993e60e3beaa28992. As expected, the tests where we restart >f nodes do not pass yet, and are ignored. The others pass locally. There are many things left to test here, including: * Testing with a valid libp2p setup * Testing with _only_ libp2p and no CDN * Checking integrity of the DA/query service during and after restart But this is a pretty good starting point. I considered doing this with something more dynamic like Bash or Python scripting, leaning on our existing docker-compose or process-compose infrastructure to spin up a network. I avoided this for a few reasons: * process-compose is annoying to script and in particular has limited capabilities for shutting down and starting up processes * both docker-compose and process-compose make it hard to dynamically choose the network topology * once the basic test infrastructure is out of the way, Rust is far easier to work with for writing new checks and assertions. For example, checking for progress is way easier when we can plug directly into the HotShot event stream, vs subscribing to some stream via HTTP and parsing responses with jq --- Cargo.toml | 1 + sequencer/Cargo.toml | 5 +- sequencer/src/api.rs | 122 ++++--- sequencer/src/api/data_source.rs | 6 +- sequencer/src/api/fs.rs | 2 +- sequencer/src/api/sql.rs | 2 +- sequencer/src/context.rs | 4 + sequencer/src/main.rs | 37 +- sequencer/src/restart_tests.rs | 598 +++++++++++++++++++++++++++++++ 9 files changed, 698 insertions(+), 79 deletions(-) create mode 100644 sequencer/src/restart_tests.rs diff --git a/Cargo.toml b/Cargo.toml index ff58fdbc5b..8aa5081cc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ rand_chacha = "0.3" rand_distr = "0.4" reqwest = "0.12" serde = { version = "1.0.195", features = ["derive"] } +tempfile = "3.9" toml = "0.8" url = "2.3" vbs = "0.1" diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 77a03fe9d6..b6572fcc7a 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -testing = ["hotshot-testing"] +testing = ["hotshot-testing", "tempfile"] libp2p = [] [[bin]] @@ -22,7 +22,7 @@ hotshot-testing = { workspace = true } pretty_assertions = { workspace = true } rand = "0.8.5" reqwest = { workspace = true } -tempfile = "3.9.0" +tempfile = { workspace = true } [build-dependencies] anyhow = { workspace = true } @@ -80,6 +80,7 @@ itertools = { workspace = true } jf-crhf = { workspace = true } jf-merkle-tree = { workspace = true } jf-rescue = { workspace = true } +tempfile = { workspace = true, optional = true } jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-utils = { workspace = true } # TODO temporary: used only for test_rng() diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index abba9131fb..201ae907ae 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -757,8 +757,6 @@ pub mod test_helpers { #[cfg(test)] #[espresso_macros::generic_tests] mod api_tests { - use self::options::HotshotEvents; - use super::*; use crate::{ persistence::no_storage, @@ -903,66 +901,6 @@ mod api_tests { let storage = D::create_storage().await; catchup_test_helper(|opt| D::options(&storage, opt)).await } - - #[async_std::test] - pub(crate) async fn test_hotshot_event_streaming() { - use hotshot_events_service::events_source::BuilderEvent; - use HotshotEvents; - - setup_logging(); - - setup_backtrace(); - - let hotshot_event_streaming_port = - pick_unused_port().expect("No ports free for hotshot event streaming"); - let query_service_port = pick_unused_port().expect("No ports free for query service"); - - let url = format!("http://localhost:{hotshot_event_streaming_port}") - .parse() - .unwrap(); - - let hotshot_events = HotshotEvents { - events_service_port: hotshot_event_streaming_port, - }; - - let client: Client = Client::new(url); - - let options = Options::with_port(query_service_port).hotshot_events(hotshot_events); - - let anvil = Anvil::new().spawn(); - let l1 = anvil.endpoint().parse().unwrap(); - let _network = TestNetwork::new( - options, - [no_storage::Options; TestConfig::NUM_NODES], - l1, - None, - ) - .await; - - let mut subscribed_events = client - .socket("hotshot-events/events") - .subscribe::>() - .await - .unwrap(); - - let total_count = 5; - // wait for these events to receive on client 1 - let mut receive_count = 0; - loop { - let event = subscribed_events.next().await.unwrap(); - tracing::info!( - "Received event in hotshot event streaming Client 1: {:?}", - event - ); - receive_count += 1; - if receive_count > total_count { - tracing::info!("Client Received atleast desired events, exiting loop"); - break; - } - } - // Offset 1 is due to the startup event info - assert_eq!(receive_count, total_count + 1); - } } #[cfg(test)] @@ -1599,4 +1537,64 @@ mod test { .unwrap(); assert_eq!(chain, new_chain); } + + #[async_std::test] + async fn test_hotshot_event_streaming() { + use hotshot_events_service::events_source::BuilderEvent; + use options::HotshotEvents; + + setup_logging(); + + setup_backtrace(); + + let hotshot_event_streaming_port = + pick_unused_port().expect("No ports free for hotshot event streaming"); + let query_service_port = pick_unused_port().expect("No ports free for query service"); + + let url = format!("http://localhost:{hotshot_event_streaming_port}") + .parse() + .unwrap(); + + let hotshot_events = HotshotEvents { + events_service_port: hotshot_event_streaming_port, + }; + + let client: Client = Client::new(url); + + let options = Options::with_port(query_service_port).hotshot_events(hotshot_events); + + let anvil = Anvil::new().spawn(); + let l1 = anvil.endpoint().parse().unwrap(); + let _network = TestNetwork::new( + options, + [no_storage::Options; TestConfig::NUM_NODES], + l1, + None, + ) + .await; + + let mut subscribed_events = client + .socket("hotshot-events/events") + .subscribe::>() + .await + .unwrap(); + + let total_count = 5; + // wait for these events to receive on client 1 + let mut receive_count = 0; + loop { + let event = subscribed_events.next().await.unwrap(); + tracing::info!( + "Received event in hotshot event streaming Client 1: {:?}", + event + ); + receive_count += 1; + if receive_count > total_count { + tracing::info!("Client Received atleast desired events, exiting loop"); + break; + } + } + // Offset 1 is due to the startup event info + assert_eq!(receive_count, total_count + 1); + } } diff --git a/sequencer/src/api/data_source.rs b/sequencer/src/api/data_source.rs index 392f554171..11b74f1a8a 100644 --- a/sequencer/src/api/data_source.rs +++ b/sequencer/src/api/data_source.rs @@ -284,13 +284,13 @@ impl From> for PublicHotShotConfig { } } -#[cfg(test)] -pub(crate) mod testing { +#[cfg(any(test, feature = "testing"))] +pub mod testing { use super::super::Options; use super::*; #[async_trait] - pub(crate) trait TestableSequencerDataSource: SequencerDataSource { + pub trait TestableSequencerDataSource: SequencerDataSource { type Storage: Sync; async fn create_storage() -> Self::Storage; diff --git a/sequencer/src/api/fs.rs b/sequencer/src/api/fs.rs index ce184ba650..515343fa11 100644 --- a/sequencer/src/api/fs.rs +++ b/sequencer/src/api/fs.rs @@ -26,7 +26,7 @@ impl SequencerDataSource for DataSource { impl CatchupDataSource for DataSource {} -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { use super::*; use crate::api::{self, data_source::testing::TestableSequencerDataSource}; diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index 6b9afac5f7..aeadeab0a5 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -180,7 +180,7 @@ impl ChainConfigPersistence for DataSource { } } -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { use super::*; diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index ab239c751b..91098aade4 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -238,6 +238,10 @@ impl u64 { + self.node_state.node_id + } + pub fn node_state(&self) -> NodeState { self.node_state.clone() } diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 4324328bdc..6ed1a43e80 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -7,7 +7,8 @@ use futures::future::FutureExt; use hotshot_types::traits::metrics::NoMetrics; use sequencer::{ api::{self, data_source::DataSourceOptions}, - init_node, + context::SequencerContext, + init_node, network, options::{Modules, Options}, persistence, Genesis, L1Params, NetworkParams, }; @@ -24,12 +25,12 @@ async fn main() -> anyhow::Result<()> { tracing::warn!("modules: {:?}", modules); if let Some(storage) = modules.storage_fs.take() { - init_with_storage(modules, opt, storage, SEQUENCER_VERSION).await + run_with_storage(modules, opt, storage, SEQUENCER_VERSION).await } else if let Some(storage) = modules.storage_sql.take() { - init_with_storage(modules, opt, storage, SEQUENCER_VERSION).await + run_with_storage(modules, opt, storage, SEQUENCER_VERSION).await } else { // Persistence is required. If none is provided, just use the local file system. - init_with_storage( + run_with_storage( modules, opt, persistence::fs::Options::default(), @@ -39,12 +40,30 @@ async fn main() -> anyhow::Result<()> { } } -async fn init_with_storage( +async fn run_with_storage( modules: Modules, opt: Options, storage_opt: S, bind_version: Ver, ) -> anyhow::Result<()> +where + S: DataSourceOptions, +{ + let ctx = init_with_storage(modules, opt, storage_opt, bind_version).await?; + + // Start doing consensus. + ctx.start_consensus().await; + ctx.join().await; + + Ok(()) +} + +async fn init_with_storage( + modules: Modules, + opt: Options, + storage_opt: S, + bind_version: Ver, +) -> anyhow::Result> where S: DataSourceOptions, { @@ -153,13 +172,11 @@ where } }; - // Start doing consensus. - ctx.start_consensus().await; - ctx.join().await; - - Ok(()) + Ok(ctx) } +mod restart_tests; + #[cfg(test)] mod test { use super::*; diff --git a/sequencer/src/restart_tests.rs b/sequencer/src/restart_tests.rs new file mode 100644 index 0000000000..09c4d482e6 --- /dev/null +++ b/sequencer/src/restart_tests.rs @@ -0,0 +1,598 @@ +#![cfg(test)] + +use super::*; +use anyhow::bail; +use async_compatibility_layer::art::async_timeout; +use async_std::task::{sleep, spawn, JoinHandle}; +use cdn_broker::{reexports::crypto::signature::KeyPair, Broker, Config as BrokerConfig}; +use cdn_marshal::{Config as MarshalConfig, Marshal}; +use derivative::Derivative; +use es_version::SequencerVersion; +use ethers::utils::{Anvil, AnvilInstance}; +use futures::{ + future::{join_all, try_join_all, BoxFuture, FutureExt}, + stream::{BoxStream, StreamExt}, +}; +use hotshot_orchestrator::{ + config::{Libp2pConfig, NetworkConfig}, + run_orchestrator, +}; +use hotshot_types::{ + event::{Event, EventType}, + light_client::StateKeyPair, + traits::{node_implementation::ConsensusTime, signature_key::SignatureKey}, +}; +use itertools::Itertools; +use portpicker::pick_unused_port; +use sequencer::{ + api::{self, data_source::testing::TestableSequencerDataSource, options::Http}, + genesis::StakeTableConfig, + network::cdn::{TestingDef, WrappedSignatureKey}, + persistence::PersistenceOptions, + PrivKey, PubKey, SeqTypes, +}; +use std::{path::Path, time::Duration}; +use tempfile::TempDir; + +async fn test_restart_helper(network: (usize, usize), restart: (usize, usize)) { + setup_logging(); + setup_backtrace(); + + let mut network = TestNetwork::new(network.0, network.1).await; + + // Let the network get going. + network.check_progress().await; + // Restart some combination of nodes and ensure progress resumes. + network.restart(restart.0, restart.1).await; +} + +#[async_std::test] +async fn test_restart_1_da() { + test_restart_helper((2, 3), (1, 0)).await; +} + +#[async_std::test] +async fn test_restart_1_regular() { + test_restart_helper((2, 3), (0, 1)).await; +} + +#[async_std::test] +async fn test_restart_f() { + test_restart_helper((4, 6), (1, 2)).await; +} + +#[async_std::test] +async fn test_restart_f_minus_1() { + test_restart_helper((4, 6), (1, 1)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_f_plus_1() { + test_restart_helper((4, 6), (1, 3)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_2f() { + test_restart_helper((4, 6), (1, 5)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_2f_minus_1() { + test_restart_helper((4, 6), (1, 4)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_2f_plus_1() { + test_restart_helper((4, 6), (2, 5)).await; +} + +#[async_std::test] +async fn test_restart_all() { + test_restart_helper((2, 8), (2, 8)).await; +} + +#[derive(Debug)] +struct TestNode { + storage: S::Storage, + context: Option< + SequencerContext< + network::Production, + ::Persistence, + SequencerVersion, + >, + >, + modules: Modules, + opt: Options, + num_nodes: usize, +} + +impl TestNode { + fn keygen(i: u64) -> (PrivKey, StateKeyPair) { + ( + PubKey::generated_from_seed_indexed([0; 32], i).1, + StateKeyPair::generate_from_seed_indexed([0; 32], i), + ) + } + + #[tracing::instrument] + async fn new( + i: u64, + genesis_file: &Path, + sequencer_ports: &[u16], + orchestrator_port: u16, + cdn_port: u16, + l1_provider: &str, + is_da: bool, + ) -> Self { + let (staking_key, state_key) = Self::keygen(i); + tracing::info!( + staking_key = %PubKey::from_private(&staking_key), + state_key = %state_key.ver_key(), + "creating node", + ); + + let storage = S::create_storage().await; + let port = sequencer_ports[i as usize]; + + let mut modules = Modules { + http: Some(Http::with_port(port)), + status: Some(Default::default()), + catchup: Some(Default::default()), + ..Default::default() + }; + if is_da { + modules.query = Some(Default::default()); + modules.state = Some(Default::default()); + } + + let libp2p_port = pick_unused_port().unwrap(); + let mut opt = Options::parse_from([ + "sequencer", + "--private-staking-key", + &staking_key.to_string(), + "--private-state-key", + &state_key.sign_key_ref().to_string(), + "--genesis-file", + &genesis_file.display().to_string(), + "--orchestrator-url", + &format!("http://localhost:{orchestrator_port}"), + "--libp2p-bind-address", + &format!("127.0.0.1:{libp2p_port}"), + "--libp2p-advertise-address", + &format!("127.0.0.1:{libp2p_port}"), + "--cdn-endpoint", + &format!("127.0.0.1:{cdn_port}"), + "--state-peers", + &sequencer_ports + .iter() + .map(|port| format!("http://127.0.0.1:{port}")) + .join(","), + "--l1-provider-url", + l1_provider, + ]); + opt.is_da = is_da; + Self { + storage, + modules, + opt, + num_nodes: sequencer_ports.len(), + context: None, + } + } + + fn stop(&mut self) -> BoxFuture<()> { + async { + if let Some(mut context) = self.context.take() { + tracing::info!(node_id = context.node_id(), "stopping node"); + context.shut_down().await; + } + } + .boxed() + } + + fn start(&mut self) -> BoxFuture<()> + where + S::Storage: Send, + { + async { + tracing::info!("starting node"); + let ctx = init_with_storage( + self.modules.clone(), + self.opt.clone(), + S::persistence_options(&self.storage), + SEQUENCER_VERSION, + ) + .await + .unwrap(); + tracing::info!(node_id = ctx.node_id(), "starting consensus"); + ctx.start_consensus().await; + self.context = Some(ctx); + } + .boxed() + } + + async fn event_stream(&self) -> Option>> { + if let Some(ctx) = &self.context { + Some(ctx.event_stream().await.boxed()) + } else { + None + } + } + + fn check_progress_with_timeout(&self) -> BoxFuture> { + async { + let Some(context) = &self.context else { + tracing::info!("skipping progress check on stopped node"); + return Ok(()); + }; + let node_id = context.node_id(); + let next_view_timeout = { + context + .consensus() + .read() + .await + .hotshot + .config + .next_view_timeout + }; + // Give enough time for every node to propose, with every view timing out. This is + // conservative: of course if we actually make progress, not every view will time out, + // and we will take less than this amount of time. + let timeout = Duration::from_millis(next_view_timeout) * (self.num_nodes as u32); + match async_timeout(timeout, self.check_progress()).await { + Ok(res) => res, + Err(_) => bail!("timed out waiting for progress on node {node_id}"), + } + } + .boxed() + } + + async fn check_progress(&self) -> anyhow::Result<()> { + let Some(context) = &self.context else { + tracing::info!("skipping progress check on stopped node"); + return Ok(()); + }; + + let num_nodes = { + context + .consensus() + .read() + .await + .hotshot + .config + .num_nodes_with_stake + }; + let node_id = context.node_id(); + tracing::info!(node_id, num_nodes, "waiting for progress from node"); + + // Wait for a block proposed by this node. This proves that the node is tracking consensus + // (getting Decide events) and participating (able to propose). + let mut events = context.event_stream().await; + while let Some(event) = events.next().await { + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + for leaf in leaf_chain.iter() { + if leaf.leaf.view_number().u64() % (num_nodes.get() as u64) == node_id { + tracing::info!( + node_id, + height = leaf.leaf.height(), + "got leaf proposed by this node" + ); + return Ok(()); + } + tracing::info!( + node_id, + height = leaf.leaf.height(), + view = leaf.leaf.view_number().u64(), + "leaf not proposed by this node" + ); + } + } + + bail!("node {node_id} event stream ended unexpectedly"); + } +} + +#[derive(Derivative)] +#[derivative(Debug)] +struct TestNetwork { + da_nodes: Vec>, + regular_nodes: Vec>, + tmp: TempDir, + orchestrator_task: Option>, + broker_task: Option>, + marshal_task: Option>, + #[derivative(Debug = "ignore")] + _anvil: AnvilInstance, +} + +impl Drop for TestNetwork { + fn drop(&mut self) { + if let Some(task) = self.orchestrator_task.take() { + async_std::task::block_on(task.cancel()); + } + if let Some(task) = self.broker_task.take() { + async_std::task::block_on(task.cancel()); + } + if let Some(task) = self.marshal_task.take() { + async_std::task::block_on(task.cancel()); + } + } +} + +impl TestNetwork { + async fn new(da_nodes: usize, regular_nodes: usize) -> Self { + let tmp = TempDir::new().unwrap(); + let genesis_file = tmp.path().join("genesis.toml"); + let genesis = Genesis { + chain_config: Default::default(), + stake_table: StakeTableConfig { capacity: 10 }, + accounts: Default::default(), + l1_finalized: Default::default(), + header: Default::default(), + upgrades: Default::default(), + }; + genesis.to_file(&genesis_file).unwrap(); + let orchestrator_port = pick_unused_port().unwrap(); + let orchestrator_task = Some(start_orchestrator( + orchestrator_port, + da_nodes + regular_nodes, + )); + + let cdn_dir = tmp.path().join("cdn"); + let cdn_port = pick_unused_port().unwrap(); + let broker_task = Some(start_broker(&cdn_dir).await); + let marshal_task = Some(start_marshal(&cdn_dir, cdn_port).await); + + let anvil_port = pick_unused_port().unwrap(); + let anvil = Anvil::new().port(anvil_port).spawn(); + let anvil_endpoint = anvil.endpoint(); + + let ports = std::iter::from_fn(|| Some(pick_unused_port().unwrap())) + .take(da_nodes + regular_nodes) + .collect::>(); + let mut network = Self { + da_nodes: join_all((0..da_nodes).map(|i| { + TestNode::new( + i as u64, + &genesis_file, + &ports, + orchestrator_port, + cdn_port, + &anvil_endpoint, + true, + ) + })) + .await, + regular_nodes: join_all((0..regular_nodes).map(|i| { + TestNode::new( + (i + da_nodes) as u64, + &genesis_file, + &ports, + orchestrator_port, + cdn_port, + &anvil_endpoint, + false, + ) + })) + .await, + tmp, + orchestrator_task, + broker_task, + marshal_task, + _anvil: anvil, + }; + + join_all( + network + .da_nodes + .iter_mut() + .map(TestNode::start) + .chain(network.regular_nodes.iter_mut().map(TestNode::start)), + ) + .await; + + network + } + + async fn check_progress(&self) { + try_join_all( + self.da_nodes + .iter() + .map(TestNode::check_progress_with_timeout) + .chain( + self.regular_nodes + .iter() + .map(TestNode::check_progress_with_timeout), + ), + ) + .await + .unwrap(); + } + + async fn restart(&mut self, da_nodes: usize, regular_nodes: usize) { + tracing::info!(da_nodes, regular_nodes, "shutting down some nodes"); + join_all( + self.da_nodes[..da_nodes] + .iter_mut() + .map(TestNode::stop) + .chain( + self.regular_nodes[..regular_nodes] + .iter_mut() + .map(TestNode::stop), + ), + ) + .await; + + let quorum_threshold = 2 * self.num_nodes() / 3 + 1; + let da_threshold = 2 * self.da_nodes.len() / 3 + 1; + if self.num_nodes() - da_nodes - regular_nodes >= quorum_threshold + && self.da_nodes.len() - da_nodes >= da_threshold + { + // If we are shutting down less than f nodes, the remaining nodes should be able to make + // progress, and we will check that that is the case. + // + // Note that not every node will be able to commit leaves, because a node requires the + // cooperation of the node after it to commit its proposal. But, as long as we have shut + // down fewer than the fault tolerance, at least *some* node will have a correct node + // after it and will be able to commit. Thus, we just grab an event stream and look for + // any decide. + tracing::info!("waiting for remaining nodes to progress"); + let mut events = if da_nodes < self.da_nodes.len() { + self.da_nodes[da_nodes].event_stream().await.unwrap() + } else { + self.regular_nodes[regular_nodes] + .event_stream() + .await + .unwrap() + }; + // Wait for a few decides, the first couple may be from before the restart. + for _ in 0..self.num_nodes() { + let timeout = Duration::from_secs((2 * self.num_nodes()) as u64); + async_timeout(timeout, async { + loop { + let event = events + .next() + .await + .expect("event stream terminated unexpectedly"); + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + tracing::info!(?leaf_chain, "got decide, chain is progressing"); + break; + } + }) + .await + .expect("timed out waiting for progress with nodes down"); + } + } else { + // Make sure there is a brief delay before restarting the nodes; we need the OS to + // have time to clean up the ports they were using. + tracing::info!( + "shut down too many nodes to make progress; will continue after a brief delay" + ); + sleep(Duration::from_secs(2)).await; + } + + join_all( + self.da_nodes[..da_nodes] + .iter_mut() + .map(TestNode::start) + .chain( + self.regular_nodes[..regular_nodes] + .iter_mut() + .map(TestNode::start), + ), + ) + .await; + self.check_progress().await; + } + + fn num_nodes(&self) -> usize { + self.da_nodes.len() + self.regular_nodes.len() + } +} + +fn start_orchestrator(port: u16, num_nodes: usize) -> JoinHandle<()> { + // We don't run a builder in these tests, so use a very short timeout before nodes decide to + // build an empty block on their own. + let builder_timeout = Duration::from_millis(100); + // These tests frequently have nodes down and views failing, so we use a fairly short view + // timeout. + let view_timeout = Duration::from_secs(2); + + let mut config = NetworkConfig:: { + indexed_da: false, + libp2p_config: Some(Libp2pConfig { + bootstrap_nodes: vec![], + node_index: 0, + bootstrap_mesh_n_high: 4, + bootstrap_mesh_n_low: 4, + bootstrap_mesh_outbound_min: 4 / 2, + bootstrap_mesh_n: 4, + mesh_n_high: 4, + mesh_n_low: 4, + mesh_outbound_min: 4 / 2, + mesh_n: 4, + next_view_timeout: view_timeout.as_millis() as u64, + online_time: 10, + num_txn_per_round: 0, + server_mode: true, + builder_timeout, + }), + ..Default::default() + }; + config.config.num_nodes_with_stake = num_nodes.try_into().unwrap(); + config.config.da_staked_committee_size = num_nodes; + config.config.known_nodes_with_stake = vec![]; + config.config.known_da_nodes = vec![]; + config.config.known_nodes_without_stake = vec![]; + config.config.next_view_timeout = view_timeout.as_millis() as u64; + config.config.builder_timeout = builder_timeout; + + let bind = format!("http://0.0.0.0:{port}").parse().unwrap(); + spawn(async move { + match run_orchestrator(config, bind).await { + Ok(()) => tracing::warn!("orchestrator exited"), + Err(err) => tracing::error!(%err, "orchestrator failed"), + } + }) +} + +async fn start_broker(dir: &Path) -> JoinHandle<()> { + let (public_key, private_key) = PubKey::generated_from_seed_indexed([0; 32], 1337); + let public_port = pick_unused_port().expect("failed to find free port for broker"); + let private_port = pick_unused_port().expect("failed to find free port for broker"); + let broker_config: BrokerConfig> = BrokerConfig { + public_advertise_endpoint: format!("127.0.0.1:{}", public_port), + public_bind_endpoint: format!("127.0.0.1:{}", public_port), + private_advertise_endpoint: format!("127.0.0.1:{}", private_port), + private_bind_endpoint: format!("127.0.0.1:{}", private_port), + + metrics_bind_endpoint: None, + discovery_endpoint: dir.display().to_string(), + keypair: KeyPair { + public_key: WrappedSignatureKey(public_key), + private_key, + }, + + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), + }; + + spawn(async move { + match Broker::new(broker_config).await.unwrap().start().await { + Ok(()) => tracing::warn!("broker exited"), + Err(err) => tracing::error!("broker failed: {err:#}"), + } + }) +} + +async fn start_marshal(dir: &Path, port: u16) -> JoinHandle<()> { + let marshal_config = MarshalConfig { + bind_endpoint: format!("0.0.0.0:{port}"), + metrics_bind_endpoint: None, + discovery_endpoint: dir.display().to_string(), + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), + }; + + spawn(async move { + match Marshal::>::new(marshal_config) + .await + .unwrap() + .start() + .await + { + Ok(()) => tracing::warn!("marshal exited"), + Err(err) => tracing::error!("marshal failed: {err:#}"), + } + }) +}