Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl IntoFuture for requests #72

Merged
merged 6 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ cargo add kameo
```rust,ignore
use kameo::Actor;
use kameo::message::{Context, Message};
use kameo::request::MessageSend;

// Implement the actor
#[derive(Actor)]
Expand Down Expand Up @@ -100,7 +99,7 @@ impl Message<Inc> for Counter {
let actor_ref = kameo::spawn(Counter { count: 0 });

// Send messages to the actor
let count = actor_ref.ask(Inc { amount: 42 }).send().await?;
let count = actor_ref.ask(Inc { amount: 42 }).await?;
assert_eq!(count, 42);
```

Expand All @@ -121,7 +120,7 @@ actor_ref.register("my_actor").await?;
```rust,ignore
// Lookup the remote actor
if let Some(remote_actor_ref) = RemoteActorRef::<MyActor>::lookup("my_actor").await? {
let count = remote_actor_ref.ask(&Inc { amount: 10 }).send().await?;
let count = remote_actor_ref.ask(&Inc { amount: 10 }).await?;
println!("Incremented! Count is {count}");
}
```
Expand Down
8 changes: 4 additions & 4 deletions docs/distributed-actors/messaging-remote-actors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;

// Send a message and await the reply
if let Some(actor) = remote_actor_ref {
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
}
```
Expand All @@ -30,7 +30,7 @@ If you don’t need a response from the actor, you can use the `tell` method to

```rust
// Send a fire-and-forget message
actor.tell(&LogMessage { text: String::from("Logging event") }).send().await?;
actor.tell(&LogMessage { text: String::from("Logging event") }).await?;
```

The `tell` method is useful for one-way communication where no acknowledgment is required, such as logging or notification systems.
Expand Down Expand Up @@ -84,7 +84,7 @@ By using the `#[remote_message]` macro, Kameo registers each message type during
When sending a message using the `ask` pattern, you’ll typically want to handle a response from the remote actor. The reply type is specified in the actor’s message handler and can be awaited asynchronously.

```rust
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Received reply: {}", result);
```

Expand All @@ -106,7 +106,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

