Skip to content

Commit

Permalink
feat: replace metric_service with rpc in antctl
Browse files Browse the repository at this point in the history
  • Loading branch information
ermineJose committed Jan 28, 2025
1 parent 1224c87 commit cb994a9
Show file tree
Hide file tree
Showing 12 changed files with 366 additions and 43 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 42 additions & 2 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl NetworkBuilder {
let upnp = self.upnp;

let (network, events_receiver, mut swarm_driver) =
self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp);
self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp, Some(root_dir.clone()));

// Listen on the provided address
let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
Expand Down Expand Up @@ -425,7 +425,7 @@ impl NetworkBuilder {
.set_replication_factor(REPLICATION_FACTOR);

let (network, net_event_recv, driver) =
self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false);
self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false, None);

(network, net_event_recv, driver)
}
Expand All @@ -438,6 +438,7 @@ impl NetworkBuilder {
is_client: bool,
req_res_protocol: ProtocolSupport,
upnp: bool,
root_dir: Option<PathBuf>
) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
let identify_protocol_str = IDENTIFY_PROTOCOL_STR
.read()
Expand Down Expand Up @@ -510,6 +511,45 @@ impl NetworkBuilder {
)]),
);

let metadata_extended_sub_reg = metrics_registries
.metadata_extended
.sub_registry_with_prefix("ant_networking");

metadata_extended_sub_reg.register(
"peer_id",
"Identifier of a peer of the network",
Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
);

metadata_extended_sub_reg.register(
"pid",
"id of the node process",
Info::new(vec![("pid".to_string(), std::process::id().to_string())]),
);

metadata_extended_sub_reg.register(
"bin_version",
"Package version of the node",
Info::new(vec![("bin_version".to_string(), env!("CARGO_PKG_VERSION").to_string())]),
);

if let Some(root_dir) = root_dir.clone() {
metadata_extended_sub_reg.register(
"data_dir",
"Root directory of the node",
Info::new(vec![("root_dir".to_string(), root_dir.clone().to_string_lossy().to_string())]),
);
}

if let Some(log_dir) = root_dir {
let log_dir = log_dir.join("logs");
metadata_extended_sub_reg.register(
"log_dir",
"Root directory of the node",
Info::new(vec![("log_dir".to_string(), log_dir.clone().to_string_lossy().to_string())]),
);
}

