From cb994a996087450be21d4db428b0e5914b139fa4 Mon Sep 17 00:00:00 2001 From: Ermine Jose Date: Tue, 28 Jan 2025 13:41:48 +0530 Subject: [PATCH] feat: replace metric_service with rpc in antctl --- Cargo.lock | 3 + ant-networking/src/driver.rs | 44 +++++- ant-networking/src/metrics/service.rs | 43 ++++- ant-node-manager/src/bin/cli/main.rs | 4 +- ant-node-manager/src/cmd/node.rs | 28 ++-- ant-node-manager/src/lib.rs | 10 +- ant-node-manager/src/local.rs | 22 +-- ant-node-manager/src/rpc.rs | 14 +- ant-node/src/node.rs | 19 +++ ant-service-management/Cargo.toml | 5 + ant-service-management/src/lib.rs | 1 + ant-service-management/src/metric.rs | 216 ++++++++++++++++++++++++++ 12 files changed, 366 insertions(+), 43 deletions(-) create mode 100644 ant-service-management/src/metric.rs diff --git a/Cargo.lock b/Cargo.lock index b1cc4a7ab3..ca3a19fe35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1122,7 +1122,10 @@ dependencies = [ "libp2p", "libp2p-identity", "mockall 0.11.4", + "prometheus-parse", "prost 0.9.0", + "regex", + "reqwest 0.12.12", "semver 1.0.24", "serde", "serde_json", diff --git a/ant-networking/src/driver.rs b/ant-networking/src/driver.rs index d2eb834026..911851781e 100644 --- a/ant-networking/src/driver.rs +++ b/ant-networking/src/driver.rs @@ -392,7 +392,7 @@ impl NetworkBuilder { let upnp = self.upnp; let (network, events_receiver, mut swarm_driver) = - self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp); + self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp, Some(root_dir.clone())); // Listen on the provided address let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?; @@ -425,7 +425,7 @@ impl NetworkBuilder { .set_replication_factor(REPLICATION_FACTOR); let (network, net_event_recv, driver) = - self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false); + self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false, None); (network, net_event_recv, driver) } @@ -438,6 +438,7 @@ impl NetworkBuilder { is_client: bool, req_res_protocol: ProtocolSupport, upnp: bool, + root_dir: Option ) -> (Network, mpsc::Receiver, SwarmDriver) { let identify_protocol_str = IDENTIFY_PROTOCOL_STR .read() @@ -510,6 +511,45 @@ impl NetworkBuilder { )]), ); + let metadata_extended_sub_reg = metrics_registries + .metadata_extended + .sub_registry_with_prefix("ant_networking"); + + metadata_extended_sub_reg.register( + "peer_id", + "Identifier of a peer of the network", + Info::new(vec![("peer_id".to_string(), peer_id.to_string())]), + ); + + metadata_extended_sub_reg.register( + "pid", + "id of the node process", + Info::new(vec![("pid".to_string(), std::process::id().to_string())]), + ); + + metadata_extended_sub_reg.register( + "bin_version", + "Package version of the node", + Info::new(vec![("bin_version".to_string(), env!("CARGO_PKG_VERSION").to_string())]), + ); + + if let Some(root_dir) = root_dir.clone() { + metadata_extended_sub_reg.register( + "data_dir", + "Root directory of the node", + Info::new(vec![("root_dir".to_string(), root_dir.clone().to_string_lossy().to_string())]), + ); + } + + if let Some(log_dir) = root_dir { + let log_dir = log_dir.join("logs"); + metadata_extended_sub_reg.register( + "log_dir", + "Root directory of the node", + Info::new(vec![("log_dir".to_string(), log_dir.clone().to_string_lossy().to_string())]), + ); + } + run_metrics_server(metrics_registries, port); Some(metrics_recorder) } else { diff --git a/ant-networking/src/metrics/service.rs b/ant-networking/src/metrics/service.rs index e64ae01701..72ad1c6eb6 100644 --- a/ant-networking/src/metrics/service.rs +++ b/ant-networking/src/metrics/service.rs @@ -22,6 +22,7 @@ pub struct MetricsRegistries { pub standard_metrics: Registry, pub extended_metrics: Registry, pub metadata: Registry, + pub metadata_extended: Registry, } const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; @@ -36,7 +37,7 @@ pub(crate) fn run_metrics_server(registries: MetricsRegistries, port: u16) { info!("Metrics server on http://{}/metrics", server.local_addr()); println!("Metrics server on http://{}/metrics", server.local_addr()); - info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata", server.local_addr()); + info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata, /metadata_extended", server.local_addr()); // run the server forever if let Err(e) = server.await { error!("server error: {}", e); @@ -50,6 +51,7 @@ pub(crate) struct MetricService { standard_registry: SharedRegistry, extended_registry: SharedRegistry, metadata: SharedRegistry, + metadata_extended: SharedRegistry, } impl MetricService { @@ -65,6 +67,11 @@ impl MetricService { Arc::clone(&self.metadata) } + fn get_metadata_extended_registry(&mut self) -> SharedRegistry { + Arc::clone(&self.metadata_extended) + } + + fn respond_with_metrics(&mut self) -> Result> { let mut response: Response = Response::default(); @@ -152,6 +159,28 @@ impl MetricService { Ok(response) } + fn respond_with_metadata_extended(&mut self) -> Result> { + let mut response: Response = Response::default(); + + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + METRICS_CONTENT_TYPE + .try_into() + .map_err(|_| NetworkError::NetworkMetricError)?, + ); + + let reg = self.get_metadata_extended_registry(); + let reg = reg.lock().map_err(|_| NetworkError::NetworkMetricError)?; + encode(&mut response.body_mut(), ®).map_err(|err| { + error!("Failed to encode the metadata Registry {err:?}"); + NetworkError::NetworkMetricError + })?; + + *response.status_mut() = StatusCode::OK; + + Ok(response) + } + fn respond_with_404_not_found(&mut self) -> Response { let mut resp = Response::default(); *resp.status_mut() = StatusCode::NOT_FOUND; @@ -196,7 +225,13 @@ impl Service> for MetricService { Ok(resp) => resp, Err(_) => self.respond_with_500_server_error(), } - } else { + } else if req_method == Method::GET && req_path == "/metadata_extended" { + match self.respond_with_metadata_extended() { + Ok(resp) => resp, + Err(_) => self.respond_with_500_server_error(), + } + } + else { self.respond_with_404_not_found() }; Box::pin(async { Ok(resp) }) @@ -207,6 +242,7 @@ pub(crate) struct MakeMetricService { standard_registry: SharedRegistry, extended_registry: SharedRegistry, metadata: SharedRegistry, + metadata_extended: SharedRegistry, } impl MakeMetricService { @@ -215,6 +251,7 @@ impl MakeMetricService { standard_registry: Arc::new(Mutex::new(registries.standard_metrics)), extended_registry: Arc::new(Mutex::new(registries.extended_metrics)), metadata: Arc::new(Mutex::new(registries.metadata)), + metadata_extended: Arc::new(Mutex::new(registries.metadata_extended)) } } } @@ -232,12 +269,14 @@ impl Service for MakeMetricService { let standard_registry = Arc::clone(&self.standard_registry); let extended_registry = Arc::clone(&self.extended_registry); let metadata = Arc::clone(&self.metadata); + let metadata_extended = Arc::clone(&self.metadata); let fut = async move { Ok(MetricService { standard_registry, extended_registry, metadata, + metadata_extended, }) }; Box::pin(fut) diff --git a/ant-node-manager/src/bin/cli/main.rs b/ant-node-manager/src/bin/cli/main.rs index 41a967cf4d..b87e5429c5 100644 --- a/ant-node-manager/src/bin/cli/main.rs +++ b/ant-node-manager/src/bin/cli/main.rs @@ -699,7 +699,7 @@ pub enum LocalSubCmd { /// An interval applied between launching each node. /// /// Units are milliseconds. - #[clap(long, default_value_t = 200)] + #[clap(long, default_value_t = 250)] interval: u64, /// Specify the logging format. /// @@ -798,7 +798,7 @@ pub enum LocalSubCmd { /// An interval applied between launching each node. /// /// Units are milliseconds. - #[clap(long, default_value_t = 200)] + #[clap(long, default_value_t = 250)] interval: u64, /// Specify the logging format. /// diff --git a/ant-node-manager/src/cmd/node.rs b/ant-node-manager/src/cmd/node.rs index 3812834811..de6cdfdfa5 100644 --- a/ant-node-manager/src/cmd/node.rs +++ b/ant-node-manager/src/cmd/node.rs @@ -24,9 +24,8 @@ use ant_logging::LogFormat; use ant_releases::{AntReleaseRepoActions, ReleaseType}; use ant_service_management::{ control::{ServiceControl, ServiceController}, - rpc::RpcClient, - NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult, -}; + metric::MetricClient, NodeRegistry, NodeService, + ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult}; use color_eyre::{eyre::eyre, Help, Result}; use colored::Colorize; use libp2p_identity::PeerId; @@ -181,9 +180,8 @@ pub async fn balance( for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); - // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! println!("{}: {}", service.service_data.service_name, 0,); } Ok(()) @@ -223,8 +221,8 @@ pub async fn remove( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); let mut service_manager = ServiceManager::new(service, Box::new(ServiceController {}), verbosity); match service_manager.remove(keep_directories).await { @@ -310,10 +308,8 @@ pub async fn start( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - - let service = NodeService::new(node, Box::new(rpc_client)); - + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); // set dynamic startup delay if fixed_interval is not set let service = if fixed_interval.is_none() { service.with_connection_timeout(Duration::from_secs(connection_timeout_s)) @@ -406,8 +402,8 @@ pub async fn stop( let mut failed_services = Vec::new(); for &index in &service_indices { let node = &mut node_registry.nodes[index]; - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); let mut service_manager = ServiceManager::new(service, Box::new(ServiceController {}), verbosity); @@ -519,8 +515,8 @@ pub async fn upgrade( }; let service_name = node.service_name.clone(); - let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - let service = NodeService::new(node, Box::new(rpc_client)); + let metric_client = MetricClient::new(node.metrics_port.unwrap()); + let service = NodeService::new(node, Box::new(metric_client)); // set dynamic startup delay if fixed_interval is not set let service = if fixed_interval.is_none() { service.with_connection_timeout(Duration::from_secs(connection_timeout_s)) diff --git a/ant-node-manager/src/lib.rs b/ant-node-manager/src/lib.rs index e0d6d908d3..32ee0af188 100644 --- a/ant-node-manager/src/lib.rs +++ b/ant-node-manager/src/lib.rs @@ -39,9 +39,10 @@ impl From for VerbosityLevel { } use crate::error::{Error, Result}; +use ant_service_management::metric::MetricClient; use ant_service_management::rpc::RpcActions; use ant_service_management::{ - control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry, + control::ServiceControl, error::Error as ServiceError, NodeRegistry, NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult, }; @@ -550,16 +551,15 @@ pub async fn refresh_node_registry( // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments! node.reward_balance = None; - let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr); - rpc_client.set_max_attempts(1); - let mut service = NodeService::new(node, Box::new(rpc_client.clone())); + let metric_client = MetricClient::new(node.metrics_port.unwrap_or(0)); + let mut service = NodeService::new(node, Box::new(metric_client.clone())); if is_local_network { // For a local network, retrieving the process by its path does not work, because the // paths are not unique: they are all launched from the same binary. Instead we will // just determine whether the node is running by connecting to its RPC service. We // only need to distinguish between `RUNNING` and `STOPPED` for a local network. - match rpc_client.node_info().await { + match metric_client.node_info().await { Ok(info) => { let pid = info.pid; debug!( diff --git a/ant-node-manager/src/local.rs b/ant-node-manager/src/local.rs index a7ed6529cb..8924a7cca0 100644 --- a/ant-node-manager/src/local.rs +++ b/ant-node-manager/src/local.rs @@ -14,9 +14,10 @@ use crate::helpers::{ use ant_bootstrap::PeersArgs; use ant_evm::{EvmNetwork, RewardsAddress}; use ant_logging::LogFormat; +use ant_service_management::metric::MetricClient; use ant_service_management::{ control::ServiceControl, - rpc::{RpcActions, RpcClient}, + rpc::RpcActions, NodeRegistry, NodeServiceData, ServiceStatus, }; use color_eyre::eyre::OptionExt; @@ -277,7 +278,7 @@ pub async fn run_network( }; let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port); - let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr); + let metric_client = MetricClient::new(metrics_free_port.unwrap()); let number = (node_registry.nodes.len() as u16) + 1; let node = run_node( @@ -294,7 +295,7 @@ pub async fn run_network( version: get_bin_version(&launcher.get_antnode_path())?, }, &launcher, - &rpc_client, + &metric_client, ) .await?; node_registry.nodes.push(node.clone()); @@ -323,7 +324,8 @@ pub async fn run_network( }; let rpc_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port); - let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr); + + let metric_client = MetricClient::new(metrics_free_port.unwrap()); let number = (node_registry.nodes.len() as u16) + 1; let node = run_node( @@ -340,7 +342,7 @@ pub async fn run_network( version: get_bin_version(&launcher.get_antnode_path())?, }, &launcher, - &rpc_client, + &metric_client, ) .await?; node_registry.nodes.push(node); @@ -382,7 +384,7 @@ pub struct RunNodeOptions { pub async fn run_node( run_options: RunNodeOptions, launcher: &dyn Launcher, - rpc_client: &dyn RpcActions, + metric_client: &dyn RpcActions, ) -> Result { info!("Launching node {}...", run_options.number); println!("Launching node {}...", run_options.number); @@ -397,9 +399,9 @@ pub async fn run_node( )?; launcher.wait(run_options.interval); - let node_info = rpc_client.node_info().await?; + let node_info = metric_client.node_info().await?; let peer_id = node_info.peer_id; - let network_info = rpc_client.network_info().await?; + let network_info = metric_client.network_info().await?; let connected_peers = Some(network_info.connected_peers); let listen_addrs = network_info .listeners @@ -472,8 +474,8 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec Self { + Self { + peer_id: PeerId::random(), //initialization + pid: 0, + bin_version: String::from("unknown"), + root_dir: PathBuf::from("/"), + log_dir: PathBuf::from("/"), + uptime: Duration::new(0, 0), + wallet_balance: 0, + } + } +} +pub struct NetworkInfoMetrics { + pub connected_peers: Vec, + pub listeners: Vec, +} + +impl NetworkInfoMetrics { + pub fn new(connected_peers: Vec, listeners: Vec) -> Self { + Self { + connected_peers, + listeners, + } + } +} + +pub fn write_network_metrics_to_file(root_dir: PathBuf, network_info: NetworkInfoMetrics, peer_id: String) { + let network_info_dir_path = root_dir.join("network_info"); + + std::fs::create_dir_all(&network_info_dir_path).unwrap(); + + let connected_peers_path = network_info_dir_path.join(format!("connected_peers_{}", peer_id)); + let mut file = std::fs::File::create(&connected_peers_path).unwrap(); + for peer in network_info.connected_peers.iter() { + writeln!(file, "{}", peer).unwrap(); + } + + let listeners_path = network_info_dir_path.join(format!("listeners_{}", peer_id)); + let mut file = std::fs::File::create(&listeners_path).unwrap(); + for listeners in network_info.listeners.iter() { + writeln!(file, "{}", listeners).unwrap(); + } +} + +pub fn read_network_metrics_from_file(root_dir: PathBuf, peer_id: String) -> NetworkInfoMetrics { + let network_info_dir_path = root_dir.join("network_info"); + let connected_peers_path = network_info_dir_path.join(format!("connected_peers_{}", peer_id)); + let listeners_path = network_info_dir_path.join(format!("listeners_{}", peer_id)); + println!("connected_peers_path: {:?}", connected_peers_path); + + let mut connected_peers = Vec::new(); + if std::path::Path::new(&connected_peers_path).exists() { + match std::fs::read_to_string(&connected_peers_path) { + Ok(contents) => connected_peers = contents.lines().map(|s| s.to_string()).collect(), + Err(e) => eprintln!("Failed to read the connected peers file: {}", e), + } + } + + let mut listeners = Vec::new(); + if std::path::Path::new(&listeners_path).exists() { + match std::fs::read_to_string(&listeners_path) { + Ok(contents) => listeners = contents.lines().map(|s| s.to_string()).collect(), + Err(e) => eprintln!("Failed to read the listeners file: {}", e), + } + } + NetworkInfoMetrics::new(connected_peers, listeners) +} + +#[derive(Debug, Clone)] +pub struct MetricClient { + endpoint_port: String, +} + +impl MetricClient { + pub fn new(endpoint_port: u16) -> Self { + Self { + endpoint_port: endpoint_port.to_string(), + } + } + + pub async fn get_endpoint_metrics(&self, endpoint_name: &str) -> Result> { + debug!( + "Attempting connection to collect {} metrics from {}...", + endpoint_name, + self.endpoint_port + ); + + let body = reqwest::get(&format!("http://localhost:{}/{endpoint_name}", self.endpoint_port)) + .await? + .text() + .await?; + let lines: Vec<_> = body.lines().map(|s| Ok(s.to_owned())).collect(); + let all_metrics = prometheus_parse::Scrape::parse(lines.into_iter())?; + + Ok(all_metrics) + } + + pub fn get_node_info_from_metadata_extended(&self ,scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) { + for sample in scrape.samples.iter() { + for (key, value) in sample.labels.iter() { + match key.as_str() { + "peer_id" => node_info.peer_id = value.parse().unwrap(), + "pid" => node_info.pid = value.parse().unwrap(), + "bin_version" => node_info.bin_version = value.parse().unwrap(), + "root_dir" => node_info.root_dir =value.parse().unwrap(), + "log_dir" => node_info.log_dir = value.parse().unwrap(), + _ => {} + } + } + } + } + + pub fn get_node_info_from_metrics(&self, scrape: &prometheus_parse::Scrape, node_info: &mut NodeInfoMetrics) { + for sample in scrape.samples.iter() { + if sample.metric == "ant_node_current_reward_wallet_balance" { + // Attos + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + node_info.wallet_balance = val as u64; + } + _ => {} + } + } else if sample.metric == "ant_node_uptime" { + match sample.value { + prometheus_parse::Value::Counter(val) + | prometheus_parse::Value::Gauge(val) + | prometheus_parse::Value::Untyped(val) => { + node_info.uptime = Duration::new(val as u64, 0); + } + _ => {} + } + } + } + } +} + +#[async_trait] +impl RpcActions for MetricClient { + async fn node_info(&self) -> Result { + let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended"); + let mut node_info = NodeInfoMetrics::default(); + self.get_node_info_from_metadata_extended(&scrape, &mut node_info); + let scrape = self.get_endpoint_metrics("metrics").await.expect("Failed to get endpoint metrics"); + self.get_node_info_from_metrics(&scrape, &mut node_info); + println!("node_info: {:?}", node_info); + Ok(NodeInfo{ + peer_id: node_info.peer_id, + pid: node_info.pid, + version: node_info.bin_version, + data_path: node_info.root_dir, + log_path: node_info.log_dir, + uptime: node_info.uptime, + wallet_balance: node_info.wallet_balance, + }) + } + + async fn network_info(&self) -> Result { + let scrape = self.get_endpoint_metrics("metadata_extended").await.expect("Failed to get endpoint metadata_extended"); + let mut node_info = NodeInfoMetrics::default(); + self.get_node_info_from_metadata_extended(&scrape, &mut node_info); + + let network_info_metrics = read_network_metrics_from_file(node_info.root_dir, node_info.peer_id.to_string()); + // let connected_peers = network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(); + // println!("network_info_metrics: {:?}", connected_peers); + Ok(NetworkInfo { + connected_peers: network_info_metrics.connected_peers.into_iter().map(|s| s.parse().unwrap()).collect(), + listeners: network_info_metrics.listeners.into_iter().map(|s| s.parse().unwrap()).collect(), + }) + } + + async fn record_addresses(&self) -> Result> { + Ok(vec![]) + } + + async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()> { + Ok(()) + } + + async fn node_stop(&self, delay_millis: u64) -> Result<()> { + Ok(()) + } + + async fn is_node_connected_to_network(&self, timeout: Duration) -> Result<()> { + Ok(()) + } + + async fn update_log_level(&self, log_levels: String) -> Result<()> { + Ok(()) + } + + async fn node_update(&self, delay_millis: u64) -> Result<()> { + Ok(()) + } +} \ No newline at end of file