Skip to content

Commit

Permalink
feat: add public BoxReplySender type alias (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
tqwewe authored Oct 21, 2024
1 parent b577da9 commit 810409a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 22 deletions.
10 changes: 5 additions & 5 deletions src/actor/kind.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{collections::VecDeque, mem, panic::AssertUnwindSafe};

use futures::{Future, FutureExt};
use tokio::sync::oneshot;

use crate::{
actor::{Actor, ActorRef, WeakActorRef},
error::{ActorStopReason, BoxSendError, PanicError},
error::{ActorStopReason, PanicError},
mailbox::Signal,
message::{BoxReply, DynMessage},
message::DynMessage,
reply::BoxReplySender,
};

use super::ActorID;
Expand All @@ -21,7 +21,7 @@ pub(crate) trait ActorState<A: Actor>: Sized {
&mut self,
message: Box<dyn DynMessage<A>>,
actor_ref: ActorRef<A>,
reply: Option<oneshot::Sender<Result<BoxReply, BoxSendError>>>,
reply: Option<BoxReplySender>,
sent_within_actor: bool,
) -> impl Future<Output = Option<ActorStopReason>> + Send;

Expand Down Expand Up @@ -91,7 +91,7 @@ where
&mut self,
message: Box<dyn DynMessage<A>>,
actor_ref: ActorRef<A>,
reply: Option<oneshot::Sender<Result<BoxReply, BoxSendError>>>,
reply: Option<BoxReplySender>,
sent_within_actor: bool,
) -> Option<ActorStopReason> {
if !sent_within_actor && !self.finished_startup {
Expand Down
8 changes: 4 additions & 4 deletions src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ pub mod unbounded;

use dyn_clone::DynClone;
use futures::{future::BoxFuture, Future};
use tokio::sync::oneshot;

use crate::{
actor::{ActorID, ActorRef},
error::{ActorStopReason, BoxSendError, SendError},
message::{BoxReply, DynMessage},
error::{ActorStopReason, SendError},
message::DynMessage,
reply::BoxReplySender,
Actor,
};

Expand Down Expand Up @@ -72,7 +72,7 @@ pub enum Signal<A: Actor> {
Message {
message: Box<dyn DynMessage<A>>,
actor_ref: ActorRef<A>,
reply: Option<oneshot::Sender<Result<BoxReply, BoxSendError>>>,
reply: Option<BoxReplySender>,
sent_within_actor: bool,
},
LinkDied {
Expand Down
18 changes: 12 additions & 6 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
use std::{any, fmt};

use futures::{future::BoxFuture, Future, FutureExt};
use tokio::sync::oneshot;

use crate::{
actor::ActorRef,
error::{BoxSendError, SendError},
reply::{DelegatedReply, ForwardedReply, Reply, ReplySender},
error::SendError,
reply::{BoxReplySender, DelegatedReply, ForwardedReply, Reply, ReplySender},
request::{AskRequest, LocalAskRequest, MessageSend, WithoutRequestTimeout},
Actor,
};
Expand Down Expand Up @@ -139,6 +138,7 @@ where
///
/// It is important to ensure that [ReplySender::send] is called to complete the transaction and send the response
/// back to the requester. Failure to do so could result in the requester waiting indefinitely for a response.
#[must_use]
pub fn reply_sender(&mut self) -> (DelegatedReply<R::Value>, Option<ReplySender<R::Value>>) {
(DelegatedReply::new(), self.reply.take())
}
Expand Down Expand Up @@ -178,18 +178,24 @@ where
}
}

#[doc(hidden)]
/// An object safe message which can be handled by an actor `A`.
///
/// This trait is implemented for all types which implement [`Message`], and is typically used for advanced cases such
/// as buffering actor messages.
pub trait DynMessage<A>
where
Self: Send,
A: Actor,
{
/// Handles the dyn message with the provided actor state, ref, and reply sender.
fn handle_dyn(
self: Box<Self>,
state: &mut A,
actor_ref: ActorRef<A>,
tx: Option<oneshot::Sender<Result<BoxReply, BoxSendError>>>,
tx: Option<BoxReplySender>,
) -> BoxFuture<'_, Option<BoxDebug>>;

/// Casts the type to a `Box<dyn Any>`.
fn as_any(self: Box<Self>) -> Box<dyn any::Any>;
}

Expand All @@ -202,7 +208,7 @@ where
self: Box<Self>,
state: &mut A,
actor_ref: ActorRef<A>,
tx: Option<oneshot::Sender<Result<BoxReply, BoxSendError>>>,
tx: Option<BoxReplySender>,
) -> BoxFuture<'_, Option<BoxDebug>> {
async move {
let mut reply_sender = tx.map(ReplySender::new);
Expand Down
12 changes: 9 additions & 3 deletions src/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ use crate::{
message::{BoxDebug, BoxReply},
};

/// A boxed reply sender which will be downcasted to the correct type when receiving a reply.
///
/// This is reserved for advanced use cases, and misuse of this can result in panics.
pub type BoxReplySender = oneshot::Sender<Result<BoxReply, BoxSendError>>;

/// A deligated reply that has been forwarded to another actor.
pub type ForwardedReply<T, M, E = ()> = DelegatedReply<Result<T, SendError<M, E>>>;

Expand Down Expand Up @@ -152,19 +157,20 @@ where
#[must_use = "the receiver expects a reply to be sent"]
pub struct ReplySender<R: ?Sized> {
tx: oneshot::Sender<Result<BoxReply, BoxSendError>>,
tx: BoxReplySender,
phantom: PhantomData<R>,
}

impl<R> ReplySender<R> {
pub(crate) fn new(tx: oneshot::Sender<Result<BoxReply, BoxSendError>>) -> Self {
pub(crate) fn new(tx: BoxReplySender) -> Self {
ReplySender {
tx,
phantom: PhantomData,
}
}

pub(crate) fn box_sender(self) -> oneshot::Sender<Result<BoxReply, BoxSendError>> {
/// Converts the reply sender to a generic `BoxReplySender`.
pub fn boxed(self) -> BoxReplySender {
self.tx
}

Expand Down
8 changes: 4 additions & 4 deletions src/request/ask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ impl_forward_message!(
WithoutRequestTimeout,
|req, tx| {
match &mut req.location.signal {
Signal::Message { reply, .. } => *reply = Some(tx.box_sender()),
Signal::Message { reply, .. } => *reply = Some(tx.boxed()),
_ => unreachable!("ask requests only support messages"),
}

Expand Down Expand Up @@ -784,7 +784,7 @@ impl_forward_message!(
WithoutRequestTimeout,
|req, tx| {
match &mut req.location.signal {
Signal::Message { reply, .. } => *reply = Some(tx.box_sender()),
Signal::Message { reply, .. } => *reply = Some(tx.boxed()),
_ => unreachable!("ask requests only support messages"),
}

Expand All @@ -805,7 +805,7 @@ impl_forward_message!(
WithoutRequestTimeout,
|req, tx| {
match &mut req.location.signal {
Signal::Message { reply, .. } => *reply = Some(tx.box_sender()),
Signal::Message { reply, .. } => *reply = Some(tx.boxed()),
_ => unreachable!("ask requests only support messages"),
}

Expand Down Expand Up @@ -860,7 +860,7 @@ impl_forward_message_sync!(
WithoutRequestTimeout,
|req, tx| {
match &mut req.location.signal {
Signal::Message { reply, .. } => *reply = Some(tx.box_sender()),
Signal::Message { reply, .. } => *reply = Some(tx.boxed()),
_ => unreachable!("ask requests only support messages"),
}

Expand Down

0 comments on commit 810409a

Please sign in to comment.