diff --git a/src/actor/actor_ref.rs b/src/actor/actor_ref.rs index 1666184..d627472 100644 --- a/src/actor/actor_ref.rs +++ b/src/actor/actor_ref.rs @@ -770,7 +770,7 @@ impl RemoteActorRef { > where A: remote::RemoteActor + Message + remote::RemoteMessage, - M: serde::Serialize, + M: serde::Serialize + Send + 'static, ::Ok: for<'de> serde::Deserialize<'de>, { AskRequest::new_remote(self, msg) diff --git a/src/actor/pool.rs b/src/actor/pool.rs index 343e358..9ca7aef 100644 --- a/src/actor/pool.rs +++ b/src/actor/pool.rs @@ -212,6 +212,7 @@ where pub enum WorkerReply where A: Actor + Message, + M: Send + 'static, { /// The message was forwarded to a worker. Forwarded, diff --git a/src/mailbox.rs b/src/mailbox.rs index e8429a5..53f7097 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -30,6 +30,10 @@ pub trait Mailbox: SignalMailbox + Clone + Send + Sync { &self, signal: Signal, ) -> impl Future, E>>> + Send + '_; + /// Tries to send a signal to the mailbox, failing if the mailbox is full. + fn try_send(&self, signal: Signal) -> Result<(), SendError, E>>; + /// Sends a signal to the mailbox, blocking the current thread. + fn blocking_send(&self, signal: Signal) -> Result<(), SendError, E>>; /// Waits for the mailbox to be closed. fn closed(&self) -> impl Future + Send + '_; /// Checks if the mailbox is closed. diff --git a/src/mailbox/bounded.rs b/src/mailbox/bounded.rs index bf8ce20..9a57e69 100644 --- a/src/mailbox/bounded.rs +++ b/src/mailbox/bounded.rs @@ -39,6 +39,16 @@ impl Mailbox for BoundedMailbox { Ok(self.0.send(signal).await?) } + #[inline] + fn try_send(&self, signal: Signal) -> Result<(), SendError, E>> { + Ok(self.0.try_send(signal)?) + } + + #[inline] + fn blocking_send(&self, signal: Signal) -> Result<(), SendError, E>> { + Ok(self.0.blocking_send(signal)?) + } + #[inline] async fn closed(&self) { self.0.closed().await diff --git a/src/mailbox/unbounded.rs b/src/mailbox/unbounded.rs index b8e463a..22f012c 100644 --- a/src/mailbox/unbounded.rs +++ b/src/mailbox/unbounded.rs @@ -39,6 +39,16 @@ impl Mailbox for UnboundedMailbox { Ok(self.0.send(signal)?) } + #[inline] + fn try_send(&self, signal: Signal) -> Result<(), SendError, E>> { + Ok(self.0.send(signal)?) + } + + #[inline] + fn blocking_send(&self, signal: Signal) -> Result<(), SendError, E>> { + Ok(self.0.send(signal)?) + } + #[inline] async fn closed(&self) { self.0.closed().await diff --git a/src/message.rs b/src/message.rs index 329a574..e710712 100644 --- a/src/message.rs +++ b/src/message.rs @@ -35,7 +35,7 @@ pub(crate) type BoxReply = Box; /// Messages are processed sequentially one at a time, with exclusive mutable access to the actors state. /// /// The reply type must implement [Reply]. -pub trait Message: Actor { +pub trait Message: Actor { /// The reply sent back to the message caller. type Reply: Reply; diff --git a/src/request/ask.rs b/src/request/ask.rs index 2161acd..82aec9a 100644 --- a/src/request/ask.rs +++ b/src/request/ask.rs @@ -7,7 +7,7 @@ use crate::remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand, SwarmR use crate::{ actor, error::{self, SendError}, - mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Signal}, + mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Mailbox, Signal}, message::{BoxReply, Message}, reply::ReplySender, Actor, Reply, @@ -158,42 +158,88 @@ impl AskRequest { } } -///////////////////////// -// === MessageSend === // -///////////////////////// -macro_rules! impl_message_send { - (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> MessageSend +macro_rules! impl_message_trait { + (local, $($async:ident)? => $trait:ident :: $method:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> $trait for AskRequest< - LocalAskRequest<'a, A, $mailbox>, - $mailbox, + LocalAskRequest<'a, A, A::Mailbox>, + A::Mailbox, M, $mailbox_timeout, $reply_timeout, > where - A: Actor> + Message, + A: Actor + Message, M: Send + 'static, { type Ok = ::Ok; - type Error = SendError::Error>; + type Error = error::SendError::Error>; #[inline] - async fn send(self) -> Result { + $($async)? fn $method(self) -> Result { let $req = self; $($body)* } } }; - (remote, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| ($mailbox_timeout_body:expr, $reply_timeout_body:expr)) => { - impl<'a, A, M> MessageSend + (local, $($async:ident)? => $trait:ident :: $method:ident, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> $trait for AskRequest< - RemoteAskRequest<'a, A, M>, + LocalAskRequest<'a, A, $mailbox>, $mailbox, M, $mailbox_timeout, $reply_timeout, > + where + A: Actor> + Message, + M: Send + 'static, + { + type Ok = ::Ok; + type Error = error::SendError::Error>; + + #[inline] + $($async)? fn $method(self) -> Result { + let $req = self; + $($body)* + } + } + }; + (remote, $($async:ident)? => $trait:ident :: $method:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| ($mailbox_timeout_body:expr, $reply_timeout_body:expr)) => { + impl<'a, A, M> $trait + for AskRequest, A::Mailbox, M, $mailbox_timeout, $reply_timeout> + where + AskRequest< + LocalAskRequest<'a, A, A::Mailbox>, + A::Mailbox, + M, + $mailbox_timeout, + $reply_timeout, + >: $trait, + A: Actor + Message + RemoteActor + RemoteMessage, + M: serde::Serialize + Send + Sync + 'static, + ::Ok: for<'de> serde::Deserialize<'de>, + ::Error: for<'de> serde::Deserialize<'de>, + { + type Ok = ::Ok; + type Error = error::RemoteSendError<::Error>; + + #[inline] + $($async)? fn $method(self) -> Result { + let $req = self; + remote_ask( + $req.location.actor_ref, + &$req.location.msg, + $mailbox_timeout_body, + $reply_timeout_body, + false + ).await + } + } + }; + (remote, $($async:ident)? => $trait:ident :: $method:ident, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| ($mailbox_timeout_body:expr, $reply_timeout_body:expr)) => { + impl<'a, A, M> $trait + for AskRequest, $mailbox, M, $mailbox_timeout, $reply_timeout> where AskRequest< LocalAskRequest<'a, A, $mailbox>, @@ -201,9 +247,9 @@ macro_rules! impl_message_send { M, $mailbox_timeout, $reply_timeout, - >: MessageSend, + >: $trait, A: Actor> + Message + RemoteActor + RemoteMessage, - M: serde::Serialize + Send + Sync, + M: serde::Serialize + Send + Sync + 'static, ::Ok: for<'de> serde::Deserialize<'de>, ::Error: for<'de> serde::Deserialize<'de>, { @@ -211,7 +257,7 @@ macro_rules! impl_message_send { type Error = error::RemoteSendError<::Error>; #[inline] - async fn send(self) -> Result { + $($async)? fn $method(self) -> Result { let $req = self; remote_ask( $req.location.actor_ref, @@ -225,21 +271,26 @@ macro_rules! impl_message_send { }; } -impl_message_send!( +///////////////////////// +// === MessageSend === // +///////////////////////// +impl_message_trait!( local, - BoundedMailbox, + async => MessageSend::send, WithoutRequestTimeout, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal).await?; + req.location.mailbox.send(req.location.signal).await + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap()))?; match req.location.rx.await? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } ); -impl_message_send!( +impl_message_trait!( local, + async => MessageSend::send, BoundedMailbox, WithoutRequestTimeout, WithRequestTimeout, @@ -251,8 +302,9 @@ impl_message_send!( } } ); -impl_message_send!( +impl_message_trait!( local, + async => MessageSend::send, BoundedMailbox, WithRequestTimeout, WithoutRequestTimeout, @@ -268,8 +320,9 @@ impl_message_send!( } } ); -impl_message_send!( +impl_message_trait!( local, + async => MessageSend::send, BoundedMailbox, WithRequestTimeout, WithRequestTimeout, @@ -286,21 +339,9 @@ impl_message_send!( } ); -impl_message_send!( - local, - UnboundedMailbox, - WithoutRequestTimeout, - WithoutRequestTimeout, - |req| { - req.location.mailbox.0.send(req.location.signal)?; - match req.location.rx.await? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), - } - } -); -impl_message_send!( +impl_message_trait!( local, + async => MessageSend::send, UnboundedMailbox, WithoutRequestTimeout, WithRequestTimeout, @@ -314,32 +355,35 @@ impl_message_send!( ); #[cfg(feature = "remote")] -impl_message_send!( +impl_message_trait!( remote, - BoundedMailbox, + async => MessageSend::send, WithoutRequestTimeout, WithoutRequestTimeout, |req| (None, None) ); #[cfg(feature = "remote")] -impl_message_send!( +impl_message_trait!( remote, + async => MessageSend::send, BoundedMailbox, WithoutRequestTimeout, WithRequestTimeout, |req| (None, Some(req.reply_timeout.0)) ); #[cfg(feature = "remote")] -impl_message_send!( +impl_message_trait!( remote, + async => MessageSend::send, BoundedMailbox, WithRequestTimeout, WithoutRequestTimeout, |req| (Some(req.mailbox_timeout.0), None) ); #[cfg(feature = "remote")] -impl_message_send!( +impl_message_trait!( remote, + async => MessageSend::send, BoundedMailbox, WithRequestTimeout, WithRequestTimeout, @@ -347,24 +391,18 @@ impl_message_send!( ); #[cfg(feature = "remote")] -impl_message_send!( - remote, - UnboundedMailbox, - WithoutRequestTimeout, - WithoutRequestTimeout, - |req| (None, None) -); -#[cfg(feature = "remote")] -impl_message_send!( +impl_message_trait!( remote, + async => MessageSend::send, UnboundedMailbox, WithoutRequestTimeout, WithRequestTimeout, |req| (None, Some(req.reply_timeout.0)) ); -impl_message_send!( +impl_message_trait!( local, + async => MessageSend::send, BoundedMailbox, MaybeRequestTimeout, MaybeRequestTimeout, @@ -417,8 +455,9 @@ impl_message_send!( } ); -impl_message_send!( +impl_message_trait!( local, + async => MessageSend::send, UnboundedMailbox, MaybeRequestTimeout, MaybeRequestTimeout, @@ -454,88 +493,26 @@ impl_message_send!( } ); -///////////////////////// -// === MessageSend === // -///////////////////////// -macro_rules! impl_try_message_send { - (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> TryMessageSend - for AskRequest< - LocalAskRequest<'a, A, $mailbox>, - $mailbox, - M, - $mailbox_timeout, - $reply_timeout, - > - where - A: Actor> + Message, - M: Send + 'static, - { - type Ok = ::Ok; - type Error = SendError::Error>; - - #[inline] - async fn try_send(self) -> Result { - let $req = self; - $($body)* - } - } - }; - (remote, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| ($mailbox_timeout_body:expr, $reply_timeout_body:expr)) => { - impl<'a, A, M> TryMessageSend - for AskRequest< - RemoteAskRequest<'a, A, M>, - $mailbox, - M, - $mailbox_timeout, - $reply_timeout, - > - where - AskRequest< - LocalAskRequest<'a, A, $mailbox>, - $mailbox, - M, - $mailbox_timeout, - $reply_timeout, - >: TryMessageSend, - A: Actor> + Message + RemoteActor + RemoteMessage, - M: serde::Serialize + Send + Sync, - ::Ok: for<'de> serde::Deserialize<'de>, - ::Error: for<'de> serde::Deserialize<'de>, - { - type Ok = ::Ok; - type Error = error::RemoteSendError<::Error>; - - #[inline] - async fn try_send(self) -> Result { - let $req = self; - remote_ask( - $req.location.actor_ref, - &$req.location.msg, - $mailbox_timeout_body, - $reply_timeout_body, - true - ).await - } - } - }; -} - -impl_try_message_send!( +//////////////////////////// +// === TryMessageSend === // +//////////////////////////// +impl_message_trait!( local, - BoundedMailbox, + async => TryMessageSend::try_send, WithoutRequestTimeout, WithoutRequestTimeout, |req| { - req.location.mailbox.0.try_send(req.location.signal)?; + req.location.mailbox.try_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap()))?; match req.location.rx.await? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), } } ); -impl_try_message_send!( +impl_message_trait!( local, + async => TryMessageSend::try_send, BoundedMailbox, WithoutRequestTimeout, WithRequestTimeout, @@ -548,21 +525,9 @@ impl_try_message_send!( } ); -impl_try_message_send!( - local, - UnboundedMailbox, - WithoutRequestTimeout, - WithoutRequestTimeout, - |req| { - req.location.mailbox.0.send(req.location.signal)?; - match req.location.rx.await? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), - } - } -); -impl_try_message_send!( +impl_message_trait!( local, + async => TryMessageSend::try_send, UnboundedMailbox, WithoutRequestTimeout, WithRequestTimeout, @@ -576,16 +541,17 @@ impl_try_message_send!( ); #[cfg(feature = "remote")] -impl_try_message_send!( +impl_message_trait!( remote, - BoundedMailbox, + async => TryMessageSend::try_send, WithoutRequestTimeout, WithoutRequestTimeout, |req| (None, None) ); #[cfg(feature = "remote")] -impl_try_message_send!( +impl_message_trait!( remote, + async => TryMessageSend::try_send, BoundedMailbox, WithoutRequestTimeout, WithRequestTimeout, @@ -593,24 +559,18 @@ impl_try_message_send!( ); #[cfg(feature = "remote")] -impl_try_message_send!( - remote, - UnboundedMailbox, - WithoutRequestTimeout, - WithoutRequestTimeout, - |req| (None, None) -); -#[cfg(feature = "remote")] -impl_try_message_send!( +impl_message_trait!( remote, + async => TryMessageSend::try_send, UnboundedMailbox, WithoutRequestTimeout, WithRequestTimeout, |req| (None, Some(req.reply_timeout.0)) ); -impl_try_message_send!( +impl_message_trait!( local, + async => TryMessageSend::try_send, BoundedMailbox, MaybeRequestTimeout, MaybeRequestTimeout, @@ -646,8 +606,9 @@ impl_try_message_send!( } ); -impl_try_message_send!( +impl_message_trait!( local, + async => TryMessageSend::try_send, UnboundedMailbox, MaybeRequestTimeout, MaybeRequestTimeout, @@ -686,96 +647,14 @@ impl_try_message_send!( ///////////////////////////////// // === BlockingMessageSend === // ///////////////////////////////// -macro_rules! impl_blocking_message_send { - (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> BlockingMessageSend - for AskRequest< - LocalAskRequest<'a, A, $mailbox>, - $mailbox, - M, - $mailbox_timeout, - $reply_timeout, - > - where - A: Actor> + Message, - M: 'static, - { - type Ok = ::Ok; - type Error = SendError::Error>; - - #[inline] - fn blocking_send(self) -> Result { - let $req = self; - $($body)* - } - } - }; -} - -impl_blocking_message_send!( +impl_message_trait!( local, - BoundedMailbox, - WithoutRequestTimeout, - WithoutRequestTimeout, - |req| { - req.location.mailbox.0.blocking_send(req.location.signal)?; - match req.location.rx.blocking_recv()? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), - } - } -); - -impl_blocking_message_send!( - local, - UnboundedMailbox, - WithoutRequestTimeout, - WithoutRequestTimeout, - |req| { - req.location.mailbox.0.send(req.location.signal)?; - match req.location.rx.blocking_recv()? { - Ok(val) => Ok(*val.downcast().unwrap()), - Err(err) => Err(err.downcast()), - } - } -); - -///////////////////////////////// -// === BlockingMessageSend === // -///////////////////////////////// -macro_rules! impl_try_blocking_message_send { - (local, $mailbox:ident, $mailbox_timeout:ident, $reply_timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> TryBlockingMessageSend - for AskRequest< - LocalAskRequest<'a, A, $mailbox>, - $mailbox, - M, - $mailbox_timeout, - $reply_timeout, - > - where - A: Actor> + Message, - M: 'static, - { - type Ok = ::Ok; - type Error = SendError::Error>; - - #[inline] - fn try_blocking_send(self) -> Result { - let $req = self; - $($body)* - } - } - }; -} - -impl_try_blocking_message_send!( - local, - BoundedMailbox, + => BlockingMessageSend::blocking_send, WithoutRequestTimeout, WithoutRequestTimeout, |req| { - req.location.mailbox.0.try_send(req.location.signal)?; + req.location.mailbox.blocking_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap()))?; match req.location.rx.blocking_recv()? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), @@ -783,13 +662,17 @@ impl_try_blocking_message_send!( } ); -impl_try_blocking_message_send!( +//////////////////////////////////// +// === TryBlockingMessageSend === // +//////////////////////////////////// +impl_message_trait!( local, - UnboundedMailbox, + => TryBlockingMessageSend::try_blocking_send, WithoutRequestTimeout, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; + req.location.mailbox.try_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap()))?; match req.location.rx.blocking_recv()? { Ok(val) => Ok(*val.downcast().unwrap()), Err(err) => Err(err.downcast()), @@ -965,7 +848,7 @@ async fn remote_ask<'a, A, M>( ) -> Result<::Ok, error::RemoteSendError<::Error>> where A: Actor + Message + RemoteActor + RemoteMessage, - M: serde::Serialize, + M: serde::Serialize + Send + 'static, ::Ok: for<'de> serde::Deserialize<'de>, ::Error: for<'de> serde::Deserialize<'de>, { diff --git a/src/request/tell.rs b/src/request/tell.rs index 0d4300b..7eccc82 100644 --- a/src/request/tell.rs +++ b/src/request/tell.rs @@ -6,7 +6,7 @@ use crate::remote; use crate::{ actor, error, - mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Signal}, + mailbox::{bounded::BoundedMailbox, unbounded::UnboundedMailbox, Mailbox, Signal}, message::Message, Actor, Reply, }; @@ -122,123 +122,31 @@ impl TellRequest { } } -///////////////////////// -// === MessageSend === // -///////////////////////// -macro_rules! impl_message_send { - (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> MessageSend +macro_rules! impl_message_trait { + (local, $($async:ident)? => $trait:ident :: $method:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> $trait for TellRequest< - LocalTellRequest<'a, A, $mailbox>, - $mailbox, + LocalTellRequest<'a, A, A::Mailbox>, + A::Mailbox, M, $timeout, > where - A: Actor> + Message, + A: Actor + Message, M: Send + 'static, { type Ok = (); type Error = error::SendError::Error>; #[inline] - async fn send(self) -> Result { + $($async)? fn $method(self) -> Result { let $req = self; $($body)* } } }; - (remote, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> MessageSend - for TellRequest, $mailbox, M, $timeout> - where - TellRequest< - LocalTellRequest<'a, A, $mailbox>, - $mailbox, - M, - $timeout, - >: MessageSend, - A: Actor> + Message + remote::RemoteActor + remote::RemoteMessage, - M: serde::Serialize + Send + Sync, - ::Error: for<'de> serde::Deserialize<'de>, - { - type Ok = (); - type Error = error::RemoteSendError<::Error>; - - #[inline] - async fn send(self) -> Result { - let $req = self; - remote_tell($req.location.actor_ref, &$req.location.msg, $($body)*, false).await - } - } - }; -} - -impl_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal).await?; - Ok(()) -}); -impl_message_send!(local, BoundedMailbox, WithRequestTimeout, |req| { - req.location - .mailbox - .0 - .send_timeout(req.location.signal, req.timeout.0) - .await?; - Ok(()) -}); -#[cfg(feature = "remote")] -impl_message_send!(remote, BoundedMailbox, WithoutRequestTimeout, |req| None); -#[cfg(feature = "remote")] -impl_message_send!(remote, BoundedMailbox, WithRequestTimeout, |req| Some( - req.timeout.0 -)); - -impl_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; - Ok(()) -}); -#[cfg(feature = "remote")] -impl_message_send!(remote, UnboundedMailbox, WithoutRequestTimeout, |req| None); - -impl_message_send!(local, BoundedMailbox, MaybeRequestTimeout, |req| { - match req.timeout { - MaybeRequestTimeout::NoTimeout => { - req.location.mailbox.0.send(req.location.signal).await?; - } - MaybeRequestTimeout::Timeout(timeout) => { - req.location - .mailbox - .0 - .send_timeout(req.location.signal, timeout) - .await?; - } - } - Ok(()) -}); - -impl_message_send!(local, UnboundedMailbox, MaybeRequestTimeout, |req| { - match req.timeout { - MaybeRequestTimeout::NoTimeout => { - TellRequest { - location: req.location, - timeout: WithoutRequestTimeout, - phantom: PhantomData, - } - .send() - .await - } - MaybeRequestTimeout::Timeout(_) => { - panic!("mailbox timeout is not available with unbounded mailboxes") - } - } -}); - -///////////////////////////// -// === MessageSendSync === // -///////////////////////////// -macro_rules! impl_message_send_sync { - (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> MessageSendSync + (local, $($async:ident)? => $trait:ident :: $method:ident, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> $trait for TellRequest< LocalTellRequest<'a, A, $mailbox>, $mailbox, @@ -247,53 +155,44 @@ macro_rules! impl_message_send_sync { > where A: Actor> + Message, - M: 'static, + M: Send + 'static, { type Ok = (); type Error = error::SendError::Error>; #[inline] - fn send_sync(self) -> Result { + $($async)? fn $method(self) -> Result { let $req = self; $($body)* } } }; -} - -impl_message_send_sync!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; - Ok(()) -}); - -//////////////////////////// -// === TryMessageSend === // -//////////////////////////// -macro_rules! impl_try_message_send { - (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> TryMessageSend - for TellRequest< - LocalTellRequest<'a, A, $mailbox>, - $mailbox, + (remote, $($async:ident)? => $trait:ident :: $method:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> $trait + for TellRequest, A::Mailbox, M, $timeout> + where + TellRequest< + LocalTellRequest<'a, A, A::Mailbox>, + A::Mailbox, M, $timeout, - > - where - A: Actor> + Message, - M: Send + 'static, + >: $trait, + A: Actor + Message + remote::RemoteActor + remote::RemoteMessage, + M: serde::Serialize + Send + Sync + 'static, + ::Error: for<'de> serde::Deserialize<'de>, { type Ok = (); - type Error = error::SendError::Error>; + type Error = error::RemoteSendError<::Error>; #[inline] - async fn try_send(self) -> Result { + $($async)? fn $method(self) -> Result { let $req = self; - $($body)* + remote_tell($req.location.actor_ref, &$req.location.msg, $($body)*, false).await } } }; - (remote, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> TryMessageSend + (remote, $($async:ident)? => $trait:ident :: $method:ident, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { + impl<'a, A, M> $trait for TellRequest, $mailbox, M, $timeout> where TellRequest< @@ -301,37 +200,130 @@ macro_rules! impl_try_message_send { $mailbox, M, $timeout, - >: TryMessageSend, + >: $trait, A: Actor> + Message + remote::RemoteActor + remote::RemoteMessage, - M: serde::Serialize + Send + Sync, + M: serde::Serialize + Send + Sync + 'static, ::Error: for<'de> serde::Deserialize<'de>, { type Ok = (); type Error = error::RemoteSendError<::Error>; #[inline] - async fn try_send(self) -> Result { + $($async)? fn $method(self) -> Result { let $req = self; - remote_tell($req.location.actor_ref, &$req.location.msg, $($body)*, true).await + remote_tell($req.location.actor_ref, &$req.location.msg, $($body)*, false).await } } }; } -impl_try_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.try_send(req.location.signal)?; - Ok(()) +///////////////////////// +// === MessageSend === // +///////////////////////// +impl_message_trait!(local, async => MessageSend::send, WithoutRequestTimeout, |req| { + req.location + .mailbox + .send(req.location.signal) + .await + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap())) }); +impl_message_trait!( + local, + async => MessageSend::send, + BoundedMailbox, + WithRequestTimeout, + |req| { + req.location + .mailbox + .0 + .send_timeout(req.location.signal, req.timeout.0) + .await?; + Ok(()) + } +); +#[cfg(feature = "remote")] +impl_message_trait!(remote, async => MessageSend::send, WithoutRequestTimeout, |req| None); #[cfg(feature = "remote")] -impl_try_message_send!(remote, BoundedMailbox, WithoutRequestTimeout, |req| None); -impl_try_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; - Ok(()) +impl_message_trait!( + remote, + async => MessageSend::send, + BoundedMailbox, + WithRequestTimeout, + |req| Some(req.timeout.0) +); + +impl_message_trait!( + local, + async => MessageSend::send, + BoundedMailbox, + MaybeRequestTimeout, + |req| { + match req.timeout { + MaybeRequestTimeout::NoTimeout => { + req.location.mailbox.0.send(req.location.signal).await?; + } + MaybeRequestTimeout::Timeout(timeout) => { + req.location + .mailbox + .0 + .send_timeout(req.location.signal, timeout) + .await?; + } + } + Ok(()) + } +); + +impl_message_trait!( + local, + async => MessageSend::send, + UnboundedMailbox, + MaybeRequestTimeout, + |req| { + match req.timeout { + MaybeRequestTimeout::NoTimeout => { + TellRequest { + location: req.location, + timeout: WithoutRequestTimeout, + phantom: PhantomData, + } + .send() + .await + } + MaybeRequestTimeout::Timeout(_) => { + panic!("mailbox timeout is not available with unbounded mailboxes") + } + } + } +); + +///////////////////////////// +// === MessageSendSync === // +///////////////////////////// +impl_message_trait!( + local, + => MessageSendSync::send_sync, + UnboundedMailbox, + WithoutRequestTimeout, + |req| { + req.location.mailbox.0.send(req.location.signal)?; + Ok(()) + } +); + +//////////////////////////// +// === TryMessageSend === // +//////////////////////////// +impl_message_trait!(local, async => TryMessageSend::try_send, WithoutRequestTimeout, |req| { + req.location + .mailbox + .try_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap())) }); #[cfg(feature = "remote")] -impl_try_message_send!(remote, UnboundedMailbox, WithoutRequestTimeout, |req| None); +impl_message_trait!(remote, async => TryMessageSend::try_send, WithoutRequestTimeout, |req| None); -impl_try_message_send!(local, BoundedMailbox, MaybeRequestTimeout, |req| { +impl_message_trait!(local, async => TryMessageSend::try_send, BoundedMailbox, MaybeRequestTimeout, |req| { match req.timeout { MaybeRequestTimeout::NoTimeout => { TellRequest { @@ -348,7 +340,7 @@ impl_try_message_send!(local, BoundedMailbox, MaybeRequestTimeout, |req| { } }); -impl_try_message_send!(local, UnboundedMailbox, MaybeRequestTimeout, |req| { +impl_message_trait!(local, async => TryMessageSend::try_send, UnboundedMailbox, MaybeRequestTimeout, |req| { match req.timeout { MaybeRequestTimeout::NoTimeout => { TellRequest { @@ -368,115 +360,25 @@ impl_try_message_send!(local, UnboundedMailbox, MaybeRequestTimeout, |req| { //////////////////////////////// // === TryMessageSendSync === // //////////////////////////////// -macro_rules! impl_try_message_send_sync { - (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> TryMessageSendSync - for TellRequest< - LocalTellRequest<'a, A, $mailbox>, - $mailbox, - M, - $timeout, - > - where - A: Actor> + Message, - M: 'static, - { - type Ok = (); - type Error = error::SendError::Error>; - - #[inline] - fn try_send_sync(self) -> Result { - let $req = self; - $($body)* - } - } - }; -} - -impl_try_message_send_sync!(local, BoundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.try_send(req.location.signal)?; - Ok(()) -}); - -impl_try_message_send_sync!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; - Ok(()) +impl_message_trait!(local, => TryMessageSendSync::try_send_sync, WithoutRequestTimeout, |req| { + req.location.mailbox.try_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap())) }); //////////////////////////////// // === BlockingMessageSend === // //////////////////////////////// -macro_rules! impl_blocking_message_send { - (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> BlockingMessageSend - for TellRequest< - LocalTellRequest<'a, A, $mailbox>, - $mailbox, - M, - $timeout, - > - where - A: Actor> + Message, - M: 'static, - { - type Ok = (); - type Error = error::SendError::Error>; - - #[inline] - fn blocking_send(self) -> Result { - let $req = self; - $($body)* - } - } - }; -} - -impl_blocking_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.blocking_send(req.location.signal)?; - Ok(()) -}); - -impl_blocking_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; - Ok(()) +impl_message_trait!(local, => BlockingMessageSend::blocking_send, WithoutRequestTimeout, |req| { + req.location.mailbox.blocking_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap())) }); //////////////////////////////////// // === TryBlockingMessageSend === // //////////////////////////////////// -macro_rules! impl_try_blocking_message_send { - (local, $mailbox:ident, $timeout:ident, |$req:ident| $($body:tt)*) => { - impl<'a, A, M> TryBlockingMessageSend - for TellRequest< - LocalTellRequest<'a, A, $mailbox>, - $mailbox, - M, - $timeout, - > - where - A: Actor> + Message, - M: 'static, - { - type Ok = (); - type Error = error::SendError::Error>; - - #[inline] - fn try_blocking_send(self) -> Result { - let $req = self; - $($body)* - } - } - }; -} - -impl_try_blocking_message_send!(local, BoundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.try_send(req.location.signal)?; - Ok(()) -}); - -impl_try_blocking_message_send!(local, UnboundedMailbox, WithoutRequestTimeout, |req| { - req.location.mailbox.0.send(req.location.signal)?; - Ok(()) +impl_message_trait!(local, => TryBlockingMessageSend::try_blocking_send, WithoutRequestTimeout, |req| { + req.location.mailbox.try_send(req.location.signal) + .map_err(|err| err.map_msg(|signal| signal.downcast_message().unwrap())) }); #[cfg(feature = "remote")] @@ -488,7 +390,7 @@ async fn remote_tell( ) -> Result<(), error::RemoteSendError<::Error>> where A: Actor + Message + remote::RemoteActor + remote::RemoteMessage, - M: serde::Serialize, + M: serde::Serialize + Send + 'static, ::Error: for<'de> serde::Deserialize<'de>, { use remote::*;