From a153df9658199f6381187f72fa9069b096a2bf79 Mon Sep 17 00:00:00 2001 From: StemCll Date: Wed, 24 Jan 2024 22:09:03 +0100 Subject: [PATCH] feat(swarm): allow behaviours to share addresses of peers Resolves: #4302. Pull-Request: #4371. --- Cargo.lock | 9 +- Cargo.toml | 8 +- protocols/autonat/Cargo.toml | 2 +- protocols/autonat/src/behaviour.rs | 1 + protocols/dcutr/tests/lib.rs | 1 + protocols/identify/CHANGELOG.md | 7 + protocols/identify/Cargo.toml | 2 +- protocols/identify/src/behaviour.rs | 72 ++-- protocols/identify/tests/smoke.rs | 74 +++- protocols/request-response/CHANGELOG.md | 5 + protocols/request-response/Cargo.toml | 4 +- protocols/request-response/src/lib.rs | 30 +- .../request-response/tests/peer_address.rs | 60 ++++ swarm-derive/CHANGELOG.md | 5 + swarm-derive/Cargo.toml | 2 +- swarm/CHANGELOG.md | 8 + swarm/Cargo.toml | 3 +- swarm/src/behaviour.rs | 28 ++ swarm/src/behaviour/peer_addresses.rs | 338 ++++++++++++++++++ swarm/src/lib.rs | 27 +- 20 files changed, 611 insertions(+), 75 deletions(-) create mode 100644 protocols/request-response/tests/peer_address.rs create mode 100644 swarm/src/behaviour/peer_addresses.rs diff --git a/Cargo.lock b/Cargo.lock index 9c293080c01..b9d42bc47ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2797,7 +2797,7 @@ dependencies = [ [[package]] name = "libp2p-identify" -version = "0.44.1" +version = "0.44.2" dependencies = [ "async-std", "asynchronous-codec", @@ -3187,7 +3187,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.26.1" +version = "0.26.2" dependencies = [ "anyhow", "async-std", @@ -3252,7 +3252,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.44.1" +version = "0.44.2" dependencies = [ "async-std", "either", @@ -3270,6 +3270,7 @@ dependencies = [ "libp2p-swarm-derive", "libp2p-swarm-test", "libp2p-yamux", + "lru", "multistream-select", "once_cell", "quickcheck-ext", @@ -3285,7 +3286,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" -version = "0.34.2" +version = "0.34.3" dependencies = [ "heck", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 43d3834b974..4afc0d34c6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ libp2p-dcutr = { version = "0.11.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.41.1", path = "transports/dns" } libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.46.1", path = "protocols/gossipsub" } -libp2p-identify = { version = "0.44.1", path = "protocols/identify" } +libp2p-identify = { version = "0.44.2", path = "protocols/identify" } libp2p-identity = { version = "0.2.8" } libp2p-kad = { version = "0.46.0", path = "protocols/kad" } libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" } @@ -99,11 +99,11 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" } libp2p-quic = { version = "0.10.2", path = "transports/quic" } libp2p-relay = { version = "0.17.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" } -libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" } +libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" } libp2p-server = { version = "0.12.5", path = "misc/server" } libp2p-stream = { version = "0.1.0-alpha", path = "protocols/stream" } -libp2p-swarm = { version = "0.44.1", path = "swarm" } -libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. +libp2p-swarm = { version = "0.44.2", path = "swarm" } +libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" } libp2p-tcp = { version = "0.41.0", path = "transports/tcp" } libp2p-tls = { version = "0.3.0", path = "transports/tls" } diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index a1ecae7ccab..fce64ad0c12 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -3,8 +3,8 @@ name = "libp2p-autonat" edition = "2021" rust-version = { workspace = true } description = "NAT and firewall detection for libp2p" -version = "0.12.0" authors = ["David Craven ", "Elena Frank "] +version = "0.12.0" license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" keywords = ["peer-to-peer", "libp2p", "networking"] diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index e95163ab23f..a770e61e88a 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -268,6 +268,7 @@ impl Behaviour { pub fn add_server(&mut self, peer: PeerId, address: Option) { self.servers.insert(peer); if let Some(addr) = address { + #[allow(deprecated)] self.inner.add_address(&peer, addr); } } diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 9e1f0591e6d..084ee744145 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -203,6 +203,7 @@ async fn wait_for_reservation( SwarmEvent::ExternalAddrConfirmed { address } if !is_renewal => { assert_eq!(address, client_addr); } + SwarmEvent::NewExternalAddrOfPeer { .. } => {} e => panic!("{e:?}"), } } diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 22c74b28cae..83984448d07 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.44.2 + +- Emit `ToSwarm::NewExternalAddrOfPeer` for all external addresses of remote peers. + For this work, the address cache must be enabled via `identify::Config::with_cache_size`. + The default is 0, i.e. disabled. + See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371). + ## 0.44.1 - Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 61ef9c47044..2fb51d87627 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-identify" edition = "2021" rust-version = { workspace = true } description = "Nodes identifcation protocol for libp2p" -version = "0.44.1" +version = "0.44.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index e4da898f44c..43bddb52fe7 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -26,15 +26,14 @@ use libp2p_identity::PublicKey; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, - NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm, + NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm, }; use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; -use lru::LruCache; + use std::collections::hash_map::Entry; use std::num::NonZeroUsize; use std::{ collections::{HashMap, HashSet, VecDeque}, - iter::FromIterator, task::Context, task::Poll, time::Duration, @@ -200,9 +199,9 @@ impl Behaviour { .or_default() .insert(conn, addr); - if let Some(entry) = self.discovered_peers.get_mut(&peer_id) { + if let Some(cache) = self.discovered_peers.0.as_mut() { for addr in failed_addresses { - entry.remove(addr); + cache.remove(&peer_id, addr); } } } @@ -268,13 +267,23 @@ impl NetworkBehaviour for Behaviour { info.listen_addrs .retain(|addr| multiaddr_matches_peer_id(addr, &peer_id)); - // Replace existing addresses to prevent other peer from filling up our memory. - self.discovered_peers - .put(peer_id, info.listen_addrs.iter().cloned()); - let observed = info.observed_addr.clone(); self.events - .push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info })); + .push_back(ToSwarm::GenerateEvent(Event::Received { + peer_id, + info: info.clone(), + })); + + if let Some(ref mut discovered_peers) = self.discovered_peers.0 { + for address in &info.listen_addrs { + if discovered_peers.add(peer_id, address.clone()) { + self.events.push_back(ToSwarm::NewExternalAddrOfPeer { + peer_id, + address: address.clone(), + }); + } + } + } match self.our_observed_addresses.entry(id) { Entry::Vacant(not_yet_observed) => { @@ -387,11 +396,11 @@ impl NetworkBehaviour for Behaviour { self.our_observed_addresses.remove(&connection_id); } FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { - if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) { - if let DialError::Transport(errors) = error { - for (addr, _error) in errors { - entry.remove(addr); - } + if let (Some(peer_id), Some(cache), DialError::Transport(errors)) = + (peer_id, self.discovered_peers.0.as_mut(), error) + { + for (addr, _error) in errors { + cache.remove(&peer_id, addr); } } } @@ -445,7 +454,7 @@ fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { true } -struct PeerCache(Option>>); +struct PeerCache(Option); impl PeerCache { fn disabled() -> Self { @@ -453,34 +462,15 @@ impl PeerCache { } fn enabled(size: NonZeroUsize) -> Self { - Self(Some(LruCache::new(size))) - } - - fn get_mut(&mut self, peer: &PeerId) -> Option<&mut HashSet> { - self.0.as_mut()?.get_mut(peer) - } - - fn put(&mut self, peer: PeerId, addresses: impl Iterator) { - let cache = match self.0.as_mut() { - None => return, - Some(cache) => cache, - }; - - let addresses = addresses.filter_map(|a| a.with_p2p(peer).ok()); - cache.put(peer, HashSet::from_iter(addresses)); + Self(Some(PeerAddresses::new(size))) } fn get(&mut self, peer: &PeerId) -> Vec { - let cache = match self.0.as_mut() { - None => return Vec::new(), - Some(cache) => cache, - }; - - cache - .get(peer) - .cloned() - .map(Vec::from_iter) - .unwrap_or_default() + if let Some(cache) = self.0.as_mut() { + cache.get(peer).collect() + } else { + Vec::new() + } } } diff --git a/protocols/identify/tests/smoke.rs b/protocols/identify/tests/smoke.rs index 5cccc09d863..dd92d10979a 100644 --- a/protocols/identify/tests/smoke.rs +++ b/protocols/identify/tests/smoke.rs @@ -110,7 +110,7 @@ async fn only_emits_address_candidate_once_per_connection() { async_std::task::spawn(swarm2.loop_on_next()); let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx)) - .take(5) + .take(8) .collect::>() .await; @@ -156,6 +156,78 @@ async fn only_emits_address_candidate_once_per_connection() { ); } +#[async_std::test] +async fn emits_unique_listen_addresses() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let mut swarm1 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("a".to_string(), identity.public()) + .with_agent_version("b".to_string()) + .with_interval(Duration::from_secs(1)) + .with_cache_size(10), + ) + }); + let mut swarm2 = Swarm::new_ephemeral(|identity| { + identify::Behaviour::new( + identify::Config::new("c".to_string(), identity.public()) + .with_agent_version("d".to_string()), + ) + }); + + let (swarm2_mem_listen_addr, swarm2_tcp_listen_addr) = + swarm2.listen().with_memory_addr_external().await; + let swarm2_peer_id = *swarm2.local_peer_id(); + swarm1.connect(&mut swarm2).await; + + async_std::task::spawn(swarm2.loop_on_next()); + + let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx)) + .take(8) + .collect::>() + .await; + + let infos = swarm_events + .iter() + .filter_map(|e| match e { + SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()), + _ => None, + }) + .collect::>(); + + assert!( + infos.len() > 1, + "should exchange identify payload more than once" + ); + + let listen_addrs = infos + .iter() + .map(|i| i.listen_addrs.clone()) + .collect::>(); + + for addrs in listen_addrs { + assert_eq!(addrs.len(), 2); + assert!(addrs.contains(&swarm2_mem_listen_addr)); + assert!(addrs.contains(&swarm2_tcp_listen_addr)); + } + + let reported_addrs = swarm_events + .iter() + .filter_map(|e| match e { + SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { + Some((*peer_id, address.clone())) + } + _ => None, + }) + .collect::>(); + + assert_eq!(reported_addrs.len(), 2, "To have two addresses of remote"); + assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_mem_listen_addr))); + assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_tcp_listen_addr))); +} + #[async_std::test] async fn identify_push() { let _ = tracing_subscriber::fmt() diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index d53ff479ee2..92417508786 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.26.2 + +- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`. + See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371). + ## 0.26.1 - Derive `PartialOrd` and `Ord` for `{Out,In}boundRequestId`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index c2c8d6d3085..198e9fe401e 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = { workspace = true } description = "Generic Request/Response Protocols" -version = "0.26.1" +version = "0.26.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -40,7 +40,7 @@ libp2p-yamux = { workspace = true } rand = "0.8" libp2p-swarm-test = { path = "../../swarm-test" } futures_ringbuf = "0.4.0" -serde = { version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Passing arguments to the docsrs builder in order to properly document cfg's. diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index fc68bd6cf1f..4362b3255ad 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -84,8 +84,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, DialFailure, FromSwarm}, dial_opts::DialOpts, - ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NotifyHandler, + PeerAddresses, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use smallvec::SmallVec; use std::{ @@ -357,7 +357,7 @@ where /// reachable addresses, if any. connected: HashMap>, /// Externally managed addresses via `add_address` and `remove_address`. - addresses: HashMap>, + addresses: PeerAddresses, /// Requests that have not yet been sent and are waiting for a connection /// to be established. pending_outbound_requests: HashMap; 10]>>, @@ -406,7 +406,7 @@ where pending_events: VecDeque::new(), connected: HashMap::new(), pending_outbound_requests: HashMap::new(), - addresses: HashMap::new(), + addresses: PeerAddresses::default(), } } @@ -470,20 +470,15 @@ where /// /// Returns true if the address was added, false otherwise (i.e. if the /// address is already in the list). + #[deprecated(note = "Use `Swarm::add_peer_address` instead.")] pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) -> bool { - self.addresses.entry(*peer).or_default().insert(address) + self.addresses.add(*peer, address) } - /// Removes an address of a peer previously added via `add_address`. + /// Removes an address of a peer previously added via [`Behaviour::add_address`]. + #[deprecated(note = "Will be removed with the next breaking release and won't be replaced.")] pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) { - let mut last = false; - if let Some(addresses) = self.addresses.get_mut(peer) { - addresses.retain(|a| a != address); - last = addresses.is_empty(); - } - if last { - self.addresses.remove(peer); - } + self.addresses.remove(peer, address); } /// Checks whether a peer is currently connected. @@ -764,9 +759,9 @@ where if let Some(connections) = self.connected.get(&peer) { addresses.extend(connections.iter().filter_map(|c| c.remote_address.clone())) } - if let Some(more) = self.addresses.get(&peer) { - addresses.extend(more.iter().cloned()); - } + + let cached_addrs = self.addresses.get(&peer); + addresses.extend(cached_addrs); Ok(addresses) } @@ -797,6 +792,7 @@ where } fn on_swarm_event(&mut self, event: FromSwarm) { + self.addresses.on_swarm_event(&event); match event { FromSwarm::ConnectionEstablished(_) => {} FromSwarm::ConnectionClosed(connection_closed) => { diff --git a/protocols/request-response/tests/peer_address.rs b/protocols/request-response/tests/peer_address.rs new file mode 100644 index 00000000000..2a120931dcd --- /dev/null +++ b/protocols/request-response/tests/peer_address.rs @@ -0,0 +1,60 @@ +use libp2p_core::ConnectedPoint; +use libp2p_request_response as request_response; +use libp2p_request_response::ProtocolSupport; +use libp2p_swarm::{StreamProtocol, Swarm, SwarmEvent}; +use libp2p_swarm_test::SwarmExt; +use serde::{Deserialize, Serialize}; +use std::iter; +use tracing_subscriber::EnvFilter; + +#[async_std::test] +async fn dial_succeeds_after_adding_peers_address() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); + let config = request_response::Config::default(); + + let mut swarm = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols.clone(), config.clone()) + }); + + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols.clone(), config.clone()) + }); + + let peer_id2 = *swarm2.local_peer_id(); + + let (listen_addr, _) = swarm2.listen().with_memory_addr_external().await; + + swarm.add_peer_address(peer_id2, listen_addr.clone()); + + swarm.dial(peer_id2).unwrap(); + + async_std::task::spawn(swarm2.loop_on_next()); + + let (connected_peer_id, connected_address) = swarm + .wait(|event| match event { + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + let address = match endpoint { + ConnectedPoint::Dialer { address, .. } => Some(address), + _ => None, + }; + Some((peer_id, address)) + } + _ => None, + }) + .await; + let expected_address = listen_addr.with_p2p(peer_id2).unwrap(); + + assert_eq!(connected_peer_id, peer_id2); + assert_eq!(expected_address, connected_address.unwrap()); +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct Ping(Vec); +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct Pong(Vec); diff --git a/swarm-derive/CHANGELOG.md b/swarm-derive/CHANGELOG.md index 74f35806eec..55f5e571664 100644 --- a/swarm-derive/CHANGELOG.md +++ b/swarm-derive/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.34.3 + +- Generate code for `libp2p-swarm`'s `FromSwarm::NewExternalAddrOfPeer` enum variant. + See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371). + ## 0.34.2 - Restore support for generic constraints on behaviours combined with `out_event` generated by `NetworkBehaviour` where no where clause is used. diff --git a/swarm-derive/Cargo.toml b/swarm-derive/Cargo.toml index 7740d57feb1..f3c75749602 100644 --- a/swarm-derive/Cargo.toml +++ b/swarm-derive/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm-derive" edition = "2021" rust-version = { workspace = true } description = "Procedural macros of libp2p-swarm" -version = "0.34.2" +version = "0.34.3" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 65dce4b002a..3d8e3981c96 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.44.2 + +- Allow `NetworkBehaviour`s to share addresses of peers. + This is enabled via the new `ToSwarm::NewExternalAddrOfPeer` event. + The address is broadcast to all behaviours via `FromSwarm::NewExternalAddrOfPeer`. + Protocols that want to collect these addresses can use the new `PeerAddresses` utility. + See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371). + ## 0.44.1 - Implement `Clone` & `Copy` for `FromSwarm. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 3a77aede919..085b59f3b51 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = { workspace = true } description = "The libp2p swarm" -version = "0.44.1" +version = "0.44.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -20,6 +20,7 @@ instant = "0.1.12" libp2p-core = { workspace = true } libp2p-identity = { workspace = true } libp2p-swarm-derive = { workspace = true, optional = true } +lru = "0.12.1" multistream-select = { workspace = true } once_cell = "1.19.0" rand = "0.8" diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 4be129a4eea..5070871a4c1 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -21,10 +21,12 @@ mod either; mod external_addresses; mod listen_addresses; +mod peer_addresses; pub mod toggle; pub use external_addresses::ExternalAddresses; pub use listen_addresses::ListenAddresses; +pub use peer_addresses::PeerAddresses; use crate::connection::ConnectionId; use crate::dial_opts::DialOpts; @@ -303,6 +305,9 @@ pub enum ToSwarm { /// Whether to close a specific or all connections to the given peer. connection: CloseConnection, }, + + /// Reports external address of a remote peer to the [`Swarm`](crate::Swarm) and through that to other [`NetworkBehaviour`]s. + NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr }, } impl ToSwarm { @@ -335,6 +340,13 @@ impl ToSwarm { ToSwarm::NewExternalAddrCandidate(addr) => ToSwarm::NewExternalAddrCandidate(addr), ToSwarm::ExternalAddrConfirmed(addr) => ToSwarm::ExternalAddrConfirmed(addr), ToSwarm::ExternalAddrExpired(addr) => ToSwarm::ExternalAddrExpired(addr), + ToSwarm::NewExternalAddrOfPeer { + address: addr, + peer_id, + } => ToSwarm::NewExternalAddrOfPeer { + address: addr, + peer_id, + }, } } } @@ -366,6 +378,13 @@ impl ToSwarm { peer_id, connection, }, + ToSwarm::NewExternalAddrOfPeer { + address: addr, + peer_id, + } => ToSwarm::NewExternalAddrOfPeer { + address: addr, + peer_id, + }, } } } @@ -432,6 +451,8 @@ pub enum FromSwarm<'a> { ExternalAddrConfirmed(ExternalAddrConfirmed<'a>), /// Informs the behaviour that an external address of the local node expired, i.e. is no-longer confirmed. ExternalAddrExpired(ExternalAddrExpired<'a>), + /// Informs the behaviour that we have discovered a new external address for a remote peer. + NewExternalAddrOfPeer(NewExternalAddrOfPeer<'a>), } /// [`FromSwarm`] variant that informs the behaviour about a newly established connection to a peer. @@ -543,3 +564,10 @@ pub struct ExternalAddrConfirmed<'a> { pub struct ExternalAddrExpired<'a> { pub addr: &'a Multiaddr, } + +/// [`FromSwarm`] variant that informs the behaviour that a new external address for a remote peer was detected. +#[derive(Clone, Copy, Debug)] +pub struct NewExternalAddrOfPeer<'a> { + pub peer_id: PeerId, + pub addr: &'a Multiaddr, +} diff --git a/swarm/src/behaviour/peer_addresses.rs b/swarm/src/behaviour/peer_addresses.rs new file mode 100644 index 00000000000..a011867dcdf --- /dev/null +++ b/swarm/src/behaviour/peer_addresses.rs @@ -0,0 +1,338 @@ +use crate::behaviour::FromSwarm; +use crate::{DialError, DialFailure, NewExternalAddrOfPeer}; + +use libp2p_core::Multiaddr; +use libp2p_identity::PeerId; + +use lru::LruCache; + +use std::num::NonZeroUsize; + +/// Struct for tracking peers' external addresses of the [`Swarm`](crate::Swarm). +#[derive(Debug)] +pub struct PeerAddresses(LruCache>); + +impl PeerAddresses { + /// Creates a [`PeerAddresses`] cache with capacity for the given number of peers. + /// + /// For each peer, we will at most store 10 addresses. + pub fn new(number_of_peers: NonZeroUsize) -> Self { + Self(LruCache::new(number_of_peers)) + } + + /// Feed a [`FromSwarm`] event to this struct. + /// + /// Returns whether the event changed peer's known external addresses. + pub fn on_swarm_event(&mut self, event: &FromSwarm) -> bool { + match event { + FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { peer_id, addr }) => { + self.add(*peer_id, (*addr).clone()) + } + FromSwarm::DialFailure(DialFailure { + peer_id: Some(peer_id), + error: DialError::Transport(errors), + .. + }) => { + for (addr, _error) in errors { + self.remove(peer_id, addr); + } + true + } + _ => false, + } + } + + /// Adds address to cache. + /// Appends address to the existing set if peer addresses already exist. + /// Creates a new cache entry for peer_id if no addresses are present. + /// Returns true if the newly added address was not previously in the cache. + /// + pub fn add(&mut self, peer: PeerId, address: Multiaddr) -> bool { + match prepare_addr(&peer, &address) { + Ok(address) => { + if let Some(cached) = self.0.get_mut(&peer) { + cached.put(address, ()).is_none() + } else { + let mut set = LruCache::new(NonZeroUsize::new(10).expect("10 > 0")); + set.put(address, ()); + self.0.put(peer, set); + + true + } + } + Err(_) => false, + } + } + + /// Returns peer's external addresses. + pub fn get(&mut self, peer: &PeerId) -> impl Iterator + '_ { + self.0 + .get(peer) + .into_iter() + .flat_map(|c| c.iter().map(|(m, ())| m)) + .cloned() + } + + /// Removes address from peer addresses cache. + /// Returns true if the address was removed. + pub fn remove(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + match self.0.get_mut(peer) { + Some(addrs) => match prepare_addr(peer, address) { + Ok(address) => addrs.pop(&address).is_some(), + Err(_) => false, + }, + None => false, + } + } +} + +fn prepare_addr(peer: &PeerId, addr: &Multiaddr) -> Result { + addr.clone().with_p2p(*peer) +} + +impl Default for PeerAddresses { + fn default() -> Self { + Self(LruCache::new(NonZeroUsize::new(100).unwrap())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io; + + use crate::{ConnectionId, DialError}; + use libp2p_core::{ + multiaddr::Protocol, + transport::{memory::MemoryTransportError, TransportError}, + }; + + use once_cell::sync::Lazy; + + #[test] + fn new_peer_addr_returns_correct_changed_value() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + + let event = new_external_addr_of_peer1(peer_id); + + let changed = cache.on_swarm_event(&event); + assert!(changed); + + let changed = cache.on_swarm_event(&event); + assert!(!changed); + } + + #[test] + fn new_peer_addr_saves_peer_addrs() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let event = new_external_addr_of_peer1(peer_id); + + let changed = cache.on_swarm_event(&event); + assert!(changed); + + let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap(); + let expected = cache.get(&peer_id).collect::>(); + assert_eq!(expected, vec![addr1]); + + let event = new_external_addr_of_peer2(peer_id); + let changed = cache.on_swarm_event(&event); + + let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap(); + let addr2 = MEMORY_ADDR_2000.clone().with_p2p(peer_id).unwrap(); + + let expected_addrs = cache.get(&peer_id).collect::>(); + assert!(expected_addrs.contains(&addr1)); + assert!(expected_addrs.contains(&addr2)); + + let expected = cache.get(&peer_id).collect::>().len(); + assert_eq!(expected, 2); + + assert!(changed); + } + + #[test] + fn existing_addr_is_not_added_to_cache() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + + let event = new_external_addr_of_peer1(peer_id); + + let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap(); + let changed = cache.on_swarm_event(&event); + let expected = cache.get(&peer_id).collect::>(); + assert!(changed); + assert_eq!(expected, vec![addr1]); + + let addr1 = MEMORY_ADDR_1000.clone().with_p2p(peer_id).unwrap(); + let changed = cache.on_swarm_event(&event); + let expected = cache.get(&peer_id).collect::>(); + assert!(!changed); + assert_eq!(expected, [addr1]); + } + + #[test] + fn addresses_of_peer_are_removed_when_received_dial_failure() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap(); + let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap(); + + cache.add(peer_id, addr.clone()); + cache.add(peer_id, addr2.clone()); + cache.add(peer_id, addr3.clone()); + + let error = DialError::Transport(prepare_errors(vec![addr, addr3])); + + let event = FromSwarm::DialFailure(DialFailure { + peer_id: Some(peer_id), + error: &error, + connection_id: ConnectionId::new_unchecked(8), + }); + + let changed = cache.on_swarm_event(&event); + + assert!(changed); + + let cached = cache.get(&peer_id).collect::>(); + let expected = prepare_expected_addrs(peer_id, [addr2].into_iter()); + + assert_eq!(cached, expected); + } + + #[test] + fn remove_removes_address_if_present() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + cache.add(peer_id, addr.clone()); + + assert!(cache.remove(&peer_id, &addr)); + } + + #[test] + fn remove_returns_false_if_address_not_present() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + assert!(!cache.remove(&peer_id, &addr)); + } + + #[test] + fn remove_returns_false_if_peer_not_present() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + assert!(!cache.remove(&peer_id, &addr)); + } + + #[test] + fn remove_removes_address_provided_in_param() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap(); + let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap(); + + cache.add(peer_id, addr.clone()); + cache.add(peer_id, addr2.clone()); + cache.add(peer_id, addr3.clone()); + + assert!(cache.remove(&peer_id, &addr2)); + + let mut cached = cache.get(&peer_id).collect::>(); + cached.sort(); + + let expected = prepare_expected_addrs(peer_id, [addr, addr3].into_iter()); + + assert_eq!(cached, expected); + } + + #[test] + fn add_adds_new_address_to_cache() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + + assert!(cache.add(peer_id, addr.clone())); + + let mut cached = cache.get(&peer_id).collect::>(); + cached.sort(); + let expected = prepare_expected_addrs(peer_id, [addr].into_iter()); + + assert_eq!(cached, expected); + } + + #[test] + fn add_adds_address_to_cache_to_existing_key() { + let mut cache = PeerAddresses::default(); + let peer_id = PeerId::random(); + let addr: Multiaddr = "/ip4/127.0.0.1/tcp/8080".parse().unwrap(); + let addr2: Multiaddr = "/ip4/127.0.0.1/tcp/8081".parse().unwrap(); + let addr3: Multiaddr = "/ip4/127.0.0.1/tcp/8082".parse().unwrap(); + + assert!(cache.add(peer_id, addr.clone())); + + cache.add(peer_id, addr2.clone()); + cache.add(peer_id, addr3.clone()); + + let expected = prepare_expected_addrs(peer_id, [addr, addr2, addr3].into_iter()); + + let mut cached = cache.get(&peer_id).collect::>(); + cached.sort(); + + assert_eq!(cached, expected); + } + + fn prepare_expected_addrs( + peer_id: PeerId, + addrs: impl Iterator, + ) -> Vec { + let mut addrs = addrs + .filter_map(|a| a.with_p2p(peer_id).ok()) + .collect::>(); + addrs.sort(); + addrs + } + + fn new_external_addr_of_peer1(peer_id: PeerId) -> FromSwarm<'static> { + FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { + peer_id, + addr: &MEMORY_ADDR_1000, + }) + } + + fn new_external_addr_of_peer2(peer_id: PeerId) -> FromSwarm<'static> { + FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { + peer_id, + addr: &MEMORY_ADDR_2000, + }) + } + + fn prepare_errors(addrs: Vec) -> Vec<(Multiaddr, TransportError)> { + let errors: Vec<(Multiaddr, TransportError)> = addrs + .iter() + .map(|addr| { + ( + addr.clone(), + TransportError::Other(io::Error::new( + io::ErrorKind::Other, + MemoryTransportError::Unreachable, + )), + ) + }) + .collect(); + errors + } + + static MEMORY_ADDR_1000: Lazy = + Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(1000))); + static MEMORY_ADDR_2000: Lazy = + Lazy::new(|| Multiaddr::empty().with(Protocol::Memory(2000))); +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0354f39cfdc..2f02e43348d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -84,6 +84,7 @@ pub mod derive_prelude { pub use crate::behaviour::ListenerClosed; pub use crate::behaviour::ListenerError; pub use crate::behaviour::NewExternalAddrCandidate; + pub use crate::behaviour::NewExternalAddrOfPeer; pub use crate::behaviour::NewListenAddr; pub use crate::behaviour::NewListener; pub use crate::connection::ConnectionId; @@ -108,8 +109,8 @@ pub mod derive_prelude { pub use behaviour::{ AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr, ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure, - ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate, NewListenAddr, - NotifyHandler, ToSwarm, + ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate, + NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm, }; pub use connection::pool::ConnectionCounters; pub use connection::{ConnectionError, ConnectionId, SupportedProtocols}; @@ -298,6 +299,8 @@ pub enum SwarmEvent { ExternalAddrConfirmed { address: Multiaddr }, /// An external address of the local node expired, i.e. is no-longer confirmed. ExternalAddrExpired { address: Multiaddr }, + /// We have discovered a new address of a peer. + NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr }, } impl SwarmEvent { @@ -623,6 +626,17 @@ where self.confirmed_external_addr.remove(addr); } + /// Add a new external address of a remote peer. + /// + /// The address is broadcast to all [`NetworkBehaviour`]s via [`FromSwarm::NewExternalAddrOfPeer`]. + pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) { + self.behaviour + .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { + peer_id, + addr: &addr, + })) + } + /// Disconnects a peer by its peer ID, closing all connections to said peer. /// /// Returns `Ok(())` if there was one or more established connections to the peer. @@ -1176,6 +1190,15 @@ where self.pool.disconnect(peer_id); } }, + ToSwarm::NewExternalAddrOfPeer { peer_id, address } => { + self.behaviour + .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { + peer_id, + addr: &address, + })); + self.pending_swarm_events + .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address }); + } } }