run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
Expand Down
43 changes: 41 additions & 2 deletions ant-networking/src/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct MetricsRegistries {
pub standard_metrics: Registry,
pub extended_metrics: Registry,
pub metadata: Registry,
pub metadata_extended: Registry,
}

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
Expand All @@ -36,7 +37,7 @@ pub(crate) fn run_metrics_server(registries: MetricsRegistries, port: u16) {
info!("Metrics server on http://{}/metrics", server.local_addr());
println!("Metrics server on http://{}/metrics", server.local_addr());

info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata", server.local_addr());
info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata, /metadata_extended", server.local_addr());

Check warning

Code scanning / devskim

An HTTP-based URL without TLS was detected. Warning

Insecure URL
// run the server forever
if let Err(e) = server.await {
error!("server error: {}", e);
Expand All @@ -50,6 +51,7 @@ pub(crate) struct MetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MetricService {
Expand All @@ -65,6 +67,11 @@ impl MetricService {
Arc::clone(&self.metadata)
}

fn get_metadata_extended_registry(&mut self) -> SharedRegistry {
Arc::clone(&self.metadata_extended)
}


fn respond_with_metrics(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

Expand Down Expand Up @@ -152,6 +159,28 @@ impl MetricService {
Ok(response)
}

fn respond_with_metadata_extended(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE
.try_into()
.map_err(|_| NetworkError::NetworkMetricError)?,
);

let reg = self.get_metadata_extended_registry();
let reg = reg.lock().map_err(|_| NetworkError::NetworkMetricError)?;
encode(&mut response.body_mut(), &reg).map_err(|err| {
error!("Failed to encode the metadata Registry {err:?}");
NetworkError::NetworkMetricError
})?;

*response.status_mut() = StatusCode::OK;

Ok(response)
}

fn respond_with_404_not_found(&mut self) -> Response<String> {
let mut resp = Response::default();
*resp.status_mut() = StatusCode::NOT_FOUND;
Expand Down Expand Up @@ -196,7 +225,13 @@ impl Service<Request<Body>> for MetricService {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
} else {
} else if req_method == Method::GET && req_path == "/metadata_extended" {
match self.respond_with_metadata_extended() {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
}
else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
Expand All @@ -207,6 +242,7 @@ pub(crate) struct MakeMetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MakeMetricService {
Expand All @@ -215,6 +251,7 @@ impl MakeMetricService {
standard_registry: Arc::new(Mutex::new(registries.standard_metrics)),
extended_registry: Arc::new(Mutex::new(registries.extended_metrics)),
metadata: Arc::new(Mutex::new(registries.metadata)),
metadata_extended: Arc::new(Mutex::new(registries.metadata_extended))
}
}
}
Expand All @@ -232,12 +269,14 @@ impl<T> Service<T> for MakeMetricService {
let standard_registry = Arc::clone(&self.standard_registry);
let extended_registry = Arc::clone(&self.extended_registry);
let metadata = Arc::clone(&self.metadata);
let metadata_extended = Arc::clone(&self.metadata);

let fut = async move {
Ok(MetricService {
standard_registry,
extended_registry,
metadata,
metadata_extended,
})
};
Box::pin(fut)
Expand Down
4 changes: 2 additions & 2 deletions ant-node-manager/src/bin/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 250)]
interval: u64,
/// Specify the logging format.
///
Expand Down Expand Up @@ -798,7 +798,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 250)]
interval: u64,
/// Specify the logging format.
///
Expand Down
28 changes: 12 additions & 16 deletions ant-node-manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use ant_logging::LogFormat;
use ant_releases::{AntReleaseRepoActions, ReleaseType};
use ant_service_management::{
control::{ServiceControl, ServiceController},
rpc::RpcClient,
NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
};
metric::MetricClient, NodeRegistry, NodeService,
ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult};
use color_eyre::{eyre::eyre, Help, Result};
use colored::Colorize;
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -181,9 +180,8 @@ pub async fn balance(

for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
println!("{}: {}", service.service_data.service_name, 0,);
}
Ok(())
Expand Down Expand Up @@ -223,8 +221,8 @@ pub async fn remove(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
match service_manager.remove(keep_directories).await {
Expand Down Expand Up @@ -310,10 +308,8 @@ pub async fn start(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);

let service = NodeService::new(node, Box::new(rpc_client));

let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down Expand Up @@ -406,8 +402,8 @@ pub async fn stop(
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

Expand Down Expand Up @@ -519,8 +515,8 @@ pub async fn upgrade(
};
let service_name = node.service_name.clone();

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down
10 changes: 5 additions & 5 deletions ant-node-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ impl From<u8> for VerbosityLevel {
}

use crate::error::{Error, Result};
use ant_service_management::metric::MetricClient;
use ant_service_management::rpc::RpcActions;
use ant_service_management::{
control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry,
control::ServiceControl, error::Error as ServiceError, NodeRegistry,
NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions,
UpgradeResult,
};
Expand Down Expand Up @@ -550,16 +551,15 @@ pub async fn refresh_node_registry(
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
node.reward_balance = None;

let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
rpc_client.set_max_attempts(1);
let mut service = NodeService::new(node, Box::new(rpc_client.clone()));
let metric_client = MetricClient::new(node.metrics_port.unwrap_or(0));
let mut service = NodeService::new(node, Box::new(metric_client.clone()));

if is_local_network {
// For a local network, retrieving the process by its path does not work, because the
// paths are not unique: they are all launched from the same binary. Instead we will
// just determine whether the node is running by connecting to its RPC service. We
// only need to distinguish between `RUNNING` and `STOPPED` for a local network.
match rpc_client.node_info().await {
match metric_client.node_info().await {
Ok(info) => {
let pid = info.pid;
debug!(
Expand Down
22 changes: 12 additions & 10 deletions ant-node-manager/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use crate::helpers::{
use ant_bootstrap::PeersArgs;
use ant_evm::{EvmNetwork, RewardsAddress};
use ant_logging::LogFormat;
use ant_service_management::metric::MetricClient;
use ant_service_management::{
control::ServiceControl,
rpc::{RpcActions, RpcClient},
rpc::RpcActions,
NodeRegistry, NodeServiceData, ServiceStatus,
};
use color_eyre::eyre::OptionExt;
Expand Down Expand Up @@ -277,7 +278,7 @@ pub async fn run_network(
};
let rpc_socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);
let metric_client = MetricClient::new(metrics_free_port.unwrap());

let number = (node_registry.nodes.len() as u16) + 1;
let node = run_node(
Expand All @@ -294,7 +295,7 @@ pub async fn run_network(
version: get_bin_version(&launcher.get_antnode_path())?,
},
&launcher,
&rpc_client,
&metric_client,
)
.await?;
node_registry.nodes.push(node.clone());
Expand Down Expand Up @@ -323,7 +324,8 @@ pub async fn run_network(
};
let rpc_socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_free_port);
let rpc_client = RpcClient::from_socket_addr(rpc_socket_addr);

let metric_client = MetricClient::new(metrics_free_port.unwrap());

let number = (node_registry.nodes.len() as u16) + 1;
let node = run_node(
Expand All @@ -340,7 +342,7 @@ pub async fn run_network(
version: get_bin_version(&launcher.get_antnode_path())?,
},
&launcher,
&rpc_client,
&metric_client,
)
.await?;
node_registry.nodes.push(node);
Expand Down Expand Up @@ -382,7 +384,7 @@ pub struct RunNodeOptions {
pub async fn run_node(
run_options: RunNodeOptions,
launcher: &dyn Launcher,
rpc_client: &dyn RpcActions,
metric_client: &dyn RpcActions,
) -> Result<NodeServiceData> {
info!("Launching node {}...", run_options.number);
println!("Launching node {}...", run_options.number);
Expand All @@ -397,9 +399,9 @@ pub async fn run_node(
)?;
launcher.wait(run_options.interval);

let node_info = rpc_client.node_info().await?;
let node_info = metric_client.node_info().await?;
let peer_id = node_info.peer_id;
let network_info = rpc_client.network_info().await?;
let network_info = metric_client.network_info().await?;
let connected_peers = Some(network_info.connected_peers);
let listen_addrs = network_info
.listeners
Expand Down Expand Up @@ -472,8 +474,8 @@ async fn validate_network(node_registry: &mut NodeRegistry, peers: Vec<Multiaddr
all_peers.extend(additional_peers);

for node in node_registry.nodes.iter() {
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let net_info = rpc_client.network_info().await?;
let metric_client = MetricClient::new(node.metrics_port.unwrap());
let net_info = metric_client.network_info().await?;
let peers = net_info.connected_peers;
let peer_id = node.peer_id.ok_or_eyre("The PeerId was not set")?;
debug!("Node {peer_id} has {} peers", peers.len());
Expand Down
Loading

0 comments on commit cb994a9

Please sign in to comment.