Skip to content

Commit

Permalink
Add ActorGroup::send_to_one
Browse files Browse the repository at this point in the history
Future version of try_send_to_one.

Closes #565
  • Loading branch information
Thomasdezeeuw committed Mar 31, 2024
1 parent 8bdcd25 commit a699eb7
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
17 changes: 17 additions & 0 deletions src/actor_ref/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,23 @@ impl<M> ActorGroup<M> {
self.actor_refs[idx].try_send(msg)
}

/// Send a message to one of the actors in the group.
pub fn send_to_one<'r, Msg>(&'r self, msg: Msg) -> SendValue<'r, M>
where
Msg: Into<M>,
{
if self.actor_refs.is_empty() {
return SendValue {
kind: SendValueKind::Mapped(MappedSendValue::SendErr),
};
}

// SAFETY: this needs to sync with all other accesses to `send_next`.
// NOTE: this wraps around on overflow.
let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
self.actor_refs[idx].send(msg)
}

/// Attempts to send a message to all of the actors in the group.
///
/// This will first `clone` the message and then attempts to send it to each
Expand Down
10 changes: 9 additions & 1 deletion tests/functional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
mod util {
use std::future::Future;
use std::mem::size_of;
use std::pin::pin;
use std::pin::{pin, Pin};
use std::task::{self, Poll};

pub fn assert_send<T: Send>() {}
Expand All @@ -27,6 +27,14 @@ mod util {
}
}
}

pub fn poll_once<Fut: Future>(fut: Pin<&mut Fut>) {
let mut ctx = task::Context::from_waker(task::Waker::noop());
match fut.poll(&mut ctx) {
Poll::Ready(_) => panic!("unexpected output"),
Poll::Pending => {}
}
}
}

#[path = "functional"] // rustfmt can't find the files.
Expand Down
45 changes: 44 additions & 1 deletion tests/functional/actor_group.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! Tests related to `ActorGroup`.
use std::pin::pin;

use heph::actor_ref::{ActorGroup, SendError};
use heph::future::{ActorFuture, ActorFutureBuilder, InboxSize};
use heph::supervisor::NoSupervisor;
use heph::{actor, actor_fn};

use crate::util::{assert_send, assert_size, assert_sync, block_on};
use crate::util::{assert_send, assert_size, assert_sync, block_on, poll_once};

#[test]
fn size() {
Expand Down Expand Up @@ -70,6 +72,47 @@ fn try_send_to_one_empty() {
assert_eq!(group.try_send_to_one(()), Err(SendError));
}

#[test]
fn send_to_one() {
let (future, actor_ref) = ActorFuture::new(NoSupervisor, actor_fn(count_actor), 1).unwrap();

let group = ActorGroup::from(actor_ref);
assert_eq!(block_on(group.send_to_one(())), Ok(()));
drop(group);

block_on(future);
}

#[test]
fn send_to_one_full_inbox() {
let (future, actor_ref) = ActorFutureBuilder::new()
.with_inbox_size(InboxSize::ONE)
.build(NoSupervisor, actor_fn(count_actor), 1)
.unwrap();
let mut future = pin!(future);

let group = ActorGroup::from(actor_ref);
assert_eq!(block_on(group.send_to_one(())), Ok(()));

{
let mut send_future = pin!(group.send_to_one(()));
poll_once(send_future.as_mut()); // Should return Poll::Pending.

// Emptying the actor's inbox should allow us to send the message.
poll_once(future.as_mut());
assert_eq!(block_on(send_future), Ok(()));
} // Drops `send_future`, to allow us to drop `group`.
drop(group);

block_on(future);
}

#[test]
fn send_to_one_empty() {
let group = ActorGroup::<()>::empty();
assert_eq!(block_on(group.send_to_one(())), Err(SendError));
}

#[test]
fn try_send_to_all() {
let (future1, actor_ref1) = ActorFuture::new(NoSupervisor, actor_fn(count_actor), 1).unwrap();
Expand Down

0 comments on commit a699eb7

Please sign in to comment.