From 435e39001b04f5846d1e9db1b243e6b81982f654 Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Sun, 12 Jan 2025 03:33:07 -0800 Subject: [PATCH 01/10] sync: fix `sync::broadcast::Sender::closed()` doctest (#7090) The test's previous iteration could sometimes flake since we didn't await the completion of the first task. Since the tasks only existed to `move` the relevant `rx`'s in, to force a drop, we can omit them entirely and drop the `rx`s via `drop()`. This prevents any scheduling-related flakes. --- tokio/src/sync/broadcast.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index d4ebad7d684..e48925b497e 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -825,17 +825,14 @@ impl Sender { /// let (tx, mut rx1) = broadcast::channel::(16); /// let mut rx2 = tx.subscribe(); /// - /// tokio::spawn(async move { - /// assert_eq!(rx1.recv().await.unwrap(), 10); - /// }); - /// /// let _ = tx.send(10); - /// assert!(tx.closed().now_or_never().is_none()); /// - /// let _ = tokio::spawn(async move { - /// assert_eq!(rx2.recv().await.unwrap(), 10); - /// }).await; + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// drop(rx1); + /// assert!(tx.closed().now_or_never().is_none()); /// + /// assert_eq!(rx2.recv().await.unwrap(), 10); + /// drop(rx2); /// assert!(tx.closed().now_or_never().is_some()); /// } /// ``` From a82bdeebe9560d22a0179ae7ff8ce3986202e24d Mon Sep 17 00:00:00 2001 From: Motoyuki Kimura Date: Tue, 14 Jan 2025 02:36:51 +0900 Subject: [PATCH 02/10] sync: handle panic during mpsc drop (#7094) --- tokio/src/sync/mpsc/chan.rs | 28 ++++++++++++++++++++-- tokio/tests/sync_mpsc.rs | 46 +++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f4cedf0d4dd..1e6eaab1798 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -490,10 +490,34 @@ impl Drop for Rx { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; + struct Guard<'a, T, S: Semaphore> { + list: &'a mut list::Rx, + tx: &'a list::Tx, + sem: &'a S, + } + + impl<'a, T, S: Semaphore> Guard<'a, T, S> { + fn drain(&mut self) { + // call T's destructor. + while let Some(Value(_)) = self.list.pop(self.tx) { + self.sem.add_permit(); + } + } + } - while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { - self.inner.semaphore.add_permit(); + impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> { + fn drop(&mut self) { + self.drain(); + } } + + let mut guard = Guard { + list: &mut rx_fields.list, + tx: &self.inner.tx, + sem: &self.inner.semaphore, + }; + + guard.drain(); }); } } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 638ced588ce..577e9c35faa 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1454,4 +1454,50 @@ async fn test_is_empty_32_msgs() { } } +#[test] +#[cfg(not(panic = "abort"))] +fn drop_all_elements_during_panic() { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::Relaxed; + use tokio::sync::mpsc::UnboundedReceiver; + use tokio::sync::mpsc::UnboundedSender; + + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + struct A(bool); + impl Drop for A { + // cause a panic when inner value is `true`. + fn drop(&mut self) { + COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if self.0 { + panic!("panic!") + } + } + } + + fn func(tx: UnboundedSender, rx: UnboundedReceiver) { + tx.send(A(true)).unwrap(); + tx.send(A(false)).unwrap(); + tx.send(A(false)).unwrap(); + + drop(rx); + + // `mpsc::Rx`'s drop is called and gets panicked while dropping the first value, + // but will keep dropping following elements. + } + + let (tx, rx) = mpsc::unbounded_channel(); + + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + func(tx.clone(), rx); + })); + + // all A's destructor should be called at this point, even before `mpsc::Chan`'s + // drop gets called. + assert_eq!(COUNTER.load(Relaxed), 3); + + drop(tx); + // `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation. +} + fn is_debug(_: &T) {} From 21a13f9eea1723f6ca1b5e106776d0b1d5df80ea Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Tue, 21 Jan 2025 17:10:32 +0330 Subject: [PATCH 03/10] runtime: clean up magic number in registration set (#7112) --- tokio/src/runtime/io/registration_set.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/io/registration_set.rs b/tokio/src/runtime/io/registration_set.rs index 9b23732d96a..21f6c873fb1 100644 --- a/tokio/src/runtime/io/registration_set.rs +++ b/tokio/src/runtime/io/registration_set.rs @@ -7,6 +7,9 @@ use std::ptr::NonNull; use std::sync::atomic::Ordering::{Acquire, Release}; use std::sync::Arc; +// Kind of arbitrary, but buffering 16 `ScheduledIo`s doesn't seem like much +const NOTIFY_AFTER: usize = 16; + pub(super) struct RegistrationSet { num_pending_release: AtomicUsize, } @@ -35,7 +38,7 @@ impl RegistrationSet { let synced = Synced { is_shutdown: false, registrations: LinkedList::new(), - pending_release: Vec::with_capacity(16), + pending_release: Vec::with_capacity(NOTIFY_AFTER), }; (set, synced) @@ -69,9 +72,6 @@ impl RegistrationSet { // Returns `true` if the caller should unblock the I/O driver to purge // registrations pending release. pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc) -> bool { - // Kind of arbitrary, but buffering 16 `ScheduledIo`s doesn't seem like much - const NOTIFY_AFTER: usize = 16; - synced.pending_release.push(registration.clone()); let len = synced.pending_release.len(); From c081dfe3ce0a98a4f189f6e777bd74159f82b5f7 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Wed, 22 Jan 2025 01:55:00 -0800 Subject: [PATCH 04/10] macros: characterization tests for ? operator fail (#7069) When a `?` operator is used in a tokio entry point function (wrapped in `#[tokio::main]`), which has a Option or Result return type, but where the function does not actually return that type correctly, currently the compiler returns two errors instead of just one. The first of which is incorrect and only exists due to the macro expanding to an async block. ``` cannot use the `?` operator in an async block that returns `()` ``` This commit is a characterization test for this behavior to help show when it's fixed (or even changed for better / worse) --- .../tests/fail/macros_type_mismatch.rs | 34 ++++++++++ .../tests/fail/macros_type_mismatch.stderr | 65 +++++++++++++++++-- 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/tests-build/tests/fail/macros_type_mismatch.rs b/tests-build/tests/fail/macros_type_mismatch.rs index c292ee68f66..474b33d755e 100644 --- a/tests-build/tests/fail/macros_type_mismatch.rs +++ b/tests-build/tests/fail/macros_type_mismatch.rs @@ -23,6 +23,40 @@ async fn extra_semicolon() -> Result<(), ()> { Ok(()); } +/// This test is a characterization test for the `?` operator. +/// +/// See for more details. +/// +/// It should fail with a single error message about the return type of the function, but instead +/// if fails with an extra error message due to the `?` operator being used within the async block +/// rather than the original function. +/// +/// ```text +/// 28 | None?; +/// | ^ cannot use the `?` operator in an async block that returns `()` +/// ``` +#[tokio::main] +async fn question_mark_operator_with_invalid_option() -> Option<()> { + None?; +} + +/// This test is a characterization test for the `?` operator. +/// +/// See for more details. +/// +/// It should fail with a single error message about the return type of the function, but instead +/// if fails with an extra error message due to the `?` operator being used within the async block +/// rather than the original function. +/// +/// ```text +/// 33 | Ok(())?; +/// | ^ cannot use the `?` operator in an async block that returns `()` +/// ``` +#[tokio::main] +async fn question_mark_operator_with_invalid_result() -> Result<(), ()> { + Ok(())?; +} + // https://github.com/tokio-rs/tokio/issues/4635 #[allow(redundant_semicolons)] #[rustfmt::skip] diff --git a/tests-build/tests/fail/macros_type_mismatch.stderr b/tests-build/tests/fail/macros_type_mismatch.stderr index 579c241559b..201df9cdd05 100644 --- a/tests-build/tests/fail/macros_type_mismatch.stderr +++ b/tests-build/tests/fail/macros_type_mismatch.stderr @@ -49,11 +49,68 @@ help: try adding an expression at the end of the block 24 + Ok(()) | +error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `FromResidual`) + --> tests/fail/macros_type_mismatch.rs:40:9 + | +38 | #[tokio::main] + | -------------- this function should return `Result` or `Option` to accept `?` +39 | async fn question_mark_operator_with_invalid_option() -> Option<()> { +40 | None?; + | ^ cannot use the `?` operator in an async block that returns `()` + | + = help: the trait `FromResidual>` is not implemented for `()` + +error[E0308]: mismatched types + --> tests/fail/macros_type_mismatch.rs:40:5 + | +39 | async fn question_mark_operator_with_invalid_option() -> Option<()> { + | ---------- expected `Option<()>` because of return type +40 | None?; + | ^^^^^^ expected `Option<()>`, found `()` + | + = note: expected enum `Option<()>` + found unit type `()` +help: try adding an expression at the end of the block + | +40 ~ None?;; +41 + None + | +40 ~ None?;; +41 + Some(()) + | + +error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `FromResidual`) + --> tests/fail/macros_type_mismatch.rs:57:11 + | +55 | #[tokio::main] + | -------------- this function should return `Result` or `Option` to accept `?` +56 | async fn question_mark_operator_with_invalid_result() -> Result<(), ()> { +57 | Ok(())?; + | ^ cannot use the `?` operator in an async block that returns `()` + | + = help: the trait `FromResidual>` is not implemented for `()` + +error[E0308]: mismatched types + --> tests/fail/macros_type_mismatch.rs:57:5 + | +56 | async fn question_mark_operator_with_invalid_result() -> Result<(), ()> { + | -------------- expected `Result<(), ()>` because of return type +57 | Ok(())?; + | ^^^^^^^^ expected `Result<(), ()>`, found `()` + | + = note: expected enum `Result<(), ()>` + found unit type `()` +help: try adding an expression at the end of the block + | +57 ~ Ok(())?;; +58 + Ok(()) + | + error[E0308]: mismatched types - --> tests/fail/macros_type_mismatch.rs:32:5 + --> tests/fail/macros_type_mismatch.rs:66:5 | -30 | async fn issue_4635() { +64 | async fn issue_4635() { | - help: try adding a return type: `-> i32` -31 | return 1; -32 | ; +65 | return 1; +66 | ; | ^ expected `()`, found integer From ee19b0ed7371b069112b9c9ef9280b81f3438d26 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Wed, 22 Jan 2025 14:18:43 +0330 Subject: [PATCH 05/10] net: fix warnings when building the docs (#7113) --- tokio/src/doc/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tokio/src/doc/mod.rs b/tokio/src/doc/mod.rs index d0b37416711..fa941af1158 100644 --- a/tokio/src/doc/mod.rs +++ b/tokio/src/doc/mod.rs @@ -24,21 +24,21 @@ pub enum NotDefinedHere {} impl mio::event::Source for NotDefinedHere { fn register( &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, + _registry: &mio::Registry, + _token: mio::Token, + _interests: mio::Interest, ) -> std::io::Result<()> { Ok(()) } fn reregister( &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, + _registry: &mio::Registry, + _token: mio::Token, + _interests: mio::Interest, ) -> std::io::Result<()> { Ok(()) } - fn deregister(&mut self, registry: &mio::Registry) -> std::io::Result<()> { + fn deregister(&mut self, _registry: &mio::Registry) -> std::io::Result<()> { Ok(()) } } From fb7dec0e952d0d796b98650998c44a21d6775564 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 25 Jan 2025 19:17:37 +0900 Subject: [PATCH 06/10] ci: test AArch64/Armv7hf Linux on ubuntu-22.04-arm runner (#7123) --- .circleci/config.yml | 25 ------------------------- .github/workflows/ci.yml | 12 ++++++++++-- 2 files changed, 10 insertions(+), 27 deletions(-) delete mode 100644 .circleci/config.yml diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 4e3d5da8cb5..00000000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,25 +0,0 @@ -version: 2.1 -jobs: - test-arm: - machine: - image: default - resource_class: arm.medium - environment: - # Change to pin rust version - RUST_STABLE: stable - steps: - - checkout - - run: - name: Install Rust - command: | - curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs -o rustup.sh - chmod +x rustup.sh - ./rustup.sh -y --default-toolchain $RUST_STABLE - source "$HOME"/.cargo/env - # Only run Tokio tests - - run: cargo test --all-features -p tokio - -workflows: - ci: - jobs: - - test-arm diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5a3970cbbb9..a867a6e105f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -532,15 +532,19 @@ jobs: cross-test-with-parking_lot: needs: basics - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} strategy: matrix: include: - target: i686-unknown-linux-gnu + os: ubuntu-latest rustflags: --cfg tokio_taskdump - target: armv5te-unknown-linux-gnueabi + os: ubuntu-latest - target: armv7-unknown-linux-gnueabihf + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved - target: aarch64-unknown-linux-gnu + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved rustflags: --cfg tokio_taskdump steps: - uses: actions/checkout@v4 @@ -572,15 +576,19 @@ jobs: cross-test-without-parking_lot: needs: basics - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} strategy: matrix: include: - target: i686-unknown-linux-gnu + os: ubuntu-latest rustflags: --cfg tokio_taskdump - target: armv5te-unknown-linux-gnueabi + os: ubuntu-latest - target: armv7-unknown-linux-gnueabihf + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved - target: aarch64-unknown-linux-gnu + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved rustflags: --cfg tokio_taskdump steps: - uses: actions/checkout@v4 From 7f09959b0a13cbea2b0e5c20ec1b7ac3f208dbad Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 26 Jan 2025 01:46:21 +0900 Subject: [PATCH 07/10] chore: use [lints] to address unexpected_cfgs lint (#7124) --- .cirrus.yml | 2 +- .github/workflows/ci.yml | 21 ++------------------- Cargo.toml | 12 ++++++++++++ benches/Cargo.toml | 3 +++ examples/Cargo.toml | 3 +++ examples/dump.rs | 2 -- stress-test/Cargo.toml | 3 +++ tests-build/Cargo.toml | 3 +++ tests-integration/Cargo.toml | 3 +++ tokio-macros/Cargo.toml | 3 +++ tokio-macros/src/lib.rs | 5 ----- tokio-stream/Cargo.toml | 3 +++ tokio-stream/src/lib.rs | 1 - tokio-test/Cargo.toml | 3 +++ tokio-test/src/lib.rs | 1 - tokio-util/Cargo.toml | 3 +++ tokio-util/src/lib.rs | 1 - tokio-util/tests/task_join_map.rs | 1 - tokio/Cargo.toml | 3 +++ tokio/src/lib.rs | 1 - tokio/src/runtime/blocking/pool.rs | 2 +- tokio/tests/_require_full.rs | 2 -- tokio/tests/dump.rs | 1 - tokio/tests/macros_select.rs | 1 - tokio/tests/macros_test.rs | 1 - tokio/tests/rt_basic.rs | 1 - tokio/tests/rt_common.rs | 1 - tokio/tests/rt_handle.rs | 1 - tokio/tests/rt_local.rs | 1 - tokio/tests/rt_metrics.rs | 1 - tokio/tests/rt_threaded.rs | 1 - tokio/tests/rt_threaded_alt.rs | 1 - tokio/tests/rt_unstable_metrics.rs | 1 - tokio/tests/task_builder.rs | 1 - tokio/tests/task_hooks.rs | 1 - tokio/tests/task_id.rs | 1 - tokio/tests/task_join_set.rs | 1 - tokio/tests/task_local_set.rs | 1 - tokio/tests/task_yield_now.rs | 1 - tokio/tests/tracing_sync.rs | 1 - tokio/tests/tracing_task.rs | 1 - tokio/tests/tracing_time.rs | 1 - 42 files changed, 46 insertions(+), 55 deletions(-) diff --git a/.cirrus.yml b/.cirrus.yml index a7ce0d9d456..6a4e7b8e5af 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -4,7 +4,7 @@ freebsd_instance: image_family: freebsd-14-1 env: RUST_STABLE: stable - RUST_NIGHTLY: nightly-2024-05-05 + RUST_NIGHTLY: nightly-2025-01-25 RUSTFLAGS: -D warnings # Test FreeBSD in a full VM on cirrus-ci.com. Test the i686 target too, in the diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a867a6e105f..f7e9102d7ec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,9 +16,9 @@ env: RUSTUP_WINDOWS_PATH_ADD_BIN: 1 # Change to specific Rust release to pin rust_stable: stable - rust_nightly: nightly-2024-05-05 + rust_nightly: nightly-2025-01-25 # Pin a specific miri version - rust_miri_nightly: nightly-2024-10-21 + rust_miri_nightly: nightly-2025-01-25 rust_clippy: '1.77' # When updating this, also update: # - README.md @@ -1086,23 +1086,6 @@ jobs: run: cargo check-external-types --all-features working-directory: tokio - check-unexpected-lints-cfgs: - name: check unexpected lints and cfgs - needs: basics - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Install Rust ${{ env.rust_nightly }} - uses: dtolnay/rust-toolchain@master - with: - toolchain: ${{ env.rust_nightly }} - - name: don't allow warnings - run: sed -i '/#!\[allow(unknown_lints, unexpected_cfgs)\]/d' */src/lib.rs */tests/*.rs - - name: check for unknown lints and cfgs - run: cargo check --all-features --tests - env: - RUSTFLAGS: -Dwarnings --check-cfg=cfg(loom,tokio_unstable,tokio_taskdump,fuzzing,mio_unsupported_force_poll_poll,tokio_internal_mt_counters,fs,tokio_no_parking_lot,tokio_no_tuning_tests) -Funexpected_cfgs -Funknown_lints - check-fuzzing: name: check-fuzzing needs: basics diff --git a/Cargo.toml b/Cargo.toml index 2238deac71c..c215946f421 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,15 @@ members = [ [workspace.metadata.spellcheck] config = "spellcheck.toml" + +[workspace.lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(fuzzing)', + 'cfg(loom)', + 'cfg(mio_unsupported_force_poll_poll)', + 'cfg(tokio_internal_mt_counters)', + 'cfg(tokio_no_parking_lot)', + 'cfg(tokio_no_tuning_tests)', + 'cfg(tokio_taskdump)', + 'cfg(tokio_unstable)', +] } diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 44156fcbfb5..de39565b398 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -95,3 +95,6 @@ harness = false name = "time_timeout" path = "time_timeout.rs" harness = false + +[lints] +workspace = true diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 54f2ecb8a4f..84112c08dab 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -95,3 +95,6 @@ path = "named-pipe-multi-client.rs" [[example]] name = "dump" path = "dump.rs" + +[lints] +workspace = true diff --git a/examples/dump.rs b/examples/dump.rs index 6f744713f7c..c7ece458ff8 100644 --- a/examples/dump.rs +++ b/examples/dump.rs @@ -1,5 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] - //! This example demonstrates tokio's experimental task dumping functionality. //! This application deadlocks. Input CTRL+C to display traces of each task, or //! input CTRL+C twice within 1 second to quit. diff --git a/stress-test/Cargo.toml b/stress-test/Cargo.toml index 60c07e4eabd..79db8ce1da1 100644 --- a/stress-test/Cargo.toml +++ b/stress-test/Cargo.toml @@ -13,3 +13,6 @@ tokio = { version = "1.0.0", path = "../tokio/", features = ["full"] } [dev-dependencies] rand = "0.8" + +[lints] +workspace = true diff --git a/tests-build/Cargo.toml b/tests-build/Cargo.toml index 639dc3d1292..ee27d6b5ab5 100644 --- a/tests-build/Cargo.toml +++ b/tests-build/Cargo.toml @@ -15,3 +15,6 @@ tokio = { version = "1.0.0", path = "../tokio", optional = true } [dev-dependencies] trybuild = "1.0" + +[lints] +workspace = true diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 74724917f15..5b15017b943 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -61,3 +61,6 @@ tokio-test = { version = "0.4", path = "../tokio-test", optional = true } doc-comment = "0.3.1" futures = { version = "0.3.0", features = ["async-await"] } bytes = "1.0.0" + +[lints] +workspace = true diff --git a/tokio-macros/Cargo.toml b/tokio-macros/Cargo.toml index e47e4116049..3305385d94e 100644 --- a/tokio-macros/Cargo.toml +++ b/tokio-macros/Cargo.toml @@ -31,3 +31,6 @@ tokio = { version = "1.0.0", path = "../tokio", features = ["full"] } [package.metadata.docs.rs] all-features = true + +[lints] +workspace = true diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 29ea2867cff..32353b3807b 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![allow(clippy::needless_doctest_main)] #![warn( missing_debug_implementations, @@ -211,7 +210,6 @@ use proc_macro::TokenStream; /// This option is only compatible with the `current_thread` runtime. /// /// ```no_run -/// # #![allow(unknown_lints, unexpected_cfgs)] /// #[cfg(tokio_unstable)] /// #[tokio::main(flavor = "current_thread", unhandled_panic = "shutdown_runtime")] /// async fn main() { @@ -226,7 +224,6 @@ use proc_macro::TokenStream; /// Equivalent code not using `#[tokio::main]` /// /// ```no_run -/// # #![allow(unknown_lints, unexpected_cfgs)] /// #[cfg(tokio_unstable)] /// fn main() { /// tokio::runtime::Builder::new_current_thread() @@ -480,7 +477,6 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// This option is only compatible with the `current_thread` runtime. /// /// ```no_run -/// # #![allow(unknown_lints, unexpected_cfgs)] /// #[cfg(tokio_unstable)] /// #[tokio::test(flavor = "current_thread", unhandled_panic = "shutdown_runtime")] /// async fn my_test() { @@ -495,7 +491,6 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// Equivalent code not using `#[tokio::test]` /// /// ```no_run -/// # #![allow(unknown_lints, unexpected_cfgs)] /// #[cfg(tokio_unstable)] /// #[test] /// fn my_test() { diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 81d9b9d2022..547d7f5deaf 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -56,3 +56,6 @@ rustdoc-args = ["--cfg", "docsrs"] # This should allow `docsrs` to be read across projects, so that `tokio-stream` # can pick up stubbed types exported by `tokio`. rustc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index f2b463bcb9a..28fa22a2ff6 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 536c5a848e8..c8d998fd368 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -30,3 +30,6 @@ futures-util = "0.3.0" [package.metadata.docs.rs] all-features = true + +[lints] +workspace = true diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 9f60faf7952..87e63861210 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn( missing_debug_implementations, missing_docs, diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index d215590d9f2..b5a93dc3b50 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -68,3 +68,6 @@ rustc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] [package.metadata.playground] features = ["full"] + +[lints] +workspace = true diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 34f69fd14e3..1df4de1b459 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![allow(clippy::needless_doctest_main)] #![warn( missing_debug_implementations, diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 8757f8b5c6e..1ab5f9ba832 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "rt", tokio_unstable))] diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 86017871680..2b0c1127a71 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -173,3 +173,6 @@ allowed_external_types = [ "bytes::buf::buf_mut::BufMut", "tokio_macros::*", ] + +[lints] +workspace = true diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b85921f31de..a69def93634 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index a5f09d936dd..23180dc5245 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -128,7 +128,7 @@ pub(crate) struct Task { #[derive(PartialEq, Eq)] pub(crate) enum Mandatory { - #[cfg_attr(not(fs), allow(dead_code))] + #[cfg_attr(not(feature = "fs"), allow(dead_code))] Mandatory, NonMandatory, } diff --git a/tokio/tests/_require_full.rs b/tokio/tests/_require_full.rs index 81c25179615..d33943a960d 100644 --- a/tokio/tests/_require_full.rs +++ b/tokio/tests/_require_full.rs @@ -1,5 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] - #[cfg(not(any(feature = "full", target_family = "wasm")))] compile_error!("run main Tokio tests with `--features full`"); diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index 68b53aaf291..c946f38436c 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![cfg(all( tokio_unstable, tokio_taskdump, diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index fdf7fde1342..0c5ae6d9ab0 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![cfg(feature = "macros")] #![allow(clippy::disallowed_names)] diff --git a/tokio/tests/macros_test.rs b/tokio/tests/macros_test.rs index bed443cf293..bcc230f34fa 100644 --- a/tokio/tests/macros_test.rs +++ b/tokio/tests/macros_test.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threading use tokio::test; diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index f2ec0df9ff6..cedea3811a3 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] #![cfg(not(miri))] // Possible bug on Miri. diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 26a2adbadaf..c07e3e9ddb9 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![allow(clippy::needless_range_loop)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/rt_handle.rs b/tokio/tests/rt_handle.rs index 3773b8972af..bfbeff1b2e4 100644 --- a/tokio/tests/rt_handle.rs +++ b/tokio/tests/rt_handle.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/rt_local.rs b/tokio/tests/rt_local.rs index 1f14f5444d3..5d276250b34 100644 --- a/tokio/tests/rt_local.rs +++ b/tokio/tests/rt_local.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable))] diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index e9f351007d5..ad3b0e367e0 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))] diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 777b0d6a07c..f0ed8443f9c 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] // Too slow on miri. #![cfg(all(feature = "full", not(target_os = "wasi"), not(miri)))] diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index f7e52af83dd..c1dc71dedc1 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", not(target_os = "wasi")))] #![cfg(tokio_unstable)] diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 85eba07d389..60cdc525ff1 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all( feature = "full", diff --git a/tokio/tests/task_builder.rs b/tokio/tests/task_builder.rs index 4d1248500ab..c700f229f9f 100644 --- a/tokio/tests/task_builder.rs +++ b/tokio/tests/task_builder.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![cfg(all(tokio_unstable, feature = "tracing"))] use std::rc::Rc; diff --git a/tokio/tests/task_hooks.rs b/tokio/tests/task_hooks.rs index 1e2de7e4b4c..185b9126cca 100644 --- a/tokio/tests/task_hooks.rs +++ b/tokio/tests/task_hooks.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable, target_has_atomic = "64"))] diff --git a/tokio/tests/task_id.rs b/tokio/tests/task_id.rs index c0aed66f16f..0cbf80d5ace 100644 --- a/tokio/tests/task_id.rs +++ b/tokio/tests/task_id.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index d07a2824889..f705fa507d7 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index d910efb8b65..30f20ed0d1d 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] diff --git a/tokio/tests/task_yield_now.rs b/tokio/tests/task_yield_now.rs index 3b462e92240..e6fe5d2009a 100644 --- a/tokio/tests/task_yield_now.rs +++ b/tokio/tests/task_yield_now.rs @@ -1,4 +1,3 @@ -#![allow(unknown_lints, unexpected_cfgs)] #![cfg(all(feature = "full", not(target_os = "wasi"), tokio_unstable))] use tokio::task; diff --git a/tokio/tests/tracing_sync.rs b/tokio/tests/tracing_sync.rs index 7391cd8b735..7065282c44b 100644 --- a/tokio/tests/tracing_sync.rs +++ b/tokio/tests/tracing_sync.rs @@ -2,7 +2,6 @@ //! //! These tests ensure that the instrumentation for tokio //! synchronization primitives is correct. -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(tokio_unstable, feature = "tracing", target_has_atomic = "64"))] diff --git a/tokio/tests/tracing_task.rs b/tokio/tests/tracing_task.rs index 5466ad960f0..a9317bf5b12 100644 --- a/tokio/tests/tracing_task.rs +++ b/tokio/tests/tracing_task.rs @@ -2,7 +2,6 @@ //! //! These tests ensure that the instrumentation for task spawning and task //! lifecycles is correct. -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(tokio_unstable, feature = "tracing", target_has_atomic = "64"))] diff --git a/tokio/tests/tracing_time.rs b/tokio/tests/tracing_time.rs index a227cde7a73..f251cc780b9 100644 --- a/tokio/tests/tracing_time.rs +++ b/tokio/tests/tracing_time.rs @@ -2,7 +2,6 @@ //! //! These tests ensure that the instrumentation for tokio //! synchronization primitives is correct. -#![allow(unknown_lints, unexpected_cfgs)] #![warn(rust_2018_idioms)] #![cfg(all(tokio_unstable, feature = "tracing", target_has_atomic = "64"))] From 2671ffb55b8f2464c0f1c3c7d99ee70118108755 Mon Sep 17 00:00:00 2001 From: Ariel Ben-Yehuda Date: Mon, 27 Jan 2025 23:09:23 +0200 Subject: [PATCH 08/10] tracing: make the task tracing API unstable piblkc (#6972) * make self-tracing public * address review comments * try to fix doctest * adjust imports to fit standard * more documentation --------- Co-authored-by: Ariel Ben-Yehuda --- tokio/src/runtime/dump.rs | 70 +++++++++++++++++- tokio/src/runtime/handle.rs | 3 + tokio/src/runtime/task/trace/mod.rs | 3 +- tokio/tests/task_trace_self.rs | 107 ++++++++++++++++++++++++++++ 4 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 tokio/tests/task_trace_self.rs diff --git a/tokio/src/runtime/dump.rs b/tokio/src/runtime/dump.rs index d262f3987cc..0a6adf979b7 100644 --- a/tokio/src/runtime/dump.rs +++ b/tokio/src/runtime/dump.rs @@ -3,7 +3,9 @@ //! See [`Handle::dump`][crate::runtime::Handle::dump]. use crate::task::Id; -use std::{fmt, path::Path}; +use std::{fmt, future::Future, path::Path}; + +pub use crate::runtime::task::trace::Root; /// A snapshot of a runtime's state. /// @@ -214,6 +216,72 @@ impl Trace { }) .collect() } + + /// Runs the function `f` in tracing mode, and returns its result along with the resulting [`Trace`]. + /// + /// This is normally called with `f` being the poll function of a future, and will give you a backtrace + /// that tells you what that one future is doing. + /// + /// Use [`Handle::dump`] instead if you want to know what *all the tasks* in your program are doing. + /// Also see [`Handle::dump`] for more documentation about dumps, but unlike [`Handle::dump`], this function + /// should not be much slower than calling `f` directly. + /// + /// Due to the way tracing is implemented, Tokio leaf futures will usually, instead of doing their + /// actual work, do the equivalent of a `yield_now` (returning a `Poll::Pending` and scheduling the + /// current context for execution), which means forward progress will probably not happen unless + /// you eventually call your future outside of `capture`. + /// + /// [`Handle::dump`]: crate::runtime::Handle::dump + /// + /// Example usage: + /// ``` + /// use std::future::Future; + /// use std::task::Poll; + /// use tokio::runtime::dump::Trace; + /// + /// # async fn test_fn() { + /// // some future + /// let mut test_future = std::pin::pin!(async move { tokio::task::yield_now().await; 0 }); + /// + /// // trace it once, see what it's doing + /// let (trace, res) = Trace::root(std::future::poll_fn(|cx| { + /// let (res, trace) = Trace::capture(|| test_future.as_mut().poll(cx)); + /// Poll::Ready((trace, res)) + /// })).await; + /// + /// // await it to let it finish, outside of a `capture` + /// let output = match res { + /// Poll::Ready(output) => output, + /// Poll::Pending => test_future.await, + /// }; + /// + /// println!("{trace}"); + /// # } + /// ``` + /// + /// ### Nested calls + /// + /// Nested calls to `capture` might return partial traces, but will not do any other undesirable behavior (for + /// example, they will not panic). + pub fn capture(f: F) -> (R, Trace) + where + F: FnOnce() -> R, + { + let (res, trace) = super::task::trace::Trace::capture(f); + (res, Trace { inner: trace }) + } + + /// Create a root for stack traces captured using [`Trace::capture`]. Stack frames above + /// the root will not be captured. + /// + /// Nesting multiple [`Root`] futures is fine. Captures will stop at the first root. Not having + /// a [`Root`] is fine as well, but there is no guarantee on where the capture will stop. + pub fn root(f: F) -> Root + where + F: Future, + { + crate::runtime::task::trace::Trace::root(f) + } } impl Dump { diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 752640d75bd..91f13d6c2ed 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -447,6 +447,9 @@ cfg_taskdump! { impl Handle { /// Captures a snapshot of the runtime's state. /// + /// If you only want to capture a snapshot of a single future's state, you can use + /// [`Trace::capture`][crate::runtime::dump::Trace]. + /// /// This functionality is experimental, and comes with a number of /// requirements and limitations. /// diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 16cb3477ffc..71aa3b22657 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -56,7 +56,8 @@ pub(crate) struct Trace { pin_project_lite::pin_project! { #[derive(Debug, Clone)] #[must_use = "futures do nothing unless you `.await` or poll them"] - pub(crate) struct Root { + /// A future wrapper that roots traces (captured with [`Trace::capture`]). + pub struct Root { #[pin] future: T, } diff --git a/tokio/tests/task_trace_self.rs b/tokio/tests/task_trace_self.rs new file mode 100644 index 00000000000..a4dc1f37e9c --- /dev/null +++ b/tokio/tests/task_trace_self.rs @@ -0,0 +1,107 @@ +#![allow(unknown_lints, unexpected_cfgs)] +#![cfg(all( + tokio_unstable, + tokio_taskdump, + target_os = "linux", + any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") +))] + +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use tokio::runtime::dump::{Root, Trace}; + +pin_project_lite::pin_project! { + pub struct PrettyFuture { + #[pin] + f: Root, + t_last: State, + logs: Arc>>, + } +} + +enum State { + NotStarted, + Running { since: Instant }, + Alerted, +} + +impl PrettyFuture { + pub fn pretty(f: F, logs: Arc>>) -> Self { + PrettyFuture { + f: Trace::root(f), + t_last: State::NotStarted, + logs, + } + } +} + +impl Future for PrettyFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let now = Instant::now(); + let t_last = match this.t_last { + State::Running { since } => Some(*since), + State::NotStarted => { + *this.t_last = State::Running { since: now }; + None + } + State::Alerted => { + // don't double-alert for the same future + None + } + }; + if t_last.is_some_and(|t_last| now.duration_since(t_last) > Duration::from_millis(500)) { + let (res, trace) = tokio::runtime::dump::Trace::capture(|| this.f.as_mut().poll(cx)); + this.logs.lock().unwrap().push(trace); + *this.t_last = State::Alerted; + return res; + } + this.f.poll(cx) + } +} + +#[tokio::test] +async fn task_trace_self() { + let log = Arc::new(Mutex::new(vec![])); + let log2 = Arc::new(Mutex::new(vec![])); + let mut good_line = vec![]; + let mut bad_line = vec![]; + PrettyFuture::pretty( + PrettyFuture::pretty( + async { + bad_line.push(line!() + 1); + tokio::task::yield_now().await; + bad_line.push(line!() + 1); + tokio::time::sleep(Duration::from_millis(1)).await; + for _ in 0..100 { + good_line.push(line!() + 1); + tokio::time::sleep(Duration::from_millis(10)).await; + } + }, + log.clone(), + ), + log2.clone(), + ) + .await; + for line in good_line { + let s = format!("{}:{}:", file!(), line); + assert!(log.lock().unwrap().iter().any(|x| { + eprintln!("{}", x); + format!("{}", x).contains(&s) + })); + } + for line in bad_line { + let s = format!("{}:{}:", file!(), line); + assert!(!log + .lock() + .unwrap() + .iter() + .any(|x| format!("{}", x).contains(&s))); + } +} From 5086e56dcb85223df27019d80225c29153d96050 Mon Sep 17 00:00:00 2001 From: Oliver Wangler Date: Tue, 28 Jan 2025 15:28:07 +0100 Subject: [PATCH 09/10] io: implemented `get_ref` and `get_mut` for `SyncIoBridge` (#7128) Co-authored-by: ow --- tokio-util/src/io/sync_bridge.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index 2402207584c..ce3d598eaa2 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -154,3 +154,15 @@ impl SyncIoBridge { self.src } } + +impl AsMut for SyncIoBridge { + fn as_mut(&mut self) -> &mut T { + &mut self.src + } +} + +impl AsRef for SyncIoBridge { + fn as_ref(&self) -> &T { + &self.src + } +} From b8ac94ed70df22f885bad7ea3c0ff51c536bad4a Mon Sep 17 00:00:00 2001 From: Jason Gin <67525213+GJason88@users.noreply.github.com> Date: Thu, 30 Jan 2025 22:14:00 +0100 Subject: [PATCH 10/10] rt: add before and after task poll callbacks (#7120) Add callbacks for poll start and stop, enabling users to instrument these points in the runtime's life cycle. --- tokio/src/runtime/builder.rs | 111 +++++++++++++++ tokio/src/runtime/config.rs | 8 ++ .../runtime/scheduler/current_thread/mod.rs | 13 ++ .../runtime/scheduler/multi_thread/worker.rs | 27 +++- .../scheduler/multi_thread_alt/worker.rs | 5 +- tokio/src/runtime/task/mod.rs | 32 +++-- tokio/src/runtime/task_hooks.rs | 40 ++++++ tokio/tests/rt_poll_callbacks.rs | 128 ++++++++++++++++++ 8 files changed, 347 insertions(+), 17 deletions(-) create mode 100644 tokio/tests/rt_poll_callbacks.rs diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index c9a47c3862c..11538a0983b 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -88,6 +88,14 @@ pub struct Builder { /// To run before each task is spawned. pub(super) before_spawn: Option, + /// To run before each poll + #[cfg(tokio_unstable)] + pub(super) before_poll: Option, + + /// To run after each poll + #[cfg(tokio_unstable)] + pub(super) after_poll: Option, + /// To run after each task is terminated. pub(super) after_termination: Option, @@ -306,6 +314,11 @@ impl Builder { before_spawn: None, after_termination: None, + #[cfg(tokio_unstable)] + before_poll: None, + #[cfg(tokio_unstable)] + after_poll: None, + keep_alive: None, // Defaults for these values depend on the scheduler kind, so we get them @@ -743,6 +756,92 @@ impl Builder { self } + /// Executes function `f` just before a task is polled + /// + /// `f` is called within the Tokio context, so functions like + /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being + /// invoked immediately. + /// + /// **Note**: This is an [unstable API][unstable]. The public API of this type + /// may break in 1.x releases. See [the documentation on unstable + /// features][unstable] for details. + /// + /// [unstable]: crate#unstable-features + /// + /// # Examples + /// + /// ``` + /// # use std::sync::{atomic::AtomicUsize, Arc}; + /// # use tokio::task::yield_now; + /// # pub fn main() { + /// let poll_start_counter = Arc::new(AtomicUsize::new(0)); + /// let poll_start = poll_start_counter.clone(); + /// let rt = tokio::runtime::Builder::new_multi_thread() + /// .enable_all() + /// .on_before_task_poll(move |meta| { + /// println!("task {} is about to be polled", meta.id()) + /// }) + /// .build() + /// .unwrap(); + /// let task = rt.spawn(async { + /// yield_now().await; + /// }); + /// let _ = rt.block_on(task); + /// + /// # } + /// ``` + #[cfg(tokio_unstable)] + pub fn on_before_task_poll(&mut self, f: F) -> &mut Self + where + F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, + { + self.before_poll = Some(std::sync::Arc::new(f)); + self + } + + /// Executes function `f` just after a task is polled + /// + /// `f` is called within the Tokio context, so functions like + /// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being + /// invoked immediately. + /// + /// **Note**: This is an [unstable API][unstable]. The public API of this type + /// may break in 1.x releases. See [the documentation on unstable + /// features][unstable] for details. + /// + /// [unstable]: crate#unstable-features + /// + /// # Examples + /// + /// ``` + /// # use std::sync::{atomic::AtomicUsize, Arc}; + /// # use tokio::task::yield_now; + /// # pub fn main() { + /// let poll_stop_counter = Arc::new(AtomicUsize::new(0)); + /// let poll_stop = poll_stop_counter.clone(); + /// let rt = tokio::runtime::Builder::new_multi_thread() + /// .enable_all() + /// .on_after_task_poll(move |meta| { + /// println!("task {} completed polling", meta.id()); + /// }) + /// .build() + /// .unwrap(); + /// let task = rt.spawn(async { + /// yield_now().await; + /// }); + /// let _ = rt.block_on(task); + /// + /// # } + /// ``` + #[cfg(tokio_unstable)] + pub fn on_after_task_poll(&mut self, f: F) -> &mut Self + where + F: Fn(&TaskMeta<'_>) + Send + Sync + 'static, + { + self.after_poll = Some(std::sync::Arc::new(f)); + self + } + /// Executes function `f` just after a task is terminated. /// /// `f` is called within the Tokio context, so functions like @@ -1410,6 +1509,10 @@ impl Builder { before_park: self.before_park.clone(), after_unpark: self.after_unpark.clone(), before_spawn: self.before_spawn.clone(), + #[cfg(tokio_unstable)] + before_poll: self.before_poll.clone(), + #[cfg(tokio_unstable)] + after_poll: self.after_poll.clone(), after_termination: self.after_termination.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, @@ -1560,6 +1663,10 @@ cfg_rt_multi_thread! { before_park: self.before_park.clone(), after_unpark: self.after_unpark.clone(), before_spawn: self.before_spawn.clone(), + #[cfg(tokio_unstable)] + before_poll: self.before_poll.clone(), + #[cfg(tokio_unstable)] + after_poll: self.after_poll.clone(), after_termination: self.after_termination.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, @@ -1610,6 +1717,10 @@ cfg_rt_multi_thread! { after_unpark: self.after_unpark.clone(), before_spawn: self.before_spawn.clone(), after_termination: self.after_termination.clone(), + #[cfg(tokio_unstable)] + before_poll: self.before_poll.clone(), + #[cfg(tokio_unstable)] + after_poll: self.after_poll.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, local_queue_capacity: self.local_queue_capacity, diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index eb4bf81aa4f..43ce5aebd63 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -27,6 +27,14 @@ pub(crate) struct Config { /// To run after each task is terminated. pub(crate) after_termination: Option, + /// To run before each poll + #[cfg(tokio_unstable)] + pub(crate) before_poll: Option, + + /// To run after each poll + #[cfg(tokio_unstable)] + pub(crate) after_poll: Option, + /// The multi-threaded scheduler includes a per-worker LIFO slot used to /// store the last scheduled task. This can improve certain usage patterns, /// especially message passing between tasks. However, this LIFO slot is not diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index c66635e7bd6..37f37a4e9e3 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -145,6 +145,10 @@ impl CurrentThread { task_hooks: TaskHooks { task_spawn_callback: config.before_spawn.clone(), task_terminate_callback: config.after_termination.clone(), + #[cfg(tokio_unstable)] + before_poll_callback: config.before_poll.clone(), + #[cfg(tokio_unstable)] + after_poll_callback: config.after_poll.clone(), }, shared: Shared { inject: Inject::new(), @@ -766,8 +770,17 @@ impl CoreGuard<'_> { let task = context.handle.shared.owned.assert_owner(task); + #[cfg(tokio_unstable)] + let task_id = task.task_id(); + let (c, ()) = context.run_task(core, || { + #[cfg(tokio_unstable)] + context.handle.task_hooks.poll_start_callback(task_id); + task.run(); + + #[cfg(tokio_unstable)] + context.handle.task_hooks.poll_stop_callback(task_id); }); core = c; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index ec15106fe1a..8866ea54ba6 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -282,10 +282,7 @@ pub(super) fn create( let remotes_len = remotes.len(); let handle = Arc::new(Handle { - task_hooks: TaskHooks { - task_spawn_callback: config.before_spawn.clone(), - task_terminate_callback: config.after_termination.clone(), - }, + task_hooks: TaskHooks::from_config(&config), shared: Shared { remotes: remotes.into_boxed_slice(), inject, @@ -574,6 +571,9 @@ impl Context { } fn run_task(&self, task: Notified, mut core: Box) -> RunResult { + #[cfg(tokio_unstable)] + let task_id = task.task_id(); + let task = self.worker.handle.shared.owned.assert_owner(task); // Make sure the worker is not in the **searching** state. This enables @@ -593,7 +593,16 @@ impl Context { // Run the task coop::budget(|| { + // Unlike the poll time above, poll start callback is attached to the task id, + // so it is tightly associated with the actual poll invocation. + #[cfg(tokio_unstable)] + self.worker.handle.task_hooks.poll_start_callback(task_id); + task.run(); + + #[cfg(tokio_unstable)] + self.worker.handle.task_hooks.poll_stop_callback(task_id); + let mut lifo_polls = 0; // As long as there is budget remaining and a task exists in the @@ -656,7 +665,17 @@ impl Context { // Run the LIFO task, then loop *self.core.borrow_mut() = Some(core); let task = self.worker.handle.shared.owned.assert_owner(task); + + #[cfg(tokio_unstable)] + let task_id = task.task_id(); + + #[cfg(tokio_unstable)] + self.worker.handle.task_hooks.poll_start_callback(task_id); + task.run(); + + #[cfg(tokio_unstable)] + self.worker.handle.task_hooks.poll_stop_callback(task_id); } }) } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index d88eb5e893c..206c9855bda 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -303,10 +303,7 @@ pub(super) fn create( let (inject, inject_synced) = inject::Shared::new(); let handle = Arc::new(Handle { - task_hooks: TaskHooks { - task_spawn_callback: config.before_spawn.clone(), - task_terminate_callback: config.after_termination.clone(), - }, + task_hooks: TaskHooks::from_config(&config), shared: Shared { remotes: remotes.into_boxed_slice(), inject, diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 15c5a8f4afe..7d314c3b176 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -256,6 +256,13 @@ pub(crate) struct LocalNotified { _not_send: PhantomData<*const ()>, } +impl LocalNotified { + #[cfg(tokio_unstable)] + pub(crate) fn task_id(&self) -> Id { + self.task.id() + } +} + /// A task that is not owned by any `OwnedTasks`. Used for blocking tasks. /// This type holds two ref-counts. pub(crate) struct UnownedTask { @@ -386,6 +393,16 @@ impl Task { self.raw.header_ptr() } + /// Returns a [task ID] that uniquely identifies this task relative to other + /// currently spawned tasks. + /// + /// [task ID]: crate::task::Id + #[cfg(tokio_unstable)] + pub(crate) fn id(&self) -> crate::task::Id { + // Safety: The header pointer is valid. + unsafe { Header::get_id(self.raw.header_ptr()) } + } + cfg_taskdump! { /// Notify the task for task dumping. /// @@ -400,15 +417,6 @@ impl Task { } } - /// Returns a [task ID] that uniquely identifies this task relative to other - /// currently spawned tasks. - /// - /// [task ID]: crate::task::Id - #[cfg(tokio_unstable)] - pub(crate) fn id(&self) -> crate::task::Id { - // Safety: The header pointer is valid. - unsafe { Header::get_id(self.raw.header_ptr()) } - } } } @@ -416,6 +424,12 @@ impl Notified { fn header(&self) -> &Header { self.0.header() } + + #[cfg(tokio_unstable)] + #[allow(dead_code)] + pub(crate) fn task_id(&self) -> crate::task::Id { + self.0.id() + } } impl Notified { diff --git a/tokio/src/runtime/task_hooks.rs b/tokio/src/runtime/task_hooks.rs index 2c884af74be..13865ed515d 100644 --- a/tokio/src/runtime/task_hooks.rs +++ b/tokio/src/runtime/task_hooks.rs @@ -1,17 +1,57 @@ use std::marker::PhantomData; +use super::Config; + impl TaskHooks { pub(crate) fn spawn(&self, meta: &TaskMeta<'_>) { if let Some(f) = self.task_spawn_callback.as_ref() { f(meta) } } + + #[allow(dead_code)] + pub(crate) fn from_config(config: &Config) -> Self { + Self { + task_spawn_callback: config.before_spawn.clone(), + task_terminate_callback: config.after_termination.clone(), + #[cfg(tokio_unstable)] + before_poll_callback: config.before_poll.clone(), + #[cfg(tokio_unstable)] + after_poll_callback: config.after_poll.clone(), + } + } + + #[cfg(tokio_unstable)] + #[inline] + pub(crate) fn poll_start_callback(&self, id: super::task::Id) { + if let Some(poll_start) = &self.before_poll_callback { + (poll_start)(&TaskMeta { + id, + _phantom: std::marker::PhantomData, + }) + } + } + + #[cfg(tokio_unstable)] + #[inline] + pub(crate) fn poll_stop_callback(&self, id: super::task::Id) { + if let Some(poll_stop) = &self.after_poll_callback { + (poll_stop)(&TaskMeta { + id, + _phantom: std::marker::PhantomData, + }) + } + } } #[derive(Clone)] pub(crate) struct TaskHooks { pub(crate) task_spawn_callback: Option, pub(crate) task_terminate_callback: Option, + #[cfg(tokio_unstable)] + pub(crate) before_poll_callback: Option, + #[cfg(tokio_unstable)] + pub(crate) after_poll_callback: Option, } /// Task metadata supplied to user-provided hooks for task events. diff --git a/tokio/tests/rt_poll_callbacks.rs b/tokio/tests/rt_poll_callbacks.rs new file mode 100644 index 00000000000..8ccff385772 --- /dev/null +++ b/tokio/tests/rt_poll_callbacks.rs @@ -0,0 +1,128 @@ +#![allow(unknown_lints, unexpected_cfgs)] +#![cfg(tokio_unstable)] + +use std::sync::{atomic::AtomicUsize, Arc, Mutex}; + +use tokio::task::yield_now; + +#[cfg(not(target_os = "wasi"))] +#[test] +fn callbacks_fire_multi_thread() { + let poll_start_counter = Arc::new(AtomicUsize::new(0)); + let poll_stop_counter = Arc::new(AtomicUsize::new(0)); + let poll_start = poll_start_counter.clone(); + let poll_stop = poll_stop_counter.clone(); + + let before_task_poll_callback_task_id: Arc>> = + Arc::new(Mutex::new(None)); + let after_task_poll_callback_task_id: Arc>> = + Arc::new(Mutex::new(None)); + + let before_task_poll_callback_task_id_ref = Arc::clone(&before_task_poll_callback_task_id); + let after_task_poll_callback_task_id_ref = Arc::clone(&after_task_poll_callback_task_id); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .on_before_task_poll(move |task_meta| { + before_task_poll_callback_task_id_ref + .lock() + .unwrap() + .replace(task_meta.id()); + poll_start_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .on_after_task_poll(move |task_meta| { + after_task_poll_callback_task_id_ref + .lock() + .unwrap() + .replace(task_meta.id()); + poll_stop_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .build() + .unwrap(); + let task = rt.spawn(async { + yield_now().await; + yield_now().await; + yield_now().await; + }); + + let spawned_task_id = task.id(); + + rt.block_on(task).expect("task should succeed"); + // We need to drop the runtime to guarantee the workers have exited (and thus called the callback) + drop(rt); + + assert_eq!( + before_task_poll_callback_task_id.lock().unwrap().unwrap(), + spawned_task_id + ); + assert_eq!( + after_task_poll_callback_task_id.lock().unwrap().unwrap(), + spawned_task_id + ); + let actual_count = 4; + assert_eq!( + poll_start.load(std::sync::atomic::Ordering::Relaxed), + actual_count, + "unexpected number of poll starts" + ); + assert_eq!( + poll_stop.load(std::sync::atomic::Ordering::Relaxed), + actual_count, + "unexpected number of poll stops" + ); +} + +#[test] +fn callbacks_fire_current_thread() { + let poll_start_counter = Arc::new(AtomicUsize::new(0)); + let poll_stop_counter = Arc::new(AtomicUsize::new(0)); + let poll_start = poll_start_counter.clone(); + let poll_stop = poll_stop_counter.clone(); + + let before_task_poll_callback_task_id: Arc>> = + Arc::new(Mutex::new(None)); + let after_task_poll_callback_task_id: Arc>> = + Arc::new(Mutex::new(None)); + + let before_task_poll_callback_task_id_ref = Arc::clone(&before_task_poll_callback_task_id); + let after_task_poll_callback_task_id_ref = Arc::clone(&after_task_poll_callback_task_id); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .on_before_task_poll(move |task_meta| { + before_task_poll_callback_task_id_ref + .lock() + .unwrap() + .replace(task_meta.id()); + poll_start_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .on_after_task_poll(move |task_meta| { + after_task_poll_callback_task_id_ref + .lock() + .unwrap() + .replace(task_meta.id()); + poll_stop_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .build() + .unwrap(); + + let task = rt.spawn(async { + yield_now().await; + yield_now().await; + yield_now().await; + }); + + let spawned_task_id = task.id(); + + let _ = rt.block_on(task); + drop(rt); + + assert_eq!( + before_task_poll_callback_task_id.lock().unwrap().unwrap(), + spawned_task_id + ); + assert_eq!( + after_task_poll_callback_task_id.lock().unwrap().unwrap(), + spawned_task_id + ); + assert_eq!(poll_start.load(std::sync::atomic::Ordering::Relaxed), 4); + assert_eq!(poll_stop.load(std::sync::atomic::Ordering::Relaxed), 4); +}