From fcaf41cc12ea5b299e92c57f3a9300e9268c3683 Mon Sep 17 00:00:00 2001 From: KaffinPX Date: Sun, 10 Nov 2024 17:20:01 +0300 Subject: [PATCH 1/5] First pass on removing proxy -- needs help w macro --- Cargo.toml | 1 - rpc/wrpc/proxy/Cargo.toml | 28 ------ rpc/wrpc/proxy/src/error.rs | 26 ------ rpc/wrpc/proxy/src/main.rs | 104 ---------------------- rpc/wrpc/proxy/src/result.rs | 1 - rpc/wrpc/server/src/server.rs | 155 +++++++++++---------------------- rpc/wrpc/server/src/service.rs | 12 +-- 7 files changed, 52 insertions(+), 275 deletions(-) delete mode 100644 rpc/wrpc/proxy/Cargo.toml delete mode 100644 rpc/wrpc/proxy/src/error.rs delete mode 100644 rpc/wrpc/proxy/src/main.rs delete mode 100644 rpc/wrpc/proxy/src/result.rs diff --git a/Cargo.toml b/Cargo.toml index 7141101f9..db873f62e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,6 @@ kaspa-wallet-macros = { version = "0.15.3", path = "wallet/macros" } kaspa-wasm = { version = "0.15.3", path = "wasm" } kaspa-wasm-core = { version = "0.15.3", path = "wasm/core" } kaspa-wrpc-client = { version = "0.15.3", path = "rpc/wrpc/client" } -kaspa-wrpc-proxy = { version = "0.15.3", path = "rpc/wrpc/proxy" } kaspa-wrpc-server = { version = "0.15.3", path = "rpc/wrpc/server" } kaspa-wrpc-wasm = { version = "0.15.3", path = "rpc/wrpc/wasm" } kaspa-wrpc-example-subscriber = { version = "0.15.3", path = "rpc/wrpc/examples/subscriber" } diff --git a/rpc/wrpc/proxy/Cargo.toml b/rpc/wrpc/proxy/Cargo.toml deleted file mode 100644 index 21b9b322a..000000000 --- a/rpc/wrpc/proxy/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "kaspa-wrpc-proxy" -description = "Kaspa wRPC to gRPC proxy" -rust-version.workspace = true -version.workspace = true -edition.workspace = true -authors.workspace = true -include.workspace = true -license.workspace = true -repository.workspace = true - -[dependencies] -async-trait.workspace = true -clap.workspace = true -kaspa-consensus-core.workspace = true -kaspa-grpc-client.workspace = true -kaspa-rpc-core.workspace = true -kaspa-rpc-macros.workspace = true -kaspa-wrpc-server.workspace = true -num_cpus.workspace = true -thiserror.workspace = true -tokio.workspace = true -workflow-core.workspace = true -workflow-log.workspace = true -workflow-rpc.workspace = true - -[package.metadata.emanate.build] -folder = "setup" diff --git a/rpc/wrpc/proxy/src/error.rs b/rpc/wrpc/proxy/src/error.rs deleted file mode 100644 index ec3ff9f3c..000000000 --- a/rpc/wrpc/proxy/src/error.rs +++ /dev/null @@ -1,26 +0,0 @@ -#[derive(thiserror::Error, Debug)] -pub enum Error { - #[error("{0}")] - Other(String), - - #[error(transparent)] - GrpcApi(#[from] kaspa_rpc_core::error::RpcError), - - #[error(transparent)] - GrpcClient(#[from] kaspa_grpc_client::error::Error), - - #[error(transparent)] - Wrpc(#[from] kaspa_wrpc_server::error::Error), - - #[error(transparent)] - WebSocket(#[from] workflow_rpc::server::WebSocketError), - - #[error(transparent)] - WorkflowRpc(#[from] workflow_rpc::error::Error), -} - -impl From for Error { - fn from(s: String) -> Self { - Error::Other(s) - } -} diff --git a/rpc/wrpc/proxy/src/main.rs b/rpc/wrpc/proxy/src/main.rs deleted file mode 100644 index 1cb9ad5c6..000000000 --- a/rpc/wrpc/proxy/src/main.rs +++ /dev/null @@ -1,104 +0,0 @@ -mod error; -mod result; - -use clap::Parser; -use kaspa_consensus_core::network::NetworkType; -use kaspa_rpc_core::api::ops::RpcApiOps; -use kaspa_wrpc_server::{ - connection::Connection, - router::Router, - server::Server, - service::{KaspaRpcHandler, Options}, -}; -use result::Result; -use std::sync::Arc; -use workflow_log::*; -use workflow_rpc::server::prelude::*; -use workflow_rpc::server::WebSocketCounters; - -#[derive(Debug, Parser)] -#[clap(name = "proxy")] -#[clap(version)] -struct Args { - /// proxy for testnet network - #[clap(long)] - testnet: bool, - /// proxy for simnet network - #[clap(long)] - simnet: bool, - /// proxy for devnet network - #[clap(long)] - devnet: bool, - - /// proxy:port for gRPC server (grpc://127.0.0.1:16110) - #[clap(name = "grpc")] - grpc_proxy_address: Option, - - // /// wRPC port - /// interface:port for wRPC server (wrpc://127.0.0.1:17110) - #[clap(long)] - interface: Option, - /// Number of notification serializer threads - #[clap(long)] - threads: Option, - /// Enable verbose logging - #[clap(short, long)] - verbose: bool, - /// Protocol encoding - #[clap(long)] - encoding: Option, -} - -#[tokio::main] -async fn main() -> Result<()> { - let Args { testnet, simnet, devnet, grpc_proxy_address, interface, verbose, threads, encoding } = Args::parse(); - - let network_type = if testnet { - NetworkType::Testnet - } else if simnet { - NetworkType::Simnet - } else if devnet { - NetworkType::Devnet - } else { - NetworkType::Mainnet - }; - - let kaspad_port = network_type.default_rpc_port(); - - let encoding: Encoding = encoding.unwrap_or_else(|| "borsh".to_owned()).parse()?; - let proxy_port = match encoding { - Encoding::Borsh => network_type.default_borsh_rpc_port(), - Encoding::SerdeJson => network_type.default_json_rpc_port(), - }; - - let options = Arc::new(Options { - listen_address: interface.unwrap_or_else(|| format!("wrpc://127.0.0.1:{proxy_port}")), - grpc_proxy_address: Some(grpc_proxy_address.unwrap_or_else(|| format!("grpc://127.0.0.1:{kaspad_port}"))), - verbose, - // ..Options::default() - }); - log_info!(""); - log_info!("Proxy routing to `{}` on {}", network_type, options.grpc_proxy_address.as_ref().unwrap()); - - let counters = Arc::new(WebSocketCounters::default()); - let tasks = threads.unwrap_or_else(num_cpus::get); - let rpc_handler = Arc::new(KaspaRpcHandler::new(tasks, encoding, None, options.clone())); - - let router = Arc::new(Router::new(rpc_handler.server.clone())); - let server = RpcServer::new_with_encoding::( - encoding, - rpc_handler.clone(), - router.interface.clone(), - Some(counters), - false, - ); - - log_info!("Kaspa wRPC server is listening on {}", options.listen_address); - log_info!("Using `{encoding}` protocol encoding"); - - let config = WebSocketConfig { max_message_size: Some(1024 * 1024 * 1024), ..Default::default() }; - let listener = server.bind(&options.listen_address).await?; - server.listen(listener, Some(config)).await?; - - Ok(()) -} diff --git a/rpc/wrpc/proxy/src/result.rs b/rpc/wrpc/proxy/src/result.rs deleted file mode 100644 index 605dc25cf..000000000 --- a/rpc/wrpc/proxy/src/result.rs +++ /dev/null @@ -1 +0,0 @@ -pub type Result = std::result::Result; diff --git a/rpc/wrpc/server/src/server.rs b/rpc/wrpc/server/src/server.rs index 562bbf34b..dc090697d 100644 --- a/rpc/wrpc/server/src/server.rs +++ b/rpc/wrpc/server/src/server.rs @@ -41,7 +41,7 @@ struct ServerInner { pub next_connection_id: AtomicU64, pub _encoding: Encoding, pub sockets: Mutex>, - pub rpc_core: Option, + pub rpc_core: RpcCore, pub options: Arc, } @@ -53,127 +53,81 @@ pub struct Server { const WRPC_SERVER: &str = "wrpc-server"; impl Server { - pub fn new(tasks: usize, encoding: Encoding, core_service: Option>, options: Arc) -> Self { + pub fn new(tasks: usize, encoding: Encoding, service: Arc, options: Arc) -> Self { // This notifier UTXOs subscription granularity to rpc-core notifier let policies = MutationPolicies::new(UtxosChangedMutationPolicy::AddressSet); - - // Either get a core service or be called from the proxy and rely each connection having its own gRPC client - assert_eq!( - core_service.is_none(), - options.grpc_proxy_address.is_some(), - "invalid setup: Server must exclusively get either a core service or a gRPC server address" + // Prepare rpc service objects + let notification_channel = NotificationChannel::default(); + let listener_id = service.notifier().register_new_listener( + ChannelConnection::new(WRPC_SERVER, notification_channel.sender(), ChannelType::Closable), + ListenerLifespan::Static(policies), ); - - let rpc_core = if let Some(service) = core_service { - // Prepare rpc service objects - let notification_channel = NotificationChannel::default(); - let listener_id = service.notifier().register_new_listener( - ChannelConnection::new(WRPC_SERVER, notification_channel.sender(), ChannelType::Closable), - ListenerLifespan::Static(policies), - ); - - // Prepare notification internals - let enabled_events = EVENT_TYPE_ARRAY[..].into(); - let converter = Arc::new(WrpcServiceConverter::new()); - let collector = Arc::new(WrpcServiceCollector::new(WRPC_SERVER, notification_channel.receiver(), converter)); - let subscriber = Arc::new(Subscriber::new(WRPC_SERVER, enabled_events, service.notifier(), listener_id)); - let wrpc_notifier = Arc::new(Notifier::new( - WRPC_SERVER, - enabled_events, - vec![collector], - vec![subscriber], - service.subscription_context(), - tasks, - policies, - )); - Some(RpcCore { service, wrpc_notifier }) - } else { - None - }; + // Prepare notification internals + let enabled_events = EVENT_TYPE_ARRAY[..].into(); + let converter = Arc::new(WrpcServiceConverter::new()); + let collector = Arc::new(WrpcServiceCollector::new(WRPC_SERVER, notification_channel.receiver(), converter)); + let subscriber = Arc::new(Subscriber::new(WRPC_SERVER, enabled_events, service.notifier(), listener_id)); + let wrpc_notifier = Arc::new(Notifier::new( + WRPC_SERVER, + enabled_events, + vec![collector], + vec![subscriber], + service.subscription_context(), + tasks, + policies, + )); Server { inner: Arc::new(ServerInner { next_connection_id: AtomicU64::new(0), _encoding: encoding, sockets: Mutex::new(HashMap::new()), - rpc_core, + rpc_core: RpcCore { service, wrpc_notifier }, options, }), } } pub fn start(&self) { - if let Some(rpc_core) = &self.inner.rpc_core { - // Start the internal notifier - rpc_core.wrpc_notifier.clone().start(); - } + self.inner.rpc_core.wrpc_notifier.clone().start(); } pub async fn connect(&self, peer: &SocketAddr, messenger: Arc) -> Result { // log_trace!("WebSocket connected: {}", peer); + // Generate a new connection ID let id = self.inner.next_connection_id.fetch_add(1, Ordering::SeqCst); - let grpc_client = if let Some(grpc_proxy_address) = &self.inner.options.grpc_proxy_address { - // Provider::GrpcClient - - log_info!("Routing wrpc://{peer} -> {grpc_proxy_address}"); - let grpc_client = GrpcClient::connect_with_args( - NotificationMode::Direct, - grpc_proxy_address.to_owned(), - None, - false, - None, - true, - None, - Default::default(), - ) - .await - .map_err(|e| WebSocketError::Other(e.to_string()))?; - // log_trace!("Creating proxy relay..."); - Some(Arc::new(grpc_client)) - } else { - None - }; - let connection = Connection::new(id, peer, messenger, grpc_client); - if self.inner.options.grpc_proxy_address.is_some() { - // log_trace!("starting gRPC"); - connection.grpc_client().start(Some(connection.grpc_client_notify_target())).await; - // log_trace!("gRPC started..."); - } + // Create the connection without gRPC client handling + let connection = Connection::new(id, peer, messenger, None); // TODO: also remove grpc_client + + // Insert the new connection into the sockets map self.inner.sockets.lock()?.insert(id, connection.clone()); + Ok(connection) } pub async fn disconnect(&self, connection: Connection) { // log_info!("WebSocket disconnected: {}", connection.peer()); - if let Some(rpc_core) = &self.inner.rpc_core { - if let Some(listener_id) = connection.listener_id() { - rpc_core.wrpc_notifier.unregister_listener(listener_id).unwrap_or_else(|err| { - log_error!("WebSocket {} (disconnected) error unregistering the notification listener: {err}", connection.peer()); - }); - } - } else { - let _ = connection.grpc_client().disconnect().await; - let _ = connection.grpc_client().join().await; + // Unregister available subscriptions of disconnecting connection + if let Some(listener_id) = connection.listener_id() { + self.inner.rpc_core.wrpc_notifier.unregister_listener(listener_id).unwrap_or_else(|err| { + log_error!("WebSocket {} (disconnected) error unregistering the notification listener: {err}", connection.peer()); + }); } + // Remove the connection from the sockets map self.inner.sockets.lock().unwrap().remove(&connection.id()); - // FIXME: determine if messenger should be closed explicitly // connection.close(); } #[inline(always)] - pub fn notifier(&self) -> Option> { - self.inner.rpc_core.as_ref().map(|x| x.wrpc_notifier.clone()) + pub fn notifier(&self) -> Arc { + self.inner.rpc_core.wrpc_notifier.clone() } - pub fn rpc_service(&self, connection: &Connection) -> DynRpcService { - if let Some(rpc_core) = &self.inner.rpc_core { - rpc_core.service.clone() - } else { - connection.grpc_client() - } + pub fn rpc_service(&self) -> DynRpcService { + self.inner.rpc_core.service.clone() } pub async fn start_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> { @@ -181,34 +135,27 @@ impl Server { listener_id } else { // The only possible case here is a server connected to rpc core. - // If the proxy is used, the connection has a gRPC client and the listener id - // is always set to Some(ListenerId::default()) by the connection ctor. - let notifier = - self.notifier().unwrap_or_else(|| panic!("Incorrect use: `server::Server` does not carry an internal notifier")); + // Register a new listener if one is not already set. + let notifier = self.notifier(); let listener_id = notifier.register_new_listener(connection.clone(), ListenerLifespan::Dynamic); connection.register_notification_listener(listener_id); listener_id }; + workflow_log::log_trace!("notification subscribe[0x{listener_id:x}] {scope:?}"); - if let Some(rpc_core) = &self.inner.rpc_core { - rpc_core.wrpc_notifier.clone().try_start_notify(listener_id, scope)?; - } else { - connection.grpc_client().start_notify(listener_id, scope).await?; - } + self.inner.rpc_core.wrpc_notifier.clone().try_start_notify(listener_id, scope)?; + Ok(()) } pub async fn stop_notify(&self, connection: &Connection, scope: Scope) -> RpcResult<()> { if let Some(listener_id) = connection.listener_id() { workflow_log::log_trace!("notification unsubscribe[0x{listener_id:x}] {scope:?}"); - if let Some(rpc_core) = &self.inner.rpc_core { - rpc_core.wrpc_notifier.clone().try_stop_notify(listener_id, scope)?; - } else { - connection.grpc_client().stop_notify(listener_id, scope).await?; - } + self.inner.rpc_core.wrpc_notifier.clone().try_stop_notify(listener_id, scope)?; } else { workflow_log::log_trace!("notification unsubscribe[N/A] {scope:?}"); } + Ok(()) } @@ -217,13 +164,9 @@ impl Server { } pub async fn join(&self) -> Result<()> { - if let Some(rpc_core) = &self.inner.rpc_core { - // Wait for the internal notifier to stop - rpc_core.wrpc_notifier.join().await?; - } else { - // FIXME: check if all existing connections are actually getting a call to self.disconnect(connection) - // else do it here - } + self.inner.rpc_core.wrpc_notifier.join().await?; + + // FIXME: check if all existing connections are actually getting a call to self.disconnect(connection) or do it here Ok(()) } } diff --git a/rpc/wrpc/server/src/service.rs b/rpc/wrpc/server/src/service.rs index 72d09f6e6..3b08d0337 100644 --- a/rpc/wrpc/server/src/service.rs +++ b/rpc/wrpc/server/src/service.rs @@ -18,13 +18,12 @@ static MAX_WRPC_MESSAGE_SIZE: usize = 1024 * 1024 * 128; // 128MB /// Options for configuring the wRPC server pub struct Options { pub listen_address: String, - pub grpc_proxy_address: Option, pub verbose: bool, } impl Default for Options { fn default() -> Self { - Options { listen_address: "127.0.0.1:17110".to_owned(), verbose: false, grpc_proxy_address: None } + Options { listen_address: "127.0.0.1:17110".to_owned(), verbose: false } } } @@ -49,12 +48,7 @@ pub struct KaspaRpcHandler { } impl KaspaRpcHandler { - pub fn new( - tasks: usize, - encoding: WrpcEncoding, - core_service: Option>, - options: Arc, - ) -> KaspaRpcHandler { + pub fn new(tasks: usize, encoding: WrpcEncoding, core_service: Arc, options: Arc) -> KaspaRpcHandler { KaspaRpcHandler { server: Server::new(tasks, encoding, core_service, options.clone()), options } } } @@ -106,7 +100,7 @@ impl WrpcService { /// Create and initialize RpcServer pub fn new( tasks: usize, - core_service: Option>, + core_service: Arc, encoding: &Encoding, counters: Arc, options: Options, From 7aadd3011b232115fb75d72d9297e2e972f5de30 Mon Sep 17 00:00:00 2001 From: KaffinPX Date: Mon, 11 Nov 2024 17:46:19 +0300 Subject: [PATCH 2/5] 2. pass on removing gRPC related code --- Cargo.lock | 19 ------------------- Cargo.toml | 1 - kaspad/src/daemon.rs | 2 +- rpc/macros/src/wrpc/server.rs | 2 +- rpc/wrpc/server/src/connection.rs | 21 ++------------------- rpc/wrpc/server/src/server.rs | 9 ++++----- 6 files changed, 8 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a951993e9..7c61c1e37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3543,25 +3543,6 @@ dependencies = [ "workflow-log", ] -[[package]] -name = "kaspa-wrpc-proxy" -version = "0.15.3" -dependencies = [ - "async-trait", - "clap 4.5.19", - "kaspa-consensus-core", - "kaspa-grpc-client", - "kaspa-rpc-core", - "kaspa-rpc-macros", - "kaspa-wrpc-server", - "num_cpus", - "thiserror", - "tokio", - "workflow-core", - "workflow-log", - "workflow-rpc", -] - [[package]] name = "kaspa-wrpc-server" version = "0.15.3" diff --git a/Cargo.toml b/Cargo.toml index db873f62e..384a63c01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ members = [ "rpc/grpc/server", "rpc/wrpc/server", "rpc/wrpc/client", - "rpc/wrpc/proxy", "rpc/wrpc/wasm", "rpc/wrpc/examples/subscriber", "rpc/wrpc/examples/simple_client", diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index db9f32c16..4656665b6 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -618,7 +618,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm listen_address.map(|listen_address| { Arc::new(WrpcService::new( wrpc_service_tasks, - Some(rpc_core_service.clone()), + rpc_core_service.clone(), &encoding, wrpc_server_counters, WrpcServerOptions { diff --git a/rpc/macros/src/wrpc/server.rs b/rpc/macros/src/wrpc/server.rs index 092b1edb3..d2f4061ac 100644 --- a/rpc/macros/src/wrpc/server.rs +++ b/rpc/macros/src/wrpc/server.rs @@ -54,7 +54,7 @@ impl ToTokens for RpcTable { let verbose = server_ctx.verbose(); if verbose { workflow_log::log_info!("request: {:?}",request); } // TODO: RPC-CONNECT - let response: #response_type = server_ctx.rpc_service(&connection_ctx).#fn_call(None, request.into_inner()).await + let response: #response_type = server_ctx.rpc_service().#fn_call(None, request.into_inner()).await .map_err(|e|ServerError::Text(e.to_string()))?; if verbose { workflow_log::log_info!("response: {:?}",response); } Ok(Serializable(response)) diff --git a/rpc/wrpc/server/src/connection.rs b/rpc/wrpc/server/src/connection.rs index e118d161d..9d382e322 100644 --- a/rpc/wrpc/server/src/connection.rs +++ b/rpc/wrpc/server/src/connection.rs @@ -48,7 +48,6 @@ struct ConnectionInner { pub id: u64, pub peer: SocketAddr, pub messenger: Arc, - pub grpc_client: Option>, // not using an atomic in case an Id will change type in the future... pub listener_id: Mutex>, } @@ -83,12 +82,8 @@ pub struct Connection { } impl Connection { - pub fn new(id: u64, peer: &SocketAddr, messenger: Arc, grpc_client: Option>) -> Connection { - // If a GrpcClient is provided, it has to come configured in direct mode - assert!(grpc_client.is_none() || grpc_client.as_ref().unwrap().notification_mode() == NotificationMode::Direct); - // Should a gRPC client be provided, no listener_id is required for subscriptions so the listener id is set to default - let listener_id = Mutex::new(grpc_client.clone().map(|_| ListenerId::default())); - Connection { inner: Arc::new(ConnectionInner { id, peer: *peer, messenger, grpc_client, listener_id }) } + pub fn new(id: u64, peer: &SocketAddr, messenger: Arc) -> Connection { + Connection { inner: Arc::new(ConnectionInner { id, peer: *peer, messenger, listener_id: None.into() }) } } /// Obtain the connection id @@ -101,18 +96,6 @@ impl Connection { &self.inner.messenger } - pub fn grpc_client(&self) -> Arc { - self.inner - .grpc_client - .as_ref() - .cloned() - .unwrap_or_else(|| panic!("Incorrect use: `server::Connection` does not carry RpcApi references")) - } - - pub fn grpc_client_notify_target(&self) -> GrpcClientNotify { - self.inner.clone() - } - pub fn listener_id(&self) -> Option { *self.inner.listener_id.lock().unwrap() } diff --git a/rpc/wrpc/server/src/server.rs b/rpc/wrpc/server/src/server.rs index dc090697d..6bd0fa4fa 100644 --- a/rpc/wrpc/server/src/server.rs +++ b/rpc/wrpc/server/src/server.rs @@ -4,7 +4,6 @@ use crate::{ result::Result, service::Options, }; -use kaspa_grpc_client::GrpcClient; use kaspa_notify::{ connection::ChannelType, events::EVENT_TYPE_ARRAY, @@ -15,8 +14,8 @@ use kaspa_notify::{ subscription::{MutationPolicies, UtxosChangedMutationPolicy}, }; use kaspa_rpc_core::{ - api::rpc::{DynRpcService, RpcApi}, - notify::{channel::NotificationChannel, connection::ChannelConnection, mode::NotificationMode}, + api::rpc::DynRpcService, + notify::{channel::NotificationChannel, connection::ChannelConnection}, Notification, RpcResult, }; use kaspa_rpc_service::service::RpcCoreService; @@ -98,7 +97,7 @@ impl Server { let id = self.inner.next_connection_id.fetch_add(1, Ordering::SeqCst); // Create the connection without gRPC client handling - let connection = Connection::new(id, peer, messenger, None); // TODO: also remove grpc_client + let connection = Connection::new(id, peer, messenger); // Insert the new connection into the sockets map self.inner.sockets.lock()?.insert(id, connection.clone()); @@ -115,7 +114,7 @@ impl Server { }); } - // Remove the connection from the sockets map + // Remove the connection from the sockets self.inner.sockets.lock().unwrap().remove(&connection.id()); // FIXME: determine if messenger should be closed explicitly // connection.close(); From ba881c599636c1dc94dc692b75b9c66debd8931c Mon Sep 17 00:00:00 2001 From: KaffinPX Date: Mon, 11 Nov 2024 17:47:44 +0300 Subject: [PATCH 3/5] Remove gRPC dependency --- Cargo.lock | 1 - rpc/wrpc/server/Cargo.toml | 1 - rpc/wrpc/server/src/connection.rs | 3 +-- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c61c1e37..804792f9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3552,7 +3552,6 @@ dependencies = [ "futures", "kaspa-consensus-core", "kaspa-core", - "kaspa-grpc-client", "kaspa-notify", "kaspa-rpc-core", "kaspa-rpc-macros", diff --git a/rpc/wrpc/server/Cargo.toml b/rpc/wrpc/server/Cargo.toml index 54137e95b..2d385ca2a 100644 --- a/rpc/wrpc/server/Cargo.toml +++ b/rpc/wrpc/server/Cargo.toml @@ -18,7 +18,6 @@ borsh = { workspace = true, features = ["rc"] } futures.workspace = true kaspa-consensus-core.workspace = true kaspa-core.workspace = true -kaspa-grpc-client.workspace = true kaspa-notify.workspace = true kaspa-rpc-core.workspace = true kaspa-rpc-macros.workspace = true diff --git a/rpc/wrpc/server/src/connection.rs b/rpc/wrpc/server/src/connection.rs index 9d382e322..05e2be55f 100644 --- a/rpc/wrpc/server/src/connection.rs +++ b/rpc/wrpc/server/src/connection.rs @@ -1,4 +1,3 @@ -use kaspa_grpc_client::{GrpcClient, GrpcClientNotify}; use kaspa_notify::{ connection::Connection as ConnectionT, error::{Error as NotifyError, Result as NotifyResult}, @@ -6,7 +5,7 @@ use kaspa_notify::{ notification::Notification as NotificationT, notifier::Notify, }; -use kaspa_rpc_core::{api::ops::RpcApiOps, notify::mode::NotificationMode, Notification}; +use kaspa_rpc_core::{api::ops::RpcApiOps, Notification}; use std::{ fmt::{Debug, Display}, sync::{Arc, Mutex}, From 8dddd1d6976941c332a0f4289dcd010f575abad2 Mon Sep 17 00:00:00 2001 From: KaffinPX Date: Mon, 11 Nov 2024 17:54:50 +0300 Subject: [PATCH 4/5] A simple formatting change --- rpc/wrpc/server/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/wrpc/server/src/connection.rs b/rpc/wrpc/server/src/connection.rs index 05e2be55f..021ba7912 100644 --- a/rpc/wrpc/server/src/connection.rs +++ b/rpc/wrpc/server/src/connection.rs @@ -82,7 +82,7 @@ pub struct Connection { impl Connection { pub fn new(id: u64, peer: &SocketAddr, messenger: Arc) -> Connection { - Connection { inner: Arc::new(ConnectionInner { id, peer: *peer, messenger, listener_id: None.into() }) } + Connection { inner: Arc::new(ConnectionInner { id, peer: *peer, messenger, listener_id: Mutex::new(None) }) } } /// Obtain the connection id From ba3ca33225eacb701d96366c8a5eba4e1db57b97 Mon Sep 17 00:00:00 2001 From: KaffinPX Date: Mon, 11 Nov 2024 18:16:09 +0300 Subject: [PATCH 5/5] Clippy --- kaspad/src/daemon.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 4656665b6..23ceb4c02 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -624,7 +624,6 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm WrpcServerOptions { listen_address: listen_address.to_address(&network.network_type, &encoding).to_string(), // TODO: use a normalized ContextualNetAddress instead of a String verbose: args.wrpc_verbose, - ..WrpcServerOptions::default() }, )) })