Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop rpc replace metrics #2670

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 42 additions & 2 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl NetworkBuilder {
let upnp = self.upnp;

let (network, events_receiver, mut swarm_driver) =
self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp);
self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp, Some(root_dir.clone()));

// Listen on the provided address
let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
Expand Down Expand Up @@ -425,7 +425,7 @@ impl NetworkBuilder {
.set_replication_factor(REPLICATION_FACTOR);

let (network, net_event_recv, driver) =
self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false);
self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false, None);

(network, net_event_recv, driver)
}
Expand All @@ -438,6 +438,7 @@ impl NetworkBuilder {
is_client: bool,
req_res_protocol: ProtocolSupport,
upnp: bool,
root_dir: Option<PathBuf>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is optional, maybe should this be a method inside the NetworkBuilder?

) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
let identify_protocol_str = IDENTIFY_PROTOCOL_STR
.read()
Expand Down Expand Up @@ -510,6 +511,45 @@ impl NetworkBuilder {
)]),
);

let metadata_extended_sub_reg = metrics_registries
.metadata_extended
.sub_registry_with_prefix("ant_networking");

metadata_extended_sub_reg.register(
"peer_id",
"Identifier of a peer of the network",
Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
);

metadata_extended_sub_reg.register(
"pid",
"id of the node process",
Info::new(vec![("pid".to_string(), std::process::id().to_string())]),
);

metadata_extended_sub_reg.register(
"bin_version",
"Package version of the node",
Info::new(vec![("bin_version".to_string(), env!("CARGO_PKG_VERSION").to_string())]),
);

if let Some(root_dir) = root_dir.clone() {
metadata_extended_sub_reg.register(
"data_dir",
"Root directory of the node",
Info::new(vec![("root_dir".to_string(), root_dir.clone().to_string_lossy().to_string())]),
);
}

if let Some(log_dir) = root_dir {
let log_dir = log_dir.join("logs");
metadata_extended_sub_reg.register(
"log_dir",
"Root directory of the node",
Info::new(vec![("log_dir".to_string(), log_dir.clone().to_string_lossy().to_string())]),
);
}

run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
Expand Down
43 changes: 41 additions & 2 deletions ant-networking/src/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct MetricsRegistries {
pub standard_metrics: Registry,
pub extended_metrics: Registry,
pub metadata: Registry,
pub metadata_extended: Registry,
}

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
Expand All @@ -36,7 +37,7 @@ pub(crate) fn run_metrics_server(registries: MetricsRegistries, port: u16) {
info!("Metrics server on http://{}/metrics", server.local_addr());
println!("Metrics server on http://{}/metrics", server.local_addr());

info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata", server.local_addr());
info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata, /metadata_extended", server.local_addr());
Dismissed Show dismissed Hide dismissed
// run the server forever
if let Err(e) = server.await {
error!("server error: {}", e);
Expand All @@ -50,6 +51,7 @@ pub(crate) struct MetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MetricService {
Expand All @@ -65,6 +67,11 @@ impl MetricService {
Arc::clone(&self.metadata)
}

fn get_metadata_extended_registry(&mut self) -> SharedRegistry {
Arc::clone(&self.metadata_extended)
}


fn respond_with_metrics(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

Expand Down Expand Up @@ -152,6 +159,28 @@ impl MetricService {
Ok(response)
}

fn respond_with_metadata_extended(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE
.try_into()
.map_err(|_| NetworkError::NetworkMetricError)?,
);

let reg = self.get_metadata_extended_registry();
let reg = reg.lock().map_err(|_| NetworkError::NetworkMetricError)?;
encode(&mut response.body_mut(), &reg).map_err(|err| {
error!("Failed to encode the metadata Registry {err:?}");
NetworkError::NetworkMetricError
})?;

*response.status_mut() = StatusCode::OK;

Ok(response)
}

fn respond_with_404_not_found(&mut self) -> Response<String> {
let mut resp = Response::default();
*resp.status_mut() = StatusCode::NOT_FOUND;
Expand Down Expand Up @@ -196,7 +225,13 @@ impl Service<Request<Body>> for MetricService {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
} else {
} else if req_method == Method::GET && req_path == "/metadata_extended" {
match self.respond_with_metadata_extended() {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
}
else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
Expand All @@ -207,6 +242,7 @@ pub(crate) struct MakeMetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MakeMetricService {
Expand All @@ -215,6 +251,7 @@ impl MakeMetricService {
standard_registry: Arc::new(Mutex::new(registries.standard_metrics)),
extended_registry: Arc::new(Mutex::new(registries.extended_metrics)),
metadata: Arc::new(Mutex::new(registries.metadata)),
metadata_extended: Arc::new(Mutex::new(registries.metadata_extended))
}
}
}
Expand All @@ -232,12 +269,14 @@ impl<T> Service<T> for MakeMetricService {
let standard_registry = Arc::clone(&self.standard_registry);
let extended_registry = Arc::clone(&self.extended_registry);
let metadata = Arc::clone(&self.metadata);
let metadata_extended = Arc::clone(&self.metadata_extended);

let fut = async move {
Ok(MetricService {
standard_registry,
extended_registry,
metadata,
metadata_extended,
})
};
Box::pin(fut)
Expand Down
5 changes: 1 addition & 4 deletions ant-node-manager/src/add_services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,9 @@ pub async fn add_node(
};
let metrics_free_port = if let Some(port) = metrics_port {
Some(port)
} else if options.enable_metrics_server {
Some(service_control.get_available_port()?)
} else {
None
Some(service_control.get_available_port()?)
};

let rpc_socket_addr = if let Some(addr) = options.rpc_address {
SocketAddr::new(IpAddr::V4(addr), rpc_free_port)
} else {
Expand Down
4 changes: 2 additions & 2 deletions ant-node-manager/src/bin/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 220)]
interval: u64,
/// Specify the logging format.
///
Expand Down Expand Up @@ -798,7 +798,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 220)]
interval: u64,
/// Specify the logging format.
///
Expand Down
41 changes: 20 additions & 21 deletions ant-node-manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
add_services::{
add_node,
config::{AddNodeServiceOptions, PortRange},
},
config::{self, is_running_as_root},
helpers::{download_and_extract_release, get_bin_version},
print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
}, config::{self, is_running_as_root}, error::Error, helpers::{download_and_extract_release, get_bin_version}, print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel
};
use ant_bootstrap::PeersArgs;
use ant_evm::{EvmNetwork, RewardsAddress};
use ant_logging::LogFormat;
use ant_releases::{AntReleaseRepoActions, ReleaseType};
use ant_service_management::{
control::{ServiceControl, ServiceController},
rpc::RpcClient,
NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
};
metric::MetricClient, NodeRegistry, NodeService,
ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult};
use color_eyre::{eyre::eyre, Help, Result};
use colored::Colorize;
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -181,10 +177,11 @@