if let Some(actor) = remote_actor_ref {
// Send a message and await the reply
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
} else {
println!("Actor not found");
Expand Down
4 changes: 2 additions & 2 deletions docs/distributed-actors/registering-looking-up-actors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;

if let Some(actor) = remote_actor_ref {
// Use the actor reference to send a message
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
} else {
println!("Actor not found");
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Node 2: Lookup the actor and send a message
let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;
if let Some(actor) = remote_actor_ref {
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
} else {
println!("Actor not found");
Expand Down
2 changes: 1 addition & 1 deletion docs/faq.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Kameo uses [libp2p](https://libp2p.io) for networking, with Kademlia Distributed
You can query an actor’s state by sending a message using the `ask` pattern, which allows you to request a response without modifying the actor's state.

```rust
let result = actor_ref.ask(QueryState).send().await?;
let result = actor_ref.ask(QueryState).await?;
println!("Actor state: {:?}", result);
```

Expand Down
2 changes: 0 additions & 2 deletions docs/getting-started.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ To interact with the `HelloWorldActor`, we spawn it and send a `Greet` message.

```rust
use kameo::spawn;
use kameo::request::MessageSend;

#[tokio::main] // Mark the entry point as an asynchronous main function
async fn main() -> Result<(), Box<dyn std::error::Error>> { // Use a Result return type for error handling
Expand All @@ -66,7 +65,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // Use a Result retu
// Send a Greet message to the actor
actor_ref
.tell(Greet("Hello, world!".to_string()))
.send()
.await?;

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/ask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use kameo::{
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
request::{MessageSend, MessageSendSync},
request::MessageSendSync,
Actor,
};
use tracing::info;
Expand Down Expand Up @@ -53,7 +53,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let count = my_actor_ref
.ask(Inc { amount: 3 })
.reply_timeout(Duration::from_millis(10))
.send()
.await?;
info!("Count is {count}");

Expand Down
11 changes: 6 additions & 5 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use kameo::{
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
request::{MessageSend, MessageSendSync},
request::MessageSendSync,
Actor,
};
use tracing::info;
Expand Down Expand Up @@ -60,21 +60,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let my_actor_ref = kameo::spawn(MyActor::default());

// Increment the count by 3
let count = my_actor_ref.ask(Inc { amount: 3 }).send().await?;
let count = my_actor_ref.ask(Inc { amount: 3 }).await?;
info!("Count is {count}");

// Increment the count by 50 in the background
my_actor_ref.tell(Inc { amount: 50 }).send_sync()?;
my_actor_ref.tell(Inc { amount: 50 }).await?;

// Increment the count by 2
let count = my_actor_ref.ask(Inc { amount: 2 }).send().await?;
let count = my_actor_ref.ask(Inc { amount: 2 }).await?;
info!("Count is {count}");

// Async messages that return an Err will cause the actor to panic
// send_sync is possible since the mailbox is unbounded, so we don't need to wait for any capacity
my_actor_ref.tell(ForceErr).send_sync()?;

// Actor should be stopped, so we cannot send more messages to it
assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err());
assert!(my_actor_ref.ask(Inc { amount: 2 }).await.is_err());

Ok(())
}
11 changes: 3 additions & 8 deletions examples/macro.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::fmt;

use kameo::{
messages,
request::{MessageSend, MessageSendSync},
Actor,
};
use kameo::{messages, request::MessageSendSync, Actor};
use tracing::info;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -54,7 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let my_actor_ref = kameo::spawn(MyActor::new());

// Increment the count by 3
let count = my_actor_ref.ask(Inc { amount: 3 }).send().await?;
let count = my_actor_ref.ask(Inc { amount: 3 }).await?;
info!("Count is {count}");

// Increment the count by 50 in the background
Expand All @@ -65,14 +61,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.ask(Print {
msg: "Generics work!",
})
.send()
.await?;

// Async messages that return an Err will cause the actor to panic
my_actor_ref.tell(ForceErr).send_sync()?;

// Actor should be stopped, so we cannot send more messages to it
assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err());
assert!(my_actor_ref.ask(Inc { amount: 2 }).await.is_err());

Ok(())
}
7 changes: 3 additions & 4 deletions examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Duration;
use kameo::{
actor::pool::{ActorPool, BroadcastMsg, WorkerMsg},
message::{Context, Message},
request::MessageSend,
Actor,
};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -49,18 +48,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Print IDs from 0..=4
for _ in 0..5 {
pool.ask(WorkerMsg(PrintActorID)).send().await?;
pool.ask(WorkerMsg(PrintActorID)).await?;
}

// Force all workers to stop, causing them to be restarted
pool.ask(BroadcastMsg(ForceStop)).send().await?;
pool.ask(BroadcastMsg(ForceStop)).await?;
tokio::time::sleep(Duration::from_millis(200)).await;

println!("Restarted all workers");

// New IDs from 6..=10 will be printed
for _ in 0..5 {
pool.ask(WorkerMsg(PrintActorID)).send().await?;
pool.ask(WorkerMsg(PrintActorID)).await?;
}

Ok(())
Expand Down
7 changes: 3 additions & 4 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use kameo::{
actor::pubsub::{PubSub, Publish, Subscribe},
message::{Context, Message},
request::MessageSend,
Actor,
};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -50,9 +49,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pubsub = kameo::spawn(PubSub::<PrintActorID>::new());
let actor_a = kameo::spawn(ActorA);
let actor_b = kameo::spawn(ActorB);
pubsub.ask(Subscribe(actor_a)).send().await?;
pubsub.ask(Subscribe(actor_b)).send().await?;
pubsub.ask(Publish(PrintActorID)).send().await?;
pubsub.ask(Subscribe(actor_a)).await?;
pubsub.ask(Subscribe(actor_b)).await?;
pubsub.ask(Publish(PrintActorID)).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions examples/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use kameo::{
actor::RemoteActorRef,
message::{Context, Message},
remote::{dial_opts::DialOpts, ActorSwarm},
request::MessageSend,
Actor,
};
use kameo_macros::{remote_message, RemoteActor};
Expand Down Expand Up @@ -93,7 +92,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;
match remote_actor_ref {
Some(remote_actor_ref) => {
let count = remote_actor_ref.ask(&Inc { amount: 10 }).send().await?;
let count = remote_actor_ref.ask(&Inc { amount: 10 }).await?;
remote_actor_ref.tell(&Inc { amount: 10 }).await?;
println!("Incremented! Count is {count}");
}
None => {
Expand Down
4 changes: 2 additions & 2 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use syn::parse_macro_input;
/// }
/// }
///
/// counter_ref.ask(Inc { amount: 5 }).send().await?;
/// counter_ref.ask(Dec { amount: 2 }.clone()).send().await?;
/// counter_ref.ask(Inc { amount: 5 }).await?;
/// counter_ref.ask(Dec { amount: 2 }.clone()).await?;
/// ```
///
/// <details>
Expand Down
13 changes: 4 additions & 9 deletions src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ where
/// use kameo::actor::{Actor, ActorRef};
/// use kameo::error::BoxError;
/// use kameo::mailbox::unbounded::UnboundedMailbox;
/// use kameo::request::MessageSend;
/// use tokio::time::sleep;
///
/// struct MyActor;
Expand Down Expand Up @@ -229,7 +228,6 @@ where
///
/// ```
/// use kameo::actor::ActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor)]
/// # struct MyActor;
Expand All @@ -244,7 +242,7 @@ where
/// # tokio_test::block_on(async {
/// let actor_ref = kameo::spawn(MyActor);
/// # let msg = Msg;
/// let reply = actor_ref.ask(msg).send().await?;
/// let reply = actor_ref.ask(msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down Expand Up @@ -275,7 +273,6 @@ where
///
/// ```
/// use kameo::actor::ActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor)]
/// # struct MyActor;
Expand All @@ -290,7 +287,7 @@ where
/// # tokio_test::block_on(async {
/// let actor_ref = kameo::spawn(MyActor);
/// # let msg = Msg;
/// actor_ref.tell(msg).send().await?;
/// actor_ref.tell(msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down Expand Up @@ -735,7 +732,6 @@ impl<A: Actor> RemoteActorRef<A> {
///
/// ```no_run
/// use kameo::actor::RemoteActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor, kameo::RemoteActor)]
/// # #[actor(mailbox = bounded)]
Expand All @@ -753,7 +749,7 @@ impl<A: Actor> RemoteActorRef<A> {
/// # tokio_test::block_on(async {
/// let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?.unwrap();
/// # let msg = Msg;
/// let reply = remote_actor_ref.ask(&msg).send().await?;
/// let reply = remote_actor_ref.ask(&msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down Expand Up @@ -785,7 +781,6 @@ impl<A: Actor> RemoteActorRef<A> {
///
/// ```no_run
/// use kameo::actor::RemoteActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor, kameo::RemoteActor)]
/// # #[actor(mailbox = bounded)]
Expand All @@ -803,7 +798,7 @@ impl<A: Actor> RemoteActorRef<A> {
/// # tokio_test::block_on(async {
/// let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?.unwrap();
/// # let msg = Msg;
/// remote_actor_ref.tell(&msg).send().await?;
/// remote_actor_ref.tell(&msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down
5 changes: 2 additions & 3 deletions src/actor/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
//! use kameo::Actor;
//! use kameo::actor::pool::{ActorPool, WorkerMsg, BroadcastMsg};
//! # use kameo::message::{Context, Message};
//! use kameo::request::MessageSend;
//!
//! #[derive(Actor)]
//! struct MyWorker;
Expand All @@ -35,8 +34,8 @@
//! let pool_actor = kameo::spawn(ActorPool::new(4, || kameo::spawn(MyWorker)));
//!
//! // Send tasks to the pool
//! pool_actor.tell(WorkerMsg("Hello worker!")).send().await?;
//! pool_actor.tell(BroadcastMsg("Hello all workers!")).send().await?;
//! pool_actor.tell(WorkerMsg("Hello worker!")).await?;
//! pool_actor.tell(BroadcastMsg("Hello all workers!")).await?;
//! # Ok::<(), Box<dyn std::error::Error>>(())
//! # });
//! ```
Expand Down
5 changes: 2 additions & 3 deletions src/actor/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
//! use kameo::Actor;
//! use kameo::actor::pubsub::{PubSub, Publish, Subscribe};
//! # use kameo::message::{Context, Message};
//! use kameo::request::MessageSend;
//!
//! #[derive(Actor)]
//! struct MyActor;
Expand All @@ -39,8 +38,8 @@
//!
//! // Or spawn PubSub as an actor and use messages
//! let pubsub_actor_ref = kameo::spawn(PubSub::new());
//! pubsub_actor_ref.tell(Subscribe(actor_ref)).send().await?;
//! pubsub_actor_ref.tell(Publish("Hello, spawned world!")).send().await?;
//! pubsub_actor_ref.tell(Subscribe(actor_ref)).await?;
//! pubsub_actor_ref.tell(Publish("Hello, spawned world!")).await?;
//! # Ok::<(), Box<dyn std::error::Error>>(())
//! # });
//! ```
Expand Down
Loading