From d1dbee771e8797e364dccbd4cb6bbf6ae3977908 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Mar 2024 08:42:32 +0000 Subject: [PATCH 1/6] deps: bump rust-embed from 8.2.0 to 8.3.0 Pull-Request: #5199. --- Cargo.lock | 12 ++++++------ examples/browser-webrtc/Cargo.toml | 2 +- interop-tests/Cargo.toml | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b2b90a3a005..507b73c9cfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4997,9 +4997,9 @@ dependencies = [ [[package]] name = "rust-embed" -version = "8.2.0" +version = "8.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82c0bbc10308ed323529fd3c1dce8badda635aa319a5ff0e6466f33b8101e3f" +checksum = "fb78f46d0066053d16d4ca7b898e9343bc3530f71c61d5ad84cd404ada068745" dependencies = [ "rust-embed-impl", "rust-embed-utils", @@ -5008,9 +5008,9 @@ dependencies = [ [[package]] name = "rust-embed-impl" -version = "8.2.0" +version = "8.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6227c01b1783cdfee1bcf844eb44594cd16ec71c35305bf1c9fb5aade2735e16" +checksum = "b91ac2a3c6c0520a3fb3dd89321177c3c692937c4eb21893378219da10c44fc8" dependencies = [ "proc-macro2", "quote", @@ -5022,9 +5022,9 @@ dependencies = [ [[package]] name = "rust-embed-utils" -version = "8.2.0" +version = "8.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cb0a25bfbb2d4b4402179c2cf030387d9990857ce08a32592c6238db9fa8665" +checksum = "86f69089032567ffff4eada41c573fc43ff466c7db7c5688b2e7969584345581" dependencies = [ "globset", "sha2 0.10.8", diff --git a/examples/browser-webrtc/Cargo.toml b/examples/browser-webrtc/Cargo.toml index 5e3c8a0c666..a01ebe71e5f 100644 --- a/examples/browser-webrtc/Cargo.toml +++ b/examples/browser-webrtc/Cargo.toml @@ -26,7 +26,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } axum = "0.7.4" libp2p = { path = "../../libp2p", features = [ "ed25519", "macros", "ping", "tokio"] } libp2p-webrtc = { workspace = true, features = ["tokio"] } -rust-embed = { version = "8.2.0", features = ["include-exclude", "interpolate-folder-path"] } +rust-embed = { version = "8.3.0", features = ["include-exclude", "interpolate-folder-path"] } tokio = { version = "1.36", features = ["macros", "net", "rt", "signal"] } tokio-util = { version = "0.7", features = ["compat"] } tower = "0.4" diff --git a/interop-tests/Cargo.toml b/interop-tests/Cargo.toml index b9ecc031f5a..803225f7bb6 100644 --- a/interop-tests/Cargo.toml +++ b/interop-tests/Cargo.toml @@ -31,7 +31,7 @@ mime_guess = "2.0" redis = { version = "0.23.3", default-features = false, features = [ "tokio-comp", ] } -rust-embed = "8.2" +rust-embed = "8.3" serde_json = "1" thirtyfour = "=0.32.0-rc.8" # https://github.com/stevepryde/thirtyfour/issues/169 tokio = { version = "1.36.0", features = ["full"] } From a3d4b375f73ac9359e817f9f9d0a01ec65e1cf2f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Mar 2024 10:24:26 +0000 Subject: [PATCH 2/6] deps: bump serde from 1.0.196 to 1.0.197 Pull-Request: #5189. --- Cargo.lock | 8 ++++---- hole-punching-tests/Cargo.toml | 2 +- misc/keygen/Cargo.toml | 2 +- misc/server/Cargo.toml | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 507b73c9cfb..006a4108b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5286,18 +5286,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", diff --git a/hole-punching-tests/Cargo.toml b/hole-punching-tests/Cargo.toml index 2059f4b6258..c8e6cde98e0 100644 --- a/hole-punching-tests/Cargo.toml +++ b/hole-punching-tests/Cargo.toml @@ -13,6 +13,6 @@ libp2p = { path = "../libp2p", features = ["tokio", "dcutr", "identify", "macros tracing = "0.1.37" redis = { version = "0.23.0", default-features = false, features = ["tokio-comp"] } tokio = { version = "1.36.0", features = ["full"] } -serde = { version = "1.0.196", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" either = "1.9.0" diff --git a/misc/keygen/Cargo.toml b/misc/keygen/Cargo.toml index 96a5fac069b..9fe9e926d76 100644 --- a/misc/keygen/Cargo.toml +++ b/misc/keygen/Cargo.toml @@ -15,7 +15,7 @@ release = false [dependencies] clap = { version = "4.4.16", features = ["derive"] } zeroize = "1" -serde = { version = "1.0.196", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" libp2p-core = { workspace = true } base64 = "0.21.7" diff --git a/misc/server/Cargo.toml b/misc/server/Cargo.toml index 9653ddc5d76..af71a57561a 100644 --- a/misc/server/Cargo.toml +++ b/misc/server/Cargo.toml @@ -18,7 +18,7 @@ futures-timer = "3" hyper = { version = "0.14", features = ["server", "tcp", "http1"] } libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] } prometheus-client = { workspace = true } -serde = "1.0.196" +serde = "1.0.197" serde_derive = "1.0.125" serde_json = "1.0" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } From 46ec6c416a9b06c089236a8d6d44e55d0c3f7edd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 4 Mar 2024 13:15:03 +0100 Subject: [PATCH 3/6] fix(mergify): disable approve rules (#5208) --- .github/mergify.yml | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/.github/mergify.yml b/.github/mergify.yml index c630d23cea0..b464c9aba87 100644 --- a/.github/mergify.yml +++ b/.github/mergify.yml @@ -52,22 +52,22 @@ pull_request_rules: message: Approvals have been dismissed because the PR was updated after the `send-it` label was applied. changes_requested: false - - name: Approve trivial maintainer PRs - conditions: - - base=master - - label=trivial - - author=@libp2p/rust-libp2p-maintainers - actions: - review: + # - name: Approve trivial maintainer PRs + # conditions: + # - base=master + # - label=trivial + # - author=@libp2p/rust-libp2p-maintainers + # actions: + # review: - - name: Approve dependabot PRs of semver-compatible updates - conditions: - - author=dependabot[bot] - - or: - - title~=bump [^\s]+ from ([1-9]+)\..+ to \1\. # For major >= 1 versions, only approve updates with the same major version. - - title~=bump [^\s]+ from 0\.([\d]+)\..+ to 0\.\1\. # For major == 0 versions, only approve updates with the same minor version. - actions: - review: + # - name: Approve dependabot PRs of semver-compatible updates + # conditions: + # - author=dependabot[bot] + # - or: + # - title~=bump [^\s]+ from ([1-9]+)\..+ to \1\. # For major >= 1 versions, only approve updates with the same major version. + # - title~=bump [^\s]+ from 0\.([\d]+)\..+ to 0\.\1\. # For major == 0 versions, only approve updates with the same minor version. + # actions: + # review: queue_rules: - name: default From 75f6ec78827a9475543f792b8693c3b784d43e57 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Mar 2024 22:16:33 +0000 Subject: [PATCH 4/6] deps: bump tempfile from 3.10.0 to 3.10.1 Pull-Request: #5201. --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 006a4108b77..2b846c16a4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5716,9 +5716,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.0" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", "fastrand 2.0.1", From 245430ab19586372291a82c4dd7e3b15d658ac76 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 5 Mar 2024 06:19:25 +0000 Subject: [PATCH 5/6] deps: bump thirtyfour from 0.32.0-rc.8 to 0.32.0-rc.10 Pull-Request: #5206. --- Cargo.lock | 166 ++++++++++++--------------------------- interop-tests/Cargo.toml | 2 +- 2 files changed, 50 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b846c16a4d..85cb63ed083 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -984,17 +984,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "795bc6e66a8e340f075fcf6227e417a2dc976b92b91f3cdc778bb858778b6747" -[[package]] -name = "cookie" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" -dependencies = [ - "percent-encoding", - "time", - "version_check", -] - [[package]] name = "core-foundation" version = "0.9.3" @@ -1498,28 +1487,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "fantoccini" -version = "0.20.0-rc.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5eb32b0001134a1d3b9e16010eb4b119451edf68446963a30a8130a0d056e98" -dependencies = [ - "base64 0.13.1", - "cookie", - "futures-core", - "futures-util", - "http 0.2.9", - "hyper 0.14.27", - "hyper-rustls", - "mime", - "serde", - "serde_json", - "time", - "tokio", - "url", - "webdriver", -] - [[package]] name = "fastrand" version = "1.9.0" @@ -1697,7 +1664,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" dependencies = [ "futures-io", - "rustls 0.21.9", + "rustls", ] [[package]] @@ -2183,15 +2150,14 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.23.2" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ + "futures-util", "http 0.2.9", "hyper 0.14.27", - "log", - "rustls 0.20.8", - "rustls-native-certs", + "rustls", "tokio", "tokio-rustls", ] @@ -3118,7 +3084,7 @@ dependencies = [ "quinn", "rand 0.8.5", "ring 0.16.20", - "rustls 0.21.9", + "rustls", "socket2 0.5.6", "thiserror", "tokio", @@ -3343,7 +3309,7 @@ dependencies = [ "libp2p-yamux", "rcgen", "ring 0.16.20", - "rustls 0.21.9", + "rustls", "rustls-webpki", "thiserror", "tokio", @@ -4566,7 +4532,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.21.9", + "rustls", "thiserror", "tokio", "tracing", @@ -4582,7 +4548,7 @@ dependencies = [ "rand 0.8.5", "ring 0.16.20", "rustc-hash", - "rustls 0.21.9", + "rustls", "slab", "thiserror", "tinyvec", @@ -4850,6 +4816,7 @@ dependencies = [ "http 0.2.9", "http-body 0.4.5", "hyper 0.14.27", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -4859,6 +4826,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls", "rustls-pemfile", "serde", "serde_json", @@ -4867,11 +4835,13 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "winreg", ] @@ -5088,18 +5058,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.20.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.9" @@ -5112,18 +5070,6 @@ dependencies = [ "sct", ] -[[package]] -name = "rustls-native-certs" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" -dependencies = [ - "openssl-probe", - "rustls-pemfile", - "schannel", - "security-framework", -] - [[package]] name = "rustls-pemfile" version = "1.0.3" @@ -5328,9 +5274,9 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.16" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8725e1dfadb3a50f7e5ce0b1a540466f6ed3fe7a0fca2ac2b8b831d31316bd00" +checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb" dependencies = [ "proc-macro2", "quote", @@ -5604,6 +5550,28 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.51", +] + [[package]] name = "stun" version = "0.5.1" @@ -5737,27 +5705,27 @@ dependencies = [ [[package]] name = "thirtyfour" -version = "0.32.0-rc.8" +version = "0.32.0-rc.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf0fe180d5f1f7dd32bb5f1a8d19231bb63dc9bbb1985e1dbb6f07163b6a8578" +checksum = "bcc8d557d4ac49c0d0bf13722bf7414506ef50b28ac40894400dcd0b8ed25c4d" dependencies = [ "async-trait", "base64 0.21.7", - "cookie", - "fantoccini", "futures", - "http 0.2.9", - "indexmap 1.9.3", - "log", + "http 1.0.0", + "indexmap 2.2.1", "parking_lot", "paste", + "reqwest", "serde", "serde_json", "serde_repr", "stringmatch", + "strum", "thirtyfour-macros", "thiserror", "tokio", + "tracing", "url", ] @@ -5907,13 +5875,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.4" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.20.8", + "rustls", "tokio", - "webpki", ] [[package]] @@ -6215,12 +6182,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" - [[package]] name = "unicode-xid" version = "0.2.4" @@ -6495,35 +6456,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webdriver" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9973cb72c8587d5ad5efdb91e663d36177dc37725e6c90ca86c626b0cc45c93f" -dependencies = [ - "base64 0.13.1", - "bytes", - "cookie", - "http 0.2.9", - "log", - "serde", - "serde_derive", - "serde_json", - "time", - "unicode-segmentation", - "url", -] - -[[package]] -name = "webpki" -version = "0.22.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f" -dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", -] - [[package]] name = "webpki-roots" version = "0.25.2" @@ -6551,7 +6483,7 @@ dependencies = [ "ring 0.16.20", "rtcp", "rtp", - "rustls 0.21.9", + "rustls", "sdp", "serde", "serde_json", @@ -6612,7 +6544,7 @@ dependencies = [ "rand_core 0.6.4", "rcgen", "ring 0.16.20", - "rustls 0.21.9", + "rustls", "sec1", "serde", "sha1", diff --git a/interop-tests/Cargo.toml b/interop-tests/Cargo.toml index 803225f7bb6..410d3dbce2b 100644 --- a/interop-tests/Cargo.toml +++ b/interop-tests/Cargo.toml @@ -33,7 +33,7 @@ redis = { version = "0.23.3", default-features = false, features = [ ] } rust-embed = "8.3" serde_json = "1" -thirtyfour = "=0.32.0-rc.8" # https://github.com/stevepryde/thirtyfour/issues/169 +thirtyfour = "=0.32.0-rc.10" # https://github.com/stevepryde/thirtyfour/issues/169 tokio = { version = "1.36.0", features = ["full"] } tower-http = { version = "0.5", features = ["cors", "fs", "trace"] } tracing = "0.1.37" From 6f9c9e5bedce42f53ab24880c7a827ed5adac81b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 6 Mar 2024 02:31:22 +1100 Subject: [PATCH 6/6] chore: move `futures-bounded` out of mono-repo I've moved this library to https://github.com/thomaseizinger/rust-futures-bounded to make maintenance easier. Any PRs / fixes etc are of course still welcome. Pull-Request: #5210. --- Cargo.lock | 4 +- Cargo.toml | 3 +- misc/futures-bounded/CHANGELOG.md | 23 -- misc/futures-bounded/Cargo.toml | 24 -- misc/futures-bounded/src/futures_map.rs | 319 --------------- misc/futures-bounded/src/futures_set.rs | 65 ---- misc/futures-bounded/src/futures_tuple_set.rs | 94 ----- misc/futures-bounded/src/lib.rs | 46 --- misc/futures-bounded/src/stream_map.rs | 362 ------------------ misc/futures-bounded/src/stream_set.rs | 64 ---- 10 files changed, 3 insertions(+), 1001 deletions(-) delete mode 100644 misc/futures-bounded/CHANGELOG.md delete mode 100644 misc/futures-bounded/Cargo.toml delete mode 100644 misc/futures-bounded/src/futures_map.rs delete mode 100644 misc/futures-bounded/src/futures_set.rs delete mode 100644 misc/futures-bounded/src/futures_tuple_set.rs delete mode 100644 misc/futures-bounded/src/lib.rs delete mode 100644 misc/futures-bounded/src/stream_map.rs delete mode 100644 misc/futures-bounded/src/stream_set.rs diff --git a/Cargo.lock b/Cargo.lock index 85cb63ed083..f662e0c9fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,11 +1580,11 @@ dependencies = [ [[package]] name = "futures-bounded" version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e2774cc104e198ef3d3e1ff4ab40f86fa3245d6cb6a3a46174f21463cee173" dependencies = [ - "futures", "futures-timer", "futures-util", - "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 23fd4774b55..556919dcae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ members = [ "interop-tests", "misc/allow-block-list", "misc/connection-limits", - "misc/futures-bounded", "misc/keygen", "misc/memory-connection-limits", "misc/metrics", @@ -73,7 +72,7 @@ rust-version = "1.73.0" [workspace.dependencies] asynchronous-codec = { version = "0.7.0" } -futures-bounded = { version = "0.2.3", path = "misc/futures-bounded" } +futures-bounded = { version = "0.2.3" } libp2p = { version = "0.53.2", path = "libp2p" } libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" } libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" } diff --git a/misc/futures-bounded/CHANGELOG.md b/misc/futures-bounded/CHANGELOG.md deleted file mode 100644 index 72b0b4f457d..00000000000 --- a/misc/futures-bounded/CHANGELOG.md +++ /dev/null @@ -1,23 +0,0 @@ -## 0.2.3 - -- Introduce `FuturesTupleSet`, holding tuples of a `Future` together with an arbitrary piece of data. - See [PR 4841](https://github.com/libp2p/rust-libp2p/pull/4841). - -## 0.2.2 - -- Fix an issue where `{Futures,Stream}Map` returns `Poll::Pending` despite being ready after an item has been replaced as part of `try_push`. - See [PR 4865](https://github.com/libp2p/rust-libp2p/pull/4865). - -## 0.2.1 - -- Add `.len()` getter to `FuturesMap`, `FuturesSet`, `StreamMap` and `StreamSet`. - See [PR 4745](https://github.com/libp2p/rust-libp2p/pull/4745). - -## 0.2.0 - -- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`. - See [PR 4616](https://github.com/libp2p/rust-libp2p/pull/4616). - -## 0.1.0 - -Initial release. diff --git a/misc/futures-bounded/Cargo.toml b/misc/futures-bounded/Cargo.toml deleted file mode 100644 index 1d4340df74f..00000000000 --- a/misc/futures-bounded/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "futures-bounded" -version = "0.2.3" -edition = "2021" -rust-version.workspace = true -license = "MIT" -repository = "https://github.com/libp2p/rust-libp2p" -keywords = ["futures", "async", "backpressure"] -categories = ["data-structures", "asynchronous"] -description = "Utilities for bounding futures in size and time." -publish = true - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -futures-util = { version = "0.3.30" } -futures-timer = "3.0.3" - -[dev-dependencies] -tokio = { version = "1.36.0", features = ["macros", "rt", "sync"] } -futures = "0.3.30" - -[lints] -workspace = true diff --git a/misc/futures-bounded/src/futures_map.rs b/misc/futures-bounded/src/futures_map.rs deleted file mode 100644 index fba3543f67b..00000000000 --- a/misc/futures-bounded/src/futures_map.rs +++ /dev/null @@ -1,319 +0,0 @@ -use std::future::Future; -use std::hash::Hash; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; -use std::time::Duration; -use std::{future, mem}; - -use futures_timer::Delay; -use futures_util::future::BoxFuture; -use futures_util::stream::FuturesUnordered; -use futures_util::{FutureExt, StreamExt}; - -use crate::{PushError, Timeout}; - -/// Represents a map of [`Future`]s. -/// -/// Each future must finish within the specified time and the map never outgrows its capacity. -pub struct FuturesMap { - timeout: Duration, - capacity: usize, - inner: FuturesUnordered>>>, - empty_waker: Option, - full_waker: Option, -} - -impl FuturesMap { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - timeout, - capacity, - inner: Default::default(), - empty_waker: None, - full_waker: None, - } - } -} - -impl FuturesMap -where - ID: Clone + Hash + Eq + Send + Unpin + 'static, - O: 'static, -{ - /// Push a future into the map. - /// - /// This method inserts the given future with defined `future_id` to the set. - /// If the length of the map is equal to the capacity, this method returns [PushError::BeyondCapacity], - /// that contains the passed future. In that case, the future is not inserted to the map. - /// If a future with the given `future_id` already exists, then the old future will be replaced by a new one. - /// In that case, the returned error [PushError::Replaced] contains the old future. - pub fn try_push(&mut self, future_id: ID, future: F) -> Result<(), PushError>> - where - F: Future + Send + 'static, - { - if self.inner.len() >= self.capacity { - return Err(PushError::BeyondCapacity(future.boxed())); - } - - if let Some(waker) = self.empty_waker.take() { - waker.wake(); - } - - let old = self.remove(future_id.clone()); - self.inner.push(TaggedFuture { - tag: future_id, - inner: TimeoutFuture { - inner: future.boxed(), - timeout: Delay::new(self.timeout), - cancelled: false, - }, - }); - match old { - None => Ok(()), - Some(old) => Err(PushError::Replaced(old)), - } - } - - pub fn remove(&mut self, id: ID) -> Option> { - let tagged = self.inner.iter_mut().find(|s| s.tag == id)?; - - let inner = mem::replace(&mut tagged.inner.inner, future::pending().boxed()); - tagged.inner.cancelled = true; - - Some(inner) - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic. - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - if self.inner.len() < self.capacity { - return Poll::Ready(()); - } - - self.full_waker = Some(cx.waker().clone()); - - Poll::Pending - } - - pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(ID, Result)> { - loop { - let maybe_result = futures_util::ready!(self.inner.poll_next_unpin(cx)); - - match maybe_result { - None => { - self.empty_waker = Some(cx.waker().clone()); - return Poll::Pending; - } - Some((id, Ok(output))) => return Poll::Ready((id, Ok(output))), - Some((id, Err(TimeoutError::Timeout))) => { - return Poll::Ready((id, Err(Timeout::new(self.timeout)))) - } - Some((_, Err(TimeoutError::Cancelled))) => continue, - } - } - } -} - -struct TimeoutFuture { - inner: F, - timeout: Delay, - - cancelled: bool, -} - -impl Future for TimeoutFuture -where - F: Future + Unpin, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.cancelled { - return Poll::Ready(Err(TimeoutError::Cancelled)); - } - - if self.timeout.poll_unpin(cx).is_ready() { - return Poll::Ready(Err(TimeoutError::Timeout)); - } - - self.inner.poll_unpin(cx).map(Ok) - } -} - -enum TimeoutError { - Timeout, - Cancelled, -} - -struct TaggedFuture { - tag: T, - inner: F, -} - -impl Future for TaggedFuture -where - T: Clone + Unpin, - F: Future + Unpin, -{ - type Output = (T, F::Output); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let output = futures_util::ready!(self.inner.poll_unpin(cx)); - - Poll::Ready((self.tag.clone(), output)) - } -} - -#[cfg(test)] -mod tests { - use futures::channel::oneshot; - use futures_util::task::noop_waker_ref; - use std::future::{pending, poll_fn, ready}; - use std::pin::Pin; - use std::time::Instant; - - use super::*; - - #[test] - fn cannot_push_more_than_capacity_tasks() { - let mut futures = FuturesMap::new(Duration::from_secs(10), 1); - - assert!(futures.try_push("ID_1", ready(())).is_ok()); - matches!( - futures.try_push("ID_2", ready(())), - Err(PushError::BeyondCapacity(_)) - ); - } - - #[test] - fn cannot_push_the_same_id_few_times() { - let mut futures = FuturesMap::new(Duration::from_secs(10), 5); - - assert!(futures.try_push("ID", ready(())).is_ok()); - matches!( - futures.try_push("ID", ready(())), - Err(PushError::Replaced(_)) - ); - } - - #[tokio::test] - async fn futures_timeout() { - let mut futures = FuturesMap::new(Duration::from_millis(100), 1); - - let _ = futures.try_push("ID", pending::<()>()); - Delay::new(Duration::from_millis(150)).await; - let (_, result) = poll_fn(|cx| futures.poll_unpin(cx)).await; - - assert!(result.is_err()) - } - - #[test] - fn resources_of_removed_future_are_cleaned_up() { - let mut futures = FuturesMap::new(Duration::from_millis(100), 1); - - let _ = futures.try_push("ID", pending::<()>()); - futures.remove("ID"); - - let poll = futures.poll_unpin(&mut Context::from_waker(noop_waker_ref())); - assert!(poll.is_pending()); - - assert_eq!(futures.len(), 0); - } - - #[tokio::test] - async fn replaced_pending_future_is_polled() { - let mut streams = FuturesMap::new(Duration::from_millis(100), 3); - - let (_tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - - let _ = streams.try_push("ID1", rx1); - let _ = streams.try_push("ID2", rx2); - - let _ = tx2.send(2); - let (id, res) = poll_fn(|cx| streams.poll_unpin(cx)).await; - assert_eq!(id, "ID2"); - assert_eq!(res.unwrap().unwrap(), 2); - - let (new_tx1, new_rx1) = oneshot::channel(); - let replaced = streams.try_push("ID1", new_rx1); - assert!(matches!(replaced.unwrap_err(), PushError::Replaced(_))); - - let _ = new_tx1.send(4); - let (id, res) = poll_fn(|cx| streams.poll_unpin(cx)).await; - - assert_eq!(id, "ID1"); - assert_eq!(res.unwrap().unwrap(), 4); - } - - // Each future causes a delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. - // We stop after NUM_FUTURES tasks, meaning the overall execution must at least take DELAY * NUM_FUTURES. - #[tokio::test] - async fn backpressure() { - const DELAY: Duration = Duration::from_millis(100); - const NUM_FUTURES: u32 = 10; - - let start = Instant::now(); - Task::new(DELAY, NUM_FUTURES, 1).await; - let duration = start.elapsed(); - - assert!(duration >= DELAY * NUM_FUTURES); - } - - struct Task { - future: Duration, - num_futures: usize, - num_processed: usize, - inner: FuturesMap, - } - - impl Task { - fn new(future: Duration, num_futures: u32, capacity: usize) -> Self { - Self { - future, - num_futures: num_futures as usize, - num_processed: 0, - inner: FuturesMap::new(Duration::from_secs(60), capacity), - } - } - } - - impl Future for Task { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while this.num_processed < this.num_futures { - if let Poll::Ready((_, result)) = this.inner.poll_unpin(cx) { - if result.is_err() { - panic!("Timeout is great than future delay") - } - - this.num_processed += 1; - continue; - } - - if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { - // We push the constant future's ID to prove that user can use the same ID - // if the future was finished - let maybe_future = this.inner.try_push(1u8, Delay::new(this.future)); - assert!(maybe_future.is_ok(), "we polled for readiness"); - - continue; - } - - return Poll::Pending; - } - - Poll::Ready(()) - } - } -} diff --git a/misc/futures-bounded/src/futures_set.rs b/misc/futures-bounded/src/futures_set.rs deleted file mode 100644 index af7cedfcc85..00000000000 --- a/misc/futures-bounded/src/futures_set.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::future::Future; -use std::task::{ready, Context, Poll}; -use std::time::Duration; - -use futures_util::future::BoxFuture; - -use crate::{FuturesMap, PushError, Timeout}; - -/// Represents a list of [Future]s. -/// -/// Each future must finish within the specified time and the list never outgrows its capacity. -pub struct FuturesSet { - id: u32, - inner: FuturesMap, -} - -impl FuturesSet { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - id: 0, - inner: FuturesMap::new(timeout, capacity), - } - } -} - -impl FuturesSet -where - O: 'static, -{ - /// Push a future into the list. - /// - /// This method adds the given future to the list. - /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future. - /// In that case, the future is not added to the set. - pub fn try_push(&mut self, future: F) -> Result<(), BoxFuture> - where - F: Future + Send + 'static, - { - self.id = self.id.wrapping_add(1); - - match self.inner.try_push(self.id, future) { - Ok(()) => Ok(()), - Err(PushError::BeyondCapacity(w)) => Err(w), - Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), - } - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.poll_ready_unpin(cx) - } - - pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll> { - let (_, res) = ready!(self.inner.poll_unpin(cx)); - - Poll::Ready(res) - } -} diff --git a/misc/futures-bounded/src/futures_tuple_set.rs b/misc/futures-bounded/src/futures_tuple_set.rs deleted file mode 100644 index e19b236aaf8..00000000000 --- a/misc/futures-bounded/src/futures_tuple_set.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::task::{ready, Context, Poll}; -use std::time::Duration; - -use futures_util::future::BoxFuture; - -use crate::{FuturesMap, PushError, Timeout}; - -/// Represents a list of tuples of a [Future] and an associated piece of data. -/// -/// Each future must finish within the specified time and the list never outgrows its capacity. -pub struct FuturesTupleSet { - id: u32, - inner: FuturesMap, - data: HashMap, -} - -impl FuturesTupleSet { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - id: 0, - inner: FuturesMap::new(timeout, capacity), - data: HashMap::new(), - } - } -} - -impl FuturesTupleSet -where - O: 'static, -{ - /// Push a future into the list. - /// - /// This method adds the given future to the list. - /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future. - /// In that case, the future is not added to the set. - pub fn try_push(&mut self, future: F, data: D) -> Result<(), (BoxFuture, D)> - where - F: Future + Send + 'static, - { - self.id = self.id.wrapping_add(1); - - match self.inner.try_push(self.id, future) { - Ok(()) => {} - Err(PushError::BeyondCapacity(w)) => return Err((w, data)), - Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), - } - self.data.insert(self.id, data); - - Ok(()) - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.poll_ready_unpin(cx) - } - - pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(Result, D)> { - let (id, res) = ready!(self.inner.poll_unpin(cx)); - let data = self.data.remove(&id).expect("must have data for future"); - - Poll::Ready((res, data)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::poll_fn; - use futures_util::FutureExt; - use std::future::ready; - - #[test] - fn tracks_associated_data_of_future() { - let mut set = FuturesTupleSet::new(Duration::from_secs(10), 10); - - let _ = set.try_push(ready(1), 1); - let _ = set.try_push(ready(2), 2); - - let (res1, data1) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap(); - let (res2, data2) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap(); - - assert_eq!(res1.unwrap(), data1); - assert_eq!(res2.unwrap(), data2); - } -} diff --git a/misc/futures-bounded/src/lib.rs b/misc/futures-bounded/src/lib.rs deleted file mode 100644 index da8483a595f..00000000000 --- a/misc/futures-bounded/src/lib.rs +++ /dev/null @@ -1,46 +0,0 @@ -mod futures_map; -mod futures_set; -mod futures_tuple_set; -mod stream_map; -mod stream_set; - -pub use futures_map::FuturesMap; -pub use futures_set::FuturesSet; -pub use futures_tuple_set::FuturesTupleSet; -pub use stream_map::StreamMap; -pub use stream_set::StreamSet; - -use std::fmt; -use std::fmt::Formatter; -use std::time::Duration; - -/// A future failed to complete within the given timeout. -#[derive(Debug)] -pub struct Timeout { - limit: Duration, -} - -impl Timeout { - fn new(duration: Duration) -> Self { - Self { limit: duration } - } -} - -impl fmt::Display for Timeout { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "future failed to complete within {:?}", self.limit) - } -} - -/// Error of a future pushing -#[derive(PartialEq, Debug)] -pub enum PushError { - /// The length of the set is equal to the capacity - BeyondCapacity(T), - /// The map already contained an item with this key. - /// - /// The old item is returned. - Replaced(T), -} - -impl std::error::Error for Timeout {} diff --git a/misc/futures-bounded/src/stream_map.rs b/misc/futures-bounded/src/stream_map.rs deleted file mode 100644 index 8464f432d02..00000000000 --- a/misc/futures-bounded/src/stream_map.rs +++ /dev/null @@ -1,362 +0,0 @@ -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll, Waker}; -use std::time::Duration; - -use futures_timer::Delay; -use futures_util::stream::{BoxStream, SelectAll}; -use futures_util::{stream, FutureExt, Stream, StreamExt}; - -use crate::{PushError, Timeout}; - -/// Represents a map of [`Stream`]s. -/// -/// Each stream must finish within the specified time and the map never outgrows its capacity. -pub struct StreamMap { - timeout: Duration, - capacity: usize, - inner: SelectAll>>>, - empty_waker: Option, - full_waker: Option, -} - -impl StreamMap -where - ID: Clone + Unpin, -{ - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - timeout, - capacity, - inner: Default::default(), - empty_waker: None, - full_waker: None, - } - } -} - -impl StreamMap -where - ID: Clone + PartialEq + Send + Unpin + 'static, - O: Send + 'static, -{ - /// Push a stream into the map. - pub fn try_push(&mut self, id: ID, stream: F) -> Result<(), PushError>> - where - F: Stream + Send + 'static, - { - if self.inner.len() >= self.capacity { - return Err(PushError::BeyondCapacity(stream.boxed())); - } - - if let Some(waker) = self.empty_waker.take() { - waker.wake(); - } - - let old = self.remove(id.clone()); - self.inner.push(TaggedStream::new( - id, - TimeoutStream { - inner: stream.boxed(), - timeout: Delay::new(self.timeout), - }, - )); - - match old { - None => Ok(()), - Some(old) => Err(PushError::Replaced(old)), - } - } - - pub fn remove(&mut self, id: ID) -> Option> { - let tagged = self.inner.iter_mut().find(|s| s.key == id)?; - - let inner = mem::replace(&mut tagged.inner.inner, stream::pending().boxed()); - tagged.exhausted = true; // Setting this will emit `None` on the next poll and ensure `SelectAll` cleans up the resources. - - Some(inner) - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] // &mut Context is idiomatic. - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - if self.inner.len() < self.capacity { - return Poll::Ready(()); - } - - self.full_waker = Some(cx.waker().clone()); - - Poll::Pending - } - - pub fn poll_next_unpin( - &mut self, - cx: &mut Context<'_>, - ) -> Poll<(ID, Option>)> { - match futures_util::ready!(self.inner.poll_next_unpin(cx)) { - None => { - self.empty_waker = Some(cx.waker().clone()); - Poll::Pending - } - Some((id, Some(Ok(output)))) => Poll::Ready((id, Some(Ok(output)))), - Some((id, Some(Err(())))) => { - self.remove(id.clone()); // Remove stream, otherwise we keep reporting the timeout. - - Poll::Ready((id, Some(Err(Timeout::new(self.timeout))))) - } - Some((id, None)) => Poll::Ready((id, None)), - } - } -} - -struct TimeoutStream { - inner: S, - timeout: Delay, -} - -impl Stream for TimeoutStream -where - F: Stream + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.timeout.poll_unpin(cx).is_ready() { - return Poll::Ready(Some(Err(()))); - } - - self.inner.poll_next_unpin(cx).map(|a| a.map(Ok)) - } -} - -struct TaggedStream { - key: K, - inner: S, - - exhausted: bool, -} - -impl TaggedStream { - fn new(key: K, inner: S) -> Self { - Self { - key, - inner, - exhausted: false, - } - } -} - -impl Stream for TaggedStream -where - K: Clone + Unpin, - S: Stream + Unpin, -{ - type Item = (K, Option); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.exhausted { - return Poll::Ready(None); - } - - match futures_util::ready!(self.inner.poll_next_unpin(cx)) { - Some(item) => Poll::Ready(Some((self.key.clone(), Some(item)))), - None => { - self.exhausted = true; - - Poll::Ready(Some((self.key.clone(), None))) - } - } - } -} - -#[cfg(test)] -mod tests { - use futures::channel::mpsc; - use futures_util::stream::{once, pending}; - use futures_util::SinkExt; - use std::future::{poll_fn, ready, Future}; - use std::pin::Pin; - use std::time::Instant; - - use super::*; - - #[test] - fn cannot_push_more_than_capacity_tasks() { - let mut streams = StreamMap::new(Duration::from_secs(10), 1); - - assert!(streams.try_push("ID_1", once(ready(()))).is_ok()); - matches!( - streams.try_push("ID_2", once(ready(()))), - Err(PushError::BeyondCapacity(_)) - ); - } - - #[test] - fn cannot_push_the_same_id_few_times() { - let mut streams = StreamMap::new(Duration::from_secs(10), 5); - - assert!(streams.try_push("ID", once(ready(()))).is_ok()); - matches!( - streams.try_push("ID", once(ready(()))), - Err(PushError::Replaced(_)) - ); - } - - #[tokio::test] - async fn streams_timeout() { - let mut streams = StreamMap::new(Duration::from_millis(100), 1); - - let _ = streams.try_push("ID", pending::<()>()); - Delay::new(Duration::from_millis(150)).await; - let (_, result) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - - assert!(result.unwrap().is_err()) - } - - #[tokio::test] - async fn timed_out_stream_gets_removed() { - let mut streams = StreamMap::new(Duration::from_millis(100), 1); - - let _ = streams.try_push("ID", pending::<()>()); - Delay::new(Duration::from_millis(150)).await; - poll_fn(|cx| streams.poll_next_unpin(cx)).await; - - let poll = streams.poll_next_unpin(&mut Context::from_waker( - futures_util::task::noop_waker_ref(), - )); - assert!(poll.is_pending()) - } - - #[test] - fn removing_stream() { - let mut streams = StreamMap::new(Duration::from_millis(100), 1); - - let _ = streams.try_push("ID", stream::once(ready(()))); - - { - let cancelled_stream = streams.remove("ID"); - assert!(cancelled_stream.is_some()); - } - - let poll = streams.poll_next_unpin(&mut Context::from_waker( - futures_util::task::noop_waker_ref(), - )); - - assert!(poll.is_pending()); - assert_eq!( - streams.len(), - 0, - "resources of cancelled streams are cleaned up properly" - ); - } - - #[tokio::test] - async fn replaced_stream_is_still_registered() { - let mut streams = StreamMap::new(Duration::from_millis(100), 3); - - let (mut tx1, rx1) = mpsc::channel(5); - let (mut tx2, rx2) = mpsc::channel(5); - - let _ = streams.try_push("ID1", rx1); - let _ = streams.try_push("ID2", rx2); - - let _ = tx2.send(2).await; - let _ = tx1.send(1).await; - let _ = tx2.send(3).await; - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - assert_eq!(id, "ID1"); - assert_eq!(res.unwrap().unwrap(), 1); - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - assert_eq!(id, "ID2"); - assert_eq!(res.unwrap().unwrap(), 2); - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - assert_eq!(id, "ID2"); - assert_eq!(res.unwrap().unwrap(), 3); - - let (mut new_tx1, new_rx1) = mpsc::channel(5); - let replaced = streams.try_push("ID1", new_rx1); - assert!(matches!(replaced.unwrap_err(), PushError::Replaced(_))); - - let _ = new_tx1.send(4).await; - let (id, res) = poll_fn(|cx| streams.poll_next_unpin(cx)).await; - - assert_eq!(id, "ID1"); - assert_eq!(res.unwrap().unwrap(), 4); - } - - // Each stream emits 1 item with delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. - // We stop after NUM_STREAMS tasks, meaning the overall execution must at least take DELAY * NUM_STREAMS. - #[tokio::test] - async fn backpressure() { - const DELAY: Duration = Duration::from_millis(100); - const NUM_STREAMS: u32 = 10; - - let start = Instant::now(); - Task::new(DELAY, NUM_STREAMS, 1).await; - let duration = start.elapsed(); - - assert!(duration >= DELAY * NUM_STREAMS); - } - - struct Task { - item_delay: Duration, - num_streams: usize, - num_processed: usize, - inner: StreamMap, - } - - impl Task { - fn new(item_delay: Duration, num_streams: u32, capacity: usize) -> Self { - Self { - item_delay, - num_streams: num_streams as usize, - num_processed: 0, - inner: StreamMap::new(Duration::from_secs(60), capacity), - } - } - } - - impl Future for Task { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while this.num_processed < this.num_streams { - match this.inner.poll_next_unpin(cx) { - Poll::Ready((_, Some(result))) => { - if result.is_err() { - panic!("Timeout is great than item delay") - } - - this.num_processed += 1; - continue; - } - Poll::Ready((_, None)) => { - continue; - } - _ => {} - } - - if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { - // We push the constant ID to prove that user can use the same ID if the stream was finished - let maybe_future = this.inner.try_push(1u8, once(Delay::new(this.item_delay))); - assert!(maybe_future.is_ok(), "we polled for readiness"); - - continue; - } - - return Poll::Pending; - } - - Poll::Ready(()) - } - } -} diff --git a/misc/futures-bounded/src/stream_set.rs b/misc/futures-bounded/src/stream_set.rs deleted file mode 100644 index bb32835065f..00000000000 --- a/misc/futures-bounded/src/stream_set.rs +++ /dev/null @@ -1,64 +0,0 @@ -use futures_util::stream::BoxStream; -use futures_util::Stream; -use std::task::{ready, Context, Poll}; -use std::time::Duration; - -use crate::{PushError, StreamMap, Timeout}; - -/// Represents a set of [Stream]s. -/// -/// Each stream must finish within the specified time and the list never outgrows its capacity. -pub struct StreamSet { - id: u32, - inner: StreamMap, -} - -impl StreamSet { - pub fn new(timeout: Duration, capacity: usize) -> Self { - Self { - id: 0, - inner: StreamMap::new(timeout, capacity), - } - } -} - -impl StreamSet -where - O: Send + 'static, -{ - /// Push a stream into the list. - /// - /// This method adds the given stream to the list. - /// If the length of the list is equal to the capacity, this method returns a error that contains the passed stream. - /// In that case, the stream is not added to the set. - pub fn try_push(&mut self, stream: F) -> Result<(), BoxStream> - where - F: Stream + Send + 'static, - { - self.id = self.id.wrapping_add(1); - - match self.inner.try_push(self.id, stream) { - Ok(()) => Ok(()), - Err(PushError::BeyondCapacity(w)) => Err(w), - Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"), - } - } - - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { - self.inner.poll_ready_unpin(cx) - } - - pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll>> { - let (_, res) = ready!(self.inner.poll_next_unpin(cx)); - - Poll::Ready(res) - } -}