Skip to content

Commit

Permalink
chore: merge branch 'main' into feat/prepared-actors
Browse files Browse the repository at this point in the history
  • Loading branch information
tqwewe committed Oct 20, 2024
2 parents 6d28b3d + b577da9 commit fe8ff8d
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 544 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ cargo add kameo
```rust,ignore
use kameo::Actor;
use kameo::message::{Context, Message};
use kameo::request::MessageSend;
// Implement the actor
#[derive(Actor)]
Expand Down Expand Up @@ -100,7 +99,7 @@ impl Message<Inc> for Counter {
let actor_ref = kameo::spawn(Counter { count: 0 });
// Send messages to the actor
let count = actor_ref.ask(Inc { amount: 42 }).send().await?;
let count = actor_ref.ask(Inc { amount: 42 }).await?;
assert_eq!(count, 42);
```

Expand All @@ -121,7 +120,7 @@ actor_ref.register("my_actor").await?;
```rust,ignore
// Lookup the remote actor
if let Some(remote_actor_ref) = RemoteActorRef::<MyActor>::lookup("my_actor").await? {
let count = remote_actor_ref.ask(&Inc { amount: 10 }).send().await?;
let count = remote_actor_ref.ask(&Inc { amount: 10 }).await?;
println!("Incremented! Count is {count}");
}
```
Expand Down
8 changes: 4 additions & 4 deletions docs/distributed-actors/messaging-remote-actors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;

// Send a message and await the reply
if let Some(actor) = remote_actor_ref {
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
}
```
Expand All @@ -30,7 +30,7 @@ If you don’t need a response from the actor, you can use the `tell` method to

```rust
// Send a fire-and-forget message
actor.tell(&LogMessage { text: String::from("Logging event") }).send().await?;
actor.tell(&LogMessage { text: String::from("Logging event") }).await?;
```

The `tell` method is useful for one-way communication where no acknowledgment is required, such as logging or notification systems.
Expand Down Expand Up @@ -84,7 +84,7 @@ By using the `#[remote_message]` macro, Kameo registers each message type during
When sending a message using the `ask` pattern, you’ll typically want to handle a response from the remote actor. The reply type is specified in the actor’s message handler and can be awaited asynchronously.

```rust
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Received reply: {}", result);
```

Expand All @@ -106,7 +106,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

