Skip to content

Commit

Permalink
feat: add proper listeners support and add a workaround for connected…
Browse files Browse the repository at this point in the history
…_peers
  • Loading branch information
ermineJose committed Feb 2, 2025
1 parent 65c07df commit d7e2457
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 24 deletions.
5 changes: 5 additions & 0 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum NetworkEvent {
KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
/// Started listening on a new address
NewListenAddr(Multiaddr),
/// stopped Listening from a address
ClosedListenAddr(Vec<Multiaddr>),
/// Report unverified record
UnverifiedRecord(Record),
/// Terminate Node on unrecoverable errors
Expand Down Expand Up @@ -180,6 +182,9 @@ impl Debug for NetworkEvent {
NetworkEvent::NewListenAddr(addr) => {
write!(f, "NetworkEvent::NewListenAddr({addr:?})")
}
NetworkEvent::ClosedListenAddr(addr ) => {
write!(f, "NetworkEvent::ClosedListenAddr({addr:?})")
}
NetworkEvent::UnverifiedRecord(record) => {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
Expand Down
2 changes: 2 additions & 0 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ impl SwarmDriver {
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_listener_closed(&listener_id, &mut self.swarm);
}

self.send_event(NetworkEvent::ClosedListenAddr(addresses.clone()));
}
SwarmEvent::IncomingConnection {
connection_id,
Expand Down
33 changes: 25 additions & 8 deletions ant-node-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,18 @@ pub async fn status_report(
println!("{json}");
} else if detailed_view {
for node in &node_registry.nodes {
print_banner(&format!(
// TODO: add this parameter as connected_peer_count in registry instead of handling it from here directly.

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let metrics_port = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let scrape = metric_client.get_endpoint_metrics("metrics").await?;
let connected_peer_count;
if let Ok(peer_num) = metric_client.get_connected_peer_num_from_metrics(&scrape) {
connected_peer_count = peer_num;
} else {
connected_peer_count = 0;
}
print_banner(&format!(
"{} - {}",
&node.service_name,
format_status_without_colour(&node.status)
Expand All @@ -414,9 +425,7 @@ pub async fn status_report(
println!("Bin path: {}", node.antnode_path.to_string_lossy());
println!(
"Connected peers: {}",
node.connected_peers
.as_ref()
.map_or("-".to_string(), |p| p.len().to_string())
connected_peer_count
);
println!(
"Reward balance: {}",
Expand Down Expand Up @@ -459,10 +468,18 @@ pub async fn status_report(
.collect::<Vec<&NodeServiceData>>();
for node in nodes {
let peer_id = node.peer_id.map_or("-".to_string(), |p| p.to_string());
let connected_peers = node
.connected_peers
.clone()
.map_or("-".to_string(), |p| p.len().to_string());
// TODO: add this parameter as connected_peer_count in registry instead of handling it from here directly.

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let metrics_port = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let scrape = metric_client.get_endpoint_metrics("metrics").await?;
let connected_peer_count;
if let Ok(peer_num) = metric_client.get_connected_peer_num_from_metrics(&scrape) {
connected_peer_count = peer_num;
} else {
connected_peer_count = 0;
}
let connected_peers = connected_peer_count.to_string();
println!(
"{:<18} {:<52} {:<7} {:>15}",
node.service_name,
Expand Down
2 changes: 2 additions & 0 deletions ant-node/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ pub enum Error {
InvalidRequest(String),
#[error("EVM Network error: {0}")]
EvmNetwork(String),
#[error("Not able to open the file for Listeners with option")]
InvalidListenerFileOperation,
}
112 changes: 96 additions & 16 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use super::{
error::Result, event::NodeEventsChannel, quote::quotes_verification, Marker, NodeEvent,
};
use crate::error::Error;
#[cfg(feature = "open-metrics")]
use crate::metrics::NodeMetricsRecorder;
use crate::RunningNode;
Expand All @@ -28,6 +29,7 @@ use ant_protocol::{
NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use ant_service_management::metric::{write_network_metrics_to_file,NetworkInfoMetrics};
// use autonomi::client::address;
use bytes::Bytes;
use itertools::Itertools;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
Expand All @@ -36,6 +38,7 @@ use rand::{
rngs::{OsRng, StdRng},
thread_rng, Rng, SeedableRng,
};
use std::io::{BufRead, Write};
use std::{
collections::HashMap,
net::SocketAddr,
Expand Down Expand Up @@ -198,6 +201,7 @@ impl NodeBuilder {
#[cfg(feature = "open-metrics")]
metrics_recorder,
evm_network: self.evm_network,
root_dir: self.root_dir.clone(),
};

let node = Node {
Expand All @@ -219,21 +223,6 @@ impl NodeBuilder {
};

// Run the node
let runing_node_metrics = running_node.clone();
let _return_value = tokio::spawn(async move {
sleep(Duration::from_millis(200)).await;
let state = runing_node_metrics.get_swarm_local_state().await.expect("Failed to get swarm local state");
let connected_peers = state.connected_peers.iter().map(|p| p.to_string()).collect();
let listeners = state.listeners.iter().map(|m| m.to_string()).collect();
let network_info = NetworkInfoMetrics::new(connected_peers, listeners);

let _ = write_network_metrics_to_file(
runing_node_metrics.root_dir_path.clone(),
network_info,
runing_node_metrics.network.peer_id().to_string()
);
});

Ok(running_node)
}
}
Expand All @@ -257,6 +246,7 @@ struct NodeInner {
metrics_recorder: Option<NodeMetricsRecorder>,
reward_address: RewardsAddress,
evm_network: EvmNetwork,
root_dir: PathBuf,
}

impl Node {
Expand All @@ -275,6 +265,10 @@ impl Node {
&self.inner.network
}

pub(crate) fn get_root_dir(&self) -> &PathBuf {
&self.inner.root_dir
}

#[cfg(feature = "open-metrics")]
/// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
/// This is used to record various metrics for the node.
Expand Down Expand Up @@ -451,7 +445,7 @@ impl Node {
fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
let start = Instant::now();
let event_string = format!("{event:?}");
let event_header;
let mut event_header = "UnknownEvent";
debug!("Handling NetworkEvent {event_string:?}");

match event {
Expand Down Expand Up @@ -492,14 +486,53 @@ impl Node {
event_header = "NewListenAddr";
let network = self.network().clone();
let peers = self.initial_peers().clone();
let peer_id = self.network().peer_id().clone();
let root_dir_nw_info = self
.get_root_dir()
.clone()
.join("network_info")
.join(format!("listeners_{}", peer_id));

let path = std::path::Path::new(&root_dir_nw_info);

if !path.exists() {
println!("File does not exist. Creating it now...");
match std::fs::File::create(&root_dir_nw_info) {
Ok(_) => println!("File created successfully: {:?}", root_dir_nw_info),
Err(e) => eprintln!("Failed to create file: {}", e),
}
}

let _handle = spawn(async move {
for addr in peers {
if !contains_string(&root_dir_nw_info, &addr.to_string()) {
_ = append_to_file(&root_dir_nw_info, &addr.clone().to_string());
}
if let Err(err) = network.dial(addr.clone()).await {
tracing::error!("Failed to dial {addr}: {err:?}");
};
}
});
}
NetworkEvent::ClosedListenAddr(address ) => {
let peer_id = self.network().peer_id().clone();
let root_dir_nw_info = self
.get_root_dir()
.clone()
.join("network_info")
.join(format!("listeners_{}", peer_id));
let path = std::path::Path::new(&root_dir_nw_info);

if path.exists() {
let _handle = spawn(async move {
for addr in address {
if contains_string(&root_dir_nw_info, &addr.to_string()) {
_ = remove_from_file(&root_dir_nw_info, &addr.clone().to_string());
}
}
});
}
}
NetworkEvent::ResponseReceived { res } => {
event_header = "ResponseReceived";
debug!("NetworkEvent::ResponseReceived {res:?}");
Expand Down Expand Up @@ -1110,6 +1143,53 @@ fn challenge_score_scheme(
)
}

fn contains_string(file_path: &PathBuf, search_str: &str) -> bool {
match std::fs::read_to_string(file_path) {
Ok(contents) => contents.contains(search_str),
Err(e) => {
eprintln!("Failed to read file: {}", e);
false
}
}
}

fn append_to_file(file_path: &PathBuf, new_str: &str) -> Result<()> {
let mut file = std::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(file_path)
.map_err(|_| Error::InvalidListenerFileOperation)?;
if let Err(e) = file.write_all(format!("{}\n", new_str).as_bytes()) {
eprintln!("Failed to write to file: {}", e);
}
Ok(())
}

fn remove_from_file(file_path: &PathBuf, new_str: &str) -> Result<()> {
// Read all lines from the file
let file = std::fs::File::open(file_path).map_err(|_| Error::InvalidListenerFileOperation)?;
let reader = std::io::BufReader::new(file);

let lines: Vec<String> = reader
.lines()
.filter_map(|line| line.ok()) // Handle errors while reading
.filter(|line| !line.contains(new_str)) // Remove lines containing the keyword
.collect();

// Write back the filtered content
let mut file = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(file_path).map_err(|_| Error::InvalidListenerFileOperation)?;

for line in lines {
writeln!(file, "{}", line).map_err(|_| Error::InvalidListenerFileOperation)?; // Write each line with a newline
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit d7e2457

Please sign in to comment.