diff --git a/.gitignore b/.gitignore index e45366f9..bda3ace6 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,8 @@ private .DS_Store homestar-guest-wasm/out homestar-wasm/out -**/fixtures/test_* +homestar-core/fixtures/test_* +homestar-runtime/fixtures/test_* .zed result-alejandra report.json diff --git a/Cargo.lock b/Cargo.lock index 89b2290e..4d0c296c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2391,6 +2391,7 @@ dependencies = [ "serde_with", "serial_test", "stream-cancel", + "strip-ansi-escapes", "strum 0.25.0", "sysinfo", "tabled", @@ -5640,6 +5641,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "strip-ansi-escapes" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ff8ef943b384c414f54aefa961dd2bd853add74ec75e7ac74cf91dba62bcfa" +dependencies = [ + "vte", +] + [[package]] name = "strsim" version = "0.10.0" @@ -6583,6 +6593,26 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "vte" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5022b5fbf9407086c180e9557be968742d839e68346af7792b8592489732197" +dependencies = [ + "utf8parse", + "vte_generate_state_changes", +] + +[[package]] +name = "vte_generate_state_changes" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d257817081c7dffcdbab24b9e62d2def62e2ff7d00b1c20062551e6cccc145ff" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "wait-timeout" version = "0.2.0" diff --git a/examples/websocket-relay/src/main.rs b/examples/websocket-relay/src/main.rs index 83830e81..e3909584 100644 --- a/examples/websocket-relay/src/main.rs +++ b/examples/websocket-relay/src/main.rs @@ -10,7 +10,7 @@ use tracing::info; fn main() -> Result<()> { let settings = Settings::load().expect("runtime settings to be loaded"); - let _guard = Logger::init(); + let _guard = Logger::init(settings.monitoring()); // Just for example purposes, we're going to start the ipfs // daemon. Typically, these would be started separately. diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 546e3b1f..8d124a69 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -181,6 +181,7 @@ rm_rf = "0.6" serial_test = { version = "2.0", default-features = false, features = [ "file_locks", ] } +strip-ansi-escapes = "0.2.0" tokio-tungstenite = { version = "0.20", default-features = false } wait-timeout = "0.2" diff --git a/homestar-runtime/config/settings.toml b/homestar-runtime/config/settings.toml index a08e9700..2be540be 100644 --- a/homestar-runtime/config/settings.toml +++ b/homestar-runtime/config/settings.toml @@ -1,5 +1,6 @@ [monitoring] process_collector_interval = 5000 metrics_port = 4000 +console_subscriber_port = 5555 [node] diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 24bb2a99..052812e5 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -93,10 +93,11 @@ async fn handle_swarm_event( event_handler: &mut EventHandler, ) { match event { + // N.B. Labels should be ordered with peer_id first for log testing SwarmEvent::Behaviour(ComposedEvent::Identify(identify_event)) => { match identify_event { identify::Event::Error { peer_id, error } => { - warn!(err=?error, peer_id=peer_id.to_string(), "error while attempting to identify the remote") + warn!(peer_id=peer_id.to_string(), err=?error, "error while attempting to identify the remote") } identify::Event::Sent { peer_id } => { debug!(peer_id = peer_id.to_string(), "sent identify info to peer") @@ -148,8 +149,8 @@ async fn handle_swarm_event( None, ) { warn!( - err = format!("{err}"), peer_id = peer_id.to_string(), + err = format!("{err}"), "failed to register with rendezvous peer" ) } @@ -190,7 +191,7 @@ async fn handle_swarm_event( .build(); // TODO: we might be dialing too many peers here. Add settings to configure when we stop dialing new peers if let Err(err) = event_handler.swarm.dial(opts) { - warn!(err=?err, peer_id=registration.record.peer_id().to_string(), "failed to dial peer discovered through rendezvous") + warn!(peer_id=registration.record.peer_id().to_string(), err=?err, "failed to dial peer discovered through rendezvous") } } } else { @@ -203,7 +204,7 @@ async fn handle_swarm_event( error, .. } => { - error!(err=?error, peer_id=rendezvous_node.to_string(), "failed to discover peers from rendezvous peer") + error!(peer_id=rendezvous_node.to_string(), err=?error, "failed to discover peers from rendezvous peer") } rendezvous::client::Event::Registered { rendezvous_node, @@ -219,7 +220,7 @@ async fn handle_swarm_event( error, .. } => { - error!(err=?error, peer_id=rendezvous_node.to_string(), "failed to register self with rendezvous peer") + error!(peer_id=rendezvous_node.to_string(), err=?error, "failed to register self with rendezvous peer") } rendezvous::client::Event::Expired { peer } => { // re-discover records from peer @@ -244,14 +245,14 @@ async fn handle_swarm_event( "served rendezvous discover request to peer" ), rendezvous::server::Event::DiscoverNotServed { enquirer, error } => { - warn!(err=?error, peer_id=enquirer.to_string(), "did not serve rendezvous discover request") + warn!(peer_id=enquirer.to_string(), err=?error, "did not serve rendezvous discover request") } rendezvous::server::Event::PeerNotRegistered { peer, namespace, error, } => { - warn!(err=?error, namespace=?namespace, peer_id=peer.to_string(), "did not register peer with rendezvous") + warn!(peer_id=peer.to_string(), err=?error, namespace=?namespace, "did not register peer with rendezvous") } _ => (), } @@ -556,12 +557,15 @@ async fn handle_swarm_event( SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { - debug!(endpoint=?endpoint, peer_id=peer_id.to_string(), "peer connection established"); + debug!(peer_id=peer_id.to_string(), endpoint=?endpoint, "peer connection established"); // add peer to connected peers list event_handler.connected_peers.insert(peer_id, endpoint); } SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { - info!("peer connection closed {peer_id}, cause: {cause:?}"); + debug!( + peer_id = peer_id.to_string(), + "peer connection closed, cause: {cause:?}" + ); event_handler.connected_peers.remove_entry(&peer_id); } SwarmEvent::OutgoingConnectionError { @@ -570,8 +574,8 @@ async fn handle_swarm_event( error, } => { error!( - err=?error, peer_id=peer_id.map(|p| p.to_string()).unwrap_or_default(), + err=?error, connection_id=?connection_id, "outgoing connection error" ) diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 03742988..4e5dd8f3 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -1,5 +1,6 @@ //! Logger initialization. +use crate::settings; use std::{io, path::PathBuf}; use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter}; @@ -19,9 +20,9 @@ impl Logger { /// write to [io::stdout]. /// /// [logfmt]: - pub fn init() -> WorkerGuard { + pub fn init(settings: &settings::Monitoring) -> WorkerGuard { let (writer, guard) = tracing_appender::non_blocking(io::stdout()); - init(writer, guard) + init(writer, guard, settings) } } @@ -30,14 +31,18 @@ impl FileLogger { /// write to file. /// /// [logfmt]: - pub fn init(dir: PathBuf) -> WorkerGuard { + pub fn init(dir: PathBuf, settings: &settings::Monitoring) -> WorkerGuard { let file_appender = tracing_appender::rolling::daily(dir, LOG_FILE); let (writer, guard) = tracing_appender::non_blocking(file_appender); - init(writer, guard) + init(writer, guard, settings) } } -fn init(writer: NonBlocking, guard: WorkerGuard) -> WorkerGuard { +fn init( + writer: NonBlocking, + guard: WorkerGuard, + #[allow(unused_variables)] settings: &settings::Monitoring, +) -> WorkerGuard { let format_layer = tracing_subscriber::fmt::layer() .event_format(tracing_logfmt::EventsFormatter::default()) .fmt_fields(tracing_logfmt::FieldsFormatter::default()) @@ -55,7 +60,12 @@ fn init(writer: NonBlocking, guard: WorkerGuard) -> WorkerGuard { .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) }); - #[cfg(all(feature = "console", tokio_unstable))] + #[cfg(all( + feature = "console", + not(test), + not(feature = "test-utils"), + tokio_unstable + ))] let filter = filter .add_directive("tokio=trace".parse().expect(DIRECTIVE_EXPECT)) .add_directive("runtime=trace".parse().expect(DIRECTIVE_EXPECT)); @@ -64,16 +74,27 @@ fn init(writer: NonBlocking, guard: WorkerGuard) -> WorkerGuard { .with(filter) .with(format_layer); - #[cfg(all(feature = "console", tokio_unstable))] + #[cfg(all( + feature = "console", + not(test), + not(feature = "test-utils"), + tokio_unstable + ))] { let console_layer = console_subscriber::ConsoleLayer::builder() .retention(std::time::Duration::from_secs(60)) + .server_addr(([127, 0, 0, 1], settings.console_subscriber_port)) .spawn(); registry.with(console_layer).init(); } - #[cfg(any(not(feature = "console"), not(tokio_unstable)))] + #[cfg(any( + not(feature = "console"), + test, + not(tokio_unstable), + feature = "test-utils", + ))] { registry.init(); } diff --git a/homestar-runtime/src/main.rs b/homestar-runtime/src/main.rs index 8026b7b7..4ba64d86 100644 --- a/homestar-runtime/src/main.rs +++ b/homestar-runtime/src/main.rs @@ -30,9 +30,9 @@ fn main() -> Result<()> { let _guard = if daemonize { daemon::start(daemon_dir.clone()) .expect("runner to be started as a daemon process"); - FileLogger::init(daemon_dir) + FileLogger::init(daemon_dir, settings.monitoring()) } else { - Logger::init() + Logger::init(settings.monitoring()) }; info!( diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index f9215f1c..ce81b10e 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -40,6 +40,8 @@ pub struct Monitoring { pub process_collector_interval: u64, /// Metrics port for prometheus scraping. pub metrics_port: u16, + /// Tokio console port. + pub console_subscriber_port: u16, } /// Server settings. @@ -159,6 +161,7 @@ impl Default for Monitoring { Self { metrics_port: 4000, process_collector_interval: 5000, + console_subscriber_port: 5555, } } } diff --git a/homestar-runtime/src/settings/pubkey_config.rs b/homestar-runtime/src/settings/pubkey_config.rs index dd1bdfea..171dcb87 100644 --- a/homestar-runtime/src/settings/pubkey_config.rs +++ b/homestar-runtime/src/settings/pubkey_config.rs @@ -64,14 +64,14 @@ impl PubkeyConfig { match key_type { KeyType::Ed25519 => { - info!("generating radom ed25519 key from seed"); + info!("generating random ed25519 key from seed"); identity::Keypair::ed25519_from_bytes(new_key).map_err(|e| { anyhow!("failed to generate ed25519 key from random: {:?}", e) }) } KeyType::Secp256k1 => { - info!("generating radom secp256k1 key from seed"); + info!("generating random secp256k1 key from seed"); let sk = secp256k1::SecretKey::try_from_bytes(&mut new_key).map_err(|e| { diff --git a/homestar-runtime/tests/cli.rs b/homestar-runtime/tests/cli.rs index 2fcc1b05..204b1feb 100644 --- a/homestar-runtime/tests/cli.rs +++ b/homestar-runtime/tests/cli.rs @@ -1,6 +1,6 @@ #[cfg(not(windows))] -use crate::utils::kill_homestar_process; -use crate::utils::{startup_ipfs, stop_all_bins, BIN_NAME}; +use crate::utils::kill_homestar_daemon; +use crate::utils::{kill_homestar, startup_ipfs, stop_all_bins, BIN_NAME}; use anyhow::Result; use assert_cmd::prelude::*; use once_cell::sync::Lazy; @@ -11,9 +11,7 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr, TcpStream}, path::PathBuf, process::{Command, Stdio}, - time::Duration, }; -use wait_timeout::ChildExt; static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); @@ -143,7 +141,7 @@ fn test_server_serial() -> Result<()> { .spawn() .unwrap(); - let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9991); + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9832); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -158,7 +156,7 @@ fn test_server_serial() -> Result<()> { .arg("--host") .arg("::1") .arg("-p") - .arg("9991") + .arg("9832") .assert() .success() .stdout(predicate::str::contains("::1")) @@ -169,7 +167,7 @@ fn test_server_serial() -> Result<()> { .arg("--host") .arg("::1") .arg("-p") - .arg("9999") + .arg("9830") .assert() .failure() .stderr( @@ -179,16 +177,7 @@ fn test_server_serial() -> Result<()> { let _ = Command::new(BIN.as_os_str()).arg("stop").output(); - if let Ok(None) = homestar_proc.try_wait() { - let _status_code = match homestar_proc.wait_timeout(Duration::from_secs(1)).unwrap() { - Some(status) => status.code(), - None => { - homestar_proc.kill().unwrap(); - homestar_proc.wait().unwrap().code() - } - }; - } - + let _ = kill_homestar(homestar_proc, None); let _ = stop_all_bins(); Ok(()) @@ -213,7 +202,7 @@ fn test_workflow_run_serial() -> Result<()> { .spawn() .unwrap(); - let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9888); + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 9840); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -226,7 +215,7 @@ fn test_workflow_run_serial() -> Result<()> { Command::new(BIN.as_os_str()) .arg("run") .arg("-p") - .arg("9888") + .arg("9840") .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .assert() @@ -244,7 +233,7 @@ fn test_workflow_run_serial() -> Result<()> { Command::new(BIN.as_os_str()) .arg("run") .arg("-p") - .arg("9888") + .arg("9840") .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .assert() @@ -260,16 +249,7 @@ fn test_workflow_run_serial() -> Result<()> { let _ = Command::new(BIN.as_os_str()).arg("stop").output(); - if let Ok(None) = homestar_proc.try_wait() { - let _status_code = match homestar_proc.wait_timeout(Duration::from_secs(1)).unwrap() { - Some(status) => status.code(), - None => { - homestar_proc.kill().unwrap(); - homestar_proc.wait().unwrap().code() - } - }; - } - + let _ = kill_homestar(homestar_proc, None); let _ = stop_all_bins(); Ok(()) @@ -294,7 +274,7 @@ fn test_daemon_serial() -> Result<()> { .assert() .success(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9987); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9831); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -308,14 +288,14 @@ fn test_daemon_serial() -> Result<()> { .arg("--host") .arg("127.0.0.1") .arg("-p") - .arg("9987") + .arg("9831") .assert() .success() .stdout(predicate::str::contains("127.0.0.1")) .stdout(predicate::str::contains("pong")); let _ = stop_all_bins(); - let _ = kill_homestar_process(); + let _ = kill_homestar_daemon(); Ok(()) } @@ -354,16 +334,7 @@ fn test_signal_kill_serial() -> Result<()> { .stdout(predicate::str::contains("pong")); let _ = Command::new(BIN.as_os_str()).arg("stop").output(); - - if let Ok(None) = homestar_proc.try_wait() { - let _status_code = match homestar_proc.wait_timeout(Duration::from_secs(1)).unwrap() { - Some(status) => status.code(), - None => { - homestar_proc.kill().unwrap(); - homestar_proc.wait().unwrap().code() - } - }; - } + let _ = kill_homestar(homestar_proc, None); Command::new(BIN.as_os_str()).arg("ping").assert().failure(); @@ -391,7 +362,7 @@ fn test_server_v4_serial() -> Result<()> { .spawn() .unwrap(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9999); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9830); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -406,7 +377,7 @@ fn test_server_v4_serial() -> Result<()> { .arg("--host") .arg("127.0.0.1") .arg("-p") - .arg("9999") + .arg("9830") .assert() .success() .stdout(predicate::str::contains("127.0.0.1")) @@ -414,16 +385,7 @@ fn test_server_v4_serial() -> Result<()> { let _ = Command::new(BIN.as_os_str()).arg("stop").output(); - if let Ok(None) = homestar_proc.try_wait() { - let _status_code = match homestar_proc.wait_timeout(Duration::from_secs(1)).unwrap() { - Some(status) => status.code(), - None => { - homestar_proc.kill().unwrap(); - homestar_proc.wait().unwrap().code() - } - }; - } - + let _ = kill_homestar(homestar_proc, None); let _ = stop_all_bins(); Ok(()) @@ -448,7 +410,7 @@ fn test_daemon_v4_serial() -> Result<()> { .assert() .success(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9999); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9830); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -462,14 +424,14 @@ fn test_daemon_v4_serial() -> Result<()> { .arg("--host") .arg("127.0.0.1") .arg("-p") - .arg("9999") + .arg("9830") .assert() .success() .stdout(predicate::str::contains("127.0.0.1")) .stdout(predicate::str::contains("pong")); let _ = stop_all_bins(); - let _ = kill_homestar_process(); + let _ = kill_homestar_daemon(); Ok(()) } diff --git a/homestar-runtime/tests/fixtures/test_mdns_connect1.toml b/homestar-runtime/tests/fixtures/test_mdns_connect1.toml new file mode 100644 index 00000000..b651ae83 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_mdns_connect1.toml @@ -0,0 +1,14 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4001 +console_subscriber_port = 5560 + +[node] + +[node.network] +rpc_port = 9800 +websocket_port = 8000 +listen_address = "/ip4/0.0.0.0/tcp/0" + +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_mdns_connect2.toml b/homestar-runtime/tests/fixtures/test_mdns_connect2.toml new file mode 100644 index 00000000..3dbdf1a0 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_mdns_connect2.toml @@ -0,0 +1,14 @@ +[monitoring] +process_collector_interval = 500 +metrics_port = 4002 +console_subscriber_port = 5561 + +[node] + +[node.network] +rpc_port = 9801 +websocket_port = 8001 +listen_address = "/ip4/0.0.0.0/tcp/0" + +[node.network.keypair_config] +existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" } diff --git a/homestar-runtime/tests/fixtures/metrics_node/config/settings.toml b/homestar-runtime/tests/fixtures/test_metrics.toml similarity index 66% rename from homestar-runtime/tests/fixtures/metrics_node/config/settings.toml rename to homestar-runtime/tests/fixtures/test_metrics.toml index c8f2f4b4..726d6c60 100644 --- a/homestar-runtime/tests/fixtures/metrics_node/config/settings.toml +++ b/homestar-runtime/tests/fixtures/test_metrics.toml @@ -1,12 +1,13 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4004 +metrics_port = 4020 +console_subscriber_port = 5570 [node] [node.network] -rpc_port = 3031 -websocket_port = 9090 +rpc_port = 9810 +websocket_port = 8010 [node.network.keypair_config] existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/network_node1/config/settings.toml b/homestar-runtime/tests/fixtures/test_network1.toml similarity index 78% rename from homestar-runtime/tests/fixtures/network_node1/config/settings.toml rename to homestar-runtime/tests/fixtures/test_network1.toml index 33090934..82a3acc2 100644 --- a/homestar-runtime/tests/fixtures/network_node1/config/settings.toml +++ b/homestar-runtime/tests/fixtures/test_network1.toml @@ -1,12 +1,13 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4002 +metrics_port = 4030 +console_subscriber_port = 5580 [node] [node.network] -rpc_port = 3032 -websocket_port = 9092 +rpc_port = 9820 +websocket_port = 8020 listen_address = "/ip4/127.0.0.1/tcp/7000" node_addresses = [ "/ip4/127.0.0.1/tcp/7001/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", diff --git a/homestar-runtime/tests/fixtures/network_node2/config/settings.toml b/homestar-runtime/tests/fixtures/test_network2.toml similarity index 78% rename from homestar-runtime/tests/fixtures/network_node2/config/settings.toml rename to homestar-runtime/tests/fixtures/test_network2.toml index b1cf1e0a..a24aa028 100644 --- a/homestar-runtime/tests/fixtures/network_node2/config/settings.toml +++ b/homestar-runtime/tests/fixtures/test_network2.toml @@ -1,12 +1,13 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4003 +metrics_port = 4031 +console_subscriber_port = 5581 [node] [node.network] -rpc_port = 3033 -websocket_port = 9093 +rpc_port = 9821 +websocket_port = 8021 listen_address = "/ip4/127.0.0.1/tcp/7001" node_addresses = [ "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", diff --git a/homestar-runtime/tests/fixtures/test_v4.toml b/homestar-runtime/tests/fixtures/test_v4.toml index f4536b6c..86d4a97c 100644 --- a/homestar-runtime/tests/fixtures/test_v4.toml +++ b/homestar-runtime/tests/fixtures/test_v4.toml @@ -1,10 +1,11 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4010 +metrics_port = 4040 +console_subscriber_port = 5590 [node] [node.network] events_buffer_len = 1000 -rpc_port = 9999 +rpc_port = 9830 rpc_host = "127.0.0.1" diff --git a/homestar-runtime/tests/fixtures/test_v4_alt.toml b/homestar-runtime/tests/fixtures/test_v4_alt.toml index f111adb9..d8088378 100644 --- a/homestar-runtime/tests/fixtures/test_v4_alt.toml +++ b/homestar-runtime/tests/fixtures/test_v4_alt.toml @@ -1,10 +1,11 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4007 +metrics_port = 4041 +console_subscriber_port = 5591 [node] [node.network] events_buffer_len = 1000 -rpc_port = 9987 +rpc_port = 9831 rpc_host = "127.0.0.1" diff --git a/homestar-runtime/tests/fixtures/test_v6.toml b/homestar-runtime/tests/fixtures/test_v6.toml index 938a30b2..07711b16 100644 --- a/homestar-runtime/tests/fixtures/test_v6.toml +++ b/homestar-runtime/tests/fixtures/test_v6.toml @@ -1,10 +1,12 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4006 +metrics_port = 4042 +console_subscriber_port = 5592 + [node] [node.network] events_buffer_len = 1000 -rpc_port = 9991 +rpc_port = 9832 rpc_host = "::1" diff --git a/homestar-runtime/tests/fixtures/test_workflow.toml b/homestar-runtime/tests/fixtures/test_workflow.toml index 6a820d4d..549f337a 100644 --- a/homestar-runtime/tests/fixtures/test_workflow.toml +++ b/homestar-runtime/tests/fixtures/test_workflow.toml @@ -1,9 +1,10 @@ [monitoring] process_collector_interval = 500 -metrics_port = 4012 +metrics_port = 4050 +console_subscriber_port = 5600 [node] [node.network] events_buffer_len = 1000 -rpc_port = 9888 +rpc_port = 9840 diff --git a/homestar-runtime/tests/main.rs b/homestar-runtime/tests/main.rs index c9a311d8..e957b31f 100644 --- a/homestar-runtime/tests/main.rs +++ b/homestar-runtime/tests/main.rs @@ -1,3 +1,4 @@ pub(crate) mod cli; pub(crate) mod metrics; +pub(crate) mod network; pub(crate) mod utils; diff --git a/homestar-runtime/tests/metrics.rs b/homestar-runtime/tests/metrics.rs index 9081f9de..3252996c 100644 --- a/homestar-runtime/tests/metrics.rs +++ b/homestar-runtime/tests/metrics.rs @@ -1,4 +1,4 @@ -use crate::utils::{stop_homestar, BIN_NAME}; +use crate::utils::{kill_homestar, stop_homestar, BIN_NAME}; use anyhow::Result; use once_cell::sync::Lazy; use reqwest::StatusCode; @@ -8,17 +8,15 @@ use std::{ net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}, path::PathBuf, process::{Command, Stdio}, - time::Duration, }; -use wait_timeout::ChildExt; static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); -const METRICS_URL: &str = "http://localhost:4004"; +const METRICS_URL: &str = "http://localhost:4020"; #[test] #[file_serial] fn test_metrics_serial() -> Result<()> { - fn sample_metrics() -> prometheus_parse::Value { + fn sample_metrics() -> Option { let body = retry( Exponential::from_millis(500).take(20), || match reqwest::blocking::get(METRICS_URL) { @@ -29,9 +27,10 @@ fn test_metrics_serial() -> Result<()> { Err(_) => OperationResult::Retry("Metrics server not available"), }, ) - .unwrap(); + .unwrap() + .expect("Metrics server failed to serve metrics"); - let lines: Vec<_> = body.unwrap().lines().map(|s| Ok(s.to_owned())).collect(); + let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect(); let metrics = prometheus_parse::Scrape::parse(lines.into_iter()).expect("Unable to parse metrics"); @@ -39,9 +38,7 @@ fn test_metrics_serial() -> Result<()> { .samples .iter() .find(|sample| sample.metric.as_str() == "system_used_memory_bytes") - .expect("Could not find system_used_memory_bytes metric") - .value - .to_owned() + .and_then(|sample| Some(sample.value.to_owned())) } let _ = stop_homestar(); @@ -49,14 +46,14 @@ fn test_metrics_serial() -> Result<()> { let mut homestar_proc = Command::new(BIN.as_os_str()) .arg("start") .arg("-c") - .arg("tests/fixtures/metrics_node/config/settings.toml") + .arg("tests/fixtures/test_metrics.toml") .arg("--db") .arg("homestar.db") .stdout(Stdio::piped()) .spawn() .unwrap(); - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4004); + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4020); let result = retry(Exponential::from_millis(1000).take(10), || { TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) }); @@ -66,10 +63,18 @@ fn test_metrics_serial() -> Result<()> { panic!("Homestar server/runtime failed to start in time"); } - let sample1 = sample_metrics(); + // Try metrics server until the target metric is available + let sample1 = retry(Exponential::from_millis(100).take(5), || { + if let Some(sample) = sample_metrics() { + OperationResult::Ok(sample) + } else { + OperationResult::Retry("Could not find system_used_memory_bytes metric") + } + }) + .unwrap(); let sample2 = retry(Exponential::from_millis(500).take(10), || { - let sample2 = sample_metrics(); + let sample2 = sample_metrics().unwrap(); if sample1 != sample2 { OperationResult::Ok(sample2) } else { @@ -84,16 +89,7 @@ fn test_metrics_serial() -> Result<()> { assert_ne!(sample1, sample2.unwrap()); - if let Ok(None) = homestar_proc.try_wait() { - let _status_code = match homestar_proc.wait_timeout(Duration::from_secs(1)).unwrap() { - Some(status) => status.code(), - None => { - homestar_proc.kill().unwrap(); - homestar_proc.wait().unwrap().code() - } - }; - } - + let _ = kill_homestar(homestar_proc, None); let _ = stop_homestar(); Ok(()) diff --git a/homestar-runtime/tests/network.rs b/homestar-runtime/tests/network.rs new file mode 100644 index 00000000..b1f1992b --- /dev/null +++ b/homestar-runtime/tests/network.rs @@ -0,0 +1,221 @@ +use crate::utils::{kill_homestar, retrieve_output, stop_homestar, BIN_NAME}; +use anyhow::Result; +use once_cell::sync::Lazy; +use predicates::prelude::*; +use serial_test::file_serial; +use std::{ + path::PathBuf, + process::{Command, Stdio}, + time::Duration, +}; + +#[allow(dead_code)] +static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); + +#[test] +#[file_serial] +fn test_libp2p_generates_peer_id_serial() -> Result<()> { + let _ = stop_homestar(); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let dead_proc = kill_homestar(homestar_proc, None); + let stdout = retrieve_output(dead_proc); + + assert!(predicate::str::contains("message=\"local peer ID generated\" peer_id=12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") + .eval(stdout.as_str())); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_listens_on_address_serial() -> Result<()> { + let _ = stop_homestar(); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let dead_proc = kill_homestar(homestar_proc, None); + let stdout = retrieve_output(dead_proc); + + assert!( + predicate::str::contains("local node is listening on /ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") + .eval(stdout.as_str()) + ); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_rpc_listens_on_address_serial() -> Result<()> { + let _ = stop_homestar(); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let dead_proc = kill_homestar(homestar_proc, None); + let stdout = retrieve_output(dead_proc); + + assert!(predicate::str::contains("RPC server listening on [::1]:9820").eval(stdout.as_str())); + + Ok(()) +} + +#[cfg(feature = "websocket-server")] +#[test] +#[file_serial] +fn test_websocket_listens_on_address_serial() -> Result<()> { + let _ = stop_homestar(); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let dead_proc = kill_homestar(homestar_proc, None); + let stdout = retrieve_output(dead_proc); + + assert!( + predicate::str::contains("websocket server listening on 127.0.0.1:8020") + .eval(stdout.as_str()) + ); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_connect_known_peers_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start two nodes configured to listen at 127.0.0.1 each with their own port. + // The nodes are configured to dial each other through the node_addresses config. + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_network2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Collect logs for five seconds then kill proceses. + let dead_proc1 = kill_homestar(homestar_proc1, Some(Duration::from_secs(5))); + let dead_proc2 = kill_homestar(homestar_proc2, Some(Duration::from_secs(5))); + + // Retrieve logs. + let stdout1 = retrieve_output(dead_proc1); + let stdout2 = retrieve_output(dead_proc2); + + // Check that node one connected to node two. + assert!(predicate::str::contains("message=\"peer connection established\" peer_id=16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc") + .eval(stdout1.as_str())); + + // Check that node two connected to node one. + assert!(predicate::str::contains("message=\"peer connection established\" peer_id=12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") + .eval(stdout2.as_str())); + + Ok(()) +} + +#[test] +#[file_serial] +fn test_libp2p_connect_after_mdns_discovery_serial() -> Result<()> { + let _ = stop_homestar(); + + // Start two nodes each configured to listen at 0.0.0.0 with no known peers. + // The nodes are configured with port 0 to allow the OS to selectn a port. + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_mdns_connect1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_mdns_connect2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + // Collect logs for seven seconds then kill processes. + let dead_proc1 = kill_homestar(homestar_proc1, Some(Duration::from_secs(7))); + let dead_proc2 = kill_homestar(homestar_proc2, Some(Duration::from_secs(7))); + + // Retrieve logs. + let stdout1 = retrieve_output(dead_proc1); + let stdout2 = retrieve_output(dead_proc2); + + // Check that node one connected to node two. + assert!(predicate::str::contains("message=\"peer connection established\" peer_id=16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc") + .eval(stdout1.as_str())); + + // Check that node two connected to node one. + assert!(predicate::str::contains("message=\"peer connection established\" peer_id=12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN") + .eval(stdout2.as_str())); + + Ok(()) +} diff --git a/homestar-runtime/tests/utils.rs b/homestar-runtime/tests/utils.rs index afedace7..76604a53 100644 --- a/homestar-runtime/tests/utils.rs +++ b/homestar-runtime/tests/utils.rs @@ -11,11 +11,14 @@ use retry::{delay::Fixed, retry}; use std::{ net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}, path::PathBuf, - process::{Command, Stdio}, + process::{Child, Command, Stdio}, + time::Duration, }; +use strip_ansi_escapes; #[cfg(not(windows))] use sysinfo::PidExt; use sysinfo::{ProcessExt, SystemExt}; +use wait_timeout::ChildExt; /// Binary name, which is different than the crate name. pub(crate) const BIN_NAME: &str = "homestar"; @@ -58,7 +61,7 @@ pub(crate) fn stop_homestar() -> Result<()> { Ok(()) } -/// Stop the IPFS daemon. +/// Stop the IPFS binary. pub(crate) fn stop_ipfs() -> Result<()> { let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(".ipfs"); Command::new(IPFS) @@ -80,9 +83,34 @@ pub(crate) fn stop_all_bins() -> Result<()> { Ok(()) } +/// Retrieve process output. +pub(crate) fn retrieve_output(proc: Child) -> String { + let output = proc.wait_with_output().expect("failed to wait on child"); + let plain_stdout_bytes = strip_ansi_escapes::strip(output.stdout); + String::from_utf8(plain_stdout_bytes).unwrap() +} + +/// Wait for process to exit or kill after timeout. +pub(crate) fn kill_homestar(mut homestar_proc: Child, timeout: Option) -> Child { + if let Ok(None) = homestar_proc.try_wait() { + let _status_code = match homestar_proc + .wait_timeout(timeout.unwrap_or(Duration::from_secs(1))) + .unwrap() + { + Some(status) => status.code(), + None => { + homestar_proc.kill().unwrap(); + homestar_proc.wait().unwrap().code() + } + }; + } + + homestar_proc +} + /// Kill the Homestar proc running as a daemon. #[cfg(not(windows))] -pub(crate) fn kill_homestar_process() -> Result<()> { +pub(crate) fn kill_homestar_daemon() -> Result<()> { let system = sysinfo::System::new_all(); let pid = system .processes_by_exact_name(BIN_NAME) @@ -111,7 +139,7 @@ pub(crate) fn kill_homestar_process() -> Result<()> { /// Kill the Homestar proc running as a daemon. #[allow(dead_code)] #[cfg(windows)] -pub(crate) fn kill_homestar_process() -> Result<()> { +pub(crate) fn kill_homestar_daemon() -> Result<()> { let system = sysinfo::System::new_all(); let pid = system .processes_by_exact_name(format!("{}.exe", BIN_NAME).as_str())