Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(futures-bounded): add support for streams #4616

Merged
merged 21 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ resolver = "2"
rust-version = "1.73.0"

[workspace.dependencies]
futures-bounded = { version = "0.1.0", path = "misc/futures-bounded" }
futures-bounded = { version = "0.2.0", path = "misc/futures-bounded" }
libp2p = { version = "0.53.0", path = "libp2p" }
libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" }
Expand Down
5 changes: 5 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.0

- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`.
See [PR 4616](https://github.com/libp2p/rust-lib2pp/pulls/4616).

## 0.1.0

Initial release.
2 changes: 1 addition & 1 deletion misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-bounded"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
rust-version.workspace = true
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};

use crate::Timeout;
use crate::{PushError, Timeout};

/// Represents a map of [`Future`]s.
///
Expand All @@ -23,15 +23,6 @@ pub struct FuturesMap<ID, O> {
full_waker: Option<Waker>,
}

/// Error of a future pushing
#[derive(PartialEq, Debug)]
pub enum PushError<F> {
/// The length of the set is equal to the capacity
BeyondCapacity(F),
/// The set already contains the given future's ID
ReplacedFuture(F),
}

impl<ID, O> FuturesMap<ID, O> {
pub fn new(timeout: Duration, capacity: usize) -> Self {
Self {
Expand All @@ -54,7 +45,7 @@ where
/// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity],
/// that contains the passed future. In that case, the future is not inserted to the map.
/// If a future with the given `future_id` already exists, then the old future will be replaced by a new one.
/// In that case, the returned error [PushError::ReplacedFuture] contains the old future.
/// In that case, the returned error [PushError::Replaced] contains the old future.
pub fn try_push<F>(&mut self, future_id: ID, future: F) -> Result<(), PushError<BoxFuture<O>>>
where
F: Future<Output = O> + Send + 'static,
Expand Down Expand Up @@ -88,7 +79,7 @@ where
},
);

Err(PushError::ReplacedFuture(old_future.inner))
Err(PushError::Replaced(old_future.inner))
}
}
}
Expand Down Expand Up @@ -187,7 +178,7 @@ mod tests {
assert!(futures.try_push("ID", ready(())).is_ok());
matches!(
futures.try_push("ID", ready(())),
Err(PushError::ReplacedFuture(_))
Err(PushError::Replaced(_))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<O> FuturesSet<O> {
match self.inner.try_push(self.id, future) {
Ok(()) => Ok(()),
Err(PushError::BeyondCapacity(w)) => Err(w),
Err(PushError::ReplacedFuture(_)) => unreachable!("we never reuse IDs"),
Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
}
}

Expand Down
24 changes: 20 additions & 4 deletions misc/futures-bounded/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
mod map;
mod set;
mod futures_map;
mod futures_set;
mod stream_map;
mod stream_set;

pub use futures_map::FuturesMap;
pub use futures_set::FuturesSet;
pub use stream_map::StreamMap;
pub use stream_set::StreamSet;

pub use map::{FuturesMap, PushError};
pub use set::FuturesSet;
use std::fmt;
use std::fmt::Formatter;
use std::time::Duration;
Expand All @@ -25,4 +30,15 @@ impl fmt::Display for Timeout {
}
}

/// Error of a future pushing
#[derive(PartialEq, Debug)]
pub enum PushError<T> {
/// The length of the set is equal to the capacity
BeyondCapacity(T),
/// The map already contained an item with this key.
///
/// The old item is returned.
Replaced(T),
}

impl std::error::Error for Timeout {}
Loading