diff --git a/Cargo.toml b/Cargo.toml index 15e2d4f65b..b453830eb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,6 +111,7 @@ rand_chacha = "0.3" rand_distr = "0.4" reqwest = "0.12" serde = { version = "1.0.195", features = ["derive"] } +tempfile = "3.9" toml = "0.8" url = "2.3" vbs = "0.1" diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 8c8c5fe582..71d602f800 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -testing = ["hotshot-testing"] +testing = ["hotshot-testing", "tempfile"] libp2p = [] [[bin]] @@ -22,7 +22,7 @@ hotshot-testing = { workspace = true } pretty_assertions = { workspace = true } rand = "0.8.5" reqwest = { workspace = true } -tempfile = "3.9.0" +tempfile = { workspace = true } [build-dependencies] anyhow = { workspace = true } @@ -80,6 +80,7 @@ itertools = { workspace = true } jf-crhf = { workspace = true } jf-merkle-tree = { workspace = true } jf-rescue = { workspace = true } +tempfile = { workspace = true, optional = true } jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-utils = { workspace = true } # TODO temporary: used only for test_rng() diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 5ac1925787..d5427e372e 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -681,8 +681,6 @@ pub mod test_helpers { #[cfg(test)] #[espresso_macros::generic_tests] mod api_tests { - use self::options::HotshotEvents; - use super::*; use crate::{ persistence::no_storage, @@ -827,66 +825,6 @@ mod api_tests { let storage = D::create_storage().await; catchup_test_helper(|opt| D::options(&storage, opt)).await } - - #[async_std::test] - pub(crate) async fn test_hotshot_event_streaming() { - use hotshot_events_service::events_source::BuilderEvent; - use HotshotEvents; - - setup_logging(); - - setup_backtrace(); - - let hotshot_event_streaming_port = - pick_unused_port().expect("No ports free for hotshot event streaming"); - let query_service_port = pick_unused_port().expect("No ports free for query service"); - - let url = format!("http://localhost:{hotshot_event_streaming_port}") - .parse() - .unwrap(); - - let hotshot_events = HotshotEvents { - events_service_port: hotshot_event_streaming_port, - }; - - let client: Client = Client::new(url); - - let options = Options::with_port(query_service_port).hotshot_events(hotshot_events); - - let anvil = Anvil::new().spawn(); - let l1 = anvil.endpoint().parse().unwrap(); - let _network = TestNetwork::new( - options, - [no_storage::Options; TestConfig::NUM_NODES], - l1, - None, - ) - .await; - - let mut subscribed_events = client - .socket("hotshot-events/events") - .subscribe::>() - .await - .unwrap(); - - let total_count = 5; - // wait for these events to receive on client 1 - let mut receive_count = 0; - loop { - let event = subscribed_events.next().await.unwrap(); - tracing::info!( - "Received event in hotshot event streaming Client 1: {:?}", - event - ); - receive_count += 1; - if receive_count > total_count { - tracing::info!("Client Received atleast desired events, exiting loop"); - break; - } - } - // Offset 1 is due to the startup event info - assert_eq!(receive_count, total_count + 1); - } } #[cfg(test)] @@ -1281,4 +1219,64 @@ mod test { .unwrap(); assert_eq!(chain, new_chain); } + + #[async_std::test] + async fn test_hotshot_event_streaming() { + use hotshot_events_service::events_source::BuilderEvent; + use options::HotshotEvents; + + setup_logging(); + + setup_backtrace(); + + let hotshot_event_streaming_port = + pick_unused_port().expect("No ports free for hotshot event streaming"); + let query_service_port = pick_unused_port().expect("No ports free for query service"); + + let url = format!("http://localhost:{hotshot_event_streaming_port}") + .parse() + .unwrap(); + + let hotshot_events = HotshotEvents { + events_service_port: hotshot_event_streaming_port, + }; + + let client: Client = Client::new(url); + + let options = Options::with_port(query_service_port).hotshot_events(hotshot_events); + + let anvil = Anvil::new().spawn(); + let l1 = anvil.endpoint().parse().unwrap(); + let _network = TestNetwork::new( + options, + [no_storage::Options; TestConfig::NUM_NODES], + l1, + None, + ) + .await; + + let mut subscribed_events = client + .socket("hotshot-events/events") + .subscribe::>() + .await + .unwrap(); + + let total_count = 5; + // wait for these events to receive on client 1 + let mut receive_count = 0; + loop { + let event = subscribed_events.next().await.unwrap(); + tracing::info!( + "Received event in hotshot event streaming Client 1: {:?}", + event + ); + receive_count += 1; + if receive_count > total_count { + tracing::info!("Client Received atleast desired events, exiting loop"); + break; + } + } + // Offset 1 is due to the startup event info + assert_eq!(receive_count, total_count + 1); + } } diff --git a/sequencer/src/api/data_source.rs b/sequencer/src/api/data_source.rs index 82e1dbc06f..6b9c100210 100644 --- a/sequencer/src/api/data_source.rs +++ b/sequencer/src/api/data_source.rs @@ -261,13 +261,13 @@ impl From> for PublicHotShotConfig { } } -#[cfg(test)] -pub(crate) mod testing { +#[cfg(any(test, feature = "testing"))] +pub mod testing { use super::super::Options; use super::*; #[async_trait] - pub(crate) trait TestableSequencerDataSource: SequencerDataSource { + pub trait TestableSequencerDataSource: SequencerDataSource { type Storage: Sync; async fn create_storage() -> Self::Storage; diff --git a/sequencer/src/api/fs.rs b/sequencer/src/api/fs.rs index ce184ba650..515343fa11 100644 --- a/sequencer/src/api/fs.rs +++ b/sequencer/src/api/fs.rs @@ -26,7 +26,7 @@ impl SequencerDataSource for DataSource { impl CatchupDataSource for DataSource {} -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { use super::*; use crate::api::{self, data_source::testing::TestableSequencerDataSource}; diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index 4ea3dc5cb0..2c53d65ae9 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -112,7 +112,7 @@ impl CatchupDataSource for DataSource { } } -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { use super::*; use crate::api::{self, data_source::testing::TestableSequencerDataSource}; diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index ab239c751b..91098aade4 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -238,6 +238,10 @@ impl u64 { + self.node_state.node_id + } + pub fn node_state(&self) -> NodeState { self.node_state.clone() } diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 62468b3799..536bcd2ef7 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -7,7 +7,8 @@ use futures::future::FutureExt; use hotshot_types::traits::metrics::NoMetrics; use sequencer::{ api::{self, data_source::DataSourceOptions}, - init_node, + context::SequencerContext, + init_node, network, options::{Modules, Options}, persistence, Genesis, L1Params, NetworkParams, }; @@ -24,12 +25,12 @@ async fn main() -> anyhow::Result<()> { tracing::warn!("modules: {:?}", modules); if let Some(storage) = modules.storage_fs.take() { - init_with_storage(modules, opt, storage, SEQUENCER_VERSION).await + run_with_storage(modules, opt, storage, SEQUENCER_VERSION).await } else if let Some(storage) = modules.storage_sql.take() { - init_with_storage(modules, opt, storage, SEQUENCER_VERSION).await + run_with_storage(modules, opt, storage, SEQUENCER_VERSION).await } else { // Persistence is required. If none is provided, just use the local file system. - init_with_storage( + run_with_storage( modules, opt, persistence::fs::Options::default(), @@ -39,12 +40,30 @@ async fn main() -> anyhow::Result<()> { } } -async fn init_with_storage( +async fn run_with_storage( modules: Modules, opt: Options, storage_opt: S, bind_version: Ver, ) -> anyhow::Result<()> +where + S: DataSourceOptions, +{ + let ctx = init_with_storage(modules, opt, storage_opt, bind_version).await?; + + // Start doing consensus. + ctx.start_consensus().await; + ctx.join().await; + + Ok(()) +} + +async fn init_with_storage( + modules: Modules, + opt: Options, + storage_opt: S, + bind_version: Ver, +) -> anyhow::Result> where S: DataSourceOptions, { @@ -152,13 +171,11 @@ where } }; - // Start doing consensus. - ctx.start_consensus().await; - ctx.join().await; - - Ok(()) + Ok(ctx) } +mod restart_tests; + #[cfg(test)] mod test { use super::*; diff --git a/sequencer/src/restart_tests.rs b/sequencer/src/restart_tests.rs new file mode 100644 index 0000000000..409d840bfa --- /dev/null +++ b/sequencer/src/restart_tests.rs @@ -0,0 +1,596 @@ +#![cfg(test)] + +use super::*; +use anyhow::bail; +use async_compatibility_layer::art::async_timeout; +use async_std::task::{sleep, spawn, JoinHandle}; +use cdn_broker::{reexports::crypto::signature::KeyPair, Broker, Config as BrokerConfig}; +use cdn_marshal::{Config as MarshalConfig, Marshal}; +use derivative::Derivative; +use es_version::SequencerVersion; +use ethers::utils::{Anvil, AnvilInstance}; +use futures::{ + future::{join_all, try_join_all, BoxFuture, FutureExt}, + stream::{BoxStream, StreamExt}, +}; +use hotshot_orchestrator::{ + config::{Libp2pConfig, NetworkConfig}, + run_orchestrator, +}; +use hotshot_types::{ + event::{Event, EventType}, + light_client::StateKeyPair, + traits::{node_implementation::ConsensusTime, signature_key::SignatureKey}, +}; +use itertools::Itertools; +use portpicker::pick_unused_port; +use sequencer::{ + api::{self, data_source::testing::TestableSequencerDataSource, options::Http}, + genesis::StakeTableConfig, + network::cdn::{TestingDef, WrappedSignatureKey}, + persistence::PersistenceOptions, + PrivKey, PubKey, SeqTypes, +}; +use std::{path::Path, time::Duration}; +use tempfile::TempDir; + +async fn test_restart_helper(network: (usize, usize), restart: (usize, usize)) { + setup_logging(); + setup_backtrace(); + + let mut network = TestNetwork::new(network.0, network.1).await; + + // Let the network get going. + network.check_progress().await; + // Restart some combination of nodes and ensure progress resumes. + network.restart(restart.0, restart.1).await; +} + +#[async_std::test] +async fn test_restart_1_da() { + test_restart_helper((2, 3), (1, 0)).await; +} + +#[async_std::test] +async fn test_restart_1_regular() { + test_restart_helper((2, 3), (0, 1)).await; +} + +#[async_std::test] +async fn test_restart_f() { + test_restart_helper((4, 6), (1, 2)).await; +} + +#[async_std::test] +async fn test_restart_f_minus_1() { + test_restart_helper((4, 6), (1, 1)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_f_plus_1() { + test_restart_helper((4, 6), (1, 3)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_2f() { + test_restart_helper((4, 6), (1, 5)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_2f_minus_1() { + test_restart_helper((4, 6), (1, 4)).await; +} + +#[ignore] +#[async_std::test] +async fn test_restart_2f_plus_1() { + test_restart_helper((4, 6), (2, 5)).await; +} + +#[async_std::test] +async fn test_restart_all() { + test_restart_helper((2, 8), (2, 8)).await; +} + +#[derive(Debug)] +struct TestNode { + storage: S::Storage, + context: Option< + SequencerContext< + network::Production, + ::Persistence, + SequencerVersion, + >, + >, + modules: Modules, + opt: Options, + num_nodes: usize, +} + +impl TestNode { + fn keygen(i: u64) -> (PrivKey, StateKeyPair) { + ( + PubKey::generated_from_seed_indexed([0; 32], i).1, + StateKeyPair::generate_from_seed_indexed([0; 32], i), + ) + } + + #[tracing::instrument] + async fn new( + i: u64, + genesis_file: &Path, + sequencer_ports: &[u16], + orchestrator_port: u16, + cdn_port: u16, + l1_provider: &str, + is_da: bool, + ) -> Self { + let (staking_key, state_key) = Self::keygen(i); + tracing::info!( + staking_key = %PubKey::from_private(&staking_key), + state_key = %state_key.ver_key(), + "creating node", + ); + + let storage = S::create_storage().await; + let port = sequencer_ports[i as usize]; + + let mut modules = Modules { + http: Some(Http::with_port(port)), + status: Some(Default::default()), + catchup: Some(Default::default()), + ..Default::default() + }; + if is_da { + modules.query = Some(Default::default()); + modules.state = Some(Default::default()); + } + + let libp2p_port = pick_unused_port().unwrap(); + let mut opt = Options::parse_from([ + "sequencer", + "--private-staking-key", + &staking_key.to_string(), + "--private-state-key", + &state_key.sign_key_ref().to_string(), + "--genesis-file", + &genesis_file.display().to_string(), + "--orchestrator-url", + &format!("http://localhost:{orchestrator_port}"), + "--libp2p-bind-address", + &format!("127.0.0.1:{libp2p_port}"), + "--libp2p-advertise-address", + &format!("127.0.0.1:{libp2p_port}"), + "--cdn-endpoint", + &format!("127.0.0.1:{cdn_port}"), + "--state-peers", + &sequencer_ports + .iter() + .map(|port| format!("http://127.0.0.1:{port}")) + .join(","), + "--l1-provider-url", + l1_provider, + ]); + opt.is_da = is_da; + Self { + storage, + modules, + opt, + num_nodes: sequencer_ports.len(), + context: None, + } + } + + fn stop(&mut self) -> BoxFuture<()> { + async { + if let Some(mut context) = self.context.take() { + tracing::info!(node_id = context.node_id(), "stopping node"); + context.shut_down().await; + } + } + .boxed() + } + + fn start(&mut self) -> BoxFuture<()> + where + S::Storage: Send, + { + async { + tracing::info!("starting node"); + let ctx = init_with_storage( + self.modules.clone(), + self.opt.clone(), + S::persistence_options(&self.storage), + SEQUENCER_VERSION, + ) + .await + .unwrap(); + tracing::info!(node_id = ctx.node_id(), "starting consensus"); + ctx.start_consensus().await; + self.context = Some(ctx); + } + .boxed() + } + + async fn event_stream(&self) -> Option>> { + if let Some(ctx) = &self.context { + Some(ctx.event_stream().await.boxed()) + } else { + None + } + } + + fn check_progress_with_timeout(&self) -> BoxFuture> { + async { + let Some(context) = &self.context else { + tracing::info!("skipping progress check on stopped node"); + return Ok(()); + }; + let node_id = context.node_id(); + let next_view_timeout = { + context + .consensus() + .read() + .await + .hotshot + .config + .next_view_timeout + }; + // Give enough time for every node to propose, with every view timing out. This is + // conservative: of course if we actually make progress, not every view will time out, + // and we will take less than this amount of time. + let timeout = Duration::from_millis(next_view_timeout) * (self.num_nodes as u32); + match async_timeout(timeout, self.check_progress()).await { + Ok(res) => res, + Err(_) => bail!("timed out waiting for progress on node {node_id}"), + } + } + .boxed() + } + + async fn check_progress(&self) -> anyhow::Result<()> { + let Some(context) = &self.context else { + tracing::info!("skipping progress check on stopped node"); + return Ok(()); + }; + + let num_nodes = { + context + .consensus() + .read() + .await + .hotshot + .config + .num_nodes_with_stake + }; + let node_id = context.node_id(); + tracing::info!(node_id, num_nodes, "waiting for progress from node"); + + // Wait for a block proposed by this node. This proves that the node is tracking consensus + // (getting Decide events) and participating (able to propose). + let mut events = context.event_stream().await; + while let Some(event) = events.next().await { + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + for leaf in leaf_chain.iter() { + if leaf.leaf.view_number().u64() % (num_nodes.get() as u64) == node_id { + tracing::info!( + node_id, + height = leaf.leaf.height(), + "got leaf proposed by this node" + ); + return Ok(()); + } + tracing::info!( + node_id, + height = leaf.leaf.height(), + view = leaf.leaf.view_number().u64(), + "leaf not proposed by this node" + ); + } + } + + bail!("node {node_id} event stream ended unexpectedly"); + } +} + +#[derive(Derivative)] +#[derivative(Debug)] +struct TestNetwork { + da_nodes: Vec>, + regular_nodes: Vec>, + tmp: TempDir, + orchestrator_task: Option>, + broker_task: Option>, + marshal_task: Option>, + #[derivative(Debug = "ignore")] + _anvil: AnvilInstance, +} + +impl Drop for TestNetwork { + fn drop(&mut self) { + if let Some(task) = self.orchestrator_task.take() { + async_std::task::block_on(task.cancel()); + } + if let Some(task) = self.broker_task.take() { + async_std::task::block_on(task.cancel()); + } + if let Some(task) = self.marshal_task.take() { + async_std::task::block_on(task.cancel()); + } + } +} + +impl TestNetwork { + async fn new(da_nodes: usize, regular_nodes: usize) -> Self { + let tmp = TempDir::new().unwrap(); + let genesis_file = tmp.path().join("genesis.toml"); + let genesis = Genesis { + chain_config: Default::default(), + stake_table: StakeTableConfig { capacity: 10 }, + accounts: Default::default(), + l1_finalized: Default::default(), + header: Default::default(), + network: Default::default(), + }; + genesis.to_file(&genesis_file).unwrap(); + let orchestrator_port = pick_unused_port().unwrap(); + let orchestrator_task = Some(start_orchestrator( + orchestrator_port, + da_nodes + regular_nodes, + )); + + let cdn_dir = tmp.path().join("cdn"); + let cdn_port = pick_unused_port().unwrap(); + let broker_task = Some(start_broker(&cdn_dir).await); + let marshal_task = Some(start_marshal(&cdn_dir, cdn_port).await); + + let anvil_port = pick_unused_port().unwrap(); + let anvil = Anvil::new().port(anvil_port).spawn(); + let anvil_endpoint = anvil.endpoint(); + + let ports = std::iter::from_fn(|| Some(pick_unused_port().unwrap())) + .take(da_nodes + regular_nodes) + .collect::>(); + let mut network = Self { + da_nodes: join_all((0..da_nodes).map(|i| { + TestNode::new( + i as u64, + &genesis_file, + &ports, + orchestrator_port, + cdn_port, + &anvil_endpoint, + true, + ) + })) + .await, + regular_nodes: join_all((0..regular_nodes).map(|i| { + TestNode::new( + (i + da_nodes) as u64, + &genesis_file, + &ports, + orchestrator_port, + cdn_port, + &anvil_endpoint, + false, + ) + })) + .await, + tmp, + orchestrator_task, + broker_task, + marshal_task, + _anvil: anvil, + }; + + join_all( + network + .da_nodes + .iter_mut() + .map(TestNode::start) + .chain(network.regular_nodes.iter_mut().map(TestNode::start)), + ) + .await; + + network + } + + async fn check_progress(&self) { + try_join_all( + self.da_nodes + .iter() + .map(TestNode::check_progress_with_timeout) + .chain( + self.regular_nodes + .iter() + .map(TestNode::check_progress_with_timeout), + ), + ) + .await + .unwrap(); + } + + async fn restart(&mut self, da_nodes: usize, regular_nodes: usize) { + tracing::info!(da_nodes, regular_nodes, "shutting down some nodes"); + join_all( + self.da_nodes[..da_nodes] + .iter_mut() + .map(TestNode::stop) + .chain( + self.regular_nodes[..regular_nodes] + .iter_mut() + .map(TestNode::stop), + ), + ) + .await; + + let quorum_threshold = 2 * self.num_nodes() / 3 + 1; + let da_threshold = 2 * self.da_nodes.len() / 3 + 1; + if self.num_nodes() - da_nodes - regular_nodes >= quorum_threshold + && self.da_nodes.len() - da_nodes >= da_threshold + { + // If we are shutting down less than f nodes, the remaining nodes should be able to make + // progress, and we will check that that is the case. + // + // Note that not every node will be able to commit leaves, because a node requires the + // cooperation of the node after it to commit its proposal. But, as long as we have shut + // down fewer than the fault tolerance, at least *some* node will have a correct node + // after it and will be able to commit. Thus, we just grab an event stream and look for + // any decide. + tracing::info!("waiting for remaining nodes to progress"); + let mut events = if da_nodes < self.da_nodes.len() { + self.da_nodes[da_nodes].event_stream().await.unwrap() + } else { + self.regular_nodes[regular_nodes] + .event_stream() + .await + .unwrap() + }; + // Wait for a few decides, the first couple may be from before the restart. + for _ in 0..self.num_nodes() { + let timeout = Duration::from_secs((2 * self.num_nodes()) as u64); + async_timeout(timeout, async { + loop { + let event = events + .next() + .await + .expect("event stream terminated unexpectedly"); + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + tracing::info!(?leaf_chain, "got decide, chain is progressing"); + break; + } + }) + .await + .expect("timed out waiting for progress with nodes down"); + } + } else { + // Make sure there is a brief delay before restarting the nodes; we need the OS to + // have time to clean up the ports they were using. + tracing::info!( + "shut down too many nodes to make progress; will continue after a brief delay" + ); + sleep(Duration::from_secs(2)).await; + } + + join_all( + self.da_nodes[..da_nodes] + .iter_mut() + .map(TestNode::start) + .chain( + self.regular_nodes[..regular_nodes] + .iter_mut() + .map(TestNode::start), + ), + ) + .await; + self.check_progress().await; + } + + fn num_nodes(&self) -> usize { + self.da_nodes.len() + self.regular_nodes.len() + } +} + +fn start_orchestrator(port: u16, num_nodes: usize) -> JoinHandle<()> { + // We don't run a builder in these tests, so use a very short timeout before nodes decide to + // build an empty block on their own. + let builder_timeout = Duration::from_millis(100); + // These tests frequently have nodes down and views failing, so we use a fairly short view + // timeout. + let view_timeout = Duration::from_secs(2); + + let mut config = NetworkConfig:: { + indexed_da: false, + libp2p_config: Some(Libp2pConfig { + bootstrap_nodes: vec![], + node_index: 0, + bootstrap_mesh_n_high: 4, + bootstrap_mesh_n_low: 4, + bootstrap_mesh_outbound_min: 4 / 2, + bootstrap_mesh_n: 4, + mesh_n_high: 4, + mesh_n_low: 4, + mesh_outbound_min: 4 / 2, + mesh_n: 4, + next_view_timeout: view_timeout.as_millis() as u64, + online_time: 10, + num_txn_per_round: 0, + server_mode: true, + builder_timeout, + }), + ..Default::default() + }; + config.config.num_nodes_with_stake = num_nodes.try_into().unwrap(); + config.config.da_staked_committee_size = num_nodes; + config.config.known_nodes_with_stake = vec![]; + config.config.known_da_nodes = vec![]; + config.config.known_nodes_without_stake = vec![]; + config.config.next_view_timeout = view_timeout.as_millis() as u64; + config.config.builder_timeout = builder_timeout; + + let bind = format!("http://0.0.0.0:{port}").parse().unwrap(); + spawn(async move { + match run_orchestrator(config, bind).await { + Ok(()) => tracing::warn!("orchestrator exited"), + Err(err) => tracing::error!(%err, "orchestrator failed"), + } + }) +} + +async fn start_broker(dir: &Path) -> JoinHandle<()> { + let (public_key, private_key) = PubKey::generated_from_seed_indexed([0; 32], 1337); + let public_port = pick_unused_port().expect("failed to find free port for broker"); + let private_port = pick_unused_port().expect("failed to find free port for broker"); + let broker_config: BrokerConfig> = BrokerConfig { + public_advertise_endpoint: format!("127.0.0.1:{}", public_port), + public_bind_endpoint: format!("127.0.0.1:{}", public_port), + private_advertise_endpoint: format!("127.0.0.1:{}", private_port), + private_bind_endpoint: format!("127.0.0.1:{}", private_port), + + metrics_bind_endpoint: None, + discovery_endpoint: dir.display().to_string(), + keypair: KeyPair { + public_key: WrappedSignatureKey(public_key), + private_key, + }, + + ca_cert_path: None, + ca_key_path: None, + }; + + spawn(async move { + match Broker::new(broker_config).await.unwrap().start().await { + Ok(()) => tracing::warn!("broker exited"), + Err(err) => tracing::error!("broker failed: {err:#}"), + } + }) +} + +async fn start_marshal(dir: &Path, port: u16) -> JoinHandle<()> { + let marshal_config = MarshalConfig { + bind_endpoint: format!("0.0.0.0:{port}"), + metrics_bind_endpoint: None, + discovery_endpoint: dir.display().to_string(), + ca_cert_path: None, + ca_key_path: None, + }; + + spawn(async move { + match Marshal::>::new(marshal_config) + .await + .unwrap() + .start() + .await + { + Ok(()) => tracing::warn!("marshal exited"), + Err(err) => tracing::error!("marshal failed: {err:#}"), + } + }) +}