Skip to content

Commit

Permalink
Add RpcMessage::try_handle
Browse files Browse the repository at this point in the history
Failable version of RpcMessage::handle.
  • Loading branch information
Thomasdezeeuw committed Dec 31, 2023
1 parent 6f8662a commit 964df81
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
50 changes: 50 additions & 0 deletions rt/tests/functional/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,10 +951,12 @@ fn join_before_actor_finished() {
enum CalcMessage {
Get(RpcMessage<(), usize>),
Add(RpcMessage<usize, ()>),
Add2(RpcMessage<(usize, usize), ()>),
}

from_message!(CalcMessage::Get(()) -> usize);
from_message!(CalcMessage::Add(usize) -> ());
from_message!(CalcMessage::Add2((usize, usize)) -> ());

#[derive(Debug, Eq, PartialEq)]
struct Overflow;
Expand All @@ -970,6 +972,16 @@ async fn calc_actor(mut ctx: actor::Context<CalcMessage, ThreadLocal>) -> Result
.await
.unwrap()
}
CalcMessage::Add2(msg) => {
let c = &mut count;
msg.try_handle(|(a, b)| async move {
*c = c.checked_add(a).ok_or(Overflow)?;
*c = c.checked_add(b).ok_or(Overflow)?;
Ok(())
})
.await?
.unwrap()
}
}
}
Ok(())
Expand Down Expand Up @@ -1019,3 +1031,41 @@ fn rpc_message_handle_skip_if_no_receiver() {
drop(actor_ref);
assert_eq!(poll_future(Pin::new(&mut actor)), Poll::Ready(Ok(())));
}

#[test]
fn rpc_message_try_handle() {
let calc_actor = actor_fn(calc_actor);
let (actor, actor_ref) = init_local_actor(calc_actor, ()).unwrap();
let mut actor = pin!(actor);

let mut add_rpc = actor_ref.rpc((usize::MAX, usize::MAX));
assert_eq!(poll_future(Pin::new(&mut add_rpc)), Poll::Pending);

assert_eq!(
poll_future(Pin::new(&mut actor)),
Poll::Ready(Err(Overflow))
);
}

#[test]
fn rpc_message_try_handle_skip_if_no_receiver() {
let calc_actor = actor_fn(calc_actor);
let (actor, actor_ref) = init_local_actor(calc_actor, ()).unwrap();
let mut actor = pin!(actor);

let mut add_rpc = actor_ref.rpc((usize::MAX, usize::MAX));
// Make sure the value is send.
assert_eq!(poll_future(Pin::new(&mut add_rpc)), Poll::Pending);
drop(add_rpc);

let mut get_rpc = actor_ref.rpc(());
assert_eq!(poll_future(Pin::new(&mut get_rpc)), Poll::Pending);

assert_eq!(poll_future(Pin::new(&mut actor)), Poll::Pending);

assert_eq!(poll_future(Pin::new(&mut get_rpc)), Poll::Ready(Ok(10)));
drop(get_rpc);

drop(actor_ref);
assert_eq!(poll_future(Pin::new(&mut actor)), Poll::Ready(Ok(())));
}
20 changes: 20 additions & 0 deletions src/actor_ref/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,26 @@ impl<Req, Res> RpcMessage<Req, Res> {
Ok(())
}
}

/// Convenience method to handle a `Req`uest and return a `Res`ponse.
///
/// This is similar to [`handle`], but allows `f` to be failable.
///
/// [`handle`]: RpcMessage::handle
pub async fn try_handle<F, Fut, E>(self, f: F) -> Result<Result<(), SendError>, E>
where
F: FnOnce(Req) -> Fut,
Fut: Future<Output = Result<Res, E>>,
{
if self.response.is_connected() {
let response = f(self.request).await?;
Ok(self.response.respond(response))
} else {
// If the receiving actor is no longer waiting we can skip the
// request.
Ok(Ok(()))
}
}
}

/// Structure to respond to an [`Rpc`] request.
Expand Down

0 comments on commit 964df81

Please sign in to comment.