Skip to content

Commit

Permalink
fix: add proper error handling to metric service support
Browse files Browse the repository at this point in the history
  • Loading branch information
ermineJose committed Jan 30, 2025
1 parent 35433fc commit cb8c46f
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 57 deletions.
5 changes: 1 addition & 4 deletions ant-node-manager/src/add_services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,9 @@ pub async fn add_node(
};
let metrics_free_port = if let Some(port) = metrics_port {
Some(port)
} else if options.enable_metrics_server {
Some(service_control.get_available_port()?)
} else {
None
Some(service_control.get_available_port()?)
};

let rpc_socket_addr = if let Some(addr) = options.rpc_address {
SocketAddr::new(IpAddr::V4(addr), rpc_free_port)
} else {
Expand Down
4 changes: 2 additions & 2 deletions ant-node-manager/src/bin/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 250)]
#[clap(long, default_value_t = 220)]
interval: u64,
/// Specify the logging format.
///
Expand Down Expand Up @@ -798,7 +798,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 250)]
#[clap(long, default_value_t = 220)]
interval: u64,
/// Specify the logging format.
///
Expand Down
23 changes: 13 additions & 10 deletions ant-node-manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use crate::{
add_services::{
add_node,
config::{AddNodeServiceOptions, PortRange},
},
config::{self, is_running_as_root},
helpers::{download_and_extract_release, get_bin_version},
print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
}, config::{self, is_running_as_root}, error::Error, helpers::{download_and_extract_release, get_bin_version}, print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel
};
use ant_bootstrap::PeersArgs;
use ant_evm::{EvmNetwork, RewardsAddress};
Expand Down Expand Up @@ -180,7 +177,9 @@ pub async fn balance(

for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metrics_port: u16 = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
println!("{}: {}", service.service_data.service_name, 0,);

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
}
Expand Down Expand Up @@ -221,7 +220,9 @@ pub async fn remove(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;

let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
Expand Down Expand Up @@ -308,7 +309,8 @@ pub async fn start(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
Expand Down Expand Up @@ -402,7 +404,8 @@ pub async fn stop(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
Expand Down Expand Up @@ -514,8 +517,8 @@ pub async fn upgrade(
target_version: target_version.clone(),
};
let service_name = node.service_name.clone();

let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
Expand Down
2 changes: 2 additions & 0 deletions ant-node-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum Error {
PidNotFoundAfterStarting,
#[error("The PID of the process was not set.")]
PidNotSet,
#[error("The metric port of the node is empty")]
MetricPortEmpty,
#[error(transparent)]
SemverError(#[from] semver::Error),
#[error("The service(s) is already running: {0:?}")]
Expand Down
4 changes: 3 additions & 1 deletion ant-node-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,10 @@ pub async fn refresh_node_registry(
// exists.
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
node.reward_balance = None;
let metrics_port = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;

let metric_client = MetricClient::new(node.metrics_port.unwrap_or(0));
let metric_client = MetricClient::new(metrics_port);
let mut service = NodeService::new(node, Box::new(metric_client.clone()));

if is_local_network {
Expand Down
28 changes: 13 additions & 15 deletions ant-node-manager/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,21 +270,20 @@ pub async fn run_network(
service_control.get_available_port()?
};
let metrics_free_port = if let Some(port) = metrics_port {
Some(port)
} else if options.enable_metrics_server {
Some(service_control.get_available_port()?)
} else {
None
port
} else {
service_control.get_available_port()?
};

let rpc_socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
let metric_client = MetricClient::new(metrics_free_port.unwrap());
let metric_client = MetricClient::new(metrics_free_port);

let number = (node_registry.nodes.len() as u16) + 1;
let node = run_node(
RunNodeOptions {
first: true,
metrics_port: metrics_free_port,
metrics_port: Some(metrics_free_port),
node_port,
interval: options.interval,
log_format: options.log_format,
Expand Down Expand Up @@ -316,22 +315,21 @@ pub async fn run_network(
service_control.get_available_port()?
};
let metrics_free_port = if let Some(port) = metrics_port {
Some(port)
} else if options.enable_metrics_server {
Some(service_control.get_available_port()?)
} else {
None
port
} else {
service_control.get_available_port()?
};

let rpc_socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);

let metric_client = MetricClient::new(metrics_free_port.unwrap());
let metric_client = MetricClient::new(metrics_free_port);

let number = (node_registry.nodes.len() as u16) + 1;
let node = run_node(
RunNodeOptions {
first: false,
metrics_port: metrics_free_port,
metrics_port: Some(metrics_free_port),
node_port,
interval: options.interval,
log_format: options.log_format,
Expand Down Expand Up @@ -474,7 +472,7 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr
all_peers.extend(additional_peers);

for node in node_registry.nodes.iter() {
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metric_client = MetricClient::new(node.metrics_port.ok_or_eyre("Metrics port not set")?);
let net_info = metric_client.network_info().await?;
let peers = net_info.connected_peers;
let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
Expand Down
9 changes: 5 additions & 4 deletions ant-node-manager/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
add_services::config::InstallNodeServiceCtxBuilder, config::create_owned_dir, ServiceManager,
VerbosityLevel,
add_services::config::InstallNodeServiceCtxBuilder, config::create_owned_dir, error::Error, ServiceManager, VerbosityLevel
};
use ant_service_management::{
control::{ServiceControl, ServiceController},
Expand Down Expand Up @@ -37,7 +36,8 @@ pub async fn restart_node_service(
})?;
let current_node_clone = current_node_mut.clone();

let metric_client = MetricClient::new(current_node_mut.metrics_port.unwrap());
let metrics_port = current_node_mut.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(current_node_mut, Box::new(metric_client));

let mut service_manager = ServiceManager::new(
Expand Down Expand Up @@ -236,7 +236,8 @@ pub async fn restart_node_service(
version: current_node_clone.version.clone(),
};

let metric_client = MetricClient::new(node.metrics_port.unwrap());
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(&mut node, Box::new(metric_client));

let mut service_manager = ServiceManager::new(
Expand Down
8 changes: 8 additions & 0 deletions ant-service-management/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ pub enum Error {
ServiceUserAccountCreationFailed,
#[error("Could not obtain user's data directory")]
UserDataDirectoryNotObtainable,
#[error("Could not connect to the metric service")]
MetricServiceConnectionError,
#[error("Could not process the metric service inforesponse")]
MetricServiceInfoResponseError,
#[error("Could not process the metrics from nodeInfo '{0}'")]
MetricServiceNodeInfoError(String),
#[error("Could not process the metrics from NetworkInfo '{0}'")]
MetricServiceNetworkInfoError(String),
#[error(transparent)]
Utf8Error(#[from] std::str::Utf8Error),
}
53 changes: 32 additions & 21 deletions ant-service-management/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::rpc::{RpcActions, NodeInfo, NetworkInfo, RecordAddress};
use tokio::time::Duration;
use std::path::PathBuf;
use std::io::Write;
use crate::error::Result;
use crate::error::{Result,Error};

#[derive(Debug, Clone)]
pub struct NodeInfoMetrics {
Expand Down Expand Up @@ -66,7 +66,6 @@ pub fn read_network_metrics_from_file(root_dir: PathBuf, peer_id: String) -> Net
let network_info_dir_path = root_dir.join("network_info");
let connected_peers_path = network_info_dir_path.join(format!("connected_peers_{}", peer_id));
let listeners_path = network_info_dir_path.join(format!("listeners_{}", peer_id));
println!("connected_peers_path: {:?}", connected_peers_path);

let mut connected_peers = Vec::new();
if std::path::Path::new(&connected_peers_path).exists() {
Expand Down Expand Up @@ -98,36 +97,37 @@ impl MetricClient {
}
}

pub async fn get_endpoint_metrics(&self, endpoint_name: &str) -> Result<prometheus_parse::Scrape, Box<dyn std::error::Error>> {
pub async fn get_endpoint_metrics(&self, endpoint_name: &str) -> Result<prometheus_parse::Scrape> {
debug!(
"Attempting connection to collect {} metrics from {}...",
endpoint_name,
self.endpoint_port
);

let body = reqwest::get(&format!("http://localhost:{}/{endpoint_name}", self.endpoint_port))
.await?
.await.map_err(|_| Error::MetricServiceConnectionError)?

Check notice

Code scanning / devskim

Accessing localhost could indicate debug code, or could hinder scaling. Note

Do not leave debug code in production
.text()
.await?;
.await.map_err(|_| Error::MetricServiceInfoResponseError)?;
let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect();
let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?;

Ok(all_metrics)
}

pub fn get_node_info_from_metadata_extended(&self ,scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) {
pub fn get_node_info_from_metadata_extended(&self ,scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) -> Result<()>{
for sample in scrape.samples.iter() {
for (key, value) in sample.labels.iter() {
match key.as_str() {
"peer_id" => node_info.peer_id = value.parse().unwrap(),
"pid" => node_info.pid = value.parse().unwrap(),
"bin_version" => node_info.bin_version = value.parse().unwrap(),
"root_dir" => node_info.root_dir =value.parse().unwrap(),
"log_dir" => node_info.log_dir = value.parse().unwrap(),
"peer_id" => node_info.peer_id = value.parse()?,
"pid" => node_info.pid = value.parse()?,
"bin_version" => node_info.bin_version = value.to_string(),
"root_dir" => node_info.root_dir = PathBuf::from(value),
"log_dir" => node_info.log_dir = PathBuf::from(value),
_ => {}
}
}
}
Ok(())
}

pub fn get_node_info_from_metrics(&self, scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) {
Expand Down Expand Up @@ -159,12 +159,12 @@ impl MetricClient {
#[async_trait]
impl RpcActions for MetricClient {
async fn node_info(&self) -> Result<NodeInfo> {
let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended");
let scrape = self.get_endpoint_metrics("metadata_extended").await?;
let mut node_info = NodeInfoMetrics::default();
self.get_node_info_from_metadata_extended(&scrape, &mut node_info);
let scrape = self.get_endpoint_metrics("metrics").await.expect("Failed to get endpoint metrics");
let _ = self.get_node_info_from_metadata_extended(&scrape, &mut node_info);
let scrape = self.get_endpoint_metrics("metrics").await?;
self.get_node_info_from_metrics(&scrape, &mut node_info);
println!("node_info: {:?}", node_info);

Ok(NodeInfo{
peer_id: node_info.peer_id,
pid: node_info.pid,
Expand All @@ -177,16 +177,27 @@ impl RpcActions for MetricClient {
}

async fn network_info(&self) -> Result<NetworkInfo> {
let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended");
let scrape = self.get_endpoint_metrics("metadata_extended").await?;
let mut node_info = NodeInfoMetrics::default();
self.get_node_info_from_metadata_extended(&scrape, &mut node_info);
let _ = self.get_node_info_from_metadata_extended(&scrape, &mut node_info);

let network_info_metrics = read_network_metrics_from_file(node_info.root_dir, node_info.peer_id.to_string());
// let connected_peers = network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect();
// println!("network_info_metrics: {:?}", connected_peers);

let connected_peers_ = network_info_metrics
.connected_peers
.into_iter()
.filter_map(|s| s.parse().ok()) // Ignores errors
.collect();

let listeners_ = network_info_metrics
.listeners
.into_iter()
.filter_map(|s| s.parse().ok()) // Ignores errors
.collect();

Ok(NetworkInfo {
connected_peers: network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(),
listeners: network_info_metrics.listeners.into_iter().map(|s| s.parse().unwrap()).collect(),
connected_peers: connected_peers_,
listeners: listeners_ ,
})
}

Expand Down

0 comments on commit cb8c46f

Please sign in to comment.