Skip to content

Commit

Permalink
fix(relay): correctly track associated state of inflight streams
Browse files Browse the repository at this point in the history
In the relay client, we need to track a fair bit of data _alongside_ `Future`s that are executing within a `FuturesSet`. In particular, we need to track the channels that connect us the `Transport`. Using `?` within the actual `async` block of the protocol is crucial for ergonomics but essentially locks us out of passing the channel _into_ the `Future` itself. Hence, we need to track it outside.

As we can see from #4822, doing so in a separate list is error prone.

We solve this by introducing the `FuturesTupleSet` type to `futures-bounded`. This is a variation of `FuturesSet` that carries a piece of arbitrary data alongside the `Future` that is executing and returns it back upon completion.

Using this within the relay code reveals another bug where we mistakenly confused a timed out `CONNECT` request for a timed out `RESERVE` request.

Fixes: #4822.

Pull-Request: #4841.
  • Loading branch information
thomaseizinger authored Nov 15, 2023
1 parent e3db5a5 commit 41da782
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ rust-version = "1.73.0"

[workspace.dependencies]
asynchronous-codec = { version = "0.7.0" }
futures-bounded = { version = "0.2.2", path = "misc/futures-bounded" }
futures-bounded = { version = "0.2.3", path = "misc/futures-bounded" }
libp2p = { version = "0.53.0", path = "libp2p" }
libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" }
Expand Down
5 changes: 5 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.3 - unreleased

- Introduce `FuturesTupleSet`, holding tuples of a `Future` together with an arbitrary piece of data.
See [PR 4841](https://github.com/libp2p/rust-lib2pp/pulls/4841).

## 0.2.2

- Fix an issue where `{Futures,Stream}Map` returns `Poll::Pending` despite being ready after an item has been replaced as part of `try_push`.
Expand Down
2 changes: 1 addition & 1 deletion misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-bounded"
version = "0.2.2"
version = "0.2.3"
edition = "2021"
rust-version.workspace = true
license = "MIT"
Expand Down
94 changes: 94 additions & 0 deletions misc/futures-bounded/src/futures_tuple_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::collections::HashMap;
use std::future::Future;
use std::task::{ready, Context, Poll};
use std::time::Duration;

use futures_util::future::BoxFuture;

use crate::{FuturesMap, PushError, Timeout};

/// Represents a list of tuples of a [Future] and an associated piece of data.
///
/// Each future must finish within the specified time and the list never outgrows its capacity.
pub struct FuturesTupleSet<O, D> {
id: u32,
inner: FuturesMap<u32, O>,
data: HashMap<u32, D>,
}

impl<O, D> FuturesTupleSet<O, D> {
pub fn new(timeout: Duration, capacity: usize) -> Self {
Self {
id: 0,
inner: FuturesMap::new(timeout, capacity),
data: HashMap::new(),
}
}
}

impl<O, D> FuturesTupleSet<O, D>
where
O: 'static,
{
/// Push a future into the list.
///
/// This method adds the given future to the list.
/// If the length of the list is equal to the capacity, this method returns a error that contains the passed future.
/// In that case, the future is not added to the set.
pub fn try_push<F>(&mut self, future: F, data: D) -> Result<(), (BoxFuture<O>, D)>
where
F: Future<Output = O> + Send + 'static,
{
self.id = self.id.wrapping_add(1);

match self.inner.try_push(self.id, future) {
Ok(()) => {}
Err(PushError::BeyondCapacity(w)) => return Err((w, data)),
Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
}
self.data.insert(self.id, data);

Ok(())
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.inner.poll_ready_unpin(cx)
}

pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(Result<O, Timeout>, D)> {
let (id, res) = ready!(self.inner.poll_unpin(cx));
let data = self.data.remove(&id).expect("must have data for future");

Poll::Ready((res, data))
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::poll_fn;
use futures_util::FutureExt;
use std::future::ready;

#[test]
fn tracks_associated_data_of_future() {
let mut set = FuturesTupleSet::new(Duration::from_secs(10), 10);

let _ = set.try_push(ready(1), 1);
let _ = set.try_push(ready(2), 2);

let (res1, data1) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap();
let (res2, data2) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap();

assert_eq!(res1.unwrap(), data1);
assert_eq!(res2.unwrap(), data2);
}
}
2 changes: 2 additions & 0 deletions misc/futures-bounded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod futures_map;
mod futures_set;
mod futures_tuple_set;
mod stream_map;
mod stream_set;

pub use futures_map::FuturesMap;
pub use futures_set::FuturesSet;
pub use futures_tuple_set::FuturesTupleSet;
pub use stream_map::StreamMap;
pub use stream_set::StreamSet;

Expand Down
4 changes: 3 additions & 1 deletion protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@

## 0.17.1 - unreleased

- Automatically register relayed addresses as external addresses.
See [PR 4809](https://github.com/libp2p/rust-lib2pp/pulls/4809).
- Fix an error where performing too many reservations at once could lead to inconsistent internal state.
See [PR 4841](https://github.com/libp2p/rust-libp2p/pull/4841).

## 0.17.0
- Don't close connections on protocol failures within the relay-server.
Expand Down
123 changes: 40 additions & 83 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,15 @@ pub struct Handler {
/// We issue a stream upgrade for each pending request.
pending_requests: VecDeque<PendingRequest>,

/// A `RESERVE` request is in-flight for each item in this queue.
active_reserve_requests: VecDeque<mpsc::Sender<transport::ToListenerMsg>>,

inflight_reserve_requests:
futures_bounded::FuturesSet<Result<outbound_hop::Reservation, outbound_hop::ReserveError>>,

/// A `CONNECT` request is in-flight for each item in this queue.
active_connect_requests:
VecDeque<oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>>,
inflight_reserve_requests: futures_bounded::FuturesTupleSet<
Result<outbound_hop::Reservation, outbound_hop::ReserveError>,
mpsc::Sender<transport::ToListenerMsg>,
>,

inflight_outbound_connect_requests:
futures_bounded::FuturesSet<Result<outbound_hop::Circuit, outbound_hop::ConnectError>>,
inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet<
Result<outbound_hop::Circuit, outbound_hop::ConnectError>,
oneshot::Sender<Result<priv_client::Connection, outbound_hop::ConnectError>>,
>,

inflight_inbound_circuit_requests:
futures_bounded::FuturesSet<Result<inbound_stop::Circuit, inbound_stop::Error>>,
Expand All @@ -137,24 +134,22 @@ impl Handler {
remote_addr,
queued_events: Default::default(),
pending_requests: Default::default(),
active_reserve_requests: Default::default(),
inflight_reserve_requests: futures_bounded::FuturesSet::new(
inflight_reserve_requests: futures_bounded::FuturesTupleSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_inbound_circuit_requests: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_outbound_connect_requests: futures_bounded::FuturesSet::new(
inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
inflight_outbound_circuit_deny_requests: futures_bounded::FuturesSet::new(
DENYING_CIRCUIT_TIMEOUT,
MAX_NUMBER_DENYING_CIRCUIT,
),
active_connect_requests: Default::default(),
reservation: Reservation::None,
}
}
Expand Down Expand Up @@ -276,24 +271,16 @@ impl ConnectionHandler for Handler {
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
loop {
debug_assert_eq!(
self.inflight_reserve_requests.len(),
self.active_reserve_requests.len(),
"expect to have one active request per inflight stream"
);

// Reservations
match self.inflight_reserve_requests.poll_unpin(cx) {
Poll::Ready(Ok(Ok(outbound_hop::Reservation {
renewal_timeout,
addrs,
limit,
}))) => {
let to_listener = self
.active_reserve_requests
.pop_front()
.expect("must have active request for stream");

Poll::Ready((
Ok(Ok(outbound_hop::Reservation {
renewal_timeout,
addrs,
limit,
})),
to_listener,
)) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
self.reservation.accepted(
renewal_timeout,
Expand All @@ -304,12 +291,7 @@ impl ConnectionHandler for Handler {
),
));
}
Poll::Ready(Ok(Err(error))) => {
let mut to_listener = self
.active_reserve_requests
.pop_front()
.expect("must have active request for stream");

Poll::Ready((Ok(Err(error)), mut to_listener)) => {
if let Err(e) =
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error)))
{
Expand All @@ -318,12 +300,7 @@ impl ConnectionHandler for Handler {
self.reservation.failed();
continue;
}
Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
let mut to_listener = self
.active_reserve_requests
.pop_front()
.expect("must have active request for stream");

Poll::Ready((Err(futures_bounded::Timeout { .. }), mut to_listener)) => {
if let Err(e) =
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()),
Expand All @@ -337,25 +314,17 @@ impl ConnectionHandler for Handler {
Poll::Pending => {}
}

debug_assert_eq!(
self.inflight_outbound_connect_requests.len(),
self.active_connect_requests.len(),
"expect to have one active request per inflight stream"
);

// Circuits
match self.inflight_outbound_connect_requests.poll_unpin(cx) {
Poll::Ready(Ok(Ok(outbound_hop::Circuit {
limit,
read_buffer,
stream,
}))) => {
let to_listener = self
.active_connect_requests
.pop_front()
.expect("must have active request for stream");

if to_listener
Poll::Ready((
Ok(Ok(outbound_hop::Circuit {
limit,
read_buffer,
stream,
})),
to_dialer,
)) => {
if to_dialer
.send(Ok(priv_client::Connection {
state: priv_client::ConnectionState::new_outbound(stream, read_buffer),
}))
Expand All @@ -371,27 +340,18 @@ impl ConnectionHandler for Handler {
Event::OutboundCircuitEstablished { limit },
));
}
Poll::Ready(Ok(Err(error))) => {
let to_dialer = self
.active_connect_requests
.pop_front()
.expect("must have active request for stream");

Poll::Ready((Ok(Err(error)), to_dialer)) => {
let _ = to_dialer.send(Err(error));
continue;
}
Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
let mut to_listener = self
.active_reserve_requests
.pop_front()
.expect("must have active request for stream");

if let Err(e) =
to_listener.try_send(transport::ToListenerMsg::Reservation(Err(
outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()),
Poll::Ready((Err(futures_bounded::Timeout { .. }), to_dialer)) => {
if to_dialer
.send(Err(outbound_hop::ConnectError::Io(
io::ErrorKind::TimedOut.into(),
)))
.is_err()
{
tracing::debug!("Unable to send error to listener: {}", e.into_send_error())
tracing::debug!("Unable to send error to dialer")
}
self.reservation.failed();
continue;
Expand Down Expand Up @@ -499,27 +459,24 @@ impl ConnectionHandler for Handler {
);
match pending_request {
PendingRequest::Reserve { to_listener } => {
self.active_reserve_requests.push_back(to_listener);
if self
.inflight_reserve_requests
.try_push(outbound_hop::make_reservation(stream))
.try_push(outbound_hop::make_reservation(stream), to_listener)
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
tracing::warn!("Dropping outbound stream because we are at capacity");
}
}
PendingRequest::Connect {
dst_peer_id,
to_dial: send_back,
} => {
self.active_connect_requests.push_back(send_back);

if self
.inflight_outbound_connect_requests
.try_push(outbound_hop::open_circuit(stream, dst_peer_id))
.try_push(outbound_hop::open_circuit(stream, dst_peer_id), send_back)
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
tracing::warn!("Dropping outbound stream because we are at capacity");
}
}
}
Expand Down

0 comments on commit 41da782

Please sign in to comment.