Skip to content

Commit

Permalink
Use InflightProtocolDataQueue in libp2p-relay
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Nov 14, 2023
1 parent b7243fd commit f4ab209
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 116 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ static_assertions = "1"
thiserror = "1.0"
tracing = "0.1.37"
void = "1"
libp2p-protocol-utils = { workspace = true }

[dev-dependencies]
libp2p-identity = { workspace = true, features = ["rand"] }
Expand Down
219 changes: 103 additions & 116 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ use libp2p_core::multiaddr::Protocol;
use libp2p_core::upgrade::ReadyUpgrade;
use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
};
use libp2p_protocol_utils::InflightProtocolDataQueue;
use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use std::collections::VecDeque;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{fmt, io};
use void::Void;

/// The maximum number of circuits being denied concurrently.
///
Expand Down Expand Up @@ -104,8 +104,12 @@ pub struct Handler {
>,
>,

/// We issue a stream upgrade for each pending request.
pending_requests: VecDeque<PendingRequest>,
/// Manages associated data whilst we wait for outbound streams to be opened.
pending_streams: InflightProtocolDataQueue<
PendingRequest,
StreamProtocol,
Result<Stream, StreamUpgradeError<Void>>,
>,

/// A `RESERVE` request is in-flight for each item in this queue.
active_reserve_requests: VecDeque<mpsc::Sender<transport::ToListenerMsg>>,
Expand Down Expand Up @@ -136,7 +140,7 @@ impl Handler {
remote_peer_id,
remote_addr,
queued_events: Default::default(),
pending_requests: Default::default(),
pending_streams: Default::default(),
active_reserve_requests: Default::default(),
inflight_reserve_requests: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
Expand All @@ -159,57 +163,6 @@ impl Handler {
}
}

fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { error, .. }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
let pending_request = self
.pending_requests
.pop_front()
.expect("got a stream error without a pending request");

match pending_request {
PendingRequest::Reserve { mut to_listener } => {
let error = match error {
StreamUpgradeError::Timeout => {
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::Apply(never) => void::unreachable(never),
StreamUpgradeError::NegotiationFailed => {
outbound_hop::ReserveError::Unsupported
}
StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
};

if let Err(e) =
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
{
tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
}
self.reservation.failed();
}
PendingRequest::Connect {
to_dial: send_back, ..
} => {
let error = match error {
StreamUpgradeError::Timeout => {
outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::NegotiationFailed => {
outbound_hop::ConnectError::Unsupported
}
StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
StreamUpgradeError::Apply(v) => void::unreachable(v),
};

let _ = send_back.send(Err(error));
}
}
}

fn insert_to_deny_futs(&mut self, circuit: inbound_stop::Circuit) {
let src_peer_id = circuit.src_peer_id();

Expand Down Expand Up @@ -241,25 +194,20 @@ impl ConnectionHandler for Handler {
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
In::Reserve { to_listener } => {
self.pending_requests
.push_back(PendingRequest::Reserve { to_listener });
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
});
self.pending_streams
.enqueue_request(HOP_PROTOCOL_NAME, PendingRequest::Reserve { to_listener });
}
In::EstablishCircuit {
to_dial: send_back,
dst_peer_id,
} => {
self.pending_requests.push_back(PendingRequest::Connect {
dst_peer_id,
to_dial: send_back,
});
self.queued_events
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
});
self.pending_streams.enqueue_request(
HOP_PROTOCOL_NAME,
PendingRequest::Connect {
dst_peer_id,
to_dial: send_back,
},
);
}
}
}
Expand Down Expand Up @@ -442,12 +390,9 @@ impl ConnectionHandler for Handler {
}

if let Poll::Ready(Some(to_listener)) = self.reservation.poll(cx) {
self.pending_requests
.push_back(PendingRequest::Reserve { to_listener });

return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(HOP_PROTOCOL_NAME), ()),
});
self.pending_streams
.enqueue_request(HOP_PROTOCOL_NAME, PendingRequest::Reserve { to_listener });
continue;
}

// Deny incoming circuit requests.
Expand All @@ -464,6 +409,81 @@ impl ConnectionHandler for Handler {
Poll::Pending => {}
}

if let Some(protocol) = self.pending_streams.next_request() {
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(protocol), ()),
});
}

match self.pending_streams.next_completed() {
Some((Ok(stream), PendingRequest::Reserve { to_listener })) => {
self.active_reserve_requests.push_back(to_listener);
if self
.inflight_reserve_requests
.try_push(outbound_hop::make_reservation(stream))
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
continue;
}
Some((Err(error), PendingRequest::Reserve { mut to_listener })) => {
let error = match error {
StreamUpgradeError::Timeout => {
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::Apply(never) => void::unreachable(never),
StreamUpgradeError::NegotiationFailed => {
outbound_hop::ReserveError::Unsupported
}
StreamUpgradeError::Io(e) => outbound_hop::ReserveError::Io(e),
};

if let Err(e) =
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
{
tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
}
self.reservation.failed();
continue;
}
Some((
Ok(stream),
PendingRequest::Connect {
to_dial,
dst_peer_id,
},
)) => {
self.active_connect_requests.push_back(to_dial);

if self
.inflight_outbound_connect_requests
.try_push(outbound_hop::open_circuit(stream, dst_peer_id))
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
continue;
}

Some((Err(error), PendingRequest::Connect { to_dial, .. })) => {
let error = match error {
StreamUpgradeError::Timeout => {
outbound_hop::ConnectError::Io(io::ErrorKind::TimedOut.into())
}
StreamUpgradeError::NegotiationFailed => {
outbound_hop::ConnectError::Unsupported
}
StreamUpgradeError::Io(e) => outbound_hop::ConnectError::Io(e),
StreamUpgradeError::Apply(v) => void::unreachable(v),
};

let _ = to_dial.send(Err(error));
continue;
}
None => {}
}

return Poll::Pending;
}
}
Expand All @@ -490,45 +510,12 @@ impl ConnectionHandler for Handler {
tracing::warn!("Dropping inbound stream because we are at capacity")
}
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: stream,
..
}) => {
let pending_request = self.pending_requests.pop_front().expect(
"opened a stream without a pending connection command or a reserve listener",
);
match pending_request {
PendingRequest::Reserve { to_listener } => {
self.active_reserve_requests.push_back(to_listener);
if self
.inflight_reserve_requests
.try_push(outbound_hop::make_reservation(stream))
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
}
PendingRequest::Connect {
dst_peer_id,
to_dial: send_back,
} => {
self.active_connect_requests.push_back(send_back);

if self
.inflight_outbound_connect_requests
.try_push(outbound_hop::open_circuit(stream, dst_peer_id))
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
}
}
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
void::unreachable(listen_upgrade_error.error)
ConnectionEvent::FullyNegotiatedOutbound(ev) => {
self.pending_streams.submit_response(Ok(ev.protocol));
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
ConnectionEvent::ListenUpgradeError(ev) => void::unreachable(ev.error),
ConnectionEvent::DialUpgradeError(ev) => {
self.pending_streams.submit_response(Err(ev.error));
}
_ => {}
}
Expand Down

0 comments on commit f4ab209

Please sign in to comment.