From cbfcd07a65b3416345b3ed3247d1979ca4164677 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 9 Oct 2023 16:05:05 +1100 Subject: [PATCH 01/14] Rename modules --- misc/futures-bounded/src/{map.rs => futures_map.rs} | 0 misc/futures-bounded/src/{set.rs => futures_set.rs} | 0 misc/futures-bounded/src/lib.rs | 8 ++++---- 3 files changed, 4 insertions(+), 4 deletions(-) rename misc/futures-bounded/src/{map.rs => futures_map.rs} (100%) rename misc/futures-bounded/src/{set.rs => futures_set.rs} (100%) diff --git a/misc/futures-bounded/src/map.rs b/misc/futures-bounded/src/futures_map.rs similarity index 100% rename from misc/futures-bounded/src/map.rs rename to misc/futures-bounded/src/futures_map.rs diff --git a/misc/futures-bounded/src/set.rs b/misc/futures-bounded/src/futures_set.rs similarity index 100% rename from misc/futures-bounded/src/set.rs rename to misc/futures-bounded/src/futures_set.rs diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs index e7b461dc822..b76ec52eaee 100644 --- a/misc/futures-bounded/src/lib.rs +++ b/misc/futures-bounded/src/lib.rs @@ -1,8 +1,8 @@ -mod map; -mod set; +mod futures_map; +mod futures_set; -pub use map::{FuturesMap, PushError}; -pub use set::FuturesSet; +pub use futures_map::{FuturesMap, PushError}; +pub use futures_set::FuturesSet; use std::fmt; use std::fmt::Formatter; use std::time::Duration; From 55de00e7f9072448b12dff5db2b5ff57ec260af7 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 9 Oct 2023 16:13:31 +1100 Subject: [PATCH 02/14] Add `StreamMap` to `futures-bounded` --- misc/futures-bounded/src/futures_map.rs | 11 +- misc/futures-bounded/src/lib.rs | 16 ++- misc/futures-bounded/src/stream_map.rs | 166 ++++++++++++++++++++++++ 3 files changed, 182 insertions(+), 11 deletions(-) create mode 100644 misc/futures-bounded/src/stream_map.rs diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs index cecf6070efe..5bba659ca0a 100644 --- a/misc/futures-bounded/src/futures_map.rs +++ b/misc/futures-bounded/src/futures_map.rs @@ -10,7 +10,7 @@ use futures_util::future::BoxFuture; use futures_util::stream::FuturesUnordered; use futures_util::{FutureExt, StreamExt}; -use crate::Timeout; +use crate::{PushError, Timeout}; /// Represents a map of [`Future`]s. /// @@ -23,15 +23,6 @@ pub struct FuturesMap { full_waker: Option, } -/// Error of a future pushing -#[derive(PartialEq, Debug)] -pub enum PushError { - /// The length of the set is equal to the capacity - BeyondCapacity(F), - /// The set already contains the given future's ID - ReplacedFuture(F), -} - impl FuturesMap { pub fn new(timeout: Duration, capacity: usize) -> Self { Self { diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs index b76ec52eaee..3c7df3c5fdc 100644 --- a/misc/futures-bounded/src/lib.rs +++ b/misc/futures-bounded/src/lib.rs @@ -1,8 +1,11 @@ mod futures_map; mod futures_set; +mod stream_map; -pub use futures_map::{FuturesMap, PushError}; +pub use futures_map::FuturesMap; pub use futures_set::FuturesSet; +pub use stream_map::StreamMap; + use std::fmt; use std::fmt::Formatter; use std::time::Duration; @@ -25,4 +28,15 @@ impl fmt::Display for Timeout { } } +/// Error of a future pushing +#[derive(PartialEq, Debug)] +pub enum PushError { + /// The length of the set is equal to the capacity + BeyondCapacity(T), + /// The set already contained an item with this key. + /// + /// The old item is returned. + Replaced(T), +} + impl std::error::Error for Timeout {} diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs new file mode 100644 index 00000000000..805d87d8558 --- /dev/null +++ b/misc/futures-bounded/src/stream_map.rs @@ -0,0 +1,166 @@ +use std::future::Future; +use std::hash::Hash; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +use futures_timer::Delay; +use futures_util::future::BoxFuture; +use futures_util::stream::{BoxStream, SelectAll}; +use futures_util::{FutureExt, Stream, StreamExt}; + +use crate::{PushError, Timeout}; + +/// Represents a map of [`Stream`]s. +/// +/// Each stream must finish within the specified time and the map never outgrows its capacity. +pub struct StreamMap { + timeout: Duration, + capacity: usize, + inner: SelectAll>>>, + empty_waker: Option, + full_waker: Option, +} + +impl StreamMap { + pub fn new(timeout: Duration, capacity: usize) -> Self { + Self { + timeout, + capacity, + inner: Default::default(), + empty_waker: None, + full_waker: None, + } + } +} + +impl StreamMap +where + ID: Clone + Hash + Eq + Send + Unpin + 'static, +{ + /// Push a stream into the map. + pub fn try_push(&mut self, id: ID, stream: F) -> Result<(), PushError>> + where + F: Stream + Send + 'static, + { + if self.inner.len() >= self.capacity { + return Err(PushError::BeyondCapacity(stream.boxed())); + } + + if let Some(waker) = self.empty_waker.take() { + waker.wake(); + } + + match self.inner.iter_mut().find(|tagged| tagged.tag == id) { + None => { + self.inner.push(TaggedStream::new( + id, + TimeoutStream { + inner: stream.boxed(), + timeout: Delay::new(self.timeout), + }, + )); + + Ok(()) + } + Some(existing) => { + let old = mem::replace( + &mut existing.inner, + TimeoutStream { + inner: stream.boxed(), + timeout: Delay::new(self.timeout), + }, + ); + + Err(PushError::Replaced(old.inner)) + } + } + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic. + pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { + if self.inner.len() < self.capacity { + return Poll::Ready(()); + } + + self.full_waker = Some(cx.waker().clone()); + + Poll::Pending + } + + pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(ID, Result)> { + match futures_util::ready!(self.inner.poll_next_unpin(cx)) { + None => { + self.empty_waker = Some(cx.waker().clone()); + Poll::Pending + } + Some((id, Ok(output))) => Poll::Ready((id, Ok(output))), + Some((id, Err(_timeout))) => Poll::Ready((id, Err(Timeout::new(self.timeout)))), + } + } +} + +struct TimeoutStream { + inner: F, + timeout: Delay, +} + +impl Stream for TimeoutStream +where + F: Stream + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.timeout.poll_unpin(cx).is_ready() { + return Poll::Ready(Some(Err(()))); + } + + self.inner.poll_next_unpin(cx).map(|a| a.map(Ok)) + } +} + +struct TaggedStream { + key: K, + inner: S, + + reported_none: bool, +} + +impl TaggedStream { + fn new(key: K, inner: S) -> Self { + Self { + key, + inner, + reported_none: false, + } + } +} + +impl Stream for TaggedStream +where + K: Copy, + S: Stream, +{ + type Item = (K, Option); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.reported_none { + return Poll::Ready(None); + } + + match futures_util::ready!(self.inner.poll_next_unpin(cx)) { + Some(item) => Poll::Ready(Some((*self.key, Some(item)))), + None => { + *self.reported_none = true; + + Poll::Ready(Some((*self.key, None))) + } + } + } +} From dc275f84639d6e20a3bb466d51c469487419ad54 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 9 Oct 2023 16:15:17 +1100 Subject: [PATCH 03/14] Bump version of `futures-bounded` --- Cargo.lock | 2 +- Cargo.toml | 2 +- misc/futures-bounded/CHANGELOG.md | 5 +++++ misc/futures-bounded/Cargo.toml | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19c9404938c..a5da7078c09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1598,7 +1598,7 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.1.0" +version = "0.2.0" dependencies = [ "futures-timer", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 55129a8ea96..3caf94cba01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ resolver = "2" rust-version = "1.65.0" [workspace.dependencies] -futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" } +futures-bounded = { version = "0.2.0", path = "misc/futures-bounded" } libp2p = { version = "0.52.3", path = "libp2p" } libp2p-allow-block-list = { version = "0.2.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.11.0", path = "protocols/autonat" } diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md index bd05a0f8261..1c7f642152a 100644 --- a/misc/futures-bounded/CHANGELOG.md +++ b/misc/futures-bounded/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.2.0 - unreleased + +- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`. + See [PR XXXX](https://github.com/libp2p/rust-lib2pp/pulls/XXXX). + ## 0.1.0 Initial release. diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml index 4d70779e282..64d2ae5e7c2 100644 --- a/misc/futures-bounded/Cargo.toml +++ b/misc/futures-bounded/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-bounded" -version = "0.1.0" +version = "0.2.0" edition = "2021" rust-version.workspace = true license = "MIT" From 1e4ad64558159dfc94b50daf701b3ee7315553b9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 9 Oct 2023 16:20:35 +1100 Subject: [PATCH 04/14] Fix compile errors --- misc/futures-bounded/src/futures_map.rs | 2 +- misc/futures-bounded/src/futures_set.rs | 2 +- misc/futures-bounded/src/stream_map.rs | 38 ++++++++++++---------- protocols/relay/src/priv_client/handler.rs | 2 +- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs index 5bba659ca0a..4952ee33a12 100644 --- a/misc/futures-bounded/src/futures_map.rs +++ b/misc/futures-bounded/src/futures_map.rs @@ -79,7 +79,7 @@ where }, ); - Err(PushError::ReplacedFuture(old_future.inner)) + Err(PushError::Replaced(old_future.inner)) } } } diff --git a/misc/futures-bounded/src/futures_set.rs b/misc/futures-bounded/src/futures_set.rs index 96140d82f9a..79a82fde110 100644 --- a/misc/futures-bounded/src/futures_set.rs +++ b/misc/futures-bounded/src/futures_set.rs @@ -38,7 +38,7 @@ impl FuturesSet { match self.inner.try_push(self.id, future) { Ok(()) => Ok(()), Err(PushError::BeyondCapacity(w)) => Err(w), - Err(PushError::ReplacedFuture(_)) => unreachable!("we never reuse IDs"), + Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), } } diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index 805d87d8558..f7fd8e68dd1 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -1,12 +1,9 @@ -use std::future::Future; -use std::hash::Hash; use std::mem; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::time::Duration; use futures_timer::Delay; -use futures_util::future::BoxFuture; use futures_util::stream::{BoxStream, SelectAll}; use futures_util::{FutureExt, Stream, StreamExt}; @@ -18,12 +15,15 @@ use crate::{PushError, Timeout}; pub struct StreamMap { timeout: Duration, capacity: usize, - inner: SelectAll>>>, + inner: SelectAll>>>, empty_waker: Option, full_waker: Option, } -impl StreamMap { +impl StreamMap +where + ID: Clone + Unpin, +{ pub fn new(timeout: Duration, capacity: usize) -> Self { Self { timeout, @@ -37,7 +37,7 @@ impl StreamMap { impl StreamMap where - ID: Clone + Hash + Eq + Send + Unpin + 'static, + ID: Clone + PartialEq + Send + Unpin + 'static, { /// Push a stream into the map. pub fn try_push(&mut self, id: ID, stream: F) -> Result<(), PushError>> @@ -52,7 +52,7 @@ where waker.wake(); } - match self.inner.iter_mut().find(|tagged| tagged.tag == id) { + match self.inner.iter_mut().find(|tagged| tagged.key == id) { None => { self.inner.push(TaggedStream::new( id, @@ -93,14 +93,18 @@ where Poll::Pending } - pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(ID, Result)> { + pub fn poll_next_unpin( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<(ID, Option>)> { match futures_util::ready!(self.inner.poll_next_unpin(cx)) { None => { self.empty_waker = Some(cx.waker().clone()); Poll::Pending } - Some((id, Ok(output))) => Poll::Ready((id, Ok(output))), - Some((id, Err(_timeout))) => Poll::Ready((id, Err(Timeout::new(self.timeout)))), + Some((id, Some(Ok(output)))) => Poll::Ready((id, Some(Ok(output)))), + Some((id, Some(Err(())))) => Poll::Ready((id, Some(Err(Timeout::new(self.timeout))))), + Some((id, None)) => Poll::Ready((id, None)), } } } @@ -116,7 +120,7 @@ where { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.timeout.poll_unpin(cx).is_ready() { return Poll::Ready(Some(Err(()))); } @@ -144,22 +148,22 @@ impl TaggedStream { impl Stream for TaggedStream where - K: Copy, - S: Stream, + K: Clone + Unpin, + S: Stream + Unpin, { type Item = (K, Option); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.reported_none { return Poll::Ready(None); } match futures_util::ready!(self.inner.poll_next_unpin(cx)) { - Some(item) => Poll::Ready(Some((*self.key, Some(item)))), + Some(item) => Poll::Ready(Some((self.key.clone(), Some(item)))), None => { - *self.reported_none = true; + self.reported_none = true; - Poll::Ready(Some((*self.key, None))) + Poll::Ready(Some((self.key.clone(), None))) } } } diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 25488ac3041..a0fddf1f358 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -277,7 +277,7 @@ impl Handler { Err(PushError::BeyondCapacity(_)) => log::warn!( "Dropping inbound circuit request to be denied from {src_peer_id} due to exceeding limit." ), - Err(PushError::ReplacedFuture(_)) => log::warn!( + Err(PushError::Replaced(_)) => log::warn!( "Dropping existing inbound circuit request to be denied from {src_peer_id} in favor of new one." ), Ok(()) => {} From 8efba32d502b544d0fced28db4c44da7e08dbcda Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 9 Oct 2023 16:26:46 +1100 Subject: [PATCH 05/14] Add `StreamSet` --- misc/futures-bounded/src/lib.rs | 2 + misc/futures-bounded/src/stream_set.rs | 57 ++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 misc/futures-bounded/src/stream_set.rs diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs index 3c7df3c5fdc..23e67651fe3 100644 --- a/misc/futures-bounded/src/lib.rs +++ b/misc/futures-bounded/src/lib.rs @@ -1,10 +1,12 @@ mod futures_map; mod futures_set; mod stream_map; +mod stream_set; pub use futures_map::FuturesMap; pub use futures_set::FuturesSet; pub use stream_map::StreamMap; +pub use stream_set::StreamSet; use std::fmt; use std::fmt::Formatter; diff --git a/misc/futures-bounded/src/stream_set.rs b/misc/futures-bounded/src/stream_set.rs new file mode 100644 index 00000000000..e17b27a2a85 --- /dev/null +++ b/misc/futures-bounded/src/stream_set.rs @@ -0,0 +1,57 @@ +use futures_util::stream::BoxStream; +use futures_util::Stream; +use std::task::{ready, Context, Poll}; +use std::time::Duration; + +use crate::{PushError, StreamMap, Timeout}; + +/// Represents a list of [Stream]s. +/// +/// Each stream must finish within the specified time and the list never outgrows its capacity. +pub struct StreamSet { + id: u32, + inner: StreamMap, +} + +impl StreamSet { + pub fn new(timeout: Duration, capacity: usize) -> Self { + Self { + id: 0, + inner: StreamMap::new(timeout, capacity), + } + } +} + +impl StreamSet { + /// Push a stream into the list. + /// + /// This method adds the given stream to the list. + /// If the length of the list is equal to the capacity, this method returns a error that contains the passed stream. + /// In that case, the stream is not added to the set. + pub fn try_push(&mut self, stream: F) -> Result<(), BoxStream> + where + F: Stream + Send + 'static, + { + self.id = self.id.wrapping_add(1); + + match self.inner.try_push(self.id, stream) { + Ok(()) => Ok(()), + Err(PushError::BeyondCapacity(w)) => Err(w), + Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), + } + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { + self.inner.poll_ready_unpin(cx) + } + + pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll>> { + let (_, res) = ready!(self.inner.poll_next_unpin(cx)); + + Poll::Ready(res) + } +} From 80176abc55b6f0ece723f481043a05901dc1295a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 10 Oct 2023 11:35:30 +1100 Subject: [PATCH 06/14] Add tests --- misc/futures-bounded/src/futures_map.rs | 2 +- misc/futures-bounded/src/stream_map.rs | 112 ++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs index 4952ee33a12..0b5699f5456 100644 --- a/misc/futures-bounded/src/futures_map.rs +++ b/misc/futures-bounded/src/futures_map.rs @@ -178,7 +178,7 @@ mod tests { assert!(futures.try_push("ID", ready(())).is_ok()); matches!( futures.try_push("ID", ready(())), - Err(PushError::ReplacedFuture(_)) + Err(PushError::Replaced(_)) ); } diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index f7fd8e68dd1..d42633cc687 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -168,3 +168,115 @@ where } } } + +#[cfg(test)] +mod tests { + use futures_util::stream::{once, pending}; + use std::future::{poll_fn, ready, Future}; + use std::pin::Pin; + use std::time::Instant; + + use super::*; + + #[test] + fn cannot_push_more_than_capacity_tasks() { + let mut streams = StreamMap::new(Duration::from_secs(10), 1); + + assert!(streams.try_push("ID_1", once(ready(()))).is_ok()); + matches!( + streams.try_push("ID_2", once(ready(()))), + Err(PushError::BeyondCapacity(_)) + ); + } + + #[test] + fn cannot_push_the_same_id_few_times() { + let mut streams = StreamMap::new(Duration::from_secs(10), 5); + + assert!(streams.try_push("ID", once(ready(()))).is_ok()); + matches!( + streams.try_push("ID", once(ready(()))), + Err(PushError::Replaced(_)) + ); + } + + #[tokio::test] + async fn streams_timeout() { + let mut streams = StreamMap::new(Duration::from_millis(100), 1); + + let _ = streams.try_push("ID", pending::<()>()); + Delay::new(Duration::from_millis(150)).await; + let (_, result) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; + + assert!(result.unwrap().is_err()) + } + + // Each stream emits 1 item with delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. + // We stop after NUM_STREAMS tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES. + #[tokio::test] + async fn backpressure() { + const DELAY: Duration = Duration::from_millis(100); + const NUM_STREAMS: u32 = 10; + + let start = Instant::now(); + Task::new(DELAY, NUM_STREAMS, 1).await; + let duration = start.elapsed(); + + assert!(duration >= DELAY * NUM_STREAMS); + } + + struct Task { + item_delay: Duration, + num_streams: usize, + num_processed: usize, + inner: StreamMap, + } + + impl Task { + fn new(item_delay: Duration, num_streams: u32, capacity: usize) -> Self { + Self { + item_delay, + num_streams: num_streams as usize, + num_processed: 0, + inner: StreamMap::new(Duration::from_secs(60), capacity), + } + } + } + + impl Future for Task { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + while this.num_processed < this.num_streams { + match this.inner.poll_next_unpin(cx) { + Poll::Ready((_, Some(result))) => { + if result.is_err() { + panic!("Timeout is great than item delay") + } + + this.num_processed += 1; + continue; + } + Poll::Ready((_, None)) => { + continue; + } + _ => {} + } + + if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { + // We push the constant ID to prove that user can use the same ID if the stream was finished + let maybe_future = this.inner.try_push(1u8, once(Delay::new(this.item_delay))); + assert!(maybe_future.is_ok(), "we polled for readiness"); + + continue; + } + + return Poll::Pending; + } + + Poll::Ready(()) + } + } +} From eeb3c52290b7372254f3ac6cd8cd1062be1429da Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 10 Oct 2023 11:38:52 +1100 Subject: [PATCH 07/14] Update misc/futures-bounded/CHANGELOG.md --- misc/futures-bounded/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md index 1c7f642152a..64dc9d6c1c4 100644 --- a/misc/futures-bounded/CHANGELOG.md +++ b/misc/futures-bounded/CHANGELOG.md @@ -1,7 +1,7 @@ ## 0.2.0 - unreleased - Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`. - See [PR XXXX](https://github.com/libp2p/rust-lib2pp/pulls/XXXX). + See [PR 4616](https://github.com/libp2p/rust-lib2pp/pulls/4616). ## 0.1.0 From 677da7512425218bfc5cf244527afae0e8a310b4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 10 Oct 2023 11:53:55 +1100 Subject: [PATCH 08/14] Fix intra-doc link --- misc/futures-bounded/src/futures_map.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs index 0b5699f5456..5fd06037608 100644 --- a/misc/futures-bounded/src/futures_map.rs +++ b/misc/futures-bounded/src/futures_map.rs @@ -45,7 +45,7 @@ where /// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity], /// that contains the passed future. In that case, the future is not inserted to the map. /// If a future with the given `future_id` already exists, then the old future will be replaced by a new one. - /// In that case, the returned error [PushError::ReplacedFuture] contains the old future. + /// In that case, the returned error [PushError::Replaced] contains the old future. pub fn try_push(&mut self, future_id: ID, future: F) -> Result<(), PushError>> where F: Future + Send + 'static, From 797494652d84e2b8bbe5f8ff2412c8287bc61f94 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 16 Oct 2023 10:39:15 +1100 Subject: [PATCH 09/14] Allow streams to be cancelled --- misc/futures-bounded/src/stream_map.rs | 43 +++++++++++++++++++++++--- misc/futures-bounded/src/stream_set.rs | 5 ++- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index d42633cc687..087214d60a9 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -5,7 +5,7 @@ use std::time::Duration; use futures_timer::Delay; use futures_util::stream::{BoxStream, SelectAll}; -use futures_util::{FutureExt, Stream, StreamExt}; +use futures_util::{stream, FutureExt, Stream, StreamExt}; use crate::{PushError, Timeout}; @@ -38,6 +38,7 @@ where impl StreamMap where ID: Clone + PartialEq + Send + Unpin + 'static, + O: Send + 'static, { /// Push a stream into the map. pub fn try_push(&mut self, id: ID, stream: F) -> Result<(), PushError>> @@ -78,6 +79,15 @@ where } } + pub fn try_cancel(&mut self, id: ID) -> Option> { + let tagged = self.inner.iter_mut().find(|s| s.key == id)?; + + let inner = mem::replace(&mut tagged.inner.inner, stream::pending().boxed()); + tagged.exhausted = true; // Setting this will emit `None` on the next poll and ensure `SelectAll` cleans up the resources. + + Some(inner) + } + pub fn is_empty(&self) -> bool { self.inner.is_empty() } @@ -133,7 +143,7 @@ struct TaggedStream { key: K, inner: S, - reported_none: bool, + exhausted: bool, } impl TaggedStream { @@ -141,7 +151,7 @@ impl TaggedStream { Self { key, inner, - reported_none: false, + exhausted: false, } } } @@ -154,14 +164,14 @@ where type Item = (K, Option); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.reported_none { + if self.exhausted { return Poll::Ready(None); } match futures_util::ready!(self.inner.poll_next_unpin(cx)) { Some(item) => Poll::Ready(Some((self.key.clone(), Some(item)))), None => { - self.reported_none = true; + self.exhausted = true; Poll::Ready(Some((self.key.clone(), None))) } @@ -211,6 +221,29 @@ mod tests { assert!(result.unwrap().is_err()) } + #[test] + fn cancelled_stream_does_not_emit_anything() { + let mut streams = StreamMap::new(Duration::from_millis(100), 1); + + let _ = streams.try_push("ID", stream::once(ready(()))); + + { + let cancelled_stream = streams.try_cancel("ID"); + assert!(cancelled_stream.is_some()); + } + + let poll = streams.poll_next_unpin(&mut Context::from_waker( + futures_util::task::noop_waker_ref(), + )); + + assert!(poll.is_pending()); + assert_eq!( + streams.inner.len(), + 0, + "resources of cancelled streams are cleaned up properly" + ); + } + // Each stream emits 1 item with delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. // We stop after NUM_STREAMS tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES. #[tokio::test] diff --git a/misc/futures-bounded/src/stream_set.rs b/misc/futures-bounded/src/stream_set.rs index e17b27a2a85..332a09767df 100644 --- a/misc/futures-bounded/src/stream_set.rs +++ b/misc/futures-bounded/src/stream_set.rs @@ -22,7 +22,10 @@ impl StreamSet { } } -impl StreamSet { +impl StreamSet +where + O: Send + 'static, +{ /// Push a stream into the list. /// /// This method adds the given stream to the list. From da2102f8e1b4cdb1dc88ebde4f77c04d5af7a584 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 16 Oct 2023 10:40:52 +1100 Subject: [PATCH 10/14] Rebrand to `remove` --- misc/futures-bounded/src/stream_map.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index 087214d60a9..d570dbfe313 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -79,7 +79,7 @@ where } } - pub fn try_cancel(&mut self, id: ID) -> Option> { + pub fn remove(&mut self, id: ID) -> Option> { let tagged = self.inner.iter_mut().find(|s| s.key == id)?; let inner = mem::replace(&mut tagged.inner.inner, stream::pending().boxed()); @@ -222,13 +222,13 @@ mod tests { } #[test] - fn cancelled_stream_does_not_emit_anything() { + fn removing_stream() { let mut streams = StreamMap::new(Duration::from_millis(100), 1); let _ = streams.try_push("ID", stream::once(ready(()))); { - let cancelled_stream = streams.try_cancel("ID"); + let cancelled_stream = streams.remove("ID"); assert!(cancelled_stream.is_some()); } From 7a1736f97d280861488e20d44e5117ea51ea33d4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 16 Oct 2023 13:00:24 +1100 Subject: [PATCH 11/14] Rename type variable --- misc/futures-bounded/src/stream_map.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index d570dbfe313..701ca7d8b5b 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -119,8 +119,8 @@ where } } -struct TimeoutStream { - inner: F, +struct TimeoutStream { + inner: S, timeout: Delay, } From 0c0349221f3daa697ae871ab6dba5c1f84e84b10 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 16 Oct 2023 13:04:26 +1100 Subject: [PATCH 12/14] Remove streams once they time out --- misc/futures-bounded/src/stream_map.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index 701ca7d8b5b..5e130b02c01 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -113,7 +113,11 @@ where Poll::Pending } Some((id, Some(Ok(output)))) => Poll::Ready((id, Some(Ok(output)))), - Some((id, Some(Err(())))) => Poll::Ready((id, Some(Err(Timeout::new(self.timeout))))), + Some((id, Some(Err(())))) => { + self.remove(id.clone()); // Remove stream, otherwise we keep reporting the timeout. + + Poll::Ready((id, Some(Err(Timeout::new(self.timeout))))) + } Some((id, None)) => Poll::Ready((id, None)), } } @@ -221,6 +225,20 @@ mod tests { assert!(result.unwrap().is_err()) } + #[tokio::test] + async fn timed_out_stream_gets_removed() { + let mut streams = StreamMap::new(Duration::from_millis(100), 1); + + let _ = streams.try_push("ID", pending::<()>()); + Delay::new(Duration::from_millis(150)).await; + poll_fn(|cx| streams.poll_next_unpin(cx)).await; + + let poll = streams.poll_next_unpin(&mut Context::from_waker( + futures_util::task::noop_waker_ref(), + )); + assert!(poll.is_pending()) + } + #[test] fn removing_stream() { let mut streams = StreamMap::new(Duration::from_millis(100), 1); From 39a3cda65fcc4d6a57195050494340b580a16d2c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 16 Oct 2023 17:13:24 +1100 Subject: [PATCH 13/14] Add changelog entry for `libp2p-relay` --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/relay/CHANGELOG.md | 8 ++++++++ protocols/relay/Cargo.toml | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 881cfacdbcb..b49b0552430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2941,7 +2941,7 @@ dependencies = [ [[package]] name = "libp2p-relay" -version = "0.16.1" +version = "0.16.2" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index f921395df85..64a0650116e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ libp2p-ping = { version = "0.43.1", path = "protocols/ping" } libp2p-plaintext = { version = "0.40.1", path = "transports/plaintext" } libp2p-pnet = { version = "0.23.0", path = "transports/pnet" } libp2p-quic = { version = "0.9.2", path = "transports/quic" } -libp2p-relay = { version = "0.16.1", path = "protocols/relay" } +libp2p-relay = { version = "0.16.2", path = "protocols/relay" } libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" } libp2p-upnp = { version = "0.1.1", path = "protocols/upnp" } libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 6af89e25d71..33463aa20c6 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.16.2 - unreleased + + + ## 0.16.1 - Export `RateLimiter` type. diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 03799a8c77c..070c2cbc50d 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition = "2021" rust-version = { workspace = true } description = "Communications relaying for libp2p" -version = "0.16.1" +version = "0.16.2" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 191d5cd684a4036fe2802265c1dec37bb739a814 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 25 Oct 2023 08:57:37 +1100 Subject: [PATCH 14/14] Apply suggestions from code review Co-authored-by: Max Inden --- misc/futures-bounded/CHANGELOG.md | 2 +- misc/futures-bounded/src/lib.rs | 2 +- misc/futures-bounded/src/stream_map.rs | 2 +- misc/futures-bounded/src/stream_set.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md index 64dc9d6c1c4..90bd47f2f61 100644 --- a/misc/futures-bounded/CHANGELOG.md +++ b/misc/futures-bounded/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.2.0 - unreleased +## 0.2.0 - Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`. See [PR 4616](https://github.com/libp2p/rust-lib2pp/pulls/4616). diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs index 23e67651fe3..6882a96f5e9 100644 --- a/misc/futures-bounded/src/lib.rs +++ b/misc/futures-bounded/src/lib.rs @@ -35,7 +35,7 @@ impl fmt::Display for Timeout { pub enum PushError { /// The length of the set is equal to the capacity BeyondCapacity(T), - /// The set already contained an item with this key. + /// The map already contained an item with this key. /// /// The old item is returned. Replaced(T), diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs index 5e130b02c01..7fcdd15e132 100644 --- a/misc/futures-bounded/src/stream_map.rs +++ b/misc/futures-bounded/src/stream_map.rs @@ -263,7 +263,7 @@ mod tests { } // Each stream emits 1 item with delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. - // We stop after NUM_STREAMS tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES. + // We stop after NUM_STREAMS tasks, meaning the overall execution must at least take DELAY * NUM_STREAMS. #[tokio::test] async fn backpressure() { const DELAY: Duration = Duration::from_millis(100); diff --git a/misc/futures-bounded/src/stream_set.rs b/misc/futures-bounded/src/stream_set.rs index 332a09767df..4fcb649fd49 100644 --- a/misc/futures-bounded/src/stream_set.rs +++ b/misc/futures-bounded/src/stream_set.rs @@ -5,7 +5,7 @@ use std::time::Duration; use crate::{PushError, StreamMap, Timeout}; -/// Represents a list of [Stream]s. +/// Represents a set of [Stream]s. /// /// Each stream must finish within the specified time and the list never outgrows its capacity. pub struct StreamSet {