diff --git a/bindings/nostr-sdk-ffi/src/transport/websocket.rs b/bindings/nostr-sdk-ffi/src/transport/websocket.rs index 465f58b08..159cf0d33 100644 --- a/bindings/nostr-sdk-ffi/src/transport/websocket.rs +++ b/bindings/nostr-sdk-ffi/src/transport/websocket.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use std::time::Duration; use async_wsocket::message::{CloseFrame, Message}; -use nostr_sdk::pool::transport::websocket::{Sink, Stream}; +use nostr_sdk::pool::transport::websocket::{BoxSink, BoxStream}; use uniffi::{Enum, Object, Record}; use crate::error::Result; @@ -359,7 +359,7 @@ mod inner { url: &'a Url, mode: &'a ConnectionMode, timeout: Duration, - ) -> BoxedFuture<'a, Result<(Sink, Stream), TransportError>> { + ) -> BoxedFuture<'a, Result<(BoxSink, BoxStream), TransportError>> { Box::pin(async move { let intermediate = self .inner @@ -376,8 +376,8 @@ mod inner { // Split it let (tx, rx) = socket.split(); - let sink: Sink = Box::new(tx) as Sink; - let stream: Stream = Box::new(rx) as Stream; + let sink: BoxSink = Box::new(tx) as BoxSink; + let stream: BoxStream = Box::new(rx) as BoxStream; Ok((sink, stream)) }) diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 7f5df11fb..2348f2559 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -36,7 +36,7 @@ use super::{Error, Reconciliation, RelayNotification, RelayStatus, SubscriptionA use crate::pool::RelayPoolNotification; use crate::relay::status::AtomicRelayStatus; use crate::shared::SharedState; -use crate::transport::websocket::{Sink, Stream}; +use crate::transport::websocket::{BoxSink, BoxStream}; #[derive(Debug)] struct RelayChannels { @@ -388,7 +388,7 @@ impl InnerRelay { } } - pub(super) fn spawn_connection_task(&self, mut stream: Option<(Sink, Stream)>) { + pub(super) fn spawn_connection_task(&self, mut stream: Option<(BoxSink, BoxStream)>) { if self.is_running() { tracing::warn!(url = %self.url, "Connection task is already running."); return; @@ -521,7 +521,7 @@ impl InnerRelay { &self, timeout: Duration, status_on_failure: RelayStatus, - ) -> Result<(Sink, Stream), Error> { + ) -> Result<(BoxSink, BoxStream), Error> { // Update status self.set_status(RelayStatus::Connecting, true); @@ -561,7 +561,7 @@ impl InnerRelay { /// If `stream` arg is passed, no connection attempt will be done. async fn connect_and_run( &self, - stream: Option<(Sink, Stream)>, + stream: Option<(BoxSink, BoxStream)>, rx_nostr: &mut MutexGuard<'_, Receiver>>, last_ws_error: &mut Option, ) { @@ -603,8 +603,8 @@ impl InnerRelay { /// Run message handlers, pinger and other services async fn post_connection( &self, - mut ws_tx: Sink, - ws_rx: Stream, + mut ws_tx: BoxSink, + ws_rx: BoxStream, rx_nostr: &mut MutexGuard<'_, Receiver>>, ) { // Request information document @@ -648,7 +648,7 @@ impl InnerRelay { async fn sender_message_handler( &self, - ws_tx: &mut Sink, + ws_tx: &mut BoxSink, rx_nostr: &mut MutexGuard<'_, Receiver>>, ping: &PingTracker, ) -> Result<(), Error> { @@ -720,7 +720,7 @@ impl InnerRelay { async fn receiver_message_handler( &self, - mut ws_rx: Stream, + mut ws_rx: BoxStream, ping: &PingTracker, ) -> Result<(), Error> { #[cfg(target_arch = "wasm32")] @@ -1852,7 +1852,7 @@ impl InnerRelay { } /// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT]. -async fn send_ws_msgs(tx: &mut Sink, msgs: Vec) -> Result<(), Error> { +async fn send_ws_msgs(tx: &mut BoxSink, msgs: Vec) -> Result<(), Error> { let mut stream = futures_util::stream::iter(msgs.into_iter().map(Ok)); match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.send_all(&mut stream)).await { Some(res) => Ok(res?), @@ -1861,7 +1861,7 @@ async fn send_ws_msgs(tx: &mut Sink, msgs: Vec) -> Result<(), Error> { } /// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT]. -async fn close_ws(tx: &mut Sink) -> Result<(), Error> { +async fn close_ws(tx: &mut BoxSink) -> Result<(), Error> { // TODO: remove timeout from here? match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.close()).await { Some(res) => Ok(res?), diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 01830daed..f144afb88 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -40,7 +40,7 @@ pub use self::options::{ pub use self::stats::RelayConnectionStats; pub use self::status::RelayStatus; use crate::shared::SharedState; -use crate::transport::websocket::{Sink, Stream}; +use crate::transport::websocket::{BoxSink, BoxStream}; /// Subscription auto-closed reason #[derive(Debug, Clone, PartialEq, Eq)] @@ -322,7 +322,7 @@ impl Relay { // Try to connect // This will set the status to "terminated" if the connection fails - let stream: (Sink, Stream) = self + let stream: (BoxSink, BoxStream) = self .inner ._try_connect(timeout, RelayStatus::Terminated) .await?; diff --git a/crates/nostr-relay-pool/src/transport/websocket.rs b/crates/nostr-relay-pool/src/transport/websocket.rs index 4402efc4f..940ea6aa7 100644 --- a/crates/nostr-relay-pool/src/transport/websocket.rs +++ b/crates/nostr-relay-pool/src/transport/websocket.rs @@ -8,7 +8,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use async_wsocket::futures_util::{self, SinkExt, StreamExt, TryStreamExt}; +use async_wsocket::futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use async_wsocket::{ConnectionMode, Message, WebSocket}; use nostr::util::BoxedFuture; use nostr::Url; @@ -17,17 +17,16 @@ use super::error::TransportError; /// WebSocket transport sink #[cfg(not(target_arch = "wasm32"))] -pub type Sink = Box + Send + Unpin>; +pub type BoxSink = Box + Send + Unpin>; /// WebSocket transport stream #[cfg(not(target_arch = "wasm32"))] -pub type Stream = - Box> + Send + Unpin>; +pub type BoxStream = Box> + Send + Unpin>; /// WebSocket transport sink #[cfg(target_arch = "wasm32")] -pub type Sink = Box + Unpin>; +pub type BoxSink = Box + Unpin>; /// WebSocket transport stream #[cfg(target_arch = "wasm32")] -pub type Stream = Box> + Unpin>; +pub type BoxStream = Box> + Unpin>; #[doc(hidden)] pub trait IntoWebSocketTransport { @@ -69,7 +68,7 @@ pub trait WebSocketTransport: fmt::Debug + Send + Sync { url: &'a Url, mode: &'a ConnectionMode, timeout: Duration, - ) -> BoxedFuture<'a, Result<(Sink, Stream), TransportError>>; + ) -> BoxedFuture<'a, Result<(BoxSink, BoxStream), TransportError>>; } /// Default websocket transport @@ -86,7 +85,7 @@ impl WebSocketTransport for DefaultWebsocketTransport { url: &'a Url, mode: &'a ConnectionMode, timeout: Duration, - ) -> BoxedFuture<'a, Result<(Sink, Stream), TransportError>> { + ) -> BoxedFuture<'a, Result<(BoxSink, BoxStream), TransportError>> { Box::pin(async move { // Connect let socket: WebSocket = async_wsocket::connect(url, mode, timeout) @@ -95,8 +94,8 @@ impl WebSocketTransport for DefaultWebsocketTransport { // Split sink and stream let (tx, rx) = socket.split(); - let sink: Sink = Box::new(tx.sink_map_err(TransportError::backend)) as Sink; - let stream: Stream = Box::new(rx.map_err(TransportError::backend)) as Stream; + let sink: BoxSink = Box::new(tx.sink_map_err(TransportError::backend)) as BoxSink; + let stream: BoxStream = Box::new(rx.map_err(TransportError::backend)) as BoxStream; Ok((sink, stream)) }) }