Skip to content

Commit

Permalink
pool: rename Sink and Stream to BoxSink and BoxStream
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jan 28, 2025
1 parent 59429cf commit 4664dc4
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 26 deletions.
8 changes: 4 additions & 4 deletions bindings/nostr-sdk-ffi/src/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_wsocket::message::{CloseFrame, Message};
use nostr_sdk::pool::transport::websocket::{Sink, Stream};
use nostr_sdk::pool::transport::websocket::{BoxSink, BoxStream};
use uniffi::{Enum, Object, Record};

use crate::error::Result;
Expand Down Expand Up @@ -359,7 +359,7 @@ mod inner {
url: &'a Url,
mode: &'a ConnectionMode,
timeout: Duration,
) -> BoxedFuture<'a, Result<(Sink, Stream), TransportError>> {
) -> BoxedFuture<'a, Result<(BoxSink, BoxStream), TransportError>> {
Box::pin(async move {
let intermediate = self
.inner
Expand All @@ -376,8 +376,8 @@ mod inner {
// Split it
let (tx, rx) = socket.split();

let sink: Sink = Box::new(tx) as Sink;
let stream: Stream = Box::new(rx) as Stream;
let sink: BoxSink = Box::new(tx) as BoxSink;
let stream: BoxStream = Box::new(rx) as BoxStream;

Ok((sink, stream))
})
Expand Down
20 changes: 10 additions & 10 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{Error, Reconciliation, RelayNotification, RelayStatus, SubscriptionA
use crate::pool::RelayPoolNotification;
use crate::relay::status::AtomicRelayStatus;
use crate::shared::SharedState;
use crate::transport::websocket::{Sink, Stream};
use crate::transport::websocket::{BoxSink, BoxStream};

#[derive(Debug)]
struct RelayChannels {
Expand Down Expand Up @@ -388,7 +388,7 @@ impl InnerRelay {
}
}

pub(super) fn spawn_connection_task(&self, mut stream: Option<(Sink, Stream)>) {
pub(super) fn spawn_connection_task(&self, mut stream: Option<(BoxSink, BoxStream)>) {
if self.is_running() {
tracing::warn!(url = %self.url, "Connection task is already running.");
return;
Expand Down Expand Up @@ -521,7 +521,7 @@ impl InnerRelay {
&self,
timeout: Duration,
status_on_failure: RelayStatus,
) -> Result<(Sink, Stream), Error> {
) -> Result<(BoxSink, BoxStream), Error> {
// Update status
self.set_status(RelayStatus::Connecting, true);

Expand Down Expand Up @@ -561,7 +561,7 @@ impl InnerRelay {
/// If `stream` arg is passed, no connection attempt will be done.
async fn connect_and_run(
&self,
stream: Option<(Sink, Stream)>,
stream: Option<(BoxSink, BoxStream)>,
rx_nostr: &mut MutexGuard<'_, Receiver<Vec<ClientMessage>>>,
last_ws_error: &mut Option<String>,
) {
Expand Down Expand Up @@ -603,8 +603,8 @@ impl InnerRelay {
/// Run message handlers, pinger and other services
async fn post_connection(
&self,
mut ws_tx: Sink,
ws_rx: Stream,
mut ws_tx: BoxSink,
ws_rx: BoxStream,
rx_nostr: &mut MutexGuard<'_, Receiver<Vec<ClientMessage>>>,
) {
// Request information document
Expand Down Expand Up @@ -648,7 +648,7 @@ impl InnerRelay {

async fn sender_message_handler(
&self,
ws_tx: &mut Sink,
ws_tx: &mut BoxSink,
rx_nostr: &mut MutexGuard<'_, Receiver<Vec<ClientMessage>>>,
ping: &PingTracker,
) -> Result<(), Error> {
Expand Down Expand Up @@ -720,7 +720,7 @@ impl InnerRelay {

async fn receiver_message_handler(
&self,
mut ws_rx: Stream,
mut ws_rx: BoxStream,
ping: &PingTracker,
) -> Result<(), Error> {
#[cfg(target_arch = "wasm32")]
Expand Down Expand Up @@ -1852,7 +1852,7 @@ impl InnerRelay {
}

/// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT].
async fn send_ws_msgs(tx: &mut Sink, msgs: Vec<Message>) -> Result<(), Error> {
async fn send_ws_msgs(tx: &mut BoxSink, msgs: Vec<Message>) -> Result<(), Error> {
let mut stream = futures_util::stream::iter(msgs.into_iter().map(Ok));
match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.send_all(&mut stream)).await {
Some(res) => Ok(res?),
Expand All @@ -1861,7 +1861,7 @@ async fn send_ws_msgs(tx: &mut Sink, msgs: Vec<Message>) -> Result<(), Error> {
}

/// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT].
async fn close_ws(tx: &mut Sink) -> Result<(), Error> {
async fn close_ws(tx: &mut BoxSink) -> Result<(), Error> {
// TODO: remove timeout from here?
match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.close()).await {
Some(res) => Ok(res?),
Expand Down
4 changes: 2 additions & 2 deletions crates/nostr-relay-pool/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use self::options::{
pub use self::stats::RelayConnectionStats;
pub use self::status::RelayStatus;
use crate::shared::SharedState;
use crate::transport::websocket::{Sink, Stream};
use crate::transport::websocket::{BoxSink, BoxStream};

/// Subscription auto-closed reason
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -322,7 +322,7 @@ impl Relay {

// Try to connect
// This will set the status to "terminated" if the connection fails
let stream: (Sink, Stream) = self
let stream: (BoxSink, BoxStream) = self
.inner
._try_connect(timeout, RelayStatus::Terminated)
.await?;
Expand Down
19 changes: 9 additions & 10 deletions crates/nostr-relay-pool/src/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use async_wsocket::futures_util::{self, SinkExt, StreamExt, TryStreamExt};
use async_wsocket::futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use async_wsocket::{ConnectionMode, Message, WebSocket};
use nostr::util::BoxedFuture;
use nostr::Url;
Expand All @@ -17,17 +17,16 @@ use super::error::TransportError;

/// WebSocket transport sink
#[cfg(not(target_arch = "wasm32"))]
pub type Sink = Box<dyn futures_util::Sink<Message, Error = TransportError> + Send + Unpin>;
pub type BoxSink = Box<dyn Sink<Message, Error = TransportError> + Send + Unpin>;
/// WebSocket transport stream
#[cfg(not(target_arch = "wasm32"))]
pub type Stream =
Box<dyn futures_util::Stream<Item = Result<Message, TransportError>> + Send + Unpin>;
pub type BoxStream = Box<dyn Stream<Item = Result<Message, TransportError>> + Send + Unpin>;
/// WebSocket transport sink
#[cfg(target_arch = "wasm32")]
pub type Sink = Box<dyn futures_util::Sink<Message, Error = TransportError> + Unpin>;
pub type BoxSink = Box<dyn Sink<Message, Error = TransportError> + Unpin>;
/// WebSocket transport stream
#[cfg(target_arch = "wasm32")]
pub type Stream = Box<dyn futures_util::Stream<Item = Result<Message, TransportError>> + Unpin>;
pub type BoxStream = Box<dyn Stream<Item = Result<Message, TransportError>> + Unpin>;

#[doc(hidden)]
pub trait IntoWebSocketTransport {
Expand Down Expand Up @@ -69,7 +68,7 @@ pub trait WebSocketTransport: fmt::Debug + Send + Sync {
url: &'a Url,
mode: &'a ConnectionMode,
timeout: Duration,
) -> BoxedFuture<'a, Result<(Sink, Stream), TransportError>>;
) -> BoxedFuture<'a, Result<(BoxSink, BoxStream), TransportError>>;
}

/// Default websocket transport
Expand All @@ -86,7 +85,7 @@ impl WebSocketTransport for DefaultWebsocketTransport {
url: &'a Url,
mode: &'a ConnectionMode,
timeout: Duration,
) -> BoxedFuture<'a, Result<(Sink, Stream), TransportError>> {
) -> BoxedFuture<'a, Result<(BoxSink, BoxStream), TransportError>> {
Box::pin(async move {
// Connect
let socket: WebSocket = async_wsocket::connect(url, mode, timeout)
Expand All @@ -95,8 +94,8 @@ impl WebSocketTransport for DefaultWebsocketTransport {

// Split sink and stream
let (tx, rx) = socket.split();
let sink: Sink = Box::new(tx.sink_map_err(TransportError::backend)) as Sink;
let stream: Stream = Box::new(rx.map_err(TransportError::backend)) as Stream;
let sink: BoxSink = Box::new(tx.sink_map_err(TransportError::backend)) as BoxSink;
let stream: BoxStream = Box::new(rx.map_err(TransportError::backend)) as BoxStream;
Ok((sink, stream))
})
}
Expand Down

0 comments on commit 4664dc4

Please sign in to comment.