if let Some(actor) = remote_actor_ref {
// Send a message and await the reply
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
} else {
println!("Actor not found");
Expand Down
4 changes: 2 additions & 2 deletions docs/distributed-actors/registering-looking-up-actors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;

if let Some(actor) = remote_actor_ref {
// Use the actor reference to send a message
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
} else {
println!("Actor not found");
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Node 2: Lookup the actor and send a message
let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;
if let Some(actor) = remote_actor_ref {
let result = actor.ask(&Inc { amount: 10 }).send().await?;
let result = actor.ask(&Inc { amount: 10 }).await?;
println!("Incremented count: {result}");
} else {
println!("Actor not found");
Expand Down
2 changes: 1 addition & 1 deletion docs/faq.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Kameo uses [libp2p](https://libp2p.io) for networking, with Kademlia Distributed
You can query an actor’s state by sending a message using the `ask` pattern, which allows you to request a response without modifying the actor's state.

```rust
let result = actor_ref.ask(QueryState).send().await?;
let result = actor_ref.ask(QueryState).await?;
println!("Actor state: {:?}", result);
```

Expand Down
2 changes: 0 additions & 2 deletions docs/getting-started.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ To interact with the `HelloWorldActor`, we spawn it and send a `Greet` message.

```rust
use kameo::spawn;
use kameo::request::MessageSend;

#[tokio::main] // Mark the entry point as an asynchronous main function
async fn main() -> Result<(), Box<dyn std::error::Error>> { // Use a Result return type for error handling
Expand All @@ -66,7 +65,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // Use a Result retu
// Send a Greet message to the actor
actor_ref
.tell(Greet("Hello, world!".to_string()))
.send()
.await?;

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/ask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use kameo::{
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
request::{MessageSend, MessageSendSync},
request::MessageSendSync,
Actor,
};
use tracing::info;
Expand Down Expand Up @@ -53,7 +53,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let count = my_actor_ref
.ask(Inc { amount: 3 })
.reply_timeout(Duration::from_millis(10))
.send()
.await?;
info!("Count is {count}");

Expand Down
11 changes: 6 additions & 5 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use kameo::{
mailbox::unbounded::UnboundedMailbox,
message::{Context, Message},
request::{MessageSend, MessageSendSync},
request::MessageSendSync,
Actor,
};
use tracing::info;
Expand Down Expand Up @@ -60,21 +60,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let my_actor_ref = kameo::spawn(MyActor::default());

// Increment the count by 3
let count = my_actor_ref.ask(Inc { amount: 3 }).send().await?;
let count = my_actor_ref.ask(Inc { amount: 3 }).await?;
info!("Count is {count}");

// Increment the count by 50 in the background
my_actor_ref.tell(Inc { amount: 50 }).send_sync()?;
my_actor_ref.tell(Inc { amount: 50 }).await?;

// Increment the count by 2
let count = my_actor_ref.ask(Inc { amount: 2 }).send().await?;
let count = my_actor_ref.ask(Inc { amount: 2 }).await?;
info!("Count is {count}");

// Async messages that return an Err will cause the actor to panic
// send_sync is possible since the mailbox is unbounded, so we don't need to wait for any capacity
my_actor_ref.tell(ForceErr).send_sync()?;

// Actor should be stopped, so we cannot send more messages to it
assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err());
assert!(my_actor_ref.ask(Inc { amount: 2 }).await.is_err());

Ok(())
}
11 changes: 3 additions & 8 deletions examples/macro.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::fmt;

use kameo::{
messages,
request::{MessageSend, MessageSendSync},
Actor,
};
use kameo::{messages, request::MessageSendSync, Actor};
use tracing::info;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -54,7 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let my_actor_ref = kameo::spawn(MyActor::new());

// Increment the count by 3
let count = my_actor_ref.ask(Inc { amount: 3 }).send().await?;
let count = my_actor_ref.ask(Inc { amount: 3 }).await?;
info!("Count is {count}");

// Increment the count by 50 in the background
Expand All @@ -65,14 +61,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.ask(Print {
msg: "Generics work!",
})
.send()
.await?;

// Async messages that return an Err will cause the actor to panic
my_actor_ref.tell(ForceErr).send_sync()?;

// Actor should be stopped, so we cannot send more messages to it
assert!(my_actor_ref.ask(Inc { amount: 2 }).send().await.is_err());
assert!(my_actor_ref.ask(Inc { amount: 2 }).await.is_err());

Ok(())
}
7 changes: 3 additions & 4 deletions examples/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Duration;
use kameo::{
actor::pool::{ActorPool, BroadcastMsg, WorkerMsg},
message::{Context, Message},
request::MessageSend,
Actor,
};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -49,18 +48,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// Print IDs from 0..=4
for _ in 0..5 {
pool.ask(WorkerMsg(PrintActorID)).send().await?;
pool.ask(WorkerMsg(PrintActorID)).await?;
}

// Force all workers to stop, causing them to be restarted
pool.ask(BroadcastMsg(ForceStop)).send().await?;
pool.ask(BroadcastMsg(ForceStop)).await?;
tokio::time::sleep(Duration::from_millis(200)).await;

println!("Restarted all workers");

// New IDs from 6..=10 will be printed
for _ in 0..5 {
pool.ask(WorkerMsg(PrintActorID)).send().await?;
pool.ask(WorkerMsg(PrintActorID)).await?;
}

Ok(())
Expand Down
7 changes: 3 additions & 4 deletions examples/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use kameo::{
actor::pubsub::{PubSub, Publish, Subscribe},
message::{Context, Message},
request::MessageSend,
Actor,
};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -50,9 +49,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pubsub = kameo::spawn(PubSub::<PrintActorID>::new());
let actor_a = kameo::spawn(ActorA);
let actor_b = kameo::spawn(ActorB);
pubsub.ask(Subscribe(actor_a)).send().await?;
pubsub.ask(Subscribe(actor_b)).send().await?;
pubsub.ask(Publish(PrintActorID)).send().await?;
pubsub.ask(Subscribe(actor_a)).await?;
pubsub.ask(Subscribe(actor_b)).await?;
pubsub.ask(Publish(PrintActorID)).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions examples/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use kameo::{
actor::RemoteActorRef,
message::{Context, Message},
remote::{dial_opts::DialOpts, ActorSwarm},
request::MessageSend,
Actor,
};
use kameo_macros::{remote_message, RemoteActor};
Expand Down Expand Up @@ -93,7 +92,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?;
match remote_actor_ref {
Some(remote_actor_ref) => {
let count = remote_actor_ref.ask(&Inc { amount: 10 }).send().await?;
let count = remote_actor_ref.ask(&Inc { amount: 10 }).await?;
remote_actor_ref.tell(&Inc { amount: 10 }).await?;
println!("Incremented! Count is {count}");
}
None => {
Expand Down
4 changes: 2 additions & 2 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use syn::parse_macro_input;
/// }
/// }
///
/// counter_ref.ask(Inc { amount: 5 }).send().await?;
/// counter_ref.ask(Dec { amount: 2 }.clone()).send().await?;
/// counter_ref.ask(Inc { amount: 5 }).await?;
/// counter_ref.ask(Dec { amount: 2 }.clone()).await?;
/// ```
///
/// <details>
Expand Down
15 changes: 5 additions & 10 deletions src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ where
/// use kameo::actor::{Actor, ActorRef};
/// use kameo::error::BoxError;
/// use kameo::mailbox::unbounded::UnboundedMailbox;
/// use kameo::request::MessageSend;
/// use tokio::time::sleep;
///
/// struct MyActor;
Expand Down Expand Up @@ -229,7 +228,6 @@ where
///
/// ```
/// use kameo::actor::ActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor)]
/// # struct MyActor;
Expand All @@ -244,7 +242,7 @@ where
/// # tokio_test::block_on(async {
/// let actor_ref = kameo::spawn(MyActor);
/// # let msg = Msg;
/// let reply = actor_ref.ask(msg).send().await?;
/// let reply = actor_ref.ask(msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down Expand Up @@ -275,7 +273,6 @@ where
///
/// ```
/// use kameo::actor::ActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor)]
/// # struct MyActor;
Expand All @@ -290,7 +287,7 @@ where
/// # tokio_test::block_on(async {
/// let actor_ref = kameo::spawn(MyActor);
/// # let msg = Msg;
/// actor_ref.tell(msg).send().await?;
/// actor_ref.tell(msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down Expand Up @@ -735,7 +732,6 @@ impl<A: Actor> RemoteActorRef<A> {
///
/// ```no_run
/// use kameo::actor::RemoteActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor, kameo::RemoteActor)]
/// # #[actor(mailbox = bounded)]
Expand All @@ -753,7 +749,7 @@ impl<A: Actor> RemoteActorRef<A> {
/// # tokio_test::block_on(async {
/// let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?.unwrap();
/// # let msg = Msg;
/// let reply = remote_actor_ref.ask(&msg).send().await?;
/// let reply = remote_actor_ref.ask(&msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand All @@ -770,7 +766,7 @@ impl<A: Actor> RemoteActorRef<A> {
>
where
A: remote::RemoteActor + Message<M> + remote::RemoteMessage<M>,
M: serde::Serialize,
M: serde::Serialize + Send + 'static,
<A::Reply as Reply>::Ok: for<'de> serde::Deserialize<'de>,
{
AskRequest::new_remote(self, msg)
Expand All @@ -785,7 +781,6 @@ impl<A: Actor> RemoteActorRef<A> {
///
/// ```no_run
/// use kameo::actor::RemoteActorRef;
/// use kameo::request::MessageSend;
///
/// # #[derive(kameo::Actor, kameo::RemoteActor)]
/// # #[actor(mailbox = bounded)]
Expand All @@ -803,7 +798,7 @@ impl<A: Actor> RemoteActorRef<A> {
/// # tokio_test::block_on(async {
/// let remote_actor_ref = RemoteActorRef::<MyActor>::lookup("my_actor").await?.unwrap();
/// # let msg = Msg;
/// remote_actor_ref.tell(&msg).send().await?;
/// remote_actor_ref.tell(&msg).await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// ```
Expand Down
6 changes: 3 additions & 3 deletions src/actor/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
//! use kameo::Actor;
//! use kameo::actor::pool::{ActorPool, WorkerMsg, BroadcastMsg};
//! # use kameo::message::{Context, Message};
//! use kameo::request::MessageSend;
//!
//! #[derive(Actor)]
//! struct MyWorker;
Expand All @@ -35,8 +34,8 @@
//! let pool_actor = kameo::spawn(ActorPool::new(4, || kameo::spawn(MyWorker)));
//!
//! // Send tasks to the pool
//! pool_actor.tell(WorkerMsg("Hello worker!")).send().await?;
//! pool_actor.tell(BroadcastMsg("Hello all workers!")).send().await?;
//! pool_actor.tell(WorkerMsg("Hello worker!")).await?;
//! pool_actor.tell(BroadcastMsg("Hello all workers!")).await?;
//! # Ok::<(), Box<dyn std::error::Error>>(())
//! # });
//! ```
Expand Down Expand Up @@ -212,6 +211,7 @@ where
pub enum WorkerReply<A, M>
where
A: Actor + Message<M>,
M: Send + 'static,
{
/// The message was forwarded to a worker.
Forwarded,
Expand Down
Loading

0 comments on commit fe8ff8d

Please sign in to comment.