Skip to content

Commit

Permalink
feat: replace runtime rpc with metric client
Browse files Browse the repository at this point in the history
  • Loading branch information
ermineJose committed Jan 25, 2025
1 parent 565c480 commit 3786eb7
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 55 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ impl NetworkBuilder {

let metadata_extended_sub_reg = metrics_registries
.metadata_extended
.sub_registry_with_prefix("ant-networking");
.sub_registry_with_prefix("ant_networking");

metadata_extended_sub_reg.register(
"peer_id",
Expand Down Expand Up @@ -599,18 +599,6 @@ impl NetworkBuilder {
);
}

metadata_extended_sub_reg.register(
"uptime",
"id of the node process",
Info::new(vec![("pid".to_string(), "0".to_string())]),
);

metadata_extended_sub_reg.register(
"wallet balance",
"id of the node process",
Info::new(vec![("wallet_balance".to_string(), "0".to_string())]),
);

run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
Expand Down
2 changes: 1 addition & 1 deletion ant-node-manager/src/bin/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 2000)]
interval: u64,
/// Specify the logging format.
///
Expand Down
24 changes: 11 additions & 13 deletions ant-node-manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use ant_evm::{EvmNetwork, RewardsAddress};
use ant_logging::LogFormat;
use ant_releases::{AntReleaseRepoActions, ReleaseType};
use ant_service_management::{
control::{ServiceControl, ServiceController},
rpc::RpcClient,
NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
control::{ServiceControl, ServiceController}, metric::MetricClient, NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult
};
use color_eyre::{eyre::eyre, Help, Result};
use colored::Colorize;
Expand Down Expand Up @@ -181,8 +179,8 @@ pub async fn balance(

for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
println!("{}: {}", service.service_data.service_name, 0,);
}
Expand Down Expand Up @@ -223,8 +221,8 @@ pub async fn remove(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
match service_manager.remove(keep_directories).await {
Expand Down Expand Up @@ -310,9 +308,9 @@ pub async fn start(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let metric_client = MetricClient::new(node.metrics_port.unwrap());

let service = NodeService::new(node, Box::new(rpc_client));
let service = NodeService::new(node, Box::new(metric_client));

// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
Expand Down Expand Up @@ -406,8 +404,8 @@ pub async fn stop(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

Expand Down Expand Up @@ -519,8 +517,8 @@ pub async fn upgrade(
};
let service_name = node.service_name.clone();

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down
10 changes: 5 additions & 5 deletions ant-node-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ impl From<u8> for VerbosityLevel {
}

use crate::error::{Error, Result};
use ant_service_management::metric::MetricClient;
use ant_service_management::rpc::RpcActions;
use ant_service_management::{
control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry,
control::ServiceControl, error::Error as ServiceError, NodeRegistry,
NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions,
UpgradeResult,
};
Expand Down Expand Up @@ -550,16 +551,15 @@ pub async fn refresh_node_registry(
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
node.reward_balance = None;

let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
rpc_client.set_max_attempts(1);
let mut service = NodeService::new(node, Box::new(rpc_client.clone()));
let metric_client = MetricClient::new(node.metrics_port.unwrap_or(0));
let mut service = NodeService::new(node, Box::new(metric_client.clone()));

if is_local_network {
// For a local network, retrieving the process by its path does not work, because the
// paths are not unique: they are all launched from the same binary. Instead we will
// just determine whether the node is running by connecting to its RPC service. We
// only need to distinguish between `RUNNING` and `STOPPED` for a local network.
match rpc_client.node_info().await {
match metric_client.node_info().await {
Ok(info) => {
let pid = info.pid;
debug!(
Expand Down
25 changes: 13 additions & 12 deletions ant-node-manager/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use crate::helpers::{
use ant_bootstrap::PeersArgs;
use ant_evm::{EvmNetwork, RewardsAddress};
use ant_logging::LogFormat;
use ant_service_management::metric::MetricClient;
use ant_service_management::{
control::ServiceControl,
rpc::{RpcActions, RpcClient},
rpc::RpcActions,
NodeRegistry, NodeServiceData, ServiceStatus,
};
use color_eyre::eyre::OptionExt;
Expand Down Expand Up @@ -277,8 +278,7 @@ pub async fn run_network(
};
let rpc_socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);

let metric_client = MetricClient::new(metrics_free_port.unwrap());
let number = (node_registry.nodes.len() as u16) + 1;
let node = run_node(
RunNodeOptions {
Expand All @@ -294,9 +294,10 @@ pub async fn run_network(
version: get_bin_version(&launcher.get_antnode_path())?,
},
&launcher,
&rpc_client,
&metric_client,
)
.await?;

node_registry.nodes.push(node.clone());
let bootstrap_peers = node
.listen_addr
Expand All @@ -323,8 +324,7 @@ pub async fn run_network(
};
let rpc_socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);

let metric_client = MetricClient::new(metrics_free_port.unwrap());
let number = (node_registry.nodes.len() as u16) + 1;
let node = run_node(
RunNodeOptions {
Expand All @@ -340,7 +340,7 @@ pub async fn run_network(
version: get_bin_version(&launcher.get_antnode_path())?,
},
&launcher,
&rpc_client,
&metric_client,
)
.await?;
node_registry.nodes.push(node);
Expand Down Expand Up @@ -382,7 +382,7 @@ pub struct RunNodeOptions {
pub async fn run_node(
run_options: RunNodeOptions,
launcher: &dyn Launcher,
rpc_client: &dyn RpcActions,
metric_client: &dyn RpcActions,
) -> Result<NodeServiceData> {
info!("Launching node {}...", run_options.number);
println!("Launching node {}...", run_options.number);
Expand All @@ -397,9 +397,10 @@ pub async fn run_node(
)?;
launcher.wait(run_options.interval);

let node_info = rpc_client.node_info().await?;
let node_info = metric_client.node_info().await?;
let peer_id = node_info.peer_id;
let network_info = rpc_client.network_info().await?;
let network_info = metric_client.network_info().await?;
println!("network_info from the antctl is {:?}", network_info);
let connected_peers = Some(network_info.connected_peers);
let listen_addrs = network_info
.listeners
Expand Down Expand Up @@ -472,8 +473,8 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr
all_peers.extend(additional_peers);

for node in node_registry.nodes.iter() {
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let net_info = rpc_client.network_info().await?;
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let net_info = metric_client.network_info().await?;
let peers = net_info.connected_peers;
let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
debug!("Node {peer_id} has {} peers", peers.len());
Expand Down
12 changes: 5 additions & 7 deletions ant-node-manager/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use crate::{
VerbosityLevel,
};
use ant_service_management::{
control::{ServiceControl, ServiceController},
rpc::RpcClient,
NodeRegistry, NodeService, NodeServiceData, ServiceStatus,
control::{ServiceControl, ServiceController}, metric::MetricClient, NodeRegistry, NodeService, NodeServiceData, ServiceStatus
};
use color_eyre::{
eyre::{eyre, OptionExt},
Expand All @@ -37,8 +35,8 @@ pub async fn restart_node_service(
})?;
let current_node_clone = current_node_mut.clone();

let rpc_client = RpcClient::from_socket_addr(current_node_mut.rpc_socket_addr);
let service = NodeService::new(current_node_mut, Box::new(rpc_client));
let metric_client = MetricClient::new(current_node_mut.metrics_port.unwrap());
let service = NodeService::new(current_node_mut, Box::new(metric_client));
let mut service_manager = ServiceManager::new(
service,
Box::new(ServiceController {}),
Expand Down Expand Up @@ -235,8 +233,8 @@ pub async fn restart_node_service(
version: current_node_clone.version.clone(),
};

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(&mut node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(&mut node, Box::new(metric_client));
let mut service_manager = ServiceManager::new(
service,
Box::new(ServiceController {}),
Expand Down
3 changes: 1 addition & 2 deletions ant-node/src/bin/antnode/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ impl AntNode for SafeNodeRpcService {
.get_swarm_local_state()
.await
.expect("failed to get local swarm state");
let connected_peers = state.connected_peers.iter().map(|p| p.to_bytes()).collect();
let connected_peers:Vec<Vec<u8>> = state.connected_peers.iter().map(|p| p.to_bytes()).collect();
let listeners = state.listeners.iter().map(|m| m.to_string()).collect();

let resp = Response::new(NetworkInfoResponse {
connected_peers,
listeners,
Expand Down
22 changes: 20 additions & 2 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use ant_protocol::{
storage::ValidationType,
NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use ant_service_management::metric::{write_network_metrics_to_file,read_network_metrics_from_file, NetworkInfoMetrics};
use bytes::Bytes;
use itertools::Itertools;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
Expand Down Expand Up @@ -211,8 +212,25 @@ impl NodeBuilder {
rewards_address: self.evm_address,
};

// Run the node
node.run(swarm_driver, network_event_receiver);
// Run the node
let mut connected_peers = vec![];
let mut listeners = vec![];
let runing_node_metrics = running_node.clone();
let _return_value = tokio::spawn(async move {
sleep(Duration::from_millis(200)).await;
let state = runing_node_metrics.get_swarm_local_state().await.expect("Failed to get swarm local state");
connected_peers = state.connected_peers.iter().map(|p| p.to_string()).collect();
listeners = state.listeners.iter().map(|m| m.to_string()).collect();
let network_info = NetworkInfoMetrics::new(connected_peers, listeners);

write_network_metrics_to_file(
runing_node_metrics.root_dir_path.clone(),
network_info,
runing_node_metrics.network.peer_id().to_string()
);
});

node.run(swarm_driver, network_event_receiver);

Ok(running_node)
}
Expand Down
5 changes: 5 additions & 0 deletions ant-service-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ tokio = { version = "1.32.0", features = ["time"] }
tonic = { version = "0.6.2" }
tracing = { version = "~0.1.26" }
tracing-core = "0.1.30"
prometheus-parse = "0.2.5"
regex = "1.11.0"
reqwest = { version = "0.12.2", default-features = false, features = [
"rustls-tls-manual-roots",
] }

[build-dependencies]
# watch out updating this, protoc compiler needs to be installed on all build systems
Expand Down
1 change: 1 addition & 0 deletions ant-service-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod error;
pub mod faucet;
pub mod node;
pub mod rpc;
pub mod metric;

#[macro_use]
extern crate tracing;
Expand Down
Loading

0 comments on commit 3786eb7

Please sign in to comment.