diff --git a/sn_node_manager/src/bin/cli/main.rs b/sn_node_manager/src/bin/cli/main.rs index 948b6d4f66..7a6f1b9906 100644 --- a/sn_node_manager/src/bin/cli/main.rs +++ b/sn_node_manager/src/bin/cli/main.rs @@ -13,6 +13,7 @@ use sn_node_manager::{ cmd, VerbosityLevel, }; use sn_peers_acquisition::PeersArgs; +use std::net::SocketAddr; use std::{net::Ipv4Addr, path::PathBuf}; const DEFAULT_NODE_COUNT: u16 = 25; @@ -164,6 +165,8 @@ pub enum SubCmd { Faucet(FaucetSubCmd), #[clap(subcommand)] Local(LocalSubCmd), + #[clap(name = "rpc", subcommand)] + Rpc(RpcSubCmd), /// Remove safenode service(s). /// /// If no peer ID(s) or service name(s) are supplied, all services will be removed. @@ -367,7 +370,29 @@ pub enum DaemonSubCmd { Stop {}, } -/// Manage the faucet service. +/// Manage RPC client commands +#[derive(Subcommand, Debug)] +pub enum RpcSubCmd { + #[clap(name = "restart")] + /// Issue an RPC command to restart nodes + Restart { + /// The peer ID of the service to restart. + /// + /// The argument can be used multiple times to restart many services. + #[clap(long)] + peer_id: Vec, + /// The address of the RPC server where the command will be sent. + /// + /// It should be an IP address and port, e.g, 127.0.0.1:59091. + #[clap(long)] + rpc_server_address: SocketAddr, + /// Set to retain the peer ID of the nodes to be restarted. + #[clap(long)] + retain_peer_id: bool, + }, +} + +/// Manage faucet services. #[allow(clippy::large_enum_variant)] #[derive(Subcommand, Debug)] pub enum FaucetSubCmd { @@ -777,6 +802,11 @@ async fn main() -> Result<()> { ) .await } + SubCmd::Rpc(RpcSubCmd::Restart { + peer_id, + rpc_server_address: socket_addr, + retain_peer_id, + }) => sn_node_manager::rpc_client::restart_node(peer_id, socket_addr, retain_peer_id).await, } } diff --git a/sn_node_manager/src/lib.rs b/sn_node_manager/src/lib.rs index 0d55566973..25dfe8a231 100644 --- a/sn_node_manager/src/lib.rs +++ b/sn_node_manager/src/lib.rs @@ -12,6 +12,7 @@ pub mod config; pub mod helpers; pub mod local; pub mod rpc; +pub mod rpc_client; #[derive(Clone, PartialEq)] pub enum VerbosityLevel { diff --git a/sn_node_manager/src/rpc_client.rs b/sn_node_manager/src/rpc_client.rs new file mode 100644 index 0000000000..eb7c5f6da7 --- /dev/null +++ b/sn_node_manager/src/rpc_client.rs @@ -0,0 +1,63 @@ +use color_eyre::eyre::bail; +use color_eyre::{eyre::eyre, Result}; +use libp2p_identity::PeerId; +use sn_service_management::safenode_manager_proto::safe_node_manager_client::SafeNodeManagerClient; +use sn_service_management::safenode_manager_proto::NodeServiceRestartRequest; +use std::net::SocketAddr; +use std::str::FromStr; +use std::time::Duration; +use tonic::transport::Channel; +use tonic::Request; + +struct DaemonRpcClient { + addr: SocketAddr, + rpc: SafeNodeManagerClient, +} + +pub async fn restart_node( + peer_ids: Vec, + rpc_server_address: SocketAddr, + retain_peer_id: bool, +) -> Result<()> { + for peer_id in peer_ids { + let str_bytes = PeerId::from_str(&peer_id)?.to_bytes(); + + let mut daemon_client = get_rpc_client(rpc_server_address).await?; + + let _response = daemon_client + .rpc + .restart_node_service(Request::new(NodeServiceRestartRequest { + peer_id: str_bytes, + delay_millis: 0, + retain_peer_id, + })) + .await + .map_err(|err| { + eyre!( + "Failed to restart node service with {peer_id:?} at {:?} with err: {err:?}", + daemon_client.addr + ) + })?; + } + Ok(()) +} + +async fn get_rpc_client(socket_addr: SocketAddr) -> Result { + let endpoint = format!("https://{socket_addr}"); + let mut attempts = 0; + loop { + if let Ok(rpc_client) = SafeNodeManagerClient::connect(endpoint.clone()).await { + let rpc_client = DaemonRpcClient { + addr: socket_addr, + rpc: rpc_client, + }; + return Ok(rpc_client); + } + attempts += 1; + println!("Could not connect to rpc {endpoint:?}. Attempts: {attempts:?}/10"); + tokio::time::sleep(Duration::from_secs(1)).await; + if attempts >= 10 { + bail!("Failed to connect to {endpoint:?} even after 10 retries"); + } + } +}