From db7c11bcaa9ee9a1eab1a982698a1a589779627b Mon Sep 17 00:00:00 2001 From: Ermine Jose Date: Thu, 30 Jan 2025 18:35:36 +0530 Subject: [PATCH] fix: add proper error handling to metric service support --- ant-node-manager/src/add_services/mod.rs | 5 +-- ant-node-manager/src/bin/cli/main.rs | 4 +- ant-node-manager/src/cmd/node.rs | 23 +++++----- ant-node-manager/src/error.rs | 2 + ant-node-manager/src/lib.rs | 4 +- ant-node-manager/src/local.rs | 28 ++++++------- ant-node-manager/src/rpc.rs | 9 ++-- ant-service-management/src/error.rs | 8 ++++ ant-service-management/src/metric.rs | 53 ++++++++++++++---------- 9 files changed, 79 insertions(+), 57 deletions(-) diff --git a/ant-node-manager/src/add_services/mod.rs b/ant-node-manager/src/add_services/mod.rs index 842040b49c..bbc562d31c 100644 --- a/ant-node-manager/src/add_services/mod.rs +++ b/ant-node-manager/src/add_services/mod.rs @@ -115,12 +115,9 @@ pub async fn add_node( }; let metrics_free_port = if let Some(port) = metrics_port { Some(port) - } else if options.enable_metrics_server { - Some(service_control.get_available_port()?) } else { - None + Some(service_control.get_available_port()?) }; - let rpc_socket_addr = if let Some(addr) = options.rpc_address { SocketAddr::new(IpAddr::V4(addr), rpc_free_port) } else { diff --git a/ant-node-manager/src/bin/cli/main.rs b/ant-node-manager/src/bin/cli/main.rs index b87e5429c5..984eba136c 100644 --- a/ant-node-manager/src/bin/cli/main.rs +++ b/ant-node-manager/src/bin/cli/main.rs @@ -699,7 +699,7 @@ pub enum LocalSubCmd { /// An interval applied between launching each node. /// /// Units are milliseconds. - #[clap(long, default_value_t = 250)] + #[clap(long, default_value_t = 220)] interval: u64, /// Specify the logging format. /// @@ -798,7 +798,7 @@ pub enum LocalSubCmd { /// An interval applied between launching each node. /// /// Units are milliseconds. - #[clap(long, default_value_t = 250)] + #[clap(long, default_value_t = 220)] interval: u64, /// Specify the logging format. /// diff --git a/ant-node-manager/src/cmd/node.rs b/ant-node-manager/src/cmd/node.rs index de6cdfdfa5..d1d112ae62 100644 --- a/ant-node-manager/src/cmd/node.rs +++ b/ant-node-manager/src/cmd/node.rs @@ -13,10 +13,7 @@ use crate::{ add_services::{ add_node, config::{AddNodeServiceOptions, PortRange}, - }, - config::{self, is_running_as_root}, - helpers::{download_and_extract_release, get_bin_version}, - print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel, + }, config::{self, is_running_as_root}, error::Error, helpers::{download_and_extract_release, get_bin_version}, print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel }; use ant_bootstrap::PeersArgs; use ant_evm::{EvmNetwork, RewardsAddress}; @@ -180,7 +177,9 @@ pub async fn balance( for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let metrics_port: u16 = node.metrics_port + .ok_or(Error::MetricPortEmpty)?; + let metric_client = MetricClient::new(metrics_port); let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! println!("{}: {}", service.service_data.service_name, 0,); } @@ -221,7 +220,9 @@ pub async fn remove( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?; + + let metric_client = MetricClient::new(metrics_port); let service = NodeService::new(node, Box::new(metric_client)); let mut service_manager = ServiceManager::new(service, Box::new(ServiceController {}), verbosity); @@ -308,7 +309,8 @@ pub async fn start( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?; + let metric_client = MetricClient::new(metrics_port); let service = NodeService::new(node, Box::new(metric_client)); // set dynamic startup delay if fixed_interval is not set let service = if fixed_interval.is_none() { @@ -402,7 +404,8 @@ pub async fn stop( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?; + let metric_client = MetricClient::new(metrics_port); let service = NodeService::new(node, Box::new(metric_client)); let mut service_manager = ServiceManager::new(service, Box::new(ServiceController {}), verbosity); @@ -514,8 +517,8 @@ pub async fn upgrade( target_version: target_version.clone(), }; let service_name = node.service_name.clone(); - - let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?; + let metric_client = MetricClient::new(metrics_port); let service = NodeService::new(node, Box::new(metric_client)); // set dynamic startup delay if fixed_interval is not set let service = if fixed_interval.is_none() { diff --git a/ant-node-manager/src/error.rs b/ant-node-manager/src/error.rs index efb34c9369..9a45a608d1 100644 --- a/ant-node-manager/src/error.rs +++ b/ant-node-manager/src/error.rs @@ -20,6 +20,8 @@ pub enum Error { PidNotFoundAfterStarting, #[error("The PID of the process was not set.")] PidNotSet, + #[error("The metric port of the node is empty")] + MetricPortEmpty, #[error(transparent)] SemverError(#[from] semver::Error), #[error("The service(s) is already running: {0:?}")] diff --git a/ant-node-manager/src/lib.rs b/ant-node-manager/src/lib.rs index 32ee0af188..c37f454244 100644 --- a/ant-node-manager/src/lib.rs +++ b/ant-node-manager/src/lib.rs @@ -550,8 +550,10 @@ pub async fn refresh_node_registry( // exists. // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! node.reward_balance = None; + let metrics_port = node.metrics_port + .ok_or(Error::MetricPortEmpty)?; - let metric_client = MetricClient::new(node.metrics_port.unwrap_or(0)); + let metric_client = MetricClient::new(metrics_port); let mut service = NodeService::new(node, Box::new(metric_client.clone())); if is_local_network { diff --git a/ant-node-manager/src/local.rs b/ant-node-manager/src/local.rs index 8924a7cca0..adf01a850d 100644 --- a/ant-node-manager/src/local.rs +++ b/ant-node-manager/src/local.rs @@ -270,21 +270,20 @@ pub async fn run_network( service_control.get_available_port()? }; let metrics_free_port = if let Some(port) = metrics_port { - Some(port) - } else if options.enable_metrics_server { - Some(service_control.get_available_port()?) - } else { - None + port + } else { + service_control.get_available_port()? }; + let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port); - let metric_client = MetricClient::new(metrics_free_port.unwrap()); + let metric_client = MetricClient::new(metrics_free_port); let number = (node_registry.nodes.len() as u16) + 1; let node = run_node( RunNodeOptions { first: true, - metrics_port: metrics_free_port, + metrics_port: Some(metrics_free_port), node_port, interval: options.interval, log_format: options.log_format, @@ -316,22 +315,21 @@ pub async fn run_network( service_control.get_available_port()? }; let metrics_free_port = if let Some(port) = metrics_port { - Some(port) - } else if options.enable_metrics_server { - Some(service_control.get_available_port()?) - } else { - None + port + } else { + service_control.get_available_port()? }; + let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port); - let metric_client = MetricClient::new(metrics_free_port.unwrap()); + let metric_client = MetricClient::new(metrics_free_port); let number = (node_registry.nodes.len() as u16) + 1; let node = run_node( RunNodeOptions { first: false, - metrics_port: metrics_free_port, + metrics_port: Some(metrics_free_port), node_port, interval: options.interval, log_format: options.log_format, @@ -474,7 +472,7 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec Net let network_info_dir_path = root_dir.join("network_info"); let connected_peers_path = network_info_dir_path.join(format!("connected_peers_{}", peer_id)); let listeners_path = network_info_dir_path.join(format!("listeners_{}", peer_id)); - println!("connected_peers_path: {:?}", connected_peers_path); let mut connected_peers = Vec::new(); if std::path::Path::new(&connected_peers_path).exists() { @@ -98,7 +97,7 @@ impl MetricClient { } } - pub async fn get_endpoint_metrics(&self, endpoint_name: &str) -> Result> { + pub async fn get_endpoint_metrics(&self, endpoint_name: &str) -> Result { debug!( "Attempting connection to collect {} metrics from {}...", endpoint_name, @@ -106,28 +105,29 @@ impl MetricClient { ); let body = reqwest::get(&format!("http://localhost:{}/{endpoint_name}", self.endpoint_port)) - .await? + .await.map_err(|_| Error::MetricServiceConnectionError(self.endpoint_port.clone()))? .text() - .await?; + .await.map_err(|_| Error::MetricServiceInfoResponseError)?; let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect(); let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?; Ok(all_metrics) } - pub fn get_node_info_from_metadata_extended(&self ,scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) { + pub fn get_node_info_from_metadata_extended(&self ,scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) -> Result<()>{ for sample in scrape.samples.iter() { for (key, value) in sample.labels.iter() { match key.as_str() { - "peer_id" => node_info.peer_id = value.parse().unwrap(), - "pid" => node_info.pid = value.parse().unwrap(), - "bin_version" => node_info.bin_version = value.parse().unwrap(), - "root_dir" => node_info.root_dir =value.parse().unwrap(), - "log_dir" => node_info.log_dir = value.parse().unwrap(), + "peer_id" => node_info.peer_id = value.parse()?, + "pid" => node_info.pid = value.parse()?, + "bin_version" => node_info.bin_version = value.to_string(), + "root_dir" => node_info.root_dir = PathBuf::from(value), + "log_dir" => node_info.log_dir = PathBuf::from(value), _ => {} } } } + Ok(()) } pub fn get_node_info_from_metrics(&self, scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) { @@ -159,12 +159,12 @@ impl MetricClient { #[async_trait] impl RpcActions for MetricClient { async fn node_info(&self) -> Result { - let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended"); + let scrape = self.get_endpoint_metrics("metadata_extended").await?; let mut node_info = NodeInfoMetrics::default(); - self.get_node_info_from_metadata_extended(&scrape, &mut node_info); - let scrape = self.get_endpoint_metrics("metrics").await.expect("Failed to get endpoint metrics"); + let _ = self.get_node_info_from_metadata_extended(&scrape, &mut node_info); + let scrape = self.get_endpoint_metrics("metrics").await?; self.get_node_info_from_metrics(&scrape, &mut node_info); - println!("node_info: {:?}", node_info); + Ok(NodeInfo{ peer_id: node_info.peer_id, pid: node_info.pid, @@ -177,16 +177,27 @@ impl RpcActions for MetricClient { } async fn network_info(&self) -> Result { - let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended"); + let scrape = self.get_endpoint_metrics("metadata_extended").await?; let mut node_info = NodeInfoMetrics::default(); - self.get_node_info_from_metadata_extended(&scrape, &mut node_info); + let _ = self.get_node_info_from_metadata_extended(&scrape, &mut node_info); let network_info_metrics = read_network_metrics_from_file(node_info.root_dir, node_info.peer_id.to_string()); - // let connected_peers = network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(); - // println!("network_info_metrics: {:?}", connected_peers); + + let connected_peers_ = network_info_metrics + .connected_peers + .into_iter() + .filter_map(|s| s.parse().ok()) // Ignores errors + .collect(); + + let listeners_ = network_info_metrics + .listeners + .into_iter() + .filter_map(|s| s.parse().ok()) // Ignores errors + .collect(); + Ok(NetworkInfo { - connected_peers: network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(), - listeners: network_info_metrics.listeners.into_iter().map(|s| s.parse().unwrap()).collect(), + connected_peers: connected_peers_, + listeners: listeners_ , }) }