Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): introduce AsyncBehaviour #5294

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(kad): introduce AsyncBehaviour
stormshield-frb committed Jul 12, 2024
commit 9823487e93be03177b405795d2915bc3593aaa25
2 changes: 2 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -27,6 +27,8 @@
trigger when the routing table is updated and we have less that `K_VALUE` peers in it,
trigger when a new listen address is discovered and we have no connected peers.
See [PR 5474](https://github.com/libp2p/rust-libp2p/pull/5474).
- Introduce `AsyncBehaviour`, a wrapper of `Behaviour` allowing to easily track Kademlia queries.
See [PR 5294](https://github.com/libp2p/rust-libp2p/pull/5294).

## 0.45.3

356 changes: 356 additions & 0 deletions protocols/kad/src/async_behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
use std::{collections::HashMap, task::Poll};

use futures::{channel::mpsc, StreamExt};
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};

use crate::{
kbucket, store, AddProviderResult, Behaviour, BootstrapResult, Event, GetClosestPeersResult,
GetProvidersResult, GetRecordResult, NoKnownPeers, PutRecordResult, QueryId, QueryResult,
QueryStats, Quorum, Record, RecordKey,
};

/// The results of Kademlia queries (strongly typed) and only
/// those initiated by the user.
pub struct AsyncQueryResult<T> {
pub id: QueryId,
pub result: T,
pub stats: QueryStats,
}
impl<T> AsyncQueryResult<T> {
fn map<Out>(self, f: impl Fn(T) -> Out) -> AsyncQueryResult<Out> {
AsyncQueryResult {
id: self.id,
stats: self.stats,
result: f(self.result),
}
}
}

type UnboundedQueryResultSender<T> = mpsc::UnboundedSender<AsyncQueryResult<T>>;

enum QueryResultSender {
Bootstrap(UnboundedQueryResultSender<BootstrapResult>),
GetClosestPeers(UnboundedQueryResultSender<GetClosestPeersResult>),
GetProviders(UnboundedQueryResultSender<GetProvidersResult>),
StartProviding(UnboundedQueryResultSender<AddProviderResult>),
GetRecord(UnboundedQueryResultSender<GetRecordResult>),
PutRecord(UnboundedQueryResultSender<PutRecordResult>),
}

/// A handle to receive [`AsyncQueryResult`].
#[must_use = "Streams do nothing unless polled."]
pub struct AsyncQueryResultStream<T>(mpsc::UnboundedReceiver<AsyncQueryResult<T>>);
impl<T> futures::Stream for AsyncQueryResultStream<T> {
type Item = AsyncQueryResult<T>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

/// A wrapper of [`Behaviour`] allowing to easily track
/// [`QueryResult`] of user initiated Kademlia queries.
///
/// For each queries like [`Behaviour::bootstrap`], [`Behaviour::get_closest_peers`], etc
/// a corresponding method ([`AsyncBehaviour::bootstrap_async`], [`AsyncBehaviour::get_closest_peers_async`])
/// is available, allowing the developer to be notified from a typed [`AsyncQueryResultStream`]
/// instead from the normal [`Event::OutboundQueryProgressed`] event.
///
/// If a [`QueryResult`] has no corresponding [`AsyncQueryResultStream`] or
/// if the corresponding one has been dropped, it will simply be forwarded to the `Swarm`
/// with an [`Event::OutboundQueryProgressed`] like nothing happen.
///
/// For more information, see [`Behaviour`].
pub struct AsyncBehaviour<TStore> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of following the design introduced with stream, i.e. introducing a clonable Control
handle which implements the methods, that way we don't need to keep a reference to the Swarm to call the methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would probably be great !!

I'm thinking about it but I really don't see how I could implement it since I need to capture the events emitted by the kad::Behaviour. Do you have an idea ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs , I've thought a little bit about this. The only way I see to make a Control work for the kad::Behaviour is to have the entire Behaviour behind an Arc<Mutex<>>. I don't know if it is a good or bad idea.

Will the "control" semantic become something standard for all the behaviours ? If so why not. But if not, I don't know how it will be understood by the end user to have some behaviours using a control and others not.

I'd really like to also have your opinion on this @guillaumemichel when you have the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am discovering how Control works and I think it would make sense in this case. What would be the arguments against using it?

Copy link
Contributor

@elenaf9 elenaf9 Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs , I've thought a little bit about this. The only way I see to make a Control work for the kad::Behaviour is to have the entire Behaviour behind an Arc<Mutex<>>. I don't know if it is a good or bad idea.

I don't think you need an Arc/ Mutex.
You could split the current AsyncBehavior logic into two parts: the Control part and the NetworkBehavior part, that communicate through an additional mpsc channel (the "command-channel").

  • Control would have all the async variants of the kademlia behavior (get_closest_peers_async, get_providers_async, etc.).
    There, it would create the mpsc channel for the results, and then send the a tuple (Command::GetProviders {..}, mpsc::Sender<AsyncQueryResult>) through the "command-channel" to the AsyncBehavior.
  • AsyncBehavior would wrap the kademlia behavior and impl NetworkBehavior. In its poll_next loop it would poll the receiving side of the command-channel, and handle the incoming command by calling the matching function on the wrapped kademlia behavior and storing the Sender in the query_result_senders hashmap. poll_next would also still have the already existing logic for intercepting ToSwarm::GenerateEvent events from the inner behavior and forwarding the results.

Does that make sense? @jxs was that roughly what you had in mind?

inner: Behaviour<TStore>,
query_result_senders: HashMap<QueryId, QueryResultSender>,
}

impl<TStore> std::ops::Deref for AsyncBehaviour<TStore> {
type Target = Behaviour<TStore>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<TStore> std::ops::DerefMut for AsyncBehaviour<TStore> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl<TStore> AsyncBehaviour<TStore>
where
TStore: store::RecordStore + Send + 'static,
{
pub fn new(inner: Behaviour<TStore>) -> Self {
Self {
inner,
query_result_senders: Default::default(),
}
}

fn handle_inner_event(&mut self, event: Event) -> Option<<Self as NetworkBehaviour>::ToSwarm> {
match event {
Event::OutboundQueryProgressed {
id,
result,
stats,
step,
} => {
fn do_send<T>(
sender: &UnboundedQueryResultSender<T>,
id: QueryId,
result: T,
stats: QueryStats,
) -> Option<AsyncQueryResult<T>> {
match sender.unbounded_send(AsyncQueryResult { id, result, stats }) {
Ok(_) => {
// The event has been successfully sent into the channel so there is no
// need to forward it backup to the swarm.
None
}
Err(err) => {
// The receiver is closed. This is probably normal (the user got what he needed and dropped the receiver).
// So we don't log anything but we still forward this event back up to the swarm.
Some(err.into_inner())
}
}
}

let Some(sender) = self.query_result_senders.get(&id) else {
// This query was either not triggered by the user or the receiver has been dropped and removed
// so we simply forward it back up to the swarm like nothing happened.
Comment on lines +107 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query could also have been triggered through the non-async methods of the inner kad Behavior that we deref to, right?

return Some(Event::OutboundQueryProgressed {
id,
result,
stats,
step,
});
};
let event = match (result, sender) {
(QueryResult::Bootstrap(result), QueryResultSender::Bootstrap(sender)) => {
do_send(sender, id, result, stats).map(|qr| qr.map(QueryResult::Bootstrap))
}
(
QueryResult::GetClosestPeers(result),
QueryResultSender::GetClosestPeers(sender),
) => do_send(sender, id, result, stats)
.map(|qr| qr.map(QueryResult::GetClosestPeers)),
(
QueryResult::GetProviders(result),
QueryResultSender::GetProviders(sender),
) => do_send(sender, id, result, stats)
.map(|qr| qr.map(QueryResult::GetProviders)),
(
QueryResult::StartProviding(result),
QueryResultSender::StartProviding(sender),
) => do_send(sender, id, result, stats)
.map(|qr| qr.map(QueryResult::StartProviding)),
(QueryResult::GetRecord(result), QueryResultSender::GetRecord(sender)) => {
do_send(sender, id, result, stats).map(|qr| qr.map(QueryResult::GetRecord))
}
(QueryResult::PutRecord(result), QueryResultSender::PutRecord(sender)) => {
do_send(sender, id, result, stats).map(|qr| qr.map(QueryResult::PutRecord))
}
(result, _) => {
unreachable!("Wrong sender type for query {id} : unable to send {result:?}")
}
};

if let Some(AsyncQueryResult { id, result, stats }) = event {
// The receiver was closed so we were unable to send the result.
// We remove the sender and forward the event back up to the swarm
self.query_result_senders.remove(&id);
return Some(Event::OutboundQueryProgressed {
id,
result,
stats,
step,
});
}

if step.last {
// This was the last query_result and we just send it successfully.
// We remove the sender. Dropping it will close the channel and
// the receiver will be notified.
self.query_result_senders.remove(&id);
}

None
}
event => Some(event),
}
}

fn add_query<T>(
&mut self,
query_id: QueryId,
f: impl Fn(UnboundedQueryResultSender<T>) -> QueryResultSender,
) -> AsyncQueryResultStream<T> {
let (tx, rx) = mpsc::unbounded();
self.query_result_senders.insert(query_id, f(tx));
AsyncQueryResultStream(rx)
}

/// Start a Bootstrap query and capture its results in a typed [`AsyncQueryResultStream`].
///
/// For more information, see [`Behaviour::bootstrap`].
pub fn bootstrap_async(
&mut self,
) -> Result<AsyncQueryResultStream<BootstrapResult>, NoKnownPeers> {
let query_id = self.inner.bootstrap()?;
Ok(self.add_query(query_id, QueryResultSender::Bootstrap))
}

/// Start a GetClosestPeers query and capture its results in a typed [`AsyncQueryResultStream`].
///
/// For more information, see [`Behaviour::get_closest_peers`].
pub fn get_closest_peers_async<K>(
&mut self,
key: K,
) -> AsyncQueryResultStream<GetClosestPeersResult>
where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{
let query_id = self.inner.get_closest_peers(key);
self.add_query(query_id, QueryResultSender::GetClosestPeers)
}

/// Start a GetProviders query and capture its results in a typed [`AsyncQueryResultStream`].
///
/// For more information, see [`Behaviour::get_providers`].
pub fn get_providers_async(
&mut self,
key: RecordKey,
) -> AsyncQueryResultStream<GetProvidersResult> {
let query_id = self.inner.get_providers(key);
self.add_query(query_id, QueryResultSender::GetProviders)
}

/// Start a StartProviding query and capture its results in a typed [`AsyncQueryResultStream`].
///
/// For more information, see [`Behaviour::start_providing`].
pub fn start_providing_async(
&mut self,
key: RecordKey,
) -> Result<AsyncQueryResultStream<AddProviderResult>, store::Error> {
let query_id = self.inner.start_providing(key)?;
Ok(self.add_query(query_id, QueryResultSender::StartProviding))
}

/// Start a GetRecord query and capture its results in a typed [`AsyncQueryResultStream`].
///
/// For more information, see [`Behaviour::get_record`].
pub fn get_record_async(&mut self, key: RecordKey) -> AsyncQueryResultStream<GetRecordResult> {
let query_id = self.inner.get_record(key);
self.add_query(query_id, QueryResultSender::GetRecord)
}

/// Start a PutRecord query and capture its results in a typed [`AsyncQueryResultStream`].
///
/// For more information, see [`Behaviour::put_record`].
pub fn put_record_async(
&mut self,
record: Record,
quorum: Quorum,
) -> Result<AsyncQueryResultStream<PutRecordResult>, store::Error> {
let query_id = self.inner.put_record(record, quorum)?;
Ok(self.add_query(query_id, QueryResultSender::PutRecord))
}
}

impl<TStore> NetworkBehaviour for AsyncBehaviour<TStore>
where
TStore: store::RecordStore + Send + 'static,
{
type ConnectionHandler = <Behaviour<TStore> as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = <Behaviour<TStore> as NetworkBehaviour>::ToSwarm;

fn handle_pending_inbound_connection(
&mut self,
connection_id: ConnectionId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<(), ConnectionDenied> {
self.inner
.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
}

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}

fn handle_pending_outbound_connection(
&mut self,
connection_id: ConnectionId,
maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: Endpoint,
) -> Result<Vec<Multiaddr>, ConnectionDenied> {
self.inner.handle_pending_outbound_connection(
connection_id,
maybe_peer,
addresses,
effective_role,
)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}

fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
self.inner.on_swarm_event(event);
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.inner
.on_connection_handler_event(peer_id, connection_id, event);
}

fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
while let Poll::Ready(event) = self.inner.poll(cx) {
if let Some(event) = event.map_out_opt(|e| self.handle_inner_event(e)) {
return Poll::Ready(event);
}
}

Poll::Pending
}
}
2 changes: 2 additions & 0 deletions protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod addresses;
mod async_behaviour;
mod behaviour;
mod bootstrap;
mod handler;
@@ -55,6 +56,7 @@ mod proto {
}