for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
let metrics_port: u16 = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
println!("{}: {}", service.service_data.service_name, 0,);
}
Ok(())
}
Expand Down Expand Up @@ -223,8 +220,10 @@
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;

let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
match service_manager.remove(keep_directories).await {
Expand Down Expand Up @@ -310,10 +309,9 @@
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);

let service = NodeService::new(node, Box::new(rpc_client));

let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down Expand Up @@ -406,8 +404,9 @@
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

Expand Down Expand Up @@ -518,9 +517,9 @@
target_version: target_version.clone(),
};
let service_name = node.service_name.clone();

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down
2 changes: 2 additions & 0 deletions ant-node-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum Error {
PidNotFoundAfterStarting,
#[error("The PID of the process was not set.")]
PidNotSet,
#[error("The metric port of the node is empty")]
MetricPortEmpty,
#[error(transparent)]
SemverError(#[from] semver::Error),
#[error("The service(s) is already running: {0:?}")]
Expand Down
12 changes: 7 additions & 5 deletions ant-node-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ impl From<u8> for VerbosityLevel {
}

use crate::error::{Error, Result};
use ant_service_management::metric::MetricClient;
use ant_service_management::rpc::RpcActions;
use ant_service_management::{
control::ServiceControl, error::Error as ServiceError, rpc::RpcClient, NodeRegistry,
control::ServiceControl, error::Error as ServiceError, NodeRegistry,
NodeService, NodeServiceData, ServiceStateActions, ServiceStatus, UpgradeOptions,
UpgradeResult,
};
Expand Down Expand Up @@ -549,17 +550,18 @@ pub async fn refresh_node_registry(
// exists.
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
node.reward_balance = None;
let metrics_port = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;

let mut rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
rpc_client.set_max_attempts(1);
let mut service = NodeService::new(node, Box::new(rpc_client.clone()));
let metric_client = MetricClient::new(metrics_port);
let mut service = NodeService::new(node, Box::new(metric_client.clone()));

if is_local_network {
// For a local network, retrieving the process by its path does not work, because the
// paths are not unique: they are all launched from the same binary. Instead we will
// just determine whether the node is running by connecting to its RPC service. We
// only need to distinguish between `RUNNING` and `STOPPED` for a local network.
match rpc_client.node_info().await {
match metric_client.node_info().await {
Ok(info) => {
let pid = info.pid;
debug!(
Expand Down
Loading
Loading