diff --git a/ant-networking/src/event/mod.rs b/ant-networking/src/event/mod.rs index 5932ce7ad6..2946b42182 100644 --- a/ant-networking/src/event/mod.rs +++ b/ant-networking/src/event/mod.rs @@ -126,6 +126,8 @@ pub enum NetworkEvent { KeysToFetchForReplication(Vec<(PeerId, RecordKey)>), /// Started listening on a new address NewListenAddr(Multiaddr), + /// stopped Listening from a address + ClosedListenAddr(Vec), /// Report unverified record UnverifiedRecord(Record), /// Terminate Node on unrecoverable errors @@ -180,6 +182,9 @@ impl Debug for NetworkEvent { NetworkEvent::NewListenAddr(addr) => { write!(f, "NetworkEvent::NewListenAddr({addr:?})") } + NetworkEvent::ClosedListenAddr(addr ) => { + write!(f, "NetworkEvent::ClosedListenAddr({addr:?})") + } NetworkEvent::UnverifiedRecord(record) => { let pretty_key = PrettyPrintRecordKey::from(&record.key); write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})") diff --git a/ant-networking/src/event/swarm.rs b/ant-networking/src/event/swarm.rs index 330427c41f..379665336c 100644 --- a/ant-networking/src/event/swarm.rs +++ b/ant-networking/src/event/swarm.rs @@ -348,6 +348,8 @@ impl SwarmDriver { if let Some(relay_manager) = self.relay_manager.as_mut() { relay_manager.on_listener_closed(&listener_id, &mut self.swarm); } + + self.send_event(NetworkEvent::ClosedListenAddr(addresses.clone())); } SwarmEvent::IncomingConnection { connection_id, diff --git a/ant-node-manager/src/lib.rs b/ant-node-manager/src/lib.rs index c37f454244..2580299676 100644 --- a/ant-node-manager/src/lib.rs +++ b/ant-node-manager/src/lib.rs @@ -393,7 +393,18 @@ pub async fn status_report( println!("{json}"); } else if detailed_view { for node in &node_registry.nodes { - print_banner(&format!( + // TODO: add this parameter as connected_peer_count in registry instead of handling it from here directly. + let metrics_port = node.metrics_port + .ok_or(Error::MetricPortEmpty)?; + let metric_client = MetricClient::new(metrics_port); + let scrape = metric_client.get_endpoint_metrics("metrics").await?; + let connected_peer_count; + if let Ok(peer_num) = metric_client.get_connected_peer_num_from_metrics(&scrape) { + connected_peer_count = peer_num; + } else { + connected_peer_count = 0; + } + print_banner(&format!( "{} - {}", &node.service_name, format_status_without_colour(&node.status) @@ -414,9 +425,7 @@ pub async fn status_report( println!("Bin path: {}", node.antnode_path.to_string_lossy()); println!( "Connected peers: {}", - node.connected_peers - .as_ref() - .map_or("-".to_string(), |p| p.len().to_string()) + connected_peer_count ); println!( "Reward balance: {}", @@ -459,10 +468,18 @@ pub async fn status_report( .collect::>(); for node in nodes { let peer_id = node.peer_id.map_or("-".to_string(), |p| p.to_string()); - let connected_peers = node - .connected_peers - .clone() - .map_or("-".to_string(), |p| p.len().to_string()); + // TODO: add this parameter as connected_peer_count in registry instead of handling it from here directly. + let metrics_port = node.metrics_port + .ok_or(Error::MetricPortEmpty)?; + let metric_client = MetricClient::new(metrics_port); + let scrape = metric_client.get_endpoint_metrics("metrics").await?; + let connected_peer_count; + if let Ok(peer_num) = metric_client.get_connected_peer_num_from_metrics(&scrape) { + connected_peer_count = peer_num; + } else { + connected_peer_count = 0; + } + let connected_peers = connected_peer_count.to_string(); println!( "{:<18} {:<52} {:<7} {:>15}", node.service_name, diff --git a/ant-node/src/error.rs b/ant-node/src/error.rs index 896185c1a8..c6e231b61f 100644 --- a/ant-node/src/error.rs +++ b/ant-node/src/error.rs @@ -83,4 +83,6 @@ pub enum Error { InvalidRequest(String), #[error("EVM Network error: {0}")] EvmNetwork(String), + #[error("Not able to open the file for Listeners with option")] + InvalidListenerFileOperation, } diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 76230ac314..1cadf92c53 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -9,6 +9,7 @@ use super::{ error::Result, event::NodeEventsChannel, quote::quotes_verification, Marker, NodeEvent, }; +use crate::error::Error; #[cfg(feature = "open-metrics")] use crate::metrics::NodeMetricsRecorder; use crate::RunningNode; @@ -28,6 +29,7 @@ use ant_protocol::{ NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use ant_service_management::metric::{write_network_metrics_to_file,NetworkInfoMetrics}; +// use autonomi::client::address; use bytes::Bytes; use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; @@ -36,6 +38,7 @@ use rand::{ rngs::{OsRng, StdRng}, thread_rng, Rng, SeedableRng, }; +use std::io::{BufRead, Write}; use std::{ collections::HashMap, net::SocketAddr, @@ -198,6 +201,7 @@ impl NodeBuilder { #[cfg(feature = "open-metrics")] metrics_recorder, evm_network: self.evm_network, + root_dir: self.root_dir.clone(), }; let node = Node { @@ -219,21 +223,6 @@ impl NodeBuilder { }; // Run the node - let runing_node_metrics = running_node.clone(); - let _return_value = tokio::spawn(async move { - sleep(Duration::from_millis(200)).await; - let state = runing_node_metrics.get_swarm_local_state().await.expect("Failed to get swarm local state"); - let connected_peers = state.connected_peers.iter().map(|p| p.to_string()).collect(); - let listeners = state.listeners.iter().map(|m| m.to_string()).collect(); - let network_info = NetworkInfoMetrics::new(connected_peers, listeners); - - let _ = write_network_metrics_to_file( - runing_node_metrics.root_dir_path.clone(), - network_info, - runing_node_metrics.network.peer_id().to_string() - ); - }); - Ok(running_node) } } @@ -257,6 +246,7 @@ struct NodeInner { metrics_recorder: Option, reward_address: RewardsAddress, evm_network: EvmNetwork, + root_dir: PathBuf, } impl Node { @@ -275,6 +265,10 @@ impl Node { &self.inner.network } + pub(crate) fn get_root_dir(&self) -> &PathBuf { + &self.inner.root_dir + } + #[cfg(feature = "open-metrics")] /// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled /// This is used to record various metrics for the node. @@ -451,7 +445,7 @@ impl Node { fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc) { let start = Instant::now(); let event_string = format!("{event:?}"); - let event_header; + let mut event_header = "UnknownEvent"; debug!("Handling NetworkEvent {event_string:?}"); match event { @@ -492,14 +486,53 @@ impl Node { event_header = "NewListenAddr"; let network = self.network().clone(); let peers = self.initial_peers().clone(); + let peer_id = self.network().peer_id().clone(); + let root_dir_nw_info = self + .get_root_dir() + .clone() + .join("network_info") + .join(format!("listeners_{}", peer_id)); + + let path = std::path::Path::new(&root_dir_nw_info); + + if !path.exists() { + println!("File does not exist. Creating it now..."); + match std::fs::File::create(&root_dir_nw_info) { + Ok(_) => println!("File created successfully: {:?}", root_dir_nw_info), + Err(e) => eprintln!("Failed to create file: {}", e), + } + } + let _handle = spawn(async move { for addr in peers { + if !contains_string(&root_dir_nw_info, &addr.to_string()) { + _ = append_to_file(&root_dir_nw_info, &addr.clone().to_string()); + } if let Err(err) = network.dial(addr.clone()).await { tracing::error!("Failed to dial {addr}: {err:?}"); }; } }); } + NetworkEvent::ClosedListenAddr(address ) => { + let peer_id = self.network().peer_id().clone(); + let root_dir_nw_info = self + .get_root_dir() + .clone() + .join("network_info") + .join(format!("listeners_{}", peer_id)); + let path = std::path::Path::new(&root_dir_nw_info); + + if path.exists() { + let _handle = spawn(async move { + for addr in address { + if contains_string(&root_dir_nw_info, &addr.to_string()) { + _ = remove_from_file(&root_dir_nw_info, &addr.clone().to_string()); + } + } + }); + } + } NetworkEvent::ResponseReceived { res } => { event_header = "ResponseReceived"; debug!("NetworkEvent::ResponseReceived {res:?}"); @@ -1110,6 +1143,53 @@ fn challenge_score_scheme( ) } +fn contains_string(file_path: &PathBuf, search_str: &str) -> bool { + match std::fs::read_to_string(file_path) { + Ok(contents) => contents.contains(search_str), + Err(e) => { + eprintln!("Failed to read file: {}", e); + false + } + } +} + +fn append_to_file(file_path: &PathBuf, new_str: &str) -> Result<()> { + let mut file = std::fs::OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(file_path) + .map_err(|_| Error::InvalidListenerFileOperation)?; + if let Err(e) = file.write_all(format!("{}\n", new_str).as_bytes()) { + eprintln!("Failed to write to file: {}", e); + } + Ok(()) +} + +fn remove_from_file(file_path: &PathBuf, new_str: &str) -> Result<()> { + // Read all lines from the file + let file = std::fs::File::open(file_path).map_err(|_| Error::InvalidListenerFileOperation)?; + let reader = std::io::BufReader::new(file); + + let lines: Vec = reader + .lines() + .filter_map(|line| line.ok()) // Handle errors while reading + .filter(|line| !line.contains(new_str)) // Remove lines containing the keyword + .collect(); + + // Write back the filtered content + let mut file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(file_path).map_err(|_| Error::InvalidListenerFileOperation)?; + + for line in lines { + writeln!(file, "{}", line).map_err(|_| Error::InvalidListenerFileOperation)?; // Write each line with a newline + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*;