pub use addresses::Addresses;
pub use async_behaviour::{AsyncBehaviour, AsyncQueryResult, AsyncQueryResultStream};
pub use behaviour::{
AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult,
BootstrapError, BootstrapOk, BootstrapResult, GetClosestPeersError, GetClosestPeersOk,
2 changes: 2 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
The address is broadcast to all behaviours via `FromSwarm::NewExternalAddrOfPeer`.
Protocols that want to collect these addresses can use the new `PeerAddresses` utility.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).
- Add utility function `map_out_opt` on `ToSwarm`.
See [PR 5294](https://github.com/libp2p/rust-libp2p/pull/5294).

## 0.44.1

37 changes: 37 additions & 0 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -393,6 +393,43 @@ impl<TOutEvent, THandlerIn> ToSwarm<TOutEvent, THandlerIn> {
},
}
}

/// Map the event the swarm will return to an optional one.
pub fn map_out_opt<E>(
self,
f: impl FnOnce(TOutEvent) -> Option<E>,
) -> Option<ToSwarm<E, THandlerIn>> {
match self {
ToSwarm::GenerateEvent(e) => f(e).map(ToSwarm::GenerateEvent),
ToSwarm::Dial { opts } => Some(ToSwarm::Dial { opts }),
ToSwarm::ListenOn { opts } => Some(ToSwarm::ListenOn { opts }),
ToSwarm::RemoveListener { id } => Some(ToSwarm::RemoveListener { id }),
ToSwarm::NotifyHandler {
peer_id,
handler,
event,
} => Some(ToSwarm::NotifyHandler {
peer_id,
handler,
event,
}),
ToSwarm::NewExternalAddrCandidate(addr) => {
Some(ToSwarm::NewExternalAddrCandidate(addr))
}
ToSwarm::ExternalAddrConfirmed(addr) => Some(ToSwarm::ExternalAddrConfirmed(addr)),
ToSwarm::ExternalAddrExpired(addr) => Some(ToSwarm::ExternalAddrExpired(addr)),
ToSwarm::CloseConnection {
peer_id,
connection,
} => Some(ToSwarm::CloseConnection {
peer_id,
connection,
}),
ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
Some(ToSwarm::NewExternalAddrOfPeer { peer_id, address })
}
}
}
}

/// The options w.r.t. which connection handler to notify of an event.