diff --git a/Cargo.lock b/Cargo.lock index 65f46ad601..c3472da35e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1143,7 +1143,10 @@ dependencies = [ "libp2p", "libp2p-identity", "mockall 0.11.4", + "prometheus-parse", "prost 0.9.0", + "regex", + "reqwest 0.12.12", "semver 1.0.24", "serde", "serde_json", diff --git a/ant-networking/src/driver.rs b/ant-networking/src/driver.rs index 1f3938fb20..0b347cbd69 100644 --- a/ant-networking/src/driver.rs +++ b/ant-networking/src/driver.rs @@ -561,7 +561,7 @@ impl NetworkBuilder { let metadata_extended_sub_reg = metrics_registries .metadata_extended - .sub_registry_with_prefix("ant-networking"); + .sub_registry_with_prefix("ant_networking"); metadata_extended_sub_reg.register( "peer_id", @@ -599,18 +599,6 @@ impl NetworkBuilder { ); } - metadata_extended_sub_reg.register( - "uptime", - "id of the node process", - Info::new(vec![("pid".to_string(), "0".to_string())]), - ); - - metadata_extended_sub_reg.register( - "wallet balance", - "id of the node process", - Info::new(vec![("wallet_balance".to_string(), "0".to_string())]), - ); - run_metrics_server(metrics_registries, port); Some(metrics_recorder) } else { diff --git a/ant-node-manager/src/bin/cli/main.rs b/ant-node-manager/src/bin/cli/main.rs index f0be1a2bce..4848d30bb9 100644 --- a/ant-node-manager/src/bin/cli/main.rs +++ b/ant-node-manager/src/bin/cli/main.rs @@ -904,7 +904,7 @@ pub enum LocalSubCmd { /// An interval applied between launching each node. /// /// Units are milliseconds. - #[clap(long, default_value_t = 200)] + #[clap(long, default_value_t = 2000)] interval: u64, /// Specify the logging format. /// diff --git a/ant-node-manager/src/cmd/node.rs b/ant-node-manager/src/cmd/node.rs index 3812834811..a1b3163ad3 100644 --- a/ant-node-manager/src/cmd/node.rs +++ b/ant-node-manager/src/cmd/node.rs @@ -23,9 +23,7 @@ use ant_evm::{EvmNetwork, RewardsAddress}; use ant_logging::LogFormat; use ant_releases::{AntReleaseRepoActions, ReleaseType}; use ant_service_management::{ - control::{ServiceControl, ServiceController}, - rpc::RpcClient, - NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult, + control::{ServiceControl, ServiceController}, metric::MetricClient, NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult }; use color_eyre::{eyre::eyre, Help, Result}; use colored::Colorize; @@ -181,8 +179,8 @@ pub async fn balance( for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! println!("{}: {}", service.service_data.service_name, 0,); } @@ -223,8 +221,8 @@ pub async fn remove( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); let mut service_manager = ServiceManager::new(service, Box::new(ServiceController {}), verbosity); match service_manager.remove(keep_directories).await { @@ -310,9 +308,9 @@ pub async fn start( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); - let service = NodeService::new(node, Box::new(rpc_client)); + let service = NodeService::new(node, Box::new(metric_client)); // set dynamic startup delay if fixed_interval is not set let service = if fixed_interval.is_none() { @@ -406,8 +404,8 @@ pub async fn stop( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); let mut service_manager = ServiceManager::new(service, Box::new(ServiceController {}), verbosity); @@ -519,8 +517,8 @@ pub async fn upgrade( }; let service_name = node.service_name.clone(); - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); // set dynamic startup delay if fixed_interval is not set let service = if fixed_interval.is_none() { service.with_connection_timeout(Duration::from_secs(connection_timeout_s)) diff --git a/ant-node-manager/src/lib.rs b/ant-node-manager/src/lib.rs index e0d6d908d3..32ee0af188 100644 --- a/ant-node-manager/src/lib.rs +++ b/ant-node-manager/src/lib.rs @@ -39,9 +39,10 @@ impl From for VerbosityLevel { } use crate::error::{Error, Result}; +use ant_service_management::metric::MetricClient; use ant_service_management::rpc::RpcActions; use ant_service_management::{ - control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry, + control::ServiceControl, error::Error as ServiceError, NodeRegistry, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult, }; @@ -550,16 +551,15 @@ pub async fn refresh_node_registry( // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! node.reward_balance = None; - let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - rpc_client.set_max_attempts(1); - let mut service = NodeService::new(node, Box::new(rpc_client.clone())); + let metric_client = MetricClient::new(node.metrics_port.unwrap_or(0)); + let mut service = NodeService::new(node, Box::new(metric_client.clone())); if is_local_network { // For a local network, retrieving the process by its path does not work, because the // paths are not unique: they are all launched from the same binary. Instead we will // just determine whether the node is running by connecting to its RPC service. We // only need to distinguish between `RUNNING` and `STOPPED` for a local network. - match rpc_client.node_info().await { + match metric_client.node_info().await { Ok(info) => { let pid = info.pid; debug!( diff --git a/ant-node-manager/src/local.rs b/ant-node-manager/src/local.rs index a7ed6529cb..ec030f7351 100644 --- a/ant-node-manager/src/local.rs +++ b/ant-node-manager/src/local.rs @@ -14,9 +14,10 @@ use crate::helpers::{ use ant_bootstrap::PeersArgs; use ant_evm::{EvmNetwork, RewardsAddress}; use ant_logging::LogFormat; +use ant_service_management::metric::MetricClient; use ant_service_management::{ control::ServiceControl, - rpc::{RpcActions, RpcClient}, + rpc::RpcActions, NodeRegistry, NodeServiceData, ServiceStatus, }; use color_eyre::eyre::OptionExt; @@ -277,8 +278,7 @@ pub async fn run_network( }; let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port); - let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr); - + let metric_client = MetricClient::new(metrics_free_port.unwrap()); let number = (node_registry.nodes.len() as u16) + 1; let node = run_node( RunNodeOptions { @@ -294,9 +294,10 @@ pub async fn run_network( version: get_bin_version(&launcher.get_antnode_path())?, }, &launcher, - &rpc_client, + &metric_client, ) .await?; + node_registry.nodes.push(node.clone()); let bootstrap_peers = node .listen_addr @@ -323,8 +324,7 @@ pub async fn run_network( }; let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port); - let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr); - + let metric_client = MetricClient::new(metrics_free_port.unwrap()); let number = (node_registry.nodes.len() as u16) + 1; let node = run_node( RunNodeOptions { @@ -340,7 +340,7 @@ pub async fn run_network( version: get_bin_version(&launcher.get_antnode_path())?, }, &launcher, - &rpc_client, + &metric_client, ) .await?; node_registry.nodes.push(node); @@ -382,7 +382,7 @@ pub struct RunNodeOptions { pub async fn run_node( run_options: RunNodeOptions, launcher: &dyn Launcher, - rpc_client: &dyn RpcActions, + metric_client: &dyn RpcActions, ) -> Result { info!("Launching node {}...", run_options.number); println!("Launching node {}...", run_options.number); @@ -397,9 +397,10 @@ pub async fn run_node( )?; launcher.wait(run_options.interval); - let node_info = rpc_client.node_info().await?; + let node_info = metric_client.node_info().await?; let peer_id = node_info.peer_id; - let network_info = rpc_client.network_info().await?; + let network_info = metric_client.network_info().await?; + println!("network_info from the antctl is {:?}", network_info); let connected_peers = Some(network_info.connected_peers); let listen_addrs = network_info .listeners @@ -472,8 +473,8 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec> = state.connected_peers.iter().map(|p| p.to_bytes()).collect(); let listeners = state.listeners.iter().map(|m| m.to_string()).collect(); - let resp = Response::new(NetworkInfoResponse { connected_peers, listeners, diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 81395821f4..635fb307af 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -26,6 +26,7 @@ use ant_protocol::{ storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; +use ant_service_management::metric::{write_network_metrics_to_file,read_network_metrics_from_file, NetworkInfoMetrics}; use bytes::Bytes; use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; @@ -211,8 +212,25 @@ impl NodeBuilder { rewards_address: self.evm_address, }; - // Run the node - node.run(swarm_driver, network_event_receiver); + // Run the node + let mut connected_peers = vec![]; + let mut listeners = vec![]; + let runing_node_metrics = running_node.clone(); + let _return_value = tokio::spawn(async move { + sleep(Duration::from_millis(200)).await; + let state = runing_node_metrics.get_swarm_local_state().await.expect("Failed to get swarm local state"); + connected_peers = state.connected_peers.iter().map(|p| p.to_string()).collect(); + listeners = state.listeners.iter().map(|m| m.to_string()).collect(); + let network_info = NetworkInfoMetrics::new(connected_peers, listeners); + + write_network_metrics_to_file( + runing_node_metrics.root_dir_path.clone(), + network_info, + runing_node_metrics.network.peer_id().to_string() + ); + }); + + node.run(swarm_driver, network_event_receiver); Ok(running_node) } diff --git a/ant-service-management/Cargo.toml b/ant-service-management/Cargo.toml index acc2fe7d36..457c2fa06d 100644 --- a/ant-service-management/Cargo.toml +++ b/ant-service-management/Cargo.toml @@ -29,6 +29,11 @@ tokio = { version = "1.32.0", features = ["time"] } tonic = { version = "0.6.2" } tracing = { version = "~0.1.26" } tracing-core = "0.1.30" +prometheus-parse = "0.2.5" +regex = "1.11.0" +reqwest = { version = "0.12.2", default-features = false, features = [ + "rustls-tls-manual-roots", +] } [build-dependencies] # watch out updating this, protoc compiler needs to be installed on all build systems diff --git a/ant-service-management/src/lib.rs b/ant-service-management/src/lib.rs index 1e4c970808..2ac7fa1067 100644 --- a/ant-service-management/src/lib.rs +++ b/ant-service-management/src/lib.rs @@ -13,6 +13,7 @@ pub mod error; pub mod faucet; pub mod node; pub mod rpc; +pub mod metric; #[macro_use] extern crate tracing; diff --git a/ant-service-management/src/metric.rs b/ant-service-management/src/metric.rs new file mode 100644 index 0000000000..5eedb621f2 --- /dev/null +++ b/ant-service-management/src/metric.rs @@ -0,0 +1,217 @@ + +use async_trait::async_trait; +use libp2p::PeerId; +use crate::rpc::{RpcActions, NodeInfo, NetworkInfo, RecordAddress}; +use tokio::time::Duration; +use std::path::PathBuf; +use std::io::{Write, BufRead}; +use crate::error::{Error, Result}; + +#[derive(Debug, Clone)] +pub struct NodeInfoMetrics { + peer_id: PeerId, + pid: u32, + bin_version: String, + root_dir: PathBuf, + log_dir: PathBuf, + uptime: Duration, + wallet_balance: u64, +} + +impl Default for NodeInfoMetrics { + fn default() -> Self { + Self { + peer_id: PeerId::random(), //initialization + pid: 0, + bin_version: String::from("unknown"), + root_dir: PathBuf::from("/"), + log_dir: PathBuf::from("/"), + uptime: Duration::new(0, 0), + wallet_balance: 0, + } + } +} +pub struct NetworkInfoMetrics { + pub connected_peers: Vec, + pub listeners: Vec, +} + +impl NetworkInfoMetrics { + pub fn new(connected_peers: Vec, listeners: Vec) -> Self { + Self { + connected_peers, + listeners, + } + } +} + +pub fn write_network_metrics_to_file(root_dir: PathBuf, network_info: NetworkInfoMetrics, peer_id: String) { + let network_info_dir_path = root_dir.join("network_info"); + + std::fs::create_dir_all(&network_info_dir_path).unwrap(); + + let connected_peers_path = network_info_dir_path.join(format!("connected_peers_{}", peer_id)); + let mut file = std::fs::File::create(&connected_peers_path).unwrap(); + for peer in network_info.connected_peers.iter() { + writeln!(file, "{}", peer).unwrap(); + } + + let listeners_path = network_info_dir_path.join(format!("listeners_{}", peer_id)); + let mut file = std::fs::File::create(&listeners_path).unwrap(); + for listeners in network_info.listeners.iter() { + writeln!(file, "{}", listeners).unwrap(); + } +} + +pub fn read_network_metrics_from_file(root_dir: PathBuf, peer_id: String) -> NetworkInfoMetrics { + let network_info_dir_path = root_dir.join("network_info"); + let connected_peers_path = network_info_dir_path.join(format!("connected_peers_{}", peer_id)); + let listeners_path = network_info_dir_path.join(format!("listeners_{}", peer_id)); + println!("connected_peers_path: {:?}", connected_peers_path); + + let mut connected_peers = Vec::new(); + if std::path::Path::new(&connected_peers_path).exists() { + match std::fs::read_to_string(&connected_peers_path) { + Ok(contents) => connected_peers = contents.lines().map(|s| s.to_string()).collect(), + Err(e) => eprintln!("Failed to read the connected peers file: {}", e), + } + } + + let mut listeners = Vec::new(); + if std::path::Path::new(&listeners_path).exists() { + match std::fs::read_to_string(&listeners_path) { + Ok(contents) => listeners = contents.lines().map(|s| s.to_string()).collect(), + Err(e) => eprintln!("Failed to read the listeners file: {}", e), + } + } + NetworkInfoMetrics::new(connected_peers, listeners) +} + +#[derive(Debug, Clone)] +pub struct MetricClient { + endpoint_port: String, +} + +impl MetricClient { + pub fn new(endpoint_port: u16) -> Self { + Self { + endpoint_port: endpoint_port.to_string(), + } + } + + pub async fn get_endpoint_metrics(&self, endpoint_name: &str) -> Result> { + debug!( + "Attempting connection to collect {} metrics from {}...", + endpoint_name, + self.endpoint_port + ); + + let body = reqwest::get(&format!("http://localhost:{}/{endpoint_name}", self.endpoint_port)) + .await? + .text() + .await?; + let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect(); + let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?; + + Ok(all_metrics) + } + + pub fn get_node_info_from_metadata_extended(&self ,scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) { + for sample in scrape.samples.iter() { + for (key, value) in sample.labels.iter() { + match key.as_str() { + "peer_id" => node_info.peer_id = value.parse().unwrap(), + "pid" => node_info.pid = value.parse().unwrap(), + "bin_version" => node_info.bin_version = value.parse().unwrap(), + "root_dir" => node_info.root_dir =value.parse().unwrap(), + "log_dir" => node_info.log_dir = value.parse().unwrap(), + _ => {} + } + } + } + } + + pub fn get_node_info_from_metrics(&self, scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) { + for sample in scrape.samples.iter() { + if sample.metric == "ant_node_current_reward_wallet_balance" { + // Attos + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + node_info.wallet_balance = val as u64; + } + _ => {} + } + } else if sample.metric == "ant_node_uptime" { + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + node_info.uptime = Duration::new(val as u64, 0); + } + _ => {} + } + } + } + } +} + +#[async_trait] +impl RpcActions for MetricClient { + async fn node_info(&self) -> Result { + let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended"); + let mut node_info = NodeInfoMetrics::default(); + self.get_node_info_from_metadata_extended(&scrape, &mut node_info); + let scrape = self.get_endpoint_metrics("metrics").await.expect("Failed to get endpoint metrics"); + self.get_node_info_from_metrics(&scrape, &mut node_info); + println!("node_info: {:?}", node_info); + Ok(NodeInfo{ + peer_id: node_info.peer_id, + pid: node_info.pid, + version: node_info.bin_version, + data_path: node_info.root_dir, + log_path: node_info.log_dir, + uptime: node_info.uptime, + wallet_balance: node_info.wallet_balance, + }) + } + + async fn network_info(&self) -> Result { + let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended"); + let mut node_info = NodeInfoMetrics::default(); + self.get_node_info_from_metadata_extended(&scrape, &mut node_info); + + let network_info_metrics = read_network_metrics_from_file(node_info.root_dir, node_info.peer_id.to_string()); + // let connected_peers = network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(); + // println!("network_info_metrics: {:?}", connected_peers); + Ok(NetworkInfo { + connected_peers: network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(), + listeners: network_info_metrics.listeners.into_iter().map(|s| s.parse().unwrap()).collect(), + }) + } + + async fn record_addresses(&self) -> Result> { + Ok(vec![]) + } + + async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()> { + Ok(()) + } + + async fn node_stop(&self, delay_millis: u64) -> Result<()> { + Ok(()) + } + + async fn is_node_connected_to_network(&self, timeout: Duration) -> Result<()> { + Ok(()) + } + + async fn update_log_level(&self, log_levels: String) -> Result<()> { + Ok(()) + } + + async fn node_update(&self, delay_millis: u64) -> Result<()> { + Ok(()) + } +} \ No newline at end of file