From 41da78280c81e475a7b62e336618928d9ec8fc9c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 16 Nov 2023 08:52:25 +1100 Subject: [PATCH] fix(relay): correctly track associated state of inflight streams In the relay client, we need to track a fair bit of data _alongside_ `Future`s that are executing within a `FuturesSet`. In particular, we need to track the channels that connect us the `Transport`. Using `?` within the actual `async` block of the protocol is crucial for ergonomics but essentially locks us out of passing the channel _into_ the `Future` itself. Hence, we need to track it outside. As we can see from #4822, doing so in a separate list is error prone. We solve this by introducing the `FuturesTupleSet` type to `futures-bounded`. This is a variation of `FuturesSet` that carries a piece of arbitrary data alongside the `Future` that is executing and returns it back upon completion. Using this within the relay code reveals another bug where we mistakenly confused a timed out `CONNECT` request for a timed out `RESERVE` request. Fixes: #4822. Pull-Request: #4841. --- Cargo.lock | 2 +- Cargo.toml | 2 +- misc/futures-bounded/CHANGELOG.md | 5 + misc/futures-bounded/Cargo.toml | 2 +- misc/futures-bounded/src/futures_tuple_set.rs | 94 +++++++++++++ misc/futures-bounded/src/lib.rs | 2 + protocols/relay/CHANGELOG.md | 4 +- protocols/relay/src/priv_client/handler.rs | 123 ++++++------------ 8 files changed, 147 insertions(+), 87 deletions(-) create mode 100644 misc/futures-bounded/src/futures_tuple_set.rs diff --git a/Cargo.lock b/Cargo.lock index 0baa7d79bcc..332764c36f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1572,7 +1572,7 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.2.2" +version = "0.2.3" dependencies = [ "futures", "futures-timer", diff --git a/Cargo.toml b/Cargo.toml index 793c46b0454..35439a1a696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ rust-version = "1.73.0" [workspace.dependencies] asynchronous-codec = { version = "0.7.0" } -futures-bounded = { version = "0.2.2", path = "misc/futures-bounded" } +futures-bounded = { version = "0.2.3", path = "misc/futures-bounded" } libp2p = { version = "0.53.0", path = "libp2p" } libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" } diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md index 3a26f6436ba..7a6e4deac2f 100644 --- a/misc/futures-bounded/CHANGELOG.md +++ b/misc/futures-bounded/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.2.3 - unreleased + +- Introduce `FuturesTupleSet`, holding tuples of a `Future` together with an arbitrary piece of data. + See [PR 4841](https://github.com/libp2p/rust-lib2pp/pulls/4841). + ## 0.2.2 - Fix an issue where `{Futures,Stream}Map` returns `Poll::Pending` despite being ready after an item has been replaced as part of `try_push`. diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml index 42743a8ac85..b375242bb3c 100644 --- a/misc/futures-bounded/Cargo.toml +++ b/misc/futures-bounded/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-bounded" -version = "0.2.2" +version = "0.2.3" edition = "2021" rust-version.workspace = true license = "MIT" diff --git a/misc/futures-bounded/src/futures_tuple_set.rs b/misc/futures-bounded/src/futures_tuple_set.rs new file mode 100644 index 00000000000..e19b236aaf8 --- /dev/null +++ b/misc/futures-bounded/src/futures_tuple_set.rs @@ -0,0 +1,94 @@ +use std::collections::HashMap; +use std::future::Future; +use std::task::{ready, Context, Poll}; +use std::time::Duration; + +use futures_util::future::BoxFuture; + +use crate::{FuturesMap, PushError, Timeout}; + +/// Represents a list of tuples of a [Future] and an associated piece of data. +/// +/// Each future must finish within the specified time and the list never outgrows its capacity. +pub struct FuturesTupleSet { + id: u32, + inner: FuturesMap, + data: HashMap, +} + +impl FuturesTupleSet { + pub fn new(timeout: Duration, capacity: usize) -> Self { + Self { + id: 0, + inner: FuturesMap::new(timeout, capacity), + data: HashMap::new(), + } + } +} + +impl FuturesTupleSet +where + O: 'static, +{ + /// Push a future into the list. + /// + /// This method adds the given future to the list. + /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future. + /// In that case, the future is not added to the set. + pub fn try_push(&mut self, future: F, data: D) -> Result<(), (BoxFuture, D)> + where + F: Future + Send + 'static, + { + self.id = self.id.wrapping_add(1); + + match self.inner.try_push(self.id, future) { + Ok(()) => {} + Err(PushError::BeyondCapacity(w)) => return Err((w, data)), + Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), + } + self.data.insert(self.id, data); + + Ok(()) + } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { + self.inner.poll_ready_unpin(cx) + } + + pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(Result, D)> { + let (id, res) = ready!(self.inner.poll_unpin(cx)); + let data = self.data.remove(&id).expect("must have data for future"); + + Poll::Ready((res, data)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::future::poll_fn; + use futures_util::FutureExt; + use std::future::ready; + + #[test] + fn tracks_associated_data_of_future() { + let mut set = FuturesTupleSet::new(Duration::from_secs(10), 10); + + let _ = set.try_push(ready(1), 1); + let _ = set.try_push(ready(2), 2); + + let (res1, data1) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap(); + let (res2, data2) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap(); + + assert_eq!(res1.unwrap(), data1); + assert_eq!(res2.unwrap(), data2); + } +} diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs index 6882a96f5e9..da8483a595f 100644 --- a/misc/futures-bounded/src/lib.rs +++ b/misc/futures-bounded/src/lib.rs @@ -1,10 +1,12 @@ mod futures_map; mod futures_set; +mod futures_tuple_set; mod stream_map; mod stream_set; pub use futures_map::FuturesMap; pub use futures_set::FuturesSet; +pub use futures_tuple_set::FuturesTupleSet; pub use stream_map::StreamMap; pub use stream_set::StreamSet; diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 33270787bff..2aa93a4da09 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,7 +1,9 @@ - ## 0.17.1 - unreleased + - Automatically register relayed addresses as external addresses. See [PR 4809](https://github.com/libp2p/rust-lib2pp/pulls/4809). +- Fix an error where performing too many reservations at once could lead to inconsistent internal state. + See [PR 4841](https://github.com/libp2p/rust-libp2p/pull/4841). ## 0.17.0 - Don't close connections on protocol failures within the relay-server. diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 1d24493be77..1925d6f6ab4 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -107,18 +107,15 @@ pub struct Handler { /// We issue a stream upgrade for each pending request. pending_requests: VecDeque, - /// A `RESERVE` request is in-flight for each item in this queue. - active_reserve_requests: VecDeque>, - - inflight_reserve_requests: - futures_bounded::FuturesSet>, - - /// A `CONNECT` request is in-flight for each item in this queue. - active_connect_requests: - VecDeque>>, + inflight_reserve_requests: futures_bounded::FuturesTupleSet< + Result, + mpsc::Sender, + >, - inflight_outbound_connect_requests: - futures_bounded::FuturesSet>, + inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet< + Result, + oneshot::Sender>, + >, inflight_inbound_circuit_requests: futures_bounded::FuturesSet>, @@ -137,8 +134,7 @@ impl Handler { remote_addr, queued_events: Default::default(), pending_requests: Default::default(), - active_reserve_requests: Default::default(), - inflight_reserve_requests: futures_bounded::FuturesSet::new( + inflight_reserve_requests: futures_bounded::FuturesTupleSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), @@ -146,7 +142,7 @@ impl Handler { STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), - inflight_outbound_connect_requests: futures_bounded::FuturesSet::new( + inflight_outbound_connect_requests: futures_bounded::FuturesTupleSet::new( STREAM_TIMEOUT, MAX_CONCURRENT_STREAMS_PER_CONNECTION, ), @@ -154,7 +150,6 @@ impl Handler { DENYING_CIRCUIT_TIMEOUT, MAX_NUMBER_DENYING_CIRCUIT, ), - active_connect_requests: Default::default(), reservation: Reservation::None, } } @@ -276,24 +271,16 @@ impl ConnectionHandler for Handler { ConnectionHandlerEvent, > { loop { - debug_assert_eq!( - self.inflight_reserve_requests.len(), - self.active_reserve_requests.len(), - "expect to have one active request per inflight stream" - ); - // Reservations match self.inflight_reserve_requests.poll_unpin(cx) { - Poll::Ready(Ok(Ok(outbound_hop::Reservation { - renewal_timeout, - addrs, - limit, - }))) => { - let to_listener = self - .active_reserve_requests - .pop_front() - .expect("must have active request for stream"); - + Poll::Ready(( + Ok(Ok(outbound_hop::Reservation { + renewal_timeout, + addrs, + limit, + })), + to_listener, + )) => { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( self.reservation.accepted( renewal_timeout, @@ -304,12 +291,7 @@ impl ConnectionHandler for Handler { ), )); } - Poll::Ready(Ok(Err(error))) => { - let mut to_listener = self - .active_reserve_requests - .pop_front() - .expect("must have active request for stream"); - + Poll::Ready((Ok(Err(error)), mut to_listener)) => { if let Err(e) = to_listener.try_send(transport::ToListenerMsg::Reservation(Err(error))) { @@ -318,12 +300,7 @@ impl ConnectionHandler for Handler { self.reservation.failed(); continue; } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - let mut to_listener = self - .active_reserve_requests - .pop_front() - .expect("must have active request for stream"); - + Poll::Ready((Err(futures_bounded::Timeout { .. }), mut to_listener)) => { if let Err(e) = to_listener.try_send(transport::ToListenerMsg::Reservation(Err( outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()), @@ -337,25 +314,17 @@ impl ConnectionHandler for Handler { Poll::Pending => {} } - debug_assert_eq!( - self.inflight_outbound_connect_requests.len(), - self.active_connect_requests.len(), - "expect to have one active request per inflight stream" - ); - // Circuits match self.inflight_outbound_connect_requests.poll_unpin(cx) { - Poll::Ready(Ok(Ok(outbound_hop::Circuit { - limit, - read_buffer, - stream, - }))) => { - let to_listener = self - .active_connect_requests - .pop_front() - .expect("must have active request for stream"); - - if to_listener + Poll::Ready(( + Ok(Ok(outbound_hop::Circuit { + limit, + read_buffer, + stream, + })), + to_dialer, + )) => { + if to_dialer .send(Ok(priv_client::Connection { state: priv_client::ConnectionState::new_outbound(stream, read_buffer), })) @@ -371,27 +340,18 @@ impl ConnectionHandler for Handler { Event::OutboundCircuitEstablished { limit }, )); } - Poll::Ready(Ok(Err(error))) => { - let to_dialer = self - .active_connect_requests - .pop_front() - .expect("must have active request for stream"); - + Poll::Ready((Ok(Err(error)), to_dialer)) => { let _ = to_dialer.send(Err(error)); continue; } - Poll::Ready(Err(futures_bounded::Timeout { .. })) => { - let mut to_listener = self - .active_reserve_requests - .pop_front() - .expect("must have active request for stream"); - - if let Err(e) = - to_listener.try_send(transport::ToListenerMsg::Reservation(Err( - outbound_hop::ReserveError::Io(io::ErrorKind::TimedOut.into()), + Poll::Ready((Err(futures_bounded::Timeout { .. }), to_dialer)) => { + if to_dialer + .send(Err(outbound_hop::ConnectError::Io( + io::ErrorKind::TimedOut.into(), ))) + .is_err() { - tracing::debug!("Unable to send error to listener: {}", e.into_send_error()) + tracing::debug!("Unable to send error to dialer") } self.reservation.failed(); continue; @@ -499,27 +459,24 @@ impl ConnectionHandler for Handler { ); match pending_request { PendingRequest::Reserve { to_listener } => { - self.active_reserve_requests.push_back(to_listener); if self .inflight_reserve_requests - .try_push(outbound_hop::make_reservation(stream)) + .try_push(outbound_hop::make_reservation(stream), to_listener) .is_err() { - tracing::warn!("Dropping outbound stream because we are at capacity") + tracing::warn!("Dropping outbound stream because we are at capacity"); } } PendingRequest::Connect { dst_peer_id, to_dial: send_back, } => { - self.active_connect_requests.push_back(send_back); - if self .inflight_outbound_connect_requests - .try_push(outbound_hop::open_circuit(stream, dst_peer_id)) + .try_push(outbound_hop::open_circuit(stream, dst_peer_id), send_back) .is_err() { - tracing::warn!("Dropping outbound stream because we are at capacity") + tracing::warn!("Dropping outbound stream because we are at capacity"); } } }