From c1cfdc34601c4683b4fbca2c9f88dd18089290a6 Mon Sep 17 00:00:00 2001 From: Ari Seyhun Date: Mon, 20 Jan 2025 23:17:24 +0800 Subject: [PATCH] refactor!: remove internment dependency and simplify peer ID handling --- Cargo.toml | 3 +- src/actor/actor_ref.rs | 10 ++---- src/actor/id.rs | 72 +++++++++++++++++++----------------------- src/actor/spawn.rs | 4 --- src/remote/swarm.rs | 24 +++++--------- src/request/ask.rs | 9 +++--- src/request/tell.rs | 7 ++-- 7 files changed, 50 insertions(+), 79 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c9407c2..9b473ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ keywords = ["actor", "tokio"] [features] default = ["macros", "tracing"] macros = ["dep:kameo_macros"] -remote = ["dep:internment", "dep:libp2p", "dep:libp2p-identity", "dep:linkme", "dep:rmp-serde"] +remote = ["dep:libp2p", "dep:libp2p-identity", "dep:linkme", "dep:rmp-serde"] tracing = ["dep:tracing", "tokio/tracing"] [dependencies] @@ -32,7 +32,6 @@ kameo_macros = { version = "0.14.0", path = "./macros", optional = true } dyn-clone = "1.0" futures = "0.3" -internment = { version = "0.8.5", features = ["serde"], optional = true } libp2p = { version = "0.55.0", features = ["cbor", "dns", "kad", "mdns", "macros", "quic", "request-response", "rsa", "serde", "tokio"], optional = true } libp2p-identity = { version = "0.2.9", features = ["rand", "rsa"], optional = true } linkme = { version= "0.3.28", optional = true } diff --git a/src/actor/actor_ref.rs b/src/actor/actor_ref.rs index ad05231..428dc2b 100644 --- a/src/actor/actor_ref.rs +++ b/src/actor/actor_ref.rs @@ -464,10 +464,7 @@ where ); remote::ActorSwarm::get() .ok_or(RemoteSendError::SwarmNotBootstrapped)? - .link::( - self.id.with_hydrate_peer_id(), - sibbling_ref.id.with_hydrate_peer_id(), - ) + .link::(self.id, sibbling_ref.id) .await } @@ -574,10 +571,7 @@ where self.links.lock().await.remove(&sibbling_ref.id); remote::ActorSwarm::get() .ok_or(RemoteSendError::SwarmNotBootstrapped)? - .unlink::( - self.id.with_hydrate_peer_id(), - sibbling_ref.id.with_hydrate_peer_id(), - ) + .unlink::(self.id, sibbling_ref.id) .await } diff --git a/src/actor/id.rs b/src/actor/id.rs index 5108e26..9a5c4fe 100644 --- a/src/actor/id.rs +++ b/src/actor/id.rs @@ -1,8 +1,6 @@ use std::sync::atomic::Ordering; use std::{fmt, sync::atomic::AtomicU64}; -#[cfg(feature = "remote")] -use internment::Intern; use serde::{Deserialize, Serialize}; use crate::error::ActorIDFromBytesError; @@ -18,9 +16,8 @@ static ACTOR_COUNTER: AtomicU64 = AtomicU64::new(0); #[derive(Clone, Copy, PartialEq, Eq, Hash)] pub struct ActorID { sequence_id: u64, - /// None indicates a local actor; the local peer ID should be used in this case. #[cfg(feature = "remote")] - peer_id: Option>, + peer_id: PeerIdKind, } impl ActorID { @@ -40,7 +37,7 @@ impl ActorID { ActorID { sequence_id, #[cfg(feature = "remote")] - peer_id: ActorSwarm::get().map(|swarm| *swarm.local_peer_id_intern()), + peer_id: PeerIdKind::Local, } } @@ -58,7 +55,7 @@ impl ActorID { pub fn new_with_peer_id(sequence_id: u64, peer_id: libp2p::PeerId) -> Self { ActorID { sequence_id, - peer_id: Some(Intern::new(peer_id)), + peer_id: PeerIdKind::PeerId(peer_id), } } @@ -89,10 +86,10 @@ impl ActorID { /// /// # Returns /// - /// An `Option`. `None` indicates a local actor. + /// An `Option`. `None` is returned if the peer ID is local and no [`ActorSwarm`] has been bootstrapped. #[cfg(feature = "remote")] pub fn peer_id(&self) -> Option<&libp2p::PeerId> { - self.peer_id.as_deref() + self.peer_id.peer_id() } /// Serializes the `ActorID` into a byte vector. @@ -109,6 +106,7 @@ impl ActorID { #[cfg(feature = "remote")] let peer_id_bytes = self .peer_id + .peer_id() .map(|peer_id| peer_id.to_bytes()) .or_else(|| ActorSwarm::get().map(|swarm| swarm.local_peer_id().to_bytes())); #[cfg(feature = "remote")] @@ -139,9 +137,9 @@ impl ActorID { // Extract the peer id #[cfg(feature = "remote")] let peer_id = if bytes.len() > 8 { - Some(Intern::new(libp2p::PeerId::from_bytes(&bytes[8..])?)) + PeerIdKind::PeerId(libp2p::PeerId::from_bytes(&bytes[8..])?) } else { - None + PeerIdKind::Local }; Ok(ActorID { @@ -150,34 +148,6 @@ impl ActorID { peer_id, }) } - - /// Returns the interned `PeerId` associated with this `ActorID`, if any. - /// - /// This method is primarily for internal use. - /// - /// # Returns - /// - /// An `Option<&Intern>`. `None` indicates a local actor. - #[cfg(feature = "remote")] - pub(crate) fn peer_id_intern(&self) -> Option<&Intern> { - self.peer_id.as_ref() - } - - /// Ensures the `ActorID` has an associated `peer_id`. - /// - /// If the `ActorID` doesn't have a `peer_id`, this method associates it with the local `PeerId`. - /// This method is primarily for internal use. - /// - /// # Returns - /// - /// An `ActorID` with a guaranteed `peer_id`. - #[cfg(feature = "remote")] - pub(crate) fn with_hydrate_peer_id(mut self) -> ActorID { - if self.peer_id.is_none() { - self.peer_id = Some(*ActorSwarm::get().unwrap().local_peer_id_intern()); - } - self - } } impl fmt::Display for ActorID { @@ -186,7 +156,7 @@ impl fmt::Display for ActorID { return write!(f, "ActorID({})", self.sequence_id); #[cfg(feature = "remote")] - match self.peer_id { + match self.peer_id.peer_id() { Some(peer_id) => write!(f, "ActorID({}, {peer_id})", self.sequence_id), None => write!(f, "ActorID({}, local)", self.sequence_id), } @@ -196,7 +166,12 @@ impl fmt::Display for ActorID { impl fmt::Debug for ActorID { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[cfg(feature = "remote")] - return write!(f, "ActorID({:?}, {:?})", self.sequence_id, self.peer_id); + return write!( + f, + "ActorID({:?}, {:?})", + self.sequence_id, + self.peer_id.peer_id() + ); #[cfg(not(feature = "remote"))] return write!(f, "ActorID({:?})", self.sequence_id); @@ -228,3 +203,20 @@ impl<'de> Deserialize<'de> for ActorID { }) } } + +#[cfg(feature = "remote")] +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +enum PeerIdKind { + Local, + PeerId(libp2p::PeerId), +} + +#[cfg(feature = "remote")] +impl PeerIdKind { + fn peer_id(&self) -> Option<&libp2p::PeerId> { + match self { + PeerIdKind::Local => ActorSwarm::get().map(ActorSwarm::local_peer_id), + PeerIdKind::PeerId(peer_id) => Some(peer_id), + } + } +} diff --git a/src/actor/spawn.rs b/src/actor/spawn.rs index 5e945d3..b0c572f 100644 --- a/src/actor/spawn.rs +++ b/src/actor/spawn.rs @@ -344,10 +344,6 @@ where let mut actor = state.shutdown().await; let mut link_notificication_futures = FuturesUnordered::new(); - #[cfg(feature = "remote")] - { - id = id.with_hydrate_peer_id(); - } { let mut links = links.lock().await; #[allow(unused_variables)] diff --git a/src/remote/swarm.rs b/src/remote/swarm.rs index e6d9859..a9632b8 100644 --- a/src/remote/swarm.rs +++ b/src/remote/swarm.rs @@ -2,7 +2,6 @@ use core::task; use std::{borrow::Cow, collections::HashMap, io, pin, time::Duration}; use futures::{ready, stream::FuturesUnordered, Future, FutureExt}; -use internment::Intern; use libp2p::{ core::{transport::ListenerId, ConnectedPoint}, identity::Keypair, @@ -67,7 +66,7 @@ static ACTOR_SWARM: OnceCell = OnceCell::new(); #[derive(Clone, Debug)] pub struct ActorSwarm { swarm_tx: SwarmSender, - local_peer_id: Intern, + local_peer_id: PeerId, } impl ActorSwarm { @@ -141,7 +140,7 @@ impl ActorSwarm { pub fn bootstrap_with_swarm( mut swarm: Swarm, ) -> Result<&'static Self, BootstrapError> { - let local_peer_id = Intern::new(*swarm.local_peer_id()); + let local_peer_id = *swarm.local_peer_id(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let swarm_tx = SwarmSender(cmd_tx); @@ -172,7 +171,6 @@ impl ActorSwarm { /// This is for advanced cases and provides full control, returning an `ActorSwarmBehaviour` instance which /// should be used to process the swarm manually. pub fn bootstrap_manual(local_peer_id: PeerId) -> Option<(&'static Self, ActorSwarmHandler)> { - let local_peer_id = Intern::new(local_peer_id); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let swarm_tx = SwarmSender(cmd_tx); @@ -239,10 +237,6 @@ impl ActorSwarm { &self.local_peer_id } - pub(crate) fn local_peer_id_intern(&self) -> &Intern { - &self.local_peer_id - } - /// Dials a peer using the provided dialing options. /// /// This method can be used to connect to a known or unknown peer, specified by the options @@ -362,10 +356,8 @@ impl ActorSwarm { actor_ref: ActorRef, name: String, ) -> impl Future> { - let actor_registration = ActorRegistration::new( - actor_ref.id().with_hydrate_peer_id(), - Cow::Borrowed(A::REMOTE_ID), - ); + let actor_registration = + ActorRegistration::new(actor_ref.id(), Cow::Borrowed(A::REMOTE_ID)); let reply_rx = self .swarm_tx .send_with_reply(|reply| SwarmCommand::Register { @@ -379,7 +371,7 @@ impl ActorSwarm { let signal_mailbox = actor_ref.weak_signal_mailbox(); let links = actor_ref.links.clone(); REMOTE_REGISTRY.lock().await.insert( - actor_ref.id().with_hydrate_peer_id(), + actor_ref.id(), RemoteRegistryActorRef { actor_ref: Box::new(actor_ref), signal_mailbox, @@ -1039,7 +1031,7 @@ pub enum SwarmCommand { /// An actor ask request. Ask { /// Peer ID. - peer_id: Intern, + peer_id: PeerId, /// Actor ID. actor_id: ActorID, /// Actor remote ID. @@ -1060,7 +1052,7 @@ pub enum SwarmCommand { /// An actor tell request. Tell { /// Peer ID. - peer_id: Intern, + peer_id: PeerId, /// Actor ID. actor_id: ActorID, /// Actor remote ID. @@ -1544,7 +1536,7 @@ impl SwarmBehaviour for ActorSwarmBehaviour { sibbling_remote_id: Cow<'static, str>, ) -> OutboundRequestId { self.request_response.send_request( - actor_id.peer_id().unwrap(), + actor_id.peer_id().expect("swarm should be bootstrapped"), SwarmRequest::Link { actor_id, actor_remote_id, diff --git a/src/request/ask.rs b/src/request/ask.rs index a4e17ae..f5b5128 100644 --- a/src/request/ask.rs +++ b/src/request/ask.rs @@ -3,7 +3,7 @@ use std::{future::IntoFuture, marker::PhantomData, time::Duration}; use tokio::{sync::oneshot, time::timeout}; #[cfg(feature = "remote")] -use crate::remote::{ActorSwarm, RemoteActor, RemoteMessage, SwarmCommand, SwarmResponse}; +use crate::remote::{RemoteActor, RemoteMessage, SwarmCommand, SwarmResponse}; use crate::{ actor, @@ -955,10 +955,9 @@ where let actor_id = actor_ref.id(); let (reply_tx, reply_rx) = oneshot::channel(); actor_ref.send_to_swarm(SwarmCommand::Ask { - peer_id: actor_id - .peer_id_intern() - .cloned() - .unwrap_or_else(|| *ActorSwarm::get().unwrap().local_peer_id_intern()), + peer_id: *actor_id + .peer_id() + .expect("actor swarm should be bootstrapped"), actor_id, actor_remote_id: Cow::Borrowed(::REMOTE_ID), message_remote_id: Cow::Borrowed(>::REMOTE_ID), diff --git a/src/request/tell.rs b/src/request/tell.rs index 28b2295..398e5bc 100644 --- a/src/request/tell.rs +++ b/src/request/tell.rs @@ -478,10 +478,9 @@ where let actor_id = actor_ref.id(); let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); actor_ref.send_to_swarm(SwarmCommand::Tell { - peer_id: actor_id - .peer_id_intern() - .cloned() - .unwrap_or_else(|| *ActorSwarm::get().unwrap().local_peer_id_intern()), + peer_id: *actor_id + .peer_id() + .expect("actor swarm should be bootstrapped"), actor_id, actor_remote_id: Cow::Borrowed(::REMOTE_ID), message_remote_id: Cow::Borrowed(>::REMOTE_ID),