Skip to content

Commit

Permalink
fix(dcutr): handle empty holepunch_candidates
Browse files Browse the repository at this point in the history
  • Loading branch information
stormshield-frb committed Aug 30, 2024
1 parent cefd22b commit 0129730
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ libp2p-allow-block-list = { version = "0.4.1", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.13.0", path = "protocols/autonat" }
libp2p-connection-limits = { version = "0.4.0", path = "misc/connection-limits" }
libp2p-core = { version = "0.42.0", path = "core" }
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
libp2p-dcutr = { version = "0.12.1", path = "protocols/dcutr" }
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.12.1

- Handle empty hole-punch candidates at relayed connection startup.
See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX).

## 0.12.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/dcutr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-dcutr"
edition = "2021"
rust-version = { workspace = true }
description = "Direct connection upgrade through relay"
version = "0.12.0"
version = "0.12.1"
authors = ["Max Inden <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
70 changes: 43 additions & 27 deletions protocols/dcutr/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub struct Behaviour {
/// Queue of actions to return when polled.
queued_events: VecDeque<ToSwarm<Event, Either<handler::relayed::Command, Void>>>,

/// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,
/// All relayed connections.
relayed_connections: HashMap<PeerId, HashSet<ConnectionId>>,

address_candidates: Candidates,

Expand All @@ -86,15 +86,15 @@ impl Behaviour {
pub fn new(local_peer_id: PeerId) -> Self {
Behaviour {
queued_events: Default::default(),
direct_connections: Default::default(),
relayed_connections: Default::default(),
address_candidates: Candidates::new(local_peer_id),
direct_to_relayed_connections: Default::default(),
outgoing_direct_connection_attempts: Default::default(),
}
}

fn observed_addresses(&self) -> Vec<Multiaddr> {
self.address_candidates.iter().cloned().collect()
fn observed_addresses(&self) -> LruCache<Multiaddr, ()> {
self.address_candidates.inner.clone()
}

fn on_dial_failure(
Expand Down Expand Up @@ -148,17 +148,17 @@ impl Behaviour {
..
}: ConnectionClosed,
) {
if !connected_point.is_relayed() {
if connected_point.is_relayed() {
let connections = self
.direct_connections
.relayed_connections
.get_mut(&peer_id)
.expect("Peer of direct connection to be tracked.");
.expect("Peer of relayed connection to be tracked.");
connections
.remove(&connection_id)
.then_some(())
.expect("Direct connection to be tracked.");
.expect("Relayed connection to be tracked.");
if connections.is_empty() {
self.direct_connections.remove(&peer_id);
self.relayed_connections.remove(&peer_id);
}
}
}
Expand All @@ -176,6 +176,11 @@ impl NetworkBehaviour for Behaviour {
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
if is_relayed(local_addr) {
self.relayed_connections
.entry(peer)
.or_default()
.insert(connection_id);

let connected_point = ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
Expand All @@ -186,10 +191,6 @@ impl NetworkBehaviour for Behaviour {

return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}
self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

assert!(
!self
Expand All @@ -210,6 +211,11 @@ impl NetworkBehaviour for Behaviour {
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
if is_relayed(addr) {
self.relayed_connections
.entry(peer)
.or_default()
.insert(connection_id);

return Ok(Either::Left(handler::relayed::Handler::new(
ConnectedPoint::Dialer {
address: addr.clone(),
Expand All @@ -220,11 +226,6 @@ impl NetworkBehaviour for Behaviour {
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}

self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

// Whether this is a connection requested by this behaviour.
if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id)
{
Expand Down Expand Up @@ -334,7 +335,21 @@ impl NetworkBehaviour for Behaviour {
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
self.address_candidates.add(addr.clone());
if let Some(address) = self.address_candidates.add(addr.clone()) {
for (peer, connections) in &self.relayed_connections {
for connection in connections {
self.queued_events.push_back(ToSwarm::NotifyHandler {
handler: NotifyHandler::One(*connection),
peer_id: *peer,
event: Either::Left(
handler::relayed::Command::NewExternalAddrCandidate(
address.clone(),
),
),
})
}
}
}
}
_ => {}
}
Expand All @@ -360,20 +375,21 @@ impl Candidates {
}
}

fn add(&mut self, mut address: Multiaddr) {
/// Format the address and push it into the LruCache if it is not relayed.
///
/// Returns the provided address formatted if it was pushed.
/// Returns `None` otherwise.
fn add(&mut self, mut address: Multiaddr) -> Option<Multiaddr> {
if is_relayed(&address) {
return;
return None;
}

if address.iter().last() != Some(Protocol::P2p(self.me)) {
address.push(Protocol::P2p(self.me));
}

self.inner.push(address, ());
}

fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
self.inner.iter().map(|(a, _)| a)
self.inner.push(address.clone(), ());
Some(address)
}
}

Expand Down
114 changes: 84 additions & 30 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ use libp2p_swarm::handler::{
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use lru::LruCache;
use protocol::{inbound, outbound};
use std::collections::VecDeque;
use std::io;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, warn};

#[derive(Debug)]
pub enum Command {
Connect,
NewExternalAddrCandidate(Multiaddr),
}

#[derive(Debug)]
Expand All @@ -67,28 +70,76 @@ pub struct Handler {

// Inbound DCUtR handshakes
inbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, inbound::Error>>,
pending_inbound_stream: Option<Stream>,

// Outbound DCUtR handshake.
outbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, outbound::Error>>,

/// The addresses we will send to the other party for hole-punching attempts.
holepunch_candidates: Vec<Multiaddr>,
holepunch_candidates: LruCache<Multiaddr, ()>,
pending_outbound_stream: Option<Stream>,

attempts: u8,
}

impl Handler {
pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec<Multiaddr>) -> Self {
pub fn new(endpoint: ConnectedPoint, holepunch_candidates: LruCache<Multiaddr, ()>) -> Self {
Self {
endpoint,
queued_events: Default::default(),
inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
pending_inbound_stream: None,
outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
pending_outbound_stream: None,
holepunch_candidates,
attempts: 0,
}
}

fn set_stream(&mut self, stream_type: StreamType, stream: Stream) {
if self.holepunch_candidates.is_empty() {
debug!(
"New {stream_type} connect stream but holepunch_candidates is empty. Buffering it."
);
let has_replaced_old_stream = match stream_type {
StreamType::Inbound => self.pending_inbound_stream.replace(stream).is_some(),
StreamType::Outbound => self.pending_outbound_stream.replace(stream).is_some(),
};
if has_replaced_old_stream {
warn!("New {stream_type} connect stream while still buffering previous one. Replacing previous with new.");
}
} else {
let holepunch_candidates = self
.holepunch_candidates
.iter()
.map(|(a, _)| a)
.cloned()
.collect();

let has_replaced_old_stream = match stream_type {
StreamType::Inbound => {
// TODO: when upstreaming this fix, ask libp2p about merging the `attempts` incrementation.
self.attempts += 1;

// FIXME: actually does not replace !!
self.inbound_stream
.try_push(inbound::handshake(stream, holepunch_candidates))
.is_err()
}
StreamType::Outbound => {
// FIXME: actually does not replace !!
self.outbound_stream
.try_push(outbound::handshake(stream, holepunch_candidates))
.is_err()
}
};

if has_replaced_old_stream {
warn!("New {stream_type} connect stream while still upgrading previous one. Replacing previous with new.");
}
}
}

fn on_fully_negotiated_inbound(
&mut self,
FullyNegotiatedInbound {
Expand All @@ -99,21 +150,7 @@ impl Handler {
>,
) {
match output {
future::Either::Left(stream) => {
if self
.inbound_stream
.try_push(inbound::handshake(
stream,
self.holepunch_candidates.clone(),
))
.is_err()
{
tracing::warn!(
"New inbound connect stream while still upgrading previous one. Replacing previous with new.",
);
}
self.attempts += 1;
}
future::Either::Left(stream) => self.set_stream(StreamType::Inbound, stream),
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
future::Either::Right(output) => void::unreachable(output),
}
Expand All @@ -132,18 +169,7 @@ impl Handler {
self.endpoint.is_listener(),
"A connection dialer never initiates a connection upgrade."
);
if self
.outbound_stream
.try_push(outbound::handshake(
stream,
self.holepunch_candidates.clone(),
))
.is_err()
{
tracing::warn!(
"New outbound connect stream while still upgrading previous one. Replacing previous with new.",
);
}
self.set_stream(StreamType::Outbound, stream);
}

fn on_listen_upgrade_error(
Expand Down Expand Up @@ -208,8 +234,22 @@ impl ConnectionHandler for Handler {
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()),
});

// TODO: when upstreaming this fix, ask libp2p about merging the `attempts` incrementation.
// Indeed, even if the `queued_event` above will be trigger on stream opening, it might not start
self.attempts += 1;
}
Command::NewExternalAddrCandidate(address) => {
self.holepunch_candidates.push(address, ());

if let Some(stream) = self.pending_inbound_stream.take() {
self.set_stream(StreamType::Inbound, stream);
}

if let Some(stream) = self.pending_outbound_stream.take() {
self.set_stream(StreamType::Outbound, stream);
}
}
}
}

Expand Down Expand Up @@ -308,3 +348,17 @@ impl ConnectionHandler for Handler {
}
}
}

enum StreamType {
Inbound,
Outbound,
}

impl std::fmt::Display for StreamType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Inbound => write!(f, "inbound"),
Self::Outbound => write!(f, "outbound"),
}
}
}

0 comments on commit 0129730

Please sign in to comment.