diff --git a/.clippy.toml b/.clippy.toml new file mode 100644 index 0000000..992016c --- /dev/null +++ b/.clippy.toml @@ -0,0 +1 @@ +msrv = "1.36" diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 52f7945..0000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,9 +0,0 @@ -version: 2 -updates: - - package-ecosystem: cargo - directory: / - schedule: - interval: weekly - commit-message: - prefix: '' - labels: [] diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f62cd09..988524f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,111 +1,60 @@ name: CI -permissions: - contents: read - on: pull_request: push: branches: - master schedule: - - cron: '0 2 * * 0' + - cron: '0 2 * * *' env: - CARGO_INCREMENTAL: 0 - CARGO_NET_GIT_FETCH_WITH_CLI: true - CARGO_NET_RETRY: 10 - CARGO_TERM_COLOR: always - RUST_BACKTRACE: 1 RUSTFLAGS: -D warnings - RUSTDOCFLAGS: -D warnings - RUSTUP_MAX_RETRIES: 10 - -defaults: - run: - shell: bash + RUST_BACKTRACE: 1 jobs: test: - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: - include: - - rust: stable - - rust: beta - - rust: nightly - - rust: nightly - target: i686-unknown-linux-gnu + os: [ubuntu-latest] + rust: [nightly, beta, stable] steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - - name: Install cross-compilation tools - uses: taiki-e/setup-cross-toolchain-action@v1 - with: - target: ${{ matrix.target }} - if: matrix.target != '' - run: cargo build --all --all-features --all-targets - - run: cargo test --all - - run: cargo test --all --release - - run: cargo test --no-default-features --tests - - run: cargo test --no-default-features --tests --release - - name: Install cargo-hack - uses: taiki-e/install-action@cargo-hack - - run: rustup target add thumbv7m-none-eabi - name: Run cargo check (without dev-dependencies to catch missing feature flags) - run: cargo hack build --all --no-dev-deps - - run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps - - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic - - name: Install wasm-pack - uses: taiki-e/install-action@wasm-pack - - run: wasm-pack test --node - - run: wasm-pack test --node --no-default-features - - run: wasm-pack test --node --no-default-features --features portable-atomic - - name: Clone some dependent crates - run: | - git clone https://github.com/smol-rs/event-listener-strategy.git - git clone https://github.com/smol-rs/async-channel.git - git clone https://github.com/smol-rs/async-lock.git - - name: Patch dependent crates - run: | - echo '[patch.crates-io]' >> event-listener-strategy/Cargo.toml - echo 'event-listener = { path = ".." }' >> event-listener-strategy/Cargo.toml - echo '[patch.crates-io]' >> async-channel/Cargo.toml - echo 'event-listener = { path = ".." }' >> async-channel/Cargo.toml - echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-channel/Cargo.toml - echo '[patch.crates-io]' >> async-lock/Cargo.toml - echo 'event-listener = { path = ".." }' >> async-lock/Cargo.toml - echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-lock/Cargo.toml - echo 'async-channel = { path = "../async-channel" }' >> async-lock/Cargo.toml - - name: Test dependent crates - run: | - cargo test --manifest-path=event-listener-strategy/Cargo.toml - cargo test --manifest-path=async-channel/Cargo.toml - cargo test --manifest-path=async-lock/Cargo.toml + if: startsWith(matrix.rust, 'nightly') + run: cargo check -Z features=dev_dep + - run: cargo test msrv: runs-on: ubuntu-latest + strategy: + matrix: + # When updating this, the reminder to update the minimum supported + # Rust version in Cargo.toml and .clippy.toml. + rust: ['1.36'] steps: - - uses: actions/checkout@v4 - - name: Install cargo-hack - uses: taiki-e/install-action@cargo-hack - - run: cargo hack build --all --rust-version - - run: cargo hack build --all --no-default-features --rust-version + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} + - run: cargo build clippy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - - run: cargo clippy --all --all-features --all-targets + - run: cargo clippy --all-features --all-targets fmt: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - run: cargo fmt --all --check @@ -113,58 +62,18 @@ jobs: miri: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v3 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: | - echo "MIRIFLAGS=-Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation" >>"${GITHUB_ENV}" - echo "RUSTFLAGS=${RUSTFLAGS} -Z randomize-layout" >>"${GITHUB_ENV}" - - run: cargo miri test --all - - run: cargo miri test --no-default-features --tests - - run: cargo miri test --no-default-features --features portable-atomic --tests - - name: Clone some dependent crates - run: | - git clone https://github.com/smol-rs/event-listener-strategy.git - git clone https://github.com/smol-rs/async-channel.git - git clone https://github.com/smol-rs/async-lock.git - - name: Patch dependent crates - run: | - echo '[patch.crates-io]' >> event-listener-strategy/Cargo.toml - echo 'event-listener = { path = ".." }' >> event-listener-strategy/Cargo.toml - echo '[patch.crates-io]' >> async-channel/Cargo.toml - echo 'event-listener = { path = ".." }' >> async-channel/Cargo.toml - echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-channel/Cargo.toml - echo '[patch.crates-io]' >> async-lock/Cargo.toml - echo 'event-listener = { path = ".." }' >> async-lock/Cargo.toml - echo 'event-listener-strategy = { path = "../event-listener-strategy" }' >> async-lock/Cargo.toml - echo 'async-channel = { path = "../async-channel" }' >> async-lock/Cargo.toml - - name: Test dependent crates - # async-channel isn't included here as it appears to be broken on MIRI. - # See https://github.com/smol-rs/async-channel/issues/85 - run: | - cargo miri test --manifest-path=event-listener-strategy/Cargo.toml - cargo miri test --manifest-path=async-lock/Cargo.toml + - run: cargo miri test + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout security_audit: - permissions: - checks: write - contents: read - issues: write runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - # https://github.com/rustsec/audit-check/issues/2 - - uses: rustsec/audit-check@master + - uses: actions/checkout@v3 + - uses: actions-rs/audit-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} - - loom: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Install Rust - run: rustup update stable - - name: Loom tests - run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom - - diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 59fad1f..be57bd1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,8 +1,5 @@ name: Release -permissions: - contents: write - on: push: tags: @@ -13,7 +10,7 @@ jobs: if: github.repository_owner == 'smol-rs' runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v3 - uses: taiki-e/create-gh-release-action@v1 with: changelog: CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md index ef9fdf2..9ef1e71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,64 +1,3 @@ -# Version 5.3.0 - -- Add a `loom` implementation. This feature is unstable and is not semver-supported. (#126) -- Make the panic message for polling the `EventListener` after it has completed more clear. (#125) - -# Version 5.2.0 - -- Make `StackSlot` `Sync`. (#121) - -# Version 5.1.0 - -- Make `StackSlot` `Send`. (#119) - -# Version 5.0.0 - -- **Breaking:** Rework the API to afford better usage. (#105) - - The heap-based API of the v2.x line is back. - - However, there is a stack-based API as an alternative. -- Add a way to get the total number of listeners. (#114) - -# Version 4.0.3 - -- Relax MSRV to 1.60. (#110) - -# Version 4.0.2 - -- Avoid spinning in `wait_deadline`. (#107) - -# Version 4.0.1 - -- Fix a use-after-move error after an `EventListener` is assigned to listen to - another `Event`. (#101) - -# Version 4.0.0 - -- **Breaking:** Fix a footgun in the `EventListener` type. `EventListener::new()` - now no longer takes an `&Event` as an argument, and `EventListener::listen()` - takes the `&Event` as an argument. Hopefully this should prevent `.await`ing - on a listener without making sure it's listening first. (#94) - -# Version 3.1.0 - -- Implement `UnwindSafe` and `RefUnwindSafe` for `EventListener`. This was unintentionally removed in version 3 (#96). - -# Version 3.0.1 - -- Emphasize that `listen()` must be called on `EventListener` in documentation. (#90) -- Write useful output in `fmt::Debug` implementations. (#86) - -# Version 3.0.0 - -- Use the `parking` crate instead of threading APIs (#27) -- Bump MSRV to 1.59 (#71) -- **Breaking:** Make this crate `no_std`-compatible on `default-features = false`. (#34) -- Create a new `event-listener-strategy` crate for abstracting over blocking/non-blocking operations. (#49) -- **Breaking:** Change the `EventListener` API to be `!Unpin`. (#51) -- Enable a feature for the `portable-atomic` crate. (#53) -- **Breaking:** Add a `Notification` trait which is used to enable tagged events. (#52) -- Add an `is_notified()` method to `Event`. (#48) -- **Breaking:** Make it so `notify()` returns the number of listeners notified. (#57) - # Version 2.5.3 - Fix fence on x86 and miri. diff --git a/Cargo.toml b/Cargo.toml index 3fdb37b..24f1e7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,11 @@ name = "event-listener" # When publishing a new version: # - Update CHANGELOG.md -# - Create "v5.x.y" git tag -version = "5.3.0" +# - Create "v2.x.y" git tag +version = "2.5.3" authors = ["Stjepan Glavina "] -edition = "2021" -rust-version = "1.60" +edition = "2018" +rust-version = "1.36" description = "Notify async tasks or threads" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/event-listener" @@ -14,45 +14,6 @@ keywords = ["condvar", "eventcount", "wake", "blocking", "park"] categories = ["asynchronous", "concurrency"] exclude = ["/.*"] -[features] -default = ["std"] -std = ["concurrent-queue/std", "parking"] -portable-atomic = ["portable-atomic-util", "portable_atomic_crate"] -loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"] - -[dependencies] -concurrent-queue = { version = "2.4.0", default-features = false } -pin-project-lite = "0.2.12" -portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] } - -[target.'cfg(not(target_family = "wasm"))'.dependencies] -parking = { version = "2.0.0", optional = true } - -[target.'cfg(loom)'.dependencies] -loom = { version = "0.7", optional = true } - -[dependencies.portable_atomic_crate] -package = "portable-atomic" -version = "1.2.0" -default-features = false -optional = true - [dev-dependencies] -futures-lite = "2.0.0" -try-lock = "0.2.5" +futures = { version = "0.3", default-features = false, features = ["std"] } waker-fn = "1" - -[dev-dependencies.criterion] -version = "0.5" -default-features = false -features = ["cargo_bench_support"] - -[target.'cfg(target_family = "wasm")'.dev-dependencies] -wasm-bindgen-test = "0.3" - -[[bench]] -name = "bench" -harness = false - -[lib] -bench = false diff --git a/README.md b/README.md index 80aaef9..3edf086 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # event-listener -[![Build](https://github.com/smol-rs/event-listener/workflows/CI/badge.svg)]( +[![Build](https://github.com/smol-rs/event-listener/workflows/Build%20and%20test/badge.svg)]( https://github.com/smol-rs/event-listener/actions) [![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)]( https://github.com/smol-rs/event-listener) @@ -17,7 +17,7 @@ You can use this crate to turn non-blocking data structures into async or blocki structures. See a [simple mutex] implementation that exposes an async and a blocking interface for acquiring locks. -[eventcounts]: https://www.1024cores.net/home/lock-free-algorithms/eventcounts +[eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts [simple mutex]: ./examples/mutex.rs ## Examples @@ -74,8 +74,8 @@ loop { Licensed under either of - * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or https://www.apache.org/licenses/LICENSE-2.0) - * MIT license ([LICENSE-MIT](LICENSE-MIT) or https://opensource.org/licenses/MIT) + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) at your option. diff --git a/benches/bench.rs b/benches/bench.rs deleted file mode 100644 index d9e0db1..0000000 --- a/benches/bench.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::iter; - -use criterion::{criterion_group, criterion_main, Criterion}; -use event_listener::{Event, Listener}; - -const COUNT: usize = 8000; - -fn bench_events(c: &mut Criterion) { - c.bench_function("notify_and_wait", |b| { - let ev = Event::new(); - let mut handles = Vec::with_capacity(COUNT); - - b.iter(|| { - handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT)); - - ev.notify(COUNT); - - for handle in handles.drain(..) { - handle.wait(); - } - }); - }); -} - -criterion_group!(benches, bench_events); -criterion_main!(benches); diff --git a/examples/mutex.rs b/examples/mutex.rs index 30fbe66..0d28e41 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -2,169 +2,184 @@ //! //! This mutex exposes both blocking and async methods for acquiring a lock. -#[cfg(not(target_family = "wasm"))] -mod example { - #![allow(dead_code)] +#![allow(dead_code)] - use std::ops::{Deref, DerefMut}; - use std::sync::{mpsc, Arc}; - use std::thread; - use std::time::{Duration, Instant}; +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{mpsc, Arc}; +use std::thread; +use std::time::{Duration, Instant}; - use event_listener::{listener, Event, Listener}; - use try_lock::{Locked, TryLock}; +use event_listener::Event; - /// A simple mutex. - struct Mutex { - /// Blocked lock operations. - lock_ops: Event, +/// A simple mutex. +struct Mutex { + /// Set to `true` when the mutex is locked. + locked: AtomicBool, - /// The inner non-blocking mutex. - data: TryLock, - } + /// Blocked lock operations. + lock_ops: Event, - unsafe impl Send for Mutex {} - unsafe impl Sync for Mutex {} + /// The inner protected data. + data: UnsafeCell, +} - impl Mutex { - /// Creates a mutex. - fn new(t: T) -> Mutex { - Mutex { - lock_ops: Event::new(), - data: TryLock::new(t), - } +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl Mutex { + /// Creates a mutex. + fn new(t: T) -> Mutex { + Mutex { + locked: AtomicBool::new(false), + lock_ops: Event::new(), + data: UnsafeCell::new(t), } + } - /// Attempts to acquire a lock. - fn try_lock(&self) -> Option> { - self.data.try_lock().map(MutexGuard) + /// Attempts to acquire a lock. + fn try_lock(&self) -> Option> { + if !self.locked.swap(true, Ordering::Acquire) { + Some(MutexGuard(self)) + } else { + None } + } - /// Blocks until a lock is acquired. - fn lock(&self) -> MutexGuard<'_, T> { - loop { - // Attempt grabbing a lock. - if let Some(guard) = self.try_lock() { - return guard; - } + /// Blocks until a lock is acquired. + fn lock(&self) -> MutexGuard<'_, T> { + let mut listener = None; - // Set up an event listener. - listener!(self.lock_ops => listener); + loop { + // Attempt grabbing a lock. + if let Some(guard) = self.try_lock() { + return guard; + } - // Try again. - if let Some(guard) = self.try_lock() { - return guard; + // Set up an event listener or wait for a notification. + match listener.take() { + None => { + // Start listening and then try locking again. + listener = Some(self.lock_ops.listen()); + } + Some(l) => { + // Wait until a notification is received. + l.wait(); } - - // Wait for a notification. - listener.wait(); } } + } - /// Blocks until a lock is acquired or the timeout is reached. - fn lock_timeout(&self, timeout: Duration) -> Option> { - let deadline = Instant::now() + timeout; - - loop { - // Attempt grabbing a lock. - if let Some(guard) = self.try_lock() { - return Some(guard); - } + /// Blocks until a lock is acquired or the timeout is reached. + fn lock_timeout(&self, timeout: Duration) -> Option> { + let deadline = Instant::now() + timeout; + let mut listener = None; - // Set up an event listener. - listener!(self.lock_ops => listener); + loop { + // Attempt grabbing a lock. + if let Some(guard) = self.try_lock() { + return Some(guard); + } - // Try again. - if let Some(guard) = self.try_lock() { - return Some(guard); + // Set up an event listener or wait for an event. + match listener.take() { + None => { + // Start listening and then try locking again. + listener = Some(self.lock_ops.listen()); + } + Some(l) => { + // Wait until a notification is received. + if !l.wait_deadline(deadline) { + return None; + } } - - // Wait until a notification is received. - listener.wait_deadline(deadline)?; } } + } - /// Acquires a lock asynchronously. - async fn lock_async(&self) -> MutexGuard<'_, T> { - loop { - // Attempt grabbing a lock. - if let Some(guard) = self.try_lock() { - return guard; - } + /// Acquires a lock asynchronously. + async fn lock_async(&self) -> MutexGuard<'_, T> { + let mut listener = None; - // Set up an event listener. - listener!(self.lock_ops => listener); + loop { + // Attempt grabbing a lock. + if let Some(guard) = self.try_lock() { + return guard; + } - // Try again. - if let Some(guard) = self.try_lock() { - return guard; + // Set up an event listener or wait for an event. + match listener.take() { + None => { + // Start listening and then try locking again. + listener = Some(self.lock_ops.listen()); + } + Some(l) => { + // Wait until a notification is received. + l.await; } - - // Wait until a notification is received. - listener.await; } } } +} - /// A guard holding a lock. - struct MutexGuard<'a, T>(Locked<'a, T>); - - impl Deref for MutexGuard<'_, T> { - type Target = T; +/// A guard holding a lock. +struct MutexGuard<'a, T>(&'a Mutex); - fn deref(&self) -> &T { - &self.0 - } - } +unsafe impl Send for MutexGuard<'_, T> {} +unsafe impl Sync for MutexGuard<'_, T> {} - impl DerefMut for MutexGuard<'_, T> { - fn deref_mut(&mut self) -> &mut T { - &mut self.0 - } +impl Drop for MutexGuard<'_, T> { + fn drop(&mut self) { + self.0.locked.store(false, Ordering::Release); + self.0.lock_ops.notify(1); } +} - pub(super) fn entry() { - const N: usize = 10; +impl Deref for MutexGuard<'_, T> { + type Target = T; - // A shared counter. - let counter = Arc::new(Mutex::new(0)); + fn deref(&self) -> &T { + unsafe { &*self.0.data.get() } + } +} - // A channel that signals when all threads are done. - let (tx, rx) = mpsc::channel(); +impl DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.0.data.get() } + } +} - // Spawn a bunch of threads incrementing the counter. - for _ in 0..N { - let counter = counter.clone(); - let tx = tx.clone(); +fn main() { + const N: usize = 10; - thread::spawn(move || { - let mut counter = counter.lock(); - *counter += 1; + // A shared counter. + let counter = Arc::new(Mutex::new(0)); - // If this is the last increment, signal that we're done. - if *counter == N { - tx.send(()).unwrap(); - } - }); - } + // A channel that signals when all threads are done. + let (tx, rx) = mpsc::channel(); - // Wait until the last thread increments the counter. - rx.recv().unwrap(); + // Spawn a bunch of threads incrementing the counter. + for _ in 0..N { + let counter = counter.clone(); + let tx = tx.clone(); - // The counter must equal the number of threads. - assert_eq!(*counter.lock(), N); + thread::spawn(move || { + let mut counter = counter.lock(); + *counter += 1; - println!("Done!"); + // If this is the last increment, signal that we're done. + if *counter == N { + tx.send(()).unwrap(); + } + }); } -} -#[cfg(target_family = "wasm")] -mod example { - pub(super) fn entry() { - println!("This example is not supported on wasm yet."); - } -} + // Wait until the last thread increments the counter. + rx.recv().unwrap(); -fn main() { - example::entry(); + // The counter must equal the number of threads. + assert_eq!(*counter.lock(), N); + + println!("Done!"); } diff --git a/src/lib.rs b/src/lib.rs index 08d6037..20047be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ //! structures. See a [simple mutex] implementation that exposes an async and a blocking interface //! for acquiring locks. //! -//! [eventcounts]: https://www.1024cores.net/home/lock-free-algorithms/eventcounts +//! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts //! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs //! //! # Examples @@ -19,7 +19,7 @@ //! use std::thread; //! use std::time::Duration; //! use std::usize; -//! use event_listener::{Event, Listener}; +//! use event_listener::Event; //! //! let flag = Arc::new(AtomicBool::new(false)); //! let event = Arc::new(Event::new()); @@ -48,7 +48,7 @@ //! } //! //! // Start listening for events. -//! let mut listener = event.listen(); +//! let listener = event.listen(); //! //! // Check the flag again after creating the listener. //! if flag.load(Ordering::SeqCst) { @@ -59,82 +59,52 @@ //! listener.wait(); //! } //! ``` -//! -//! # Features -//! -//! - The `portable-atomic` feature enables the use of the [`portable-atomic`] crate to provide -//! atomic operations on platforms that don't support them. -//! -//! [`portable-atomic`]: https://crates.io/crates/portable-atomic -#![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] -#![doc( - html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" -)] -#![doc( - html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" -)] - -#[cfg(not(feature = "std"))] -extern crate alloc; -#[cfg(feature = "std")] -extern crate std as alloc; - -#[cfg_attr(feature = "std", path = "std.rs")] -#[cfg_attr(not(feature = "std"), path = "no_std.rs")] -mod sys; - -mod notify; - -#[cfg(not(feature = "std"))] -use alloc::boxed::Box; - -use core::borrow::Borrow; -use core::fmt; -use core::future::Future; -use core::mem::ManuallyDrop; -use core::pin::Pin; -use core::ptr; -use core::task::{Context, Poll, Waker}; - -#[cfg(all(feature = "std", not(target_family = "wasm")))] -use { - parking::{Parker, Unparker}, - std::time::{Duration, Instant}, -}; - -use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use sync::Arc; - -#[cfg(not(loom))] -use sync::WithMut; - -use notify::{Internal, NotificationPrivate}; -pub use notify::{IntoNotification, Notification}; + +use std::cell::{Cell, UnsafeCell}; +use std::fmt; +use std::future::Future; +use std::mem::{self, ManuallyDrop}; +use std::ops::{Deref, DerefMut}; +use std::panic::{RefUnwindSafe, UnwindSafe}; +use std::pin::Pin; +use std::ptr::{self, NonNull}; +use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::task::{Context, Poll, Waker}; +use std::thread::{self, Thread}; +use std::time::{Duration, Instant}; +use std::usize; /// Inner state of [`Event`]. -struct Inner { +struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. /// /// If there are no entries, this value is set to `usize::MAX`. notified: AtomicUsize, - /// Inner queue of event listeners. - /// - /// On `std` platforms, this is an intrusive linked list. On `no_std` platforms, this is a - /// more traditional `Vec` of listeners, with an atomic queue used as a backup for high - /// contention. - list: sys::List, + /// A linked list holding registered listeners. + list: Mutex, + + /// A single cached list entry to avoid allocations on the fast path of the insertion. + cache: UnsafeCell, } -impl Inner { - fn new() -> Self { - Self { - notified: AtomicUsize::new(core::usize::MAX), - list: sys::List::new(), +impl Inner { + /// Locks the list. + fn lock(&self) -> ListGuard<'_> { + ListGuard { + inner: self, + guard: self.list.lock().unwrap(), } } + + /// Returns the pointer to the single cached list entry. + #[inline(always)] + fn cache_ptr(&self) -> NonNull { + unsafe { NonNull::new_unchecked(self.cache.get()) } + } } /// A synchronization primitive for notifying async tasks and threads. @@ -156,109 +126,41 @@ impl Inner { /// kind of notification was delivered. /// /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. -pub struct Event { +pub struct Event { /// A pointer to heap-allocated inner state. /// /// This pointer is initially null and gets lazily initialized on first use. Semantically, it /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. - inner: AtomicPtr>, + inner: AtomicPtr, } -unsafe impl Send for Event {} -unsafe impl Sync for Event {} - -impl core::panic::UnwindSafe for Event {} -impl core::panic::RefUnwindSafe for Event {} - -impl fmt::Debug for Event { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.try_inner() { - Some(inner) => { - let notified_count = inner.notified.load(Ordering::Relaxed); - let total_count = match inner.list.try_total_listeners() { - Some(total_count) => total_count, - None => { - return f - .debug_tuple("Event") - .field(&format_args!("")) - .finish() - } - }; - - f.debug_struct("Event") - .field("listeners_notified", ¬ified_count) - .field("listeners_total", &total_count) - .finish() - } - None => f - .debug_tuple("Event") - .field(&format_args!("")) - .finish(), - } - } -} +unsafe impl Send for Event {} +unsafe impl Sync for Event {} -impl Default for Event { - #[inline] - fn default() -> Self { - Self::new() - } -} +impl UnwindSafe for Event {} +impl RefUnwindSafe for Event {} -impl Event { - /// Creates a new `Event` with a tag type. - /// - /// Tagging cannot be implemented efficiently on `no_std`, so this is only available when the - /// `std` feature is enabled. +impl Event { + /// Creates a new [`Event`]. /// /// # Examples /// /// ``` /// use event_listener::Event; /// - /// let event = Event::::with_tag(); + /// let event = Event::new(); /// ``` - #[cfg(all(feature = "std", not(loom)))] #[inline] - pub const fn with_tag() -> Self { - Self { + pub const fn new() -> Event { + Event { inner: AtomicPtr::new(ptr::null_mut()), } } - #[cfg(all(feature = "std", loom))] - #[inline] - pub fn with_tag() -> Self { - Self { - inner: AtomicPtr::new(ptr::null_mut()), - } - } - - /// Tell whether any listeners are currently notified. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let listener = event.listen(); - /// assert!(!event.is_notified()); - /// - /// event.notify(1); - /// assert!(event.is_notified()); - /// ``` - #[inline] - pub fn is_notified(&self) -> bool { - self.try_inner() - .map_or(false, |inner| inner.notified.load(Ordering::Acquire) > 0) - } /// Returns a guard listening for a notification. /// - /// This method emits a `SeqCst` fence after registering a listener. For now, this method - /// is an alias for calling [`EventListener::new()`], pinning it to the heap, and then - /// inserting it into a list. + /// This method emits a `SeqCst` fence after registering a listener. /// /// # Examples /// @@ -268,68 +170,30 @@ impl Event { /// let event = Event::new(); /// let listener = event.listen(); /// ``` - /// - /// # Caveats - /// - /// The above example is equivalent to this code: - /// - /// ```no_compile - /// use event_listener::{Event, EventListener}; - /// - /// let event = Event::new(); - /// let mut listener = Box::pin(EventListener::new()); - /// listener.listen(&event); - /// ``` - /// - /// It creates a new listener, pins it to the heap, and inserts it into the linked list - /// of listeners. While this type of usage is simple, it may be desired to eliminate this - /// heap allocation. In this case, consider using the [`EventListener::new`] constructor - /// directly, which allows for greater control over where the [`EventListener`] is - /// allocated. However, users of this `new` method must be careful to ensure that the - /// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise. #[cold] - pub fn listen(&self) -> EventListener { - let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) }); - - // Allocate the listener on the heap and insert it. - let mut listener = Box::pin(InnerListener { - event: Arc::clone(&inner), - listener: None, - }); - listener.as_mut().listen(); - - // Return the listener. - EventListener { listener } + pub fn listen(&self) -> EventListener { + let inner = self.inner(); + let listener = EventListener { + inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, + entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) }, + }; + + // Make sure the listener is registered before whatever happens next. + full_fence(); + listener } /// Notifies a number of active listeners. /// /// The number is allowed to be zero or exceed the current number of listeners. /// - /// The [`Notification`] trait is used to define what kind of notification is delivered. - /// The default implementation (implemented on `usize`) is a notification that only notifies - /// *at least* the specified number of listeners. - /// - /// In certain cases, this function emits a `SeqCst` fence before notifying listeners. - /// - /// This function returns the number of [`EventListener`]s that were notified by this call. - /// - /// # Caveats - /// - /// If the `std` feature is disabled, the notification will be delayed under high contention, - /// such as when another thread is taking a while to `notify` the event. In this circumstance, - /// this function will return `0` instead of the number of listeners actually notified. Therefore - /// if the `std` feature is disabled the return value of this function should not be relied upon - /// for soundness and should be used only as a hint. + /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` + /// listeners among the active ones are notified. /// - /// If the `std` feature is enabled, no spurious returns are possible, since the `std` - /// implementation uses system locking primitives to ensure there is no unavoidable - /// contention. + /// This method emits a `SeqCst` fence before notifying listeners. /// /// # Examples /// - /// Use the default notification strategy: - /// /// ``` /// use event_listener::Event; /// @@ -348,20 +212,39 @@ impl Event { /// // get notified here since they start listening before `listener3`. /// event.notify(2); /// ``` + #[inline] + pub fn notify(&self, n: usize) { + // Make sure the notification comes after whatever triggered it. + full_fence(); + + if let Some(inner) = self.try_inner() { + // Notify if there is at least one unnotified listener and the number of notified + // listeners is less than `n`. + if inner.notified.load(Ordering::Acquire) < n { + inner.lock().notify(n); + } + } + } + + /// Notifies a number of active listeners without emitting a `SeqCst` fence. + /// + /// The number is allowed to be zero or exceed the current number of listeners. + /// + /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` + /// listeners among the active ones are notified. /// - /// Notify without emitting a `SeqCst` fence. This uses the [`relaxed`] notification strategy. - /// This is equivalent to calling [`Event::notify_relaxed()`]. + /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence. /// - /// [`relaxed`]: IntoNotification::relaxed + /// # Examples /// /// ``` - /// use event_listener::{IntoNotification, Event}; + /// use event_listener::Event; /// use std::sync::atomic::{self, Ordering}; /// /// let event = Event::new(); /// /// // This notification gets lost because there are no listeners. - /// event.notify(1.relaxed()); + /// event.notify(1); /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); @@ -374,22 +257,37 @@ impl Event { /// // /// // Listener queueing is fair, which means `listener1` and `listener2` /// // get notified here since they start listening before `listener3`. - /// event.notify(2.relaxed()); + /// event.notify(2); /// ``` + #[inline] + pub fn notify_relaxed(&self, n: usize) { + if let Some(inner) = self.try_inner() { + // Notify if there is at least one unnotified listener and the number of notified + // listeners is less than `n`. + if inner.notified.load(Ordering::Acquire) < n { + inner.lock().notify(n); + } + } + } + + /// Notifies a number of active and still unnotified listeners. + /// + /// The number is allowed to be zero or exceed the current number of listeners. + /// + /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that + /// were previously unnotified. /// - /// Notify additional listeners. In contrast to [`Event::notify()`], this method will notify `n` - /// *additional* listeners that were previously unnotified. This uses the [`additional`] - /// notification strategy. This is equivalent to calling [`Event::notify_additional()`]. + /// This method emits a `SeqCst` fence before notifying listeners. /// - /// [`additional`]: IntoNotification::additional + /// # Examples /// /// ``` - /// use event_listener::{IntoNotification, Event}; + /// use event_listener::Event; /// /// let event = Event::new(); /// /// // This notification gets lost because there are no listeners. - /// event.notify(1.additional()); + /// event.notify(1); /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); @@ -399,21 +297,42 @@ impl Event { /// // /// // Listener queueing is fair, which means `listener1` and `listener2` /// // get notified here since they start listening before `listener3`. - /// event.notify(1.additional()); - /// event.notify(1.additional()); + /// event.notify_additional(1); + /// event.notify_additional(1); /// ``` + #[inline] + pub fn notify_additional(&self, n: usize) { + // Make sure the notification comes after whatever triggered it. + full_fence(); + + if let Some(inner) = self.try_inner() { + // Notify if there is at least one unnotified listener. + if inner.notified.load(Ordering::Acquire) < usize::MAX { + inner.lock().notify_additional(n); + } + } + } + + /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst` + /// fence. + /// + /// The number is allowed to be zero or exceed the current number of listeners. + /// + /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that + /// were previously unnotified. + /// + /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence. /// - /// Notifies with the [`additional`] and [`relaxed`] strategies at the same time. This is - /// equivalent to calling [`Event::notify_additional_relaxed()`]. + /// # Examples /// /// ``` - /// use event_listener::{IntoNotification, Event}; + /// use event_listener::Event; /// use std::sync::atomic::{self, Ordering}; /// /// let event = Event::new(); /// /// // This notification gets lost because there are no listeners. - /// event.notify(1.additional().relaxed()); + /// event.notify(1); /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); @@ -426,56 +345,56 @@ impl Event { /// // /// // Listener queueing is fair, which means `listener1` and `listener2` /// // get notified here since they start listening before `listener3`. - /// event.notify(1.additional().relaxed()); - /// event.notify(1.additional().relaxed()); + /// event.notify_additional_relaxed(1); + /// event.notify_additional_relaxed(1); /// ``` #[inline] - pub fn notify(&self, notify: impl IntoNotification) -> usize { - let notify = notify.into_notification(); - - // Make sure the notification comes after whatever triggered it. - notify.fence(notify::Internal::new()); - + pub fn notify_additional_relaxed(&self, n: usize) { if let Some(inner) = self.try_inner() { - let limit = if notify.is_additional(Internal::new()) { - core::usize::MAX - } else { - notify.count(Internal::new()) - }; - - // Notify if there is at least one unnotified listener and the number of notified - // listeners is less than `limit`. - if inner.needs_notification(limit) { - return inner.notify(notify); + // Notify if there is at least one unnotified listener. + if inner.notified.load(Ordering::Acquire) < usize::MAX { + inner.lock().notify_additional(n); } } - - 0 } - /// Return a reference to the inner state if it has been initialized. + /// Returns a reference to the inner state if it was initialized. #[inline] - fn try_inner(&self) -> Option<&Inner> { + fn try_inner(&self) -> Option<&Inner> { let inner = self.inner.load(Ordering::Acquire); unsafe { inner.as_ref() } } - /// Returns a raw, initialized pointer to the inner state. + /// Returns a raw pointer to the inner state, initializing it if necessary. /// /// This returns a raw pointer instead of reference because `from_raw` - /// requires raw/mut provenance: . - fn inner(&self) -> *const Inner { + /// requires raw/mut provenance: + fn inner(&self) -> *const Inner { let mut inner = self.inner.load(Ordering::Acquire); - // If this is the first use, initialize the state. + // Initialize the state if this is its first use. if inner.is_null() { - // Allocate the state on the heap. - let new = Arc::new(Inner::::new()); - - // Convert the state to a raw pointer. - let new = Arc::into_raw(new) as *mut Inner; - - // Replace the null pointer with the new state pointer. + // Allocate on the heap. + let new = Arc::new(Inner { + notified: AtomicUsize::new(usize::MAX), + list: std::sync::Mutex::new(List { + head: None, + tail: None, + start: None, + len: 0, + notified: 0, + cache_used: false, + }), + cache: UnsafeCell::new(Entry { + state: Cell::new(State::Created), + prev: Cell::new(None), + next: Cell::new(None), + }), + }); + // Convert the heap-allocated state into a raw pointer. + let new = Arc::into_raw(new) as *mut Inner; + + // Attempt to replace the null-pointer with the new state pointer. inner = self .inner .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) @@ -496,21 +415,60 @@ impl Event { inner } +} - /// Get the number of listeners currently listening to this [`Event`]. - /// - /// This call returns the number of [`EventListener`]s that are currently listening to - /// this event. It does this by acquiring the internal event lock and reading the listener - /// count. Therefore it is only available for `std`-enabled platforms. - /// - /// # Caveats - /// - /// This function returns just a snapshot of the number of listeners at this point in time. - /// Due to the nature of multi-threaded CPUs, it is possible that this number will be - /// inaccurate by the time that this function returns. - /// - /// It is possible for the actual number to change at any point. Therefore, the number should - /// only ever be used as a hint. +impl Drop for Event { + #[inline] + fn drop(&mut self) { + let inner: *mut Inner = *self.inner.get_mut(); + + // If the state pointer has been initialized, deallocate it. + if !inner.is_null() { + unsafe { + drop(Arc::from_raw(inner)); + } + } + } +} + +impl fmt::Debug for Event { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Event { .. }") + } +} + +impl Default for Event { + fn default() -> Event { + Event::new() + } +} + +/// A guard waiting for a notification from an [`Event`]. +/// +/// There are two ways for a listener to wait for a notification: +/// +/// 1. In an asynchronous manner using `.await`. +/// 2. In a blocking manner by calling [`EventListener::wait()`] on it. +/// +/// If a notified listener is dropped without receiving a notification, dropping will notify +/// another active listener. Whether one *additional* listener will be notified depends on what +/// kind of notification was delivered. +pub struct EventListener { + /// A reference to [`Event`]'s inner state. + inner: Arc, + + /// A pointer to this listener's entry in the linked list. + entry: Option>, +} + +unsafe impl Send for EventListener {} +unsafe impl Sync for EventListener {} + +impl UnwindSafe for EventListener {} +impl RefUnwindSafe for EventListener {} + +impl EventListener { + /// Blocks until a notification is received. /// /// # Examples /// @@ -518,130 +476,106 @@ impl Event { /// use event_listener::Event; /// /// let event = Event::new(); + /// let listener = event.listen(); /// - /// assert_eq!(event.total_listeners(), 0); - /// - /// let listener1 = event.listen(); - /// assert_eq!(event.total_listeners(), 1); - /// - /// let listener2 = event.listen(); - /// assert_eq!(event.total_listeners(), 2); + /// // Notify `listener`. + /// event.notify(1); /// - /// drop(listener1); - /// drop(listener2); - /// assert_eq!(event.total_listeners(), 0); + /// // Receive the notification. + /// listener.wait(); /// ``` - #[cfg(feature = "std")] - #[inline] - pub fn total_listeners(&self) -> usize { - if let Some(inner) = self.try_inner() { - inner.list.total_listeners() - } else { - 0 - } + pub fn wait(self) { + self.wait_internal(None); } -} -impl Event<()> { - /// Creates a new [`Event`]. + /// Blocks until a notification is received or a timeout is reached. + /// + /// Returns `true` if a notification was received. /// /// # Examples /// /// ``` + /// use std::time::Duration; /// use event_listener::Event; /// /// let event = Event::new(); + /// let listener = event.listen(); + /// + /// // There are no notification so this times out. + /// assert!(!listener.wait_timeout(Duration::from_secs(1))); /// ``` - #[inline] - #[cfg(not(loom))] - pub const fn new() -> Self { - Self { - inner: AtomicPtr::new(ptr::null_mut()), - } - } - - #[inline] - #[cfg(loom)] - pub fn new() -> Self { - Self { - inner: AtomicPtr::new(ptr::null_mut()), - } + pub fn wait_timeout(self, timeout: Duration) -> bool { + self.wait_internal(Some(Instant::now() + timeout)) } - /// Notifies a number of active listeners without emitting a `SeqCst` fence. - /// - /// The number is allowed to be zero or exceed the current number of listeners. - /// - /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` - /// listeners among the active ones are notified. + /// Blocks until a notification is received or a deadline is reached. /// - /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence. + /// Returns `true` if a notification was received. /// - /// This method only works for untagged events. In other cases, it is recommended to instead - /// use [`Event::notify()`] like so: + /// # Examples /// /// ``` - /// use event_listener::{IntoNotification, Event}; - /// let event = Event::new(); + /// use std::time::{Duration, Instant}; + /// use event_listener::Event; /// - /// // Old way: - /// event.notify_relaxed(1); + /// let event = Event::new(); + /// let listener = event.listen(); /// - /// // New way: - /// event.notify(1.relaxed()); + /// // There are no notification so this times out. + /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); /// ``` + pub fn wait_deadline(self, deadline: Instant) -> bool { + self.wait_internal(Some(deadline)) + } + + /// Drops this listener and discards its notification (if any) without notifying another + /// active listener. /// - /// # Examples + /// Returns `true` if a notification was discarded. /// + /// # Examples /// ``` - /// use event_listener::{Event, IntoNotification}; - /// use std::sync::atomic::{self, Ordering}; + /// use event_listener::Event; /// /// let event = Event::new(); - /// - /// // This notification gets lost because there are no listeners. - /// event.notify_relaxed(1); - /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); - /// let listener3 = event.listen(); /// - /// // We should emit a fence manually when using relaxed notifications. - /// atomic::fence(Ordering::SeqCst); + /// event.notify(1); /// - /// // Notifies two listeners. - /// // - /// // Listener queueing is fair, which means `listener1` and `listener2` - /// // get notified here since they start listening before `listener3`. - /// event.notify_relaxed(2); + /// assert!(listener1.discard()); + /// assert!(!listener2.discard()); /// ``` - #[inline] - pub fn notify_relaxed(&self, n: usize) -> usize { - self.notify(n.relaxed()) + pub fn discard(mut self) -> bool { + // If this listener has never picked up a notification... + if let Some(entry) = self.entry.take() { + let mut list = self.inner.lock(); + // Remove the listener from the list and return `true` if it was notified. + if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) { + return true; + } + } + false } - /// Notifies a number of active and still unnotified listeners. - /// - /// The number is allowed to be zero or exceed the current number of listeners. - /// - /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that - /// were previously unnotified. - /// - /// This method emits a `SeqCst` fence before notifying listeners. + /// Returns `true` if this listener listens to the given `Event`. /// - /// This method only works for untagged events. In other cases, it is recommended to instead - /// use [`Event::notify()`] like so: + /// # Examples /// /// ``` - /// use event_listener::{IntoNotification, Event}; - /// let event = Event::new(); + /// use event_listener::Event; /// - /// // Old way: - /// event.notify_additional(1); + /// let event = Event::new(); + /// let listener = event.listen(); /// - /// // New way: - /// event.notify(1.additional()); + /// assert!(listener.listens_to(&event)); /// ``` + #[inline] + pub fn listens_to(&self, event: &Event) -> bool { + ptr::eq::(&*self.inner, event.inner.load(Ordering::Acquire)) + } + + /// Returns `true` if both listeners listen to the same `Event`. /// /// # Examples /// @@ -649,903 +583,415 @@ impl Event<()> { /// use event_listener::Event; /// /// let event = Event::new(); - /// - /// // This notification gets lost because there are no listeners. - /// event.notify_additional(1); - /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); - /// let listener3 = event.listen(); /// - /// // Notifies two listeners. - /// // - /// // Listener queueing is fair, which means `listener1` and `listener2` - /// // get notified here since they start listening before `listener3`. - /// event.notify_additional(1); - /// event.notify_additional(1); + /// assert!(listener1.same_event(&listener2)); /// ``` - #[inline] - pub fn notify_additional(&self, n: usize) -> usize { - self.notify(n.additional()) + pub fn same_event(&self, other: &EventListener) -> bool { + ptr::eq::(&*self.inner, &*other.inner) } - /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst` - /// fence. - /// - /// The number is allowed to be zero or exceed the current number of listeners. - /// - /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that - /// were previously unnotified. - /// - /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence. - /// - /// This method only works for untagged events. In other cases, it is recommended to instead - /// use [`Event::notify()`] like so: - /// - /// ``` - /// use event_listener::{IntoNotification, Event}; - /// let event = Event::new(); - /// - /// // Old way: - /// event.notify_additional_relaxed(1); - /// - /// // New way: - /// event.notify(1.additional().relaxed()); - /// ``` - /// - /// # Examples - /// - /// ``` - /// use event_listener::Event; - /// use std::sync::atomic::{self, Ordering}; - /// - /// let event = Event::new(); - /// - /// // This notification gets lost because there are no listeners. - /// event.notify(1); - /// - /// let listener1 = event.listen(); - /// let listener2 = event.listen(); - /// let listener3 = event.listen(); - /// - /// // We should emit a fence manually when using relaxed notifications. - /// atomic::fence(Ordering::SeqCst); - /// - /// // Notifies two listeners. - /// // - /// // Listener queueing is fair, which means `listener1` and `listener2` - /// // get notified here since they start listening before `listener3`. - /// event.notify_additional_relaxed(1); - /// event.notify_additional_relaxed(1); - /// ``` - #[inline] - pub fn notify_additional_relaxed(&self, n: usize) -> usize { - self.notify(n.additional().relaxed()) - } -} + fn wait_internal(mut self, deadline: Option) -> bool { + // Take out the entry pointer and set it to `None`. + let entry = match self.entry.take() { + None => unreachable!("cannot wait twice on an `EventListener`"), + Some(entry) => entry, + }; -impl Drop for Event { - #[inline] - fn drop(&mut self) { - self.inner.with_mut(|&mut inner| { - // If the state pointer has been initialized, drop it. - if !inner.is_null() { - unsafe { - drop(Arc::from_raw(inner)); + // Set this listener's state to `Waiting`. + { + let mut list = self.inner.lock(); + let e = unsafe { entry.as_ref() }; + + // Do a dummy replace operation in order to take out the state. + match e.state.replace(State::Notified(false)) { + State::Notified(_) => { + // If this listener has been notified, remove it from the list and return. + list.remove(entry, self.inner.cache_ptr()); + return true; } + // Otherwise, set the state to `Waiting`. + _ => e.state.set(State::Waiting(thread::current())), } - }) - } -} - -/// A handle that is listening to an [`Event`]. -/// -/// This trait represents a type waiting for a notification from an [`Event`]. See the -/// [`EventListener`] type for more documentation on this trait's usage. -pub trait Listener: Future + __sealed::Sealed { - /// Blocks until a notification is received. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let mut listener = event.listen(); - /// - /// // Notify `listener`. - /// event.notify(1); - /// - /// // Receive the notification. - /// listener.wait(); - /// ``` - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait(self) -> T; - - /// Blocks until a notification is received or a timeout is reached. - /// - /// Returns `true` if a notification was received. - /// - /// # Examples - /// - /// ``` - /// use std::time::Duration; - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let mut listener = event.listen(); - /// - /// // There are no notification so this times out. - /// assert!(listener.wait_timeout(Duration::from_secs(1)).is_none()); - /// ``` - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait_timeout(self, timeout: Duration) -> Option; + } - /// Blocks until a notification is received or a deadline is reached. - /// - /// Returns `true` if a notification was received. - /// - /// # Examples - /// - /// ``` - /// use std::time::{Duration, Instant}; - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let mut listener = event.listen(); - /// - /// // There are no notification so this times out. - /// assert!(listener.wait_deadline(Instant::now() + Duration::from_secs(1)).is_none()); - /// ``` - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait_deadline(self, deadline: Instant) -> Option; + // Wait until a notification is received or the timeout is reached. + loop { + match deadline { + None => thread::park(), - /// Drops this listener and discards its notification (if any) without notifying another - /// active listener. - /// - /// Returns `true` if a notification was discarded. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let mut listener1 = event.listen(); - /// let mut listener2 = event.listen(); - /// - /// event.notify(1); - /// - /// assert!(listener1.discard()); - /// assert!(!listener2.discard()); - /// ``` - fn discard(self) -> bool; + Some(deadline) => { + // Check for timeout. + let now = Instant::now(); + if now >= deadline { + // Remove the entry and check if notified. + return self + .inner + .lock() + .remove(entry, self.inner.cache_ptr()) + .is_notified(); + } - /// Returns `true` if this listener listens to the given `Event`. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let listener = event.listen(); - /// - /// assert!(listener.listens_to(&event)); - /// ``` - fn listens_to(&self, event: &Event) -> bool; + // Park until the deadline. + thread::park_timeout(deadline - now); + } + } - /// Returns `true` if both listeners listen to the same `Event`. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, Listener}; - /// - /// let event = Event::new(); - /// let listener1 = event.listen(); - /// let listener2 = event.listen(); - /// - /// assert!(listener1.same_event(&listener2)); - /// ``` - fn same_event(&self, other: &Self) -> bool; -} + let mut list = self.inner.lock(); + let e = unsafe { entry.as_ref() }; -/// Implement the `Listener` trait using the underlying `InnerListener`. -macro_rules! forward_impl_to_listener { - ($gen:ident => $ty:ty) => { - impl<$gen> crate::Listener<$gen> for $ty { - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait(mut self) -> $gen { - self.listener_mut().wait_internal(None).unwrap() + // Do a dummy replace operation in order to take out the state. + match e.state.replace(State::Notified(false)) { + State::Notified(_) => { + // If this listener has been notified, remove it from the list and return. + list.remove(entry, self.inner.cache_ptr()); + return true; + } + // Otherwise, set the state back to `Waiting`. + state => e.state.set(state), } + } + } +} - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait_timeout(mut self, timeout: std::time::Duration) -> Option<$gen> { - self.listener_mut() - .wait_internal(std::time::Instant::now().checked_add(timeout)) - } +impl fmt::Debug for EventListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("EventListener { .. }") + } +} - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait_deadline(mut self, deadline: std::time::Instant) -> Option<$gen> { - self.listener_mut().wait_internal(Some(deadline)) +impl Future for EventListener { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut list = self.inner.lock(); + + let entry = match self.entry { + None => unreachable!("cannot poll a completed `EventListener` future"), + Some(entry) => entry, + }; + let state = unsafe { &entry.as_ref().state }; + + // Do a dummy replace operation in order to take out the state. + match state.replace(State::Notified(false)) { + State::Notified(_) => { + // If this listener has been notified, remove it from the list and return. + list.remove(entry, self.inner.cache_ptr()); + drop(list); + self.entry = None; + return Poll::Ready(()); } - - fn discard(mut self) -> bool { - self.listener_mut().discard() + State::Created => { + // If the listener was just created, put it in the `Polling` state. + state.set(State::Polling(cx.waker().clone())); } - - #[inline] - fn listens_to(&self, event: &Event<$gen>) -> bool { - core::ptr::eq::>( - &*self.listener().event, - event.inner.load(core::sync::atomic::Ordering::Acquire), - ) + State::Polling(w) => { + // If the listener was in the `Polling` state, update the waker. + if w.will_wake(cx.waker()) { + state.set(State::Polling(w)); + } else { + state.set(State::Polling(cx.waker().clone())); + } } - - #[inline] - fn same_event(&self, other: &$ty) -> bool { - core::ptr::eq::>(&*self.listener().event, &*other.listener().event) + State::Waiting(_) => { + unreachable!("cannot poll and wait on `EventListener` at the same time") } } - impl<$gen> Future for $ty { - type Output = $gen; + Poll::Pending + } +} - #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<$gen> { - self.listener_mut().poll_internal(cx) +impl Drop for EventListener { + fn drop(&mut self) { + // If this listener has never picked up a notification... + if let Some(entry) = self.entry.take() { + let mut list = self.inner.lock(); + + // But if a notification was delivered to it... + if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) { + // Then pass it on to another active listener. + if additional { + list.notify_additional(1); + } else { + list.notify(1); + } } } - }; -} - -/// A guard waiting for a notification from an [`Event`]. -/// -/// There are two ways for a listener to wait for a notification: -/// -/// 1. In an asynchronous manner using `.await`. -/// 2. In a blocking manner by calling [`EventListener::wait()`] on it. -/// -/// If a notified listener is dropped without receiving a notification, dropping will notify -/// another active listener. Whether one *additional* listener will be notified depends on what -/// kind of notification was delivered. -/// -/// See the [`Listener`] trait for the functionality exposed by this type. -/// -/// This structure allocates the listener on the heap. -pub struct EventListener { - listener: Pin>>>>, + } } -unsafe impl Send for EventListener {} -unsafe impl Sync for EventListener {} +/// A guard holding the linked list locked. +struct ListGuard<'a> { + /// A reference to [`Event`]'s inner state. + inner: &'a Inner, -impl core::panic::UnwindSafe for EventListener {} -impl core::panic::RefUnwindSafe for EventListener {} -impl Unpin for EventListener {} - -impl fmt::Debug for EventListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("EventListener").finish_non_exhaustive() - } + /// The actual guard that acquired the linked list. + guard: MutexGuard<'a, List>, } -impl EventListener { +impl Drop for ListGuard<'_> { #[inline] - fn listener(&self) -> &InnerListener>> { - &self.listener - } + fn drop(&mut self) { + let list = &mut **self; - #[inline] - fn listener_mut(&mut self) -> Pin<&mut InnerListener>>> { - self.listener.as_mut() + // Update the atomic `notified` counter. + let notified = if list.notified < list.len { + list.notified + } else { + usize::MAX + }; + self.inner.notified.store(notified, Ordering::Release); } } -forward_impl_to_listener! { T => EventListener } - -/// Create a stack-based event listener for an [`Event`]. -/// -/// [`EventListener`] allocates the listener on the heap. While this works for most use cases, in -/// practice this heap allocation can be expensive for repeated uses. This method allows for -/// allocating the listener on the stack instead. -/// -/// There are limitations to using this macro instead of the [`EventListener`] type, however. -/// Firstly, it is significantly less flexible. The listener is locked to the current stack -/// frame, meaning that it can't be returned or put into a place where it would go out of -/// scope. For instance, this will not work: -/// -/// ```compile_fail -/// use event_listener::{Event, Listener, listener}; -/// -/// fn get_listener(event: &Event) -> impl Listener { -/// listener!(event => cant_return_this); -/// cant_return_this -/// } -/// ``` -/// -/// In addition, the types involved in creating this listener are not able to be named. Therefore -/// it cannot be used in hand-rolled futures or similar structures. -/// -/// The type created by this macro implements [`Listener`], allowing it to be used in cases where -/// [`EventListener`] would normally be used. -/// -/// ## Example -/// -/// To use this macro, replace cases where you would normally use this... -/// -/// ```no_compile -/// let listener = event.listen(); -/// ``` -/// -/// ...with this: -/// -/// ```no_compile -/// listener!(event => listener); -/// ``` -/// -/// Here is the top level example from this crate's documentation, but using [`listener`] instead -/// of [`EventListener`]. -/// -/// ``` -/// use std::sync::atomic::{AtomicBool, Ordering}; -/// use std::sync::Arc; -/// use std::thread; -/// use std::time::Duration; -/// use std::usize; -/// use event_listener::{Event, listener, IntoNotification, Listener}; -/// -/// let flag = Arc::new(AtomicBool::new(false)); -/// let event = Arc::new(Event::new()); -/// -/// // Spawn a thread that will set the flag after 1 second. -/// thread::spawn({ -/// let flag = flag.clone(); -/// let event = event.clone(); -/// move || { -/// // Wait for a second. -/// thread::sleep(Duration::from_secs(1)); -/// -/// // Set the flag. -/// flag.store(true, Ordering::SeqCst); -/// -/// // Notify all listeners that the flag has been set. -/// event.notify(usize::MAX); -/// } -/// }); -/// -/// // Wait until the flag is set. -/// loop { -/// // Check the flag. -/// if flag.load(Ordering::SeqCst) { -/// break; -/// } -/// -/// // Start listening for events. -/// // NEW: Changed to a stack-based listener. -/// listener!(event => listener); -/// -/// // Check the flag again after creating the listener. -/// if flag.load(Ordering::SeqCst) { -/// break; -/// } -/// -/// // Wait for a notification and continue the loop. -/// listener.wait(); -/// } -/// ``` -#[macro_export] -macro_rules! listener { - ($event:expr => $listener:ident) => { - let mut $listener = $crate::__private::StackSlot::new(&$event); - // SAFETY: We shadow $listener so it can't be moved after. - let mut $listener = unsafe { $crate::__private::Pin::new_unchecked(&mut $listener) }; - #[allow(unused_mut)] - let mut $listener = $listener.listen(); - }; -} - -pin_project_lite::pin_project! { - #[project(!Unpin)] - #[project = ListenerProject] - struct InnerListener>> - where - B: Unpin, - { - // The reference to the original event. - event: B, - - // The inner state of the listener. - // - // This is only ever `None` during initialization. After `listen()` has completed, this - // should be `Some`. - #[pin] - listener: Option>, - } +impl Deref for ListGuard<'_> { + type Target = List; - impl>> PinnedDrop for InnerListener - where - B: Unpin, - { - fn drop(mut this: Pin<&mut Self>) { - // If we're being dropped, we need to remove ourself from the list. - let this = this.project(); - (*this.event).borrow().remove(this.listener, true); - } + #[inline] + fn deref(&self) -> &List { + &*self.guard } } -unsafe impl> + Unpin + Send> Send for InnerListener {} -unsafe impl> + Unpin + Sync> Sync for InnerListener {} - -impl> + Unpin> InnerListener { - /// Insert this listener into the linked list. +impl DerefMut for ListGuard<'_> { #[inline] - fn listen(self: Pin<&mut Self>) { - let this = self.project(); - (*this.event).borrow().insert(this.listener); - } - - /// Wait until the provided deadline. - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait_internal(mut self: Pin<&mut Self>, deadline: Option) -> Option { - fn parker_and_task() -> (Parker, Task) { - let parker = Parker::new(); - let unparker = parker.unparker(); - (parker, Task::Unparker(unparker)) - } - - std::thread_local! { - /// Cached thread-local parker/unparker pair. - static PARKER: (Parker, Task) = parker_and_task(); - } - - // Try to borrow the thread-local parker/unparker pair. - PARKER - .try_with({ - let this = self.as_mut(); - |(parker, unparker)| this.wait_with_parker(deadline, parker, unparker.as_task_ref()) - }) - .unwrap_or_else(|_| { - // If the pair isn't accessible, we may be being called in a destructor. - // Just create a new pair. - let (parker, unparker) = parking::pair(); - self.as_mut() - .wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker)) - }) - } - - /// Wait until the provided deadline using the specified parker/unparker pair. - #[cfg(all(feature = "std", not(target_family = "wasm")))] - fn wait_with_parker( - self: Pin<&mut Self>, - deadline: Option, - parker: &Parker, - unparker: TaskRef<'_>, - ) -> Option { - let mut this = self.project(); - let inner = (*this.event).borrow(); - - // Set the listener's state to `Task`. - if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() { - // We were already notified, so we don't need to park. - return Some(tag); - } - - // Wait until a notification is received or the timeout is reached. - loop { - match deadline { - None => parker.park(), - - #[cfg(loom)] - Some(_deadline) => { - panic!("parking does not support timeouts under loom"); - } - - #[cfg(not(loom))] - Some(deadline) => { - // Make sure we're not timed out already. - let now = Instant::now(); - if now >= deadline { - // Remove our entry and check if we were notified. - return inner - .remove(this.listener.as_mut(), false) - .expect("We never removed ourself from the list") - .notified(); - } - parker.park_deadline(deadline); - } - } - - // See if we were notified. - if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() { - return Some(tag); - } - } - } - - /// Drops this listener and discards its notification (if any) without notifying another - /// active listener. - fn discard(self: Pin<&mut Self>) -> bool { - let this = self.project(); - (*this.event) - .borrow() - .remove(this.listener, false) - .map_or(false, |state| state.is_notified()) - } - - /// Poll this listener for a notification. - fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let inner = (*this.event).borrow(); - - // Try to register the listener. - match inner - .register(this.listener, TaskRef::Waker(cx.waker())) - .notified() - { - Some(tag) => { - // We were already notified, so we don't need to park. - Poll::Ready(tag) - } - - None => { - // We're now waiting for a notification. - Poll::Pending - } - } + fn deref_mut(&mut self) -> &mut List { + &mut *self.guard } } /// The state of a listener. -#[derive(PartialEq)] -enum State { - /// The listener was just created. +enum State { + /// It has just been created. Created, - /// The listener has received a notification. + /// It has received a notification. /// /// The `bool` is `true` if this was an "additional" notification. - Notified { - /// Whether or not this is an "additional" notification. - additional: bool, - - /// The tag associated with the notification. - tag: T, - }, + Notified(bool), - /// A task is waiting for a notification. - Task(Task), + /// An async task is polling it. + Polling(Waker), - /// Empty hole used to replace a notified listener. - NotifiedTaken, -} - -impl fmt::Debug for State { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Created => f.write_str("Created"), - Self::Notified { additional, .. } => f - .debug_struct("Notified") - .field("additional", additional) - .finish(), - Self::Task(_) => f.write_str("Task(_)"), - Self::NotifiedTaken => f.write_str("NotifiedTaken"), - } - } + /// A thread is blocked on it. + Waiting(Thread), } -impl State { +impl State { + /// Returns `true` if this is the `Notified` state. + #[inline] fn is_notified(&self) -> bool { - matches!(self, Self::Notified { .. } | Self::NotifiedTaken) - } - - /// If this state was notified, return the tag associated with the notification. - #[allow(unused)] - fn notified(self) -> Option { match self { - Self::Notified { tag, .. } => Some(tag), - Self::NotifiedTaken => panic!("listener was already notified but taken"), - _ => None, + State::Notified(_) => true, + State::Created | State::Polling(_) | State::Waiting(_) => false, } } } -/// The result of registering a listener. -#[derive(Debug, PartialEq)] -enum RegisterResult { - /// The listener was already notified. - Notified(T), +/// An entry representing a registered listener. +struct Entry { + /// THe state of this listener. + state: Cell, - /// The listener has been registered. - Registered, + /// Previous entry in the linked list. + prev: Cell>>, - /// The listener was never inserted into the list. - NeverInserted, + /// Next entry in the linked list. + next: Cell>>, } -impl RegisterResult { - /// Whether or not the listener was notified. - /// - /// Panics if the listener was never inserted into the list. - fn notified(self) -> Option { - match self { - Self::Notified(tag) => Some(tag), - Self::Registered => None, - Self::NeverInserted => panic!("{}", NEVER_INSERTED_PANIC), - } - } -} +/// A linked list of entries. +struct List { + /// First entry in the list. + head: Option>, -/// A task that can be woken up. -#[derive(Debug, Clone)] -enum Task { - /// A waker that wakes up a future. - Waker(Waker), + /// Last entry in the list. + tail: Option>, - /// An unparker that wakes up a thread. - #[cfg(all(feature = "std", not(target_family = "wasm")))] - Unparker(Unparker), -} + /// The first unnotified entry in the list. + start: Option>, -impl Task { - fn as_task_ref(&self) -> TaskRef<'_> { - match self { - Self::Waker(waker) => TaskRef::Waker(waker), - #[cfg(all(feature = "std", not(target_family = "wasm")))] - Self::Unparker(unparker) => TaskRef::Unparker(unparker), - } - } + /// Total number of entries in the list. + len: usize, - fn wake(self) { - match self { - Self::Waker(waker) => waker.wake(), - #[cfg(all(feature = "std", not(target_family = "wasm")))] - Self::Unparker(unparker) => { - unparker.unpark(); - } - } - } -} + /// The number of notified entries in the list. + notified: usize, -impl PartialEq for Task { - fn eq(&self, other: &Self) -> bool { - self.as_task_ref().will_wake(other.as_task_ref()) - } + /// Whether the cached entry is used. + cache_used: bool, } -/// A reference to a task. -#[derive(Clone, Copy)] -enum TaskRef<'a> { - /// A waker that wakes up a future. - Waker(&'a Waker), +impl List { + /// Inserts a new entry into the list. + fn insert(&mut self, cache: NonNull) -> NonNull { + unsafe { + let entry = Entry { + state: Cell::new(State::Created), + prev: Cell::new(self.tail), + next: Cell::new(None), + }; - /// An unparker that wakes up a thread. - #[cfg(all(feature = "std", not(target_family = "wasm")))] - Unparker(&'a Unparker), -} + let entry = if self.cache_used { + // Allocate an entry that is going to become the new tail. + NonNull::new_unchecked(Box::into_raw(Box::new(entry))) + } else { + // No need to allocate - we can use the cached entry. + self.cache_used = true; + cache.as_ptr().write(entry); + cache + }; -impl TaskRef<'_> { - /// Tells if this task will wake up the other task. - #[allow(unreachable_patterns)] - fn will_wake(self, other: Self) -> bool { - match (self, other) { - (Self::Waker(a), Self::Waker(b)) => a.will_wake(b), - #[cfg(all(feature = "std", not(target_family = "wasm")))] - (Self::Unparker(_), Self::Unparker(_)) => { - // TODO: Use unreleased will_unpark API. - false + // Replace the tail with the new entry. + match mem::replace(&mut self.tail, Some(entry)) { + None => self.head = Some(entry), + Some(t) => t.as_ref().next.set(Some(entry)), } - _ => false, - } - } - - /// Converts this task reference to a task by cloning. - fn into_task(self) -> Task { - match self { - Self::Waker(waker) => Task::Waker(waker.clone()), - #[cfg(all(feature = "std", not(target_family = "wasm")))] - Self::Unparker(unparker) => Task::Unparker(unparker.clone()), - } - } -} - -const NEVER_INSERTED_PANIC: &str = "\ -EventListener was not inserted into the linked list, make sure you're not polling \ -EventListener/listener! after it has finished"; - -#[cfg(not(loom))] -/// Synchronization primitive implementation. -mod sync { - #[cfg(not(feature = "portable-atomic"))] - pub(super) use alloc::sync::Arc; - #[cfg(not(feature = "portable-atomic"))] - pub(super) use core::sync::atomic; - - #[cfg(feature = "portable-atomic")] - pub(super) use portable_atomic_crate as atomic; - #[cfg(feature = "portable-atomic")] - pub(super) use portable_atomic_util::Arc; - #[cfg(all(feature = "std", not(loom)))] - pub(super) use std::sync::{Mutex, MutexGuard}; - - pub(super) trait WithMut { - type Output; - - fn with_mut(&mut self, f: F) -> R - where - F: FnOnce(&mut Self::Output) -> R; - } + // If there were no unnotified entries, this one is the first now. + if self.start.is_none() { + self.start = self.tail; + } - impl WithMut for atomic::AtomicPtr { - type Output = *mut T; + // Bump the entry count. + self.len += 1; - #[inline] - fn with_mut(&mut self, f: F) -> R - where - F: FnOnce(&mut Self::Output) -> R, - { - f(self.get_mut()) + entry } } - pub(crate) mod cell { - pub(crate) use core::cell::Cell; + /// Removes an entry from the list and returns its state. + fn remove(&mut self, entry: NonNull, cache: NonNull) -> State { + unsafe { + let prev = entry.as_ref().prev.get(); + let next = entry.as_ref().next.get(); - /// This newtype around *mut T exists for interoperability with loom::cell::ConstPtr, - /// which works as a guard and performs additional logic to track access scope. - pub(crate) struct ConstPtr(*mut T); - impl ConstPtr { - pub(crate) unsafe fn deref(&self) -> &T { - &*self.0 + // Unlink from the previous entry. + match prev { + None => self.head = next, + Some(p) => p.as_ref().next.set(next), } - #[allow(unused)] // std code does not need this - pub(crate) unsafe fn deref_mut(&mut self) -> &mut T { - &mut *self.0 + // Unlink from the next entry. + match next { + None => self.tail = prev, + Some(n) => n.as_ref().prev.set(prev), } - } - /// This UnsafeCell wrapper exists for interoperability with loom::cell::UnsafeCell, and - /// only contains the interface that is needed for this crate. - #[derive(Debug, Default)] - pub(crate) struct UnsafeCell(core::cell::UnsafeCell); - - impl UnsafeCell { - pub(crate) fn new(data: T) -> UnsafeCell { - UnsafeCell(core::cell::UnsafeCell::new(data)) + // If this was the first unnotified entry, move the pointer to the next one. + if self.start == Some(entry) { + self.start = next; } - pub(crate) fn get(&self) -> ConstPtr { - ConstPtr(self.0.get()) - } + // Extract the state. + let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) { + // Free the cached entry. + self.cache_used = false; + entry.as_ref().state.replace(State::Created) + } else { + // Deallocate the entry. + Box::from_raw(entry.as_ptr()).state.into_inner() + }; - #[allow(dead_code)] // no_std does not need this - pub(crate) fn into_inner(self) -> T { - self.0.into_inner() + // Update the counters. + if state.is_notified() { + self.notified -= 1; } - } - } -} - -#[cfg(loom)] -/// Synchronization primitive implementation. -mod sync { - pub(super) use loom::cell; - pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard}; -} - -fn __test_send_and_sync() { - fn _assert_send() {} - fn _assert_sync() {} - - _assert_send::>(); - _assert_sync::>(); - _assert_send::>(); - _assert_sync::>(); - _assert_send::>(); - _assert_sync::>(); - _assert_send::>(); - _assert_sync::>(); -} - -#[doc(hidden)] -mod __sealed { - use super::{EventListener, __private::StackListener}; + self.len -= 1; - pub trait Sealed {} - impl Sealed for EventListener {} - impl Sealed for StackListener<'_, '_, T> {} -} - -/// Semver exempt module. -#[doc(hidden)] -pub mod __private { - pub use core::pin::Pin; - - use super::{Event, Inner, InnerListener}; - use core::fmt; - use core::future::Future; - use core::task::{Context, Poll}; - - pin_project_lite::pin_project! { - /// Space on the stack where a stack-based listener can be allocated. - #[doc(hidden)] - #[project(!Unpin)] - pub struct StackSlot<'ev, T> { - #[pin] - listener: InnerListener> + state } } - impl fmt::Debug for StackSlot<'_, T> { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StackSlot").finish_non_exhaustive() - } - } + /// Notifies a number of entries. + #[cold] + fn notify(&mut self, mut n: usize) { + if n <= self.notified { + return; + } + n -= self.notified; + + while n > 0 { + n -= 1; + + // Notify the first unnotified entry. + match self.start { + None => break, + Some(e) => { + // Get the entry and move the pointer forward. + let e = unsafe { e.as_ref() }; + self.start = e.next.get(); + + // Set the state of this entry to `Notified` and notify. + match e.state.replace(State::Notified(false)) { + State::Notified(_) => {} + State::Created => {} + State::Polling(w) => w.wake(), + State::Waiting(t) => t.unpark(), + } - impl core::panic::UnwindSafe for StackSlot<'_, T> {} - impl core::panic::RefUnwindSafe for StackSlot<'_, T> {} - unsafe impl Send for StackSlot<'_, T> {} - unsafe impl Sync for StackSlot<'_, T> {} - - impl<'ev, T> StackSlot<'ev, T> { - /// Create a new `StackSlot` on the stack. - #[inline] - #[doc(hidden)] - pub fn new(event: &'ev Event) -> Self { - let inner = unsafe { &*event.inner() }; - Self { - listener: InnerListener { - event: inner, - listener: None, - }, + // Update the counter. + self.notified += 1; + } } } - - /// Start listening on this `StackSlot`. - #[inline] - #[doc(hidden)] - pub fn listen(mut self: Pin<&mut Self>) -> StackListener<'ev, '_, T> { - // Insert ourselves into the list. - self.as_mut().project().listener.listen(); - - // We are now listening. - StackListener { slot: self } - } - } - - /// A stack-based `EventListener`. - #[doc(hidden)] - pub struct StackListener<'ev, 'stack, T> { - slot: Pin<&'stack mut StackSlot<'ev, T>>, } - impl core::panic::UnwindSafe for StackListener<'_, '_, T> {} - impl core::panic::RefUnwindSafe for StackListener<'_, '_, T> {} - impl Unpin for StackListener<'_, '_, T> {} + /// Notifies a number of additional entries. + #[cold] + fn notify_additional(&mut self, mut n: usize) { + while n > 0 { + n -= 1; + + // Notify the first unnotified entry. + match self.start { + None => break, + Some(e) => { + // Get the entry and move the pointer forward. + let e = unsafe { e.as_ref() }; + self.start = e.next.get(); + + // Set the state of this entry to `Notified` and notify. + match e.state.replace(State::Notified(true)) { + State::Notified(_) => {} + State::Created => {} + State::Polling(w) => w.wake(), + State::Waiting(t) => t.unpark(), + } - impl fmt::Debug for StackListener<'_, '_, T> { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StackListener").finish_non_exhaustive() + // Update the counter. + self.notified += 1; + } + } } } +} - impl<'ev, T> StackListener<'ev, '_, T> { - #[inline] - fn listener(&self) -> &InnerListener> { - &self.slot.listener - } - - #[inline] - fn listener_mut(&mut self) -> Pin<&mut InnerListener>> { - self.slot.as_mut().project().listener - } +/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. +#[inline] +fn full_fence() { + if cfg!(all( + any(target_arch = "x86", target_arch = "x86_64"), + not(miri) + )) { + // HACK(stjepang): On x86 architectures there are two different ways of executing + // a `SeqCst` fence. + // + // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. + // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction. + // + // Both instructions have the effect of a full barrier, but empirical benchmarks have shown + // that the second one is sometimes a bit faster. + // + // The ideal solution here would be to use inline assembly, but we're instead creating a + // temporary atomic variable and compare-and-exchanging its value. No sane compiler to + // x86 platforms is going to optimize this away. + atomic::compiler_fence(Ordering::SeqCst); + let a = AtomicUsize::new(0); + let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst); + atomic::compiler_fence(Ordering::SeqCst); + } else { + atomic::fence(Ordering::SeqCst); } - - forward_impl_to_listener! { T => StackListener<'_, '_, T> } } diff --git a/src/no_std.rs b/src/no_std.rs deleted file mode 100644 index a2e6dc0..0000000 --- a/src/no_std.rs +++ /dev/null @@ -1,1434 +0,0 @@ -//! Implementation of `event-listener` built exclusively on atomics. -//! -//! On `no_std`, we don't have access to `Mutex`, so we can't use intrusive linked lists like the `std` -//! implementation. Normally, we would use a concurrent atomic queue to store listeners, but benchmarks -//! show that using queues in this way is very slow, especially for the single threaded use-case. -//! -//! We've found that it's easier to assume that the `Event` won't be under high contention in most use -//! cases. Therefore, we use a spinlock that protects a linked list of listeners, and fall back to an -//! atomic queue if the lock is contended. Benchmarks show that this is about 20% slower than the std -//! implementation, but still much faster than using a queue. - -#[path = "no_std/node.rs"] -mod node; - -use node::{Node, NothingProducer, TaskWaiting}; - -use crate::notify::{GenericNotify, Internal, Notification}; -use crate::sync::atomic::{AtomicBool, Ordering}; -use crate::sync::cell::{Cell, ConstPtr, UnsafeCell}; -use crate::sync::Arc; -use crate::{RegisterResult, State, Task, TaskRef}; - -use core::fmt; -use core::marker::PhantomData; -use core::mem; -use core::num::NonZeroUsize; -use core::ops; -use core::pin::Pin; - -use alloc::vec::Vec; - -impl crate::Inner { - /// Locks the list. - fn try_lock(&self) -> Option> { - self.list.inner.try_lock().map(|guard| ListGuard { - inner: self, - guard: Some(guard), - tasks: alloc::vec![], - }) - } - - /// Force a queue update. - fn queue_update(&self) { - // Locking and unlocking the mutex will drain the queue if there is no contention. - drop(self.try_lock()); - } - - pub(crate) fn needs_notification(&self, _limit: usize) -> bool { - // TODO: Figure out a stable way to do this optimization. - true - } - - /// Add a new listener to the list. - /// - /// Does nothing if the list is already registered. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - if listener.as_ref().as_pin_ref().is_some() { - // Already inserted. - return; - } - - match self.try_lock() { - Some(mut lock) => { - let key = lock.insert(State::Created); - *listener = Some(Listener::HasNode(key)); - } - - None => { - // Push it to the queue. - let (node, task_waiting) = Node::listener(); - self.list.queue.push(node).unwrap(); - *listener = Some(Listener::Queued(task_waiting)); - - // Force a queue update. - self.queue_update(); - } - } - } - - /// Remove a listener from the list. - pub(crate) fn remove( - &self, - mut listener: Pin<&mut Option>>, - propagate: bool, - ) -> Option> { - loop { - let state = match listener.as_mut().take() { - Some(Listener::HasNode(key)) => { - match self.try_lock() { - Some(mut list) => { - // Fast path removal. - list.remove(key, propagate) - } - - None => { - // Slow path removal. - // This is why intrusive lists don't work on no_std. - let node = Node::RemoveListener { - listener: key, - propagate, - }; - - self.list.queue.push(node).unwrap(); - - // Force a queue update. - self.queue_update(); - - None - } - } - } - - Some(Listener::Queued(tw)) => { - // Make sure it's not added after the queue is drained. - if let Some(key) = tw.cancel() { - // If it was already added, set up our listener and try again. - *listener = Some(Listener::HasNode(key)); - continue; - } - - None - } - - None => None, - - _ => unreachable!(), - }; - - return state; - } - } - - /// Notifies a number of entries. - #[cold] - pub(crate) fn notify(&self, notify: impl Notification) -> usize { - match self.try_lock() { - Some(mut guard) => { - // Notify the listeners. - guard.notify(notify) - } - - None => { - // Push it to the queue. - let node = Node::Notify(GenericNotify::new( - notify.count(Internal::new()), - notify.is_additional(Internal::new()), - NothingProducer::default(), - )); - - self.list.queue.push(node).unwrap(); - - // Force a queue update. - self.queue_update(); - - // We haven't notified anyone yet. - 0 - } - } - } - - /// Register a task to be notified when the event is triggered. - /// - /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener - /// isn't inserted, returns `None`. - pub(crate) fn register( - &self, - mut listener: Pin<&mut Option>>, - task: TaskRef<'_>, - ) -> RegisterResult { - loop { - match listener.as_mut().take() { - Some(Listener::HasNode(key)) => { - *listener = Some(Listener::HasNode(key)); - match self.try_lock() { - Some(mut guard) => { - // Fast path registration. - return guard.register(listener, task); - } - - None => { - // Wait for the lock. - let node = Node::Waiting(task.into_task()); - self.list.queue.push(node).unwrap(); - - // Force a queue update. - self.queue_update(); - - return RegisterResult::Registered; - } - } - } - - Some(Listener::Queued(task_waiting)) => { - // Force a queue update. - self.queue_update(); - - // Are we done yet? - match task_waiting.status() { - Some(key) => { - assert!(key.get() != usize::MAX); - - // We're inserted now, adjust state. - *listener = Some(Listener::HasNode(key)); - } - - None => { - // We're still queued, so register the task. - task_waiting.register(task.into_task()); - *listener = Some(Listener::Queued(task_waiting)); - - // Force a queue update. - self.queue_update(); - - return RegisterResult::Registered; - } - } - } - - None => return RegisterResult::NeverInserted, - - _ => unreachable!(), - } - } - } -} - -#[derive(Debug)] -pub(crate) struct List { - /// The inner list. - inner: Mutex>, - - /// The queue of pending operations. - queue: concurrent_queue::ConcurrentQueue>, -} - -impl List { - pub(super) fn new() -> List { - List { - inner: Mutex::new(ListenerSlab::new()), - queue: concurrent_queue::ConcurrentQueue::unbounded(), - } - } - - /// Try to get the total number of listeners without blocking. - pub(super) fn try_total_listeners(&self) -> Option { - self.inner.try_lock().map(|lock| lock.listeners.len()) - } -} - -/// The guard returned by [`Inner::lock`]. -pub(crate) struct ListGuard<'a, T> { - /// Reference to the inner state. - pub(crate) inner: &'a crate::Inner, - - /// The locked list. - pub(crate) guard: Option>>, - - /// Tasks to wake up once this guard is dropped. - tasks: Vec, -} - -impl ListGuard<'_, T> { - #[cold] - fn process_nodes_slow(&mut self, start_node: Node) { - let guard = self.guard.as_mut().unwrap(); - - // Process the start node. - self.tasks.extend(start_node.apply(guard)); - - // Process all remaining nodes. - while let Ok(node) = self.inner.list.queue.pop() { - self.tasks.extend(node.apply(guard)); - } - } - - #[inline] - fn process_nodes(&mut self) { - // Process every node left in the queue. - if let Ok(start_node) = self.inner.list.queue.pop() { - self.process_nodes_slow(start_node); - } - } -} - -impl ops::Deref for ListGuard<'_, T> { - type Target = ListenerSlab; - - fn deref(&self) -> &Self::Target { - self.guard.as_ref().unwrap() - } -} - -impl ops::DerefMut for ListGuard<'_, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.guard.as_mut().unwrap() - } -} - -impl Drop for ListGuard<'_, T> { - fn drop(&mut self) { - while self.guard.is_some() { - // Process every node left in the queue. - self.process_nodes(); - - // Update the atomic `notified` counter. - let list = self.guard.take().unwrap(); - let notified = if list.notified < list.len { - list.notified - } else { - core::usize::MAX - }; - - self.inner.notified.store(notified, Ordering::Release); - - // Drop the actual lock. - drop(list); - - // Wakeup all tasks. - for task in self.tasks.drain(..) { - task.wake(); - } - - // There is a deadlock where a node is pushed to the end of the queue after we've finished - // process_nodes() but before we've finished dropping the lock. This can lead to some - // notifications not being properly delivered, or listeners not being added to the list. - // Therefore check before we finish dropping if there is anything left in the queue, and - // if so, lock it again and force a queue update. - if !self.inner.list.queue.is_empty() { - self.guard = self.inner.list.inner.try_lock(); - } - } - } -} - -/// An entry representing a registered listener. -enum Entry { - /// Contains the listener state. - Listener { - /// The state of the listener. - state: Cell>, - - /// The previous listener in the list. - prev: Cell>, - - /// The next listener in the list. - next: Cell>, - }, - - /// An empty slot that contains the index of the next empty slot. - Empty(NonZeroUsize), - - /// Sentinel value. - Sentinel, -} - -struct TakenState<'a, T> { - slot: &'a Cell>, - state: State, -} - -impl Drop for TakenState<'_, T> { - fn drop(&mut self) { - self.slot - .set(mem::replace(&mut self.state, State::NotifiedTaken)); - } -} - -impl fmt::Debug for TakenState<'_, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&self.state, f) - } -} - -impl PartialEq for TakenState<'_, T> { - fn eq(&self, other: &Self) -> bool { - self.state == other.state - } -} - -impl<'a, T> TakenState<'a, T> { - fn new(slot: &'a Cell>) -> Self { - let state = slot.replace(State::NotifiedTaken); - Self { slot, state } - } -} - -impl fmt::Debug for Entry { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Entry::Listener { state, next, prev } => f - .debug_struct("Listener") - .field("state", &TakenState::new(state)) - .field("prev", prev) - .field("next", next) - .finish(), - Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(), - Entry::Sentinel => f.debug_tuple("Sentinel").finish(), - } - } -} - -impl PartialEq for Entry { - fn eq(&self, other: &Entry) -> bool { - match (self, other) { - ( - Self::Listener { - state: state1, - prev: prev1, - next: next1, - }, - Self::Listener { - state: state2, - prev: prev2, - next: next2, - }, - ) => { - if TakenState::new(state1) != TakenState::new(state2) { - return false; - } - - prev1.get() == prev2.get() && next1.get() == next2.get() - } - (Self::Empty(next1), Self::Empty(next2)) => next1 == next2, - (Self::Sentinel, Self::Sentinel) => true, - _ => false, - } - } -} - -impl Entry { - fn state(&self) -> &Cell> { - match self { - Entry::Listener { state, .. } => state, - _ => unreachable!(), - } - } - - fn prev(&self) -> &Cell> { - match self { - Entry::Listener { prev, .. } => prev, - _ => unreachable!(), - } - } - - fn next(&self) -> &Cell> { - match self { - Entry::Listener { next, .. } => next, - _ => unreachable!(), - } - } -} - -/// A linked list of entries. -pub(crate) struct ListenerSlab { - /// The raw list of entries. - listeners: Vec>, - - /// First entry in the list. - head: Option, - - /// Last entry in the list. - tail: Option, - - /// The first unnotified entry in the list. - start: Option, - - /// The number of notified entries in the list. - notified: usize, - - /// The total number of listeners. - len: usize, - - /// The index of the first `Empty` entry, or the length of the list plus one if there - /// are no empty entries. - first_empty: NonZeroUsize, -} - -impl fmt::Debug for ListenerSlab { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ListenerSlab") - .field("listeners", &self.listeners) - .field("head", &self.head) - .field("tail", &self.tail) - .field("start", &self.start) - .field("notified", &self.notified) - .field("len", &self.len) - .field("first_empty", &self.first_empty) - .finish() - } -} - -impl ListenerSlab { - /// Create a new, empty list. - pub(crate) fn new() -> Self { - Self { - listeners: alloc::vec![Entry::Sentinel], - head: None, - tail: None, - start: None, - notified: 0, - len: 0, - first_empty: unsafe { NonZeroUsize::new_unchecked(1) }, - } - } - - /// Inserts a new entry into the list. - pub(crate) fn insert(&mut self, state: State) -> NonZeroUsize { - // Add the new entry into the list. - let key = { - let entry = Entry::Listener { - state: Cell::new(state), - prev: Cell::new(self.tail), - next: Cell::new(None), - }; - - let key = self.first_empty; - if self.first_empty.get() == self.listeners.len() { - // No empty entries, so add a new entry. - self.listeners.push(entry); - - // SAFETY: Guaranteed to not overflow, since the Vec would have panicked already. - self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) }; - } else { - // There is an empty entry, so replace it. - let slot = &mut self.listeners[key.get()]; - let next = match mem::replace(slot, entry) { - Entry::Empty(next) => next, - _ => unreachable!(), - }; - - self.first_empty = next; - } - - key - }; - - // Replace the tail with the new entry. - match mem::replace(&mut self.tail, Some(key)) { - None => self.head = Some(key), - Some(tail) => { - let tail = &self.listeners[tail.get()]; - tail.next().set(Some(key)); - } - } - - // If there are no listeners that have been notified, then the new listener is the next - // listener to be notified. - if self.start.is_none() { - self.start = Some(key); - } - - // Increment the length. - self.len += 1; - - key - } - - /// Removes an entry from the list and returns its state. - pub(crate) fn remove(&mut self, key: NonZeroUsize, propagate: bool) -> Option> { - let entry = &self.listeners[key.get()]; - let prev = entry.prev().get(); - let next = entry.next().get(); - - // Unlink from the previous entry. - match prev { - None => self.head = next, - Some(p) => self.listeners[p.get()].next().set(next), - } - - // Unlink from the next entry. - match next { - None => self.tail = prev, - Some(n) => self.listeners[n.get()].prev().set(prev), - } - - // If this was the first unnotified entry, move the pointer to the next one. - if self.start == Some(key) { - self.start = next; - } - - // Extract the state. - let entry = mem::replace( - &mut self.listeners[key.get()], - Entry::Empty(self.first_empty), - ); - self.first_empty = key; - - let mut state = match entry { - Entry::Listener { state, .. } => state.into_inner(), - _ => unreachable!(), - }; - - // Update the counters. - if state.is_notified() { - self.notified = self.notified.saturating_sub(1); - - if propagate { - // Propagate the notification to the next entry. - let state = mem::replace(&mut state, State::NotifiedTaken); - if let State::Notified { tag, additional } = state { - let tags = { - let mut tag = Some(tag); - move || tag.take().expect("called more than once") - }; - - self.notify(GenericNotify::new(1, additional, tags)); - } - } - } - self.len -= 1; - - Some(state) - } - - /// Notifies a number of listeners. - #[cold] - pub(crate) fn notify(&mut self, mut notify: impl Notification) -> usize { - let mut n = notify.count(Internal::new()); - let is_additional = notify.is_additional(Internal::new()); - if !is_additional { - // Make sure we're not notifying more than we have. - if n <= self.notified { - return 0; - } - n -= self.notified; - } - - let original_count = n; - while n > 0 { - n -= 1; - - // Notify the next entry. - match self.start { - None => return original_count - n - 1, - - Some(e) => { - // Get the entry and move the pointer forwards. - let entry = &self.listeners[e.get()]; - self.start = entry.next().get(); - - // Set the state to `Notified` and notify. - let tag = notify.next_tag(Internal::new()); - if let State::Task(task) = entry.state().replace(State::Notified { - tag, - additional: is_additional, - }) { - task.wake(); - } - - // Bump the notified count. - self.notified += 1; - } - } - } - - original_count - n - } - - /// Register a task to be notified when the event is triggered. - /// - /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener - /// isn't inserted, returns `None`. - pub(crate) fn register( - &mut self, - mut listener: Pin<&mut Option>>, - task: TaskRef<'_>, - ) -> RegisterResult { - let key = match *listener { - Some(Listener::HasNode(key)) => key, - _ => return RegisterResult::NeverInserted, - }; - - let entry = &self.listeners[key.get()]; - - // Take the state out and check it. - match entry.state().replace(State::NotifiedTaken) { - State::Notified { tag, .. } => { - // The listener was already notified, so we don't need to do anything. - self.remove(key, false); - *listener = None; - RegisterResult::Notified(tag) - } - - State::Task(other_task) => { - // Only replace the task if it's not the same as the one we're registering. - if task.will_wake(other_task.as_task_ref()) { - entry.state().set(State::Task(other_task)); - } else { - entry.state().set(State::Task(task.into_task())); - } - - RegisterResult::Registered - } - - _ => { - // Register the task. - entry.state().set(State::Task(task.into_task())); - RegisterResult::Registered - } - } - } -} - -pub(crate) enum Listener { - /// The listener has a node inside of the linked list. - HasNode(NonZeroUsize), - - /// The listener has an entry in the queue that may or may not have a task waiting. - Queued(Arc), - - /// Eat the generic type for consistency. - _EatGenericType(PhantomData), -} - -impl fmt::Debug for Listener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::HasNode(key) => f.debug_tuple("HasNode").field(key).finish(), - Self::Queued(tw) => f.debug_tuple("Queued").field(tw).finish(), - Self::_EatGenericType(_) => unreachable!(), - } - } -} - -impl Unpin for Listener {} - -impl PartialEq for Listener { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::HasNode(a), Self::HasNode(b)) => a == b, - (Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b), - _ => false, - } - } -} - -/// A simple mutex type that optimistically assumes that the lock is uncontended. -pub(crate) struct Mutex { - /// The inner value. - value: UnsafeCell, - - /// Whether the mutex is locked. - locked: AtomicBool, -} - -impl fmt::Debug for Mutex { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(lock) = self.try_lock() { - f.debug_tuple("Mutex").field(&*lock).finish() - } else { - f.write_str("Mutex { }") - } - } -} - -impl Mutex { - /// Create a new mutex. - pub(crate) fn new(value: T) -> Self { - Self { - value: UnsafeCell::new(value), - locked: AtomicBool::new(false), - } - } - - /// Lock the mutex. - pub(crate) fn try_lock(&self) -> Option> { - // Try to lock the mutex. - if self - .locked - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // We have successfully locked the mutex. - Some(MutexGuard { - mutex: self, - guard: self.value.get(), - }) - } else { - self.try_lock_slow() - } - } - - #[cold] - fn try_lock_slow(&self) -> Option> { - // Assume that the contention is short-term. - // Spin for a while to see if the mutex becomes unlocked. - let mut spins = 100u32; - - loop { - if self - .locked - .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // We have successfully locked the mutex. - return Some(MutexGuard { - mutex: self, - guard: self.value.get(), - }); - } - - // Use atomic loads instead of compare-exchange. - while self.locked.load(Ordering::Relaxed) { - // Return None once we've exhausted the number of spins. - spins = spins.checked_sub(1)?; - } - } - } -} - -pub(crate) struct MutexGuard<'a, T> { - mutex: &'a Mutex, - guard: ConstPtr, -} - -impl<'a, T> Drop for MutexGuard<'a, T> { - fn drop(&mut self) { - self.mutex.locked.store(false, Ordering::Release); - } -} - -impl<'a, T> ops::Deref for MutexGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { self.guard.deref() } - } -} - -impl<'a, T> ops::DerefMut for MutexGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { self.guard.deref_mut() } - } -} - -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} - -#[cfg(test)] -mod tests { - use super::*; - - #[cfg(target_family = "wasm")] - use wasm_bindgen_test::wasm_bindgen_test as test; - - #[test] - fn smoke_mutex() { - let mutex = Mutex::new(0); - - { - let mut guard = mutex.try_lock().unwrap(); - *guard += 1; - } - - { - let mut guard = mutex.try_lock().unwrap(); - *guard += 1; - } - - let guard = mutex.try_lock().unwrap(); - assert_eq!(*guard, 2); - } - - #[test] - fn smoke_listener_slab() { - let mut listeners = ListenerSlab::<()>::new(); - - // Insert a few listeners. - let key1 = listeners.insert(State::Created); - let key2 = listeners.insert(State::Created); - let key3 = listeners.insert(State::Created); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 0); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key1)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Remove one. - assert_eq!(listeners.remove(key2, false), Some(State::Created)); - - assert_eq!(listeners.len, 2); - assert_eq!(listeners.notified, 0); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key1)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Empty(NonZeroUsize::new(4).unwrap()) - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key1)), - next: Cell::new(None), - } - ); - } - - #[test] - fn listener_slab_notify() { - let mut listeners = ListenerSlab::new(); - - // Insert a few listeners. - let key1 = listeners.insert(State::Created); - let key2 = listeners.insert(State::Created); - let key3 = listeners.insert(State::Created); - - // Notify one. - listeners.notify(GenericNotify::new(1, true, || ())); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 1); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key2)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Notified { - additional: true, - tag: () - }), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Remove the notified listener. - assert_eq!( - listeners.remove(key1, false), - Some(State::Notified { - additional: true, - tag: () - }) - ); - - assert_eq!(listeners.len, 2); - assert_eq!(listeners.notified, 0); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key2)); - assert_eq!(listeners.start, Some(key2)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Empty(NonZeroUsize::new(4).unwrap()) - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - } - - #[test] - fn listener_slab_register() { - let woken = Arc::new(AtomicBool::new(false)); - let waker = waker_fn::waker_fn({ - let woken = woken.clone(); - move || woken.store(true, Ordering::SeqCst) - }); - - let mut listeners = ListenerSlab::new(); - - // Insert a few listeners. - let key1 = listeners.insert(State::Created); - let key2 = listeners.insert(State::Created); - let key3 = listeners.insert(State::Created); - - // Register one. - assert_eq!( - listeners.register( - Pin::new(&mut Some(Listener::HasNode(key2))), - TaskRef::Waker(&waker) - ), - RegisterResult::Registered - ); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 0); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key1)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Task(Task::Waker(waker.clone()))), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Notify the listener. - listeners.notify(GenericNotify::new(2, false, || ())); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 2); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key3)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Notified { - additional: false, - tag: (), - }), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Notified { - additional: false, - tag: (), - }), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - assert!(woken.load(Ordering::SeqCst)); - assert_eq!( - listeners.register( - Pin::new(&mut Some(Listener::HasNode(key2))), - TaskRef::Waker(&waker) - ), - RegisterResult::Notified(()) - ); - } - - #[test] - fn listener_slab_notify_prop() { - let woken = Arc::new(AtomicBool::new(false)); - let waker = waker_fn::waker_fn({ - let woken = woken.clone(); - move || woken.store(true, Ordering::SeqCst) - }); - - let mut listeners = ListenerSlab::new(); - - // Insert a few listeners. - let key1 = listeners.insert(State::Created); - let key2 = listeners.insert(State::Created); - let key3 = listeners.insert(State::Created); - - // Register one. - assert_eq!( - listeners.register( - Pin::new(&mut Some(Listener::HasNode(key2))), - TaskRef::Waker(&waker) - ), - RegisterResult::Registered - ); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 0); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key1)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Task(Task::Waker(waker.clone()))), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Notify the first listener. - listeners.notify(GenericNotify::new(1, false, || ())); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 1); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key2)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Notified { - additional: false, - tag: (), - }), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Task(Task::Waker(waker.clone()))), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Calling notify again should not change anything. - listeners.notify(GenericNotify::new(1, false, || ())); - - assert_eq!(listeners.len, 3); - assert_eq!(listeners.notified, 1); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key1)); - assert_eq!(listeners.start, Some(key2)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Listener { - state: Cell::new(State::Notified { - additional: false, - tag: (), - }), - prev: Cell::new(None), - next: Cell::new(Some(key2)), - } - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Task(Task::Waker(waker.clone()))), - prev: Cell::new(Some(key1)), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Remove the first listener. - assert_eq!( - listeners.remove(key1, false), - Some(State::Notified { - additional: false, - tag: () - }) - ); - - assert_eq!(listeners.len, 2); - assert_eq!(listeners.notified, 0); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key2)); - assert_eq!(listeners.start, Some(key2)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Empty(NonZeroUsize::new(4).unwrap()) - ); - assert_eq!(*listeners.listeners[2].prev(), Cell::new(None)); - assert_eq!(*listeners.listeners[2].next(), Cell::new(Some(key3))); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Notify the second listener. - listeners.notify(GenericNotify::new(1, false, || ())); - assert!(woken.load(Ordering::SeqCst)); - - assert_eq!(listeners.len, 2); - assert_eq!(listeners.notified, 1); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key2)); - assert_eq!(listeners.start, Some(key3)); - assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Empty(NonZeroUsize::new(4).unwrap()) - ); - assert_eq!( - listeners.listeners[2], - Entry::Listener { - state: Cell::new(State::Notified { - additional: false, - tag: (), - }), - prev: Cell::new(None), - next: Cell::new(Some(key3)), - } - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Created), - prev: Cell::new(Some(key2)), - next: Cell::new(None), - } - ); - - // Remove and propagate the second listener. - assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken)); - - // The third listener should be notified. - assert_eq!(listeners.len, 1); - assert_eq!(listeners.notified, 1); - assert_eq!(listeners.tail, Some(key3)); - assert_eq!(listeners.head, Some(key3)); - assert_eq!(listeners.start, None); - assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap()); - assert_eq!(listeners.listeners[0], Entry::Sentinel); - assert_eq!( - listeners.listeners[1], - Entry::Empty(NonZeroUsize::new(4).unwrap()) - ); - assert_eq!( - listeners.listeners[2], - Entry::Empty(NonZeroUsize::new(1).unwrap()) - ); - assert_eq!( - listeners.listeners[3], - Entry::Listener { - state: Cell::new(State::Notified { - additional: false, - tag: (), - }), - prev: Cell::new(None), - next: Cell::new(None), - } - ); - - // Remove the third listener. - assert_eq!( - listeners.remove(key3, false), - Some(State::Notified { - additional: false, - tag: () - }) - ); - } - - #[test] - fn uncontended_inner() { - let inner = crate::Inner::new(); - - // Register two listeners. - let (mut listener1, mut listener2, mut listener3) = (None, None, None); - inner.insert(Pin::new(&mut listener1)); - inner.insert(Pin::new(&mut listener2)); - inner.insert(Pin::new(&mut listener3)); - - assert_eq!( - listener1, - Some(Listener::HasNode(NonZeroUsize::new(1).unwrap())) - ); - assert_eq!( - listener2, - Some(Listener::HasNode(NonZeroUsize::new(2).unwrap())) - ); - - // Register a waker in the second listener. - let woken = Arc::new(AtomicBool::new(false)); - let waker = waker_fn::waker_fn({ - let woken = woken.clone(); - move || woken.store(true, Ordering::SeqCst) - }); - assert_eq!( - inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)), - RegisterResult::Registered - ); - - // Notify the first listener. - inner.notify(GenericNotify::new(1, false, || ())); - assert!(!woken.load(Ordering::SeqCst)); - - // Another notify should do nothing. - inner.notify(GenericNotify::new(1, false, || ())); - assert!(!woken.load(Ordering::SeqCst)); - - // Receive the notification. - assert_eq!( - inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)), - RegisterResult::Notified(()) - ); - - // First listener is already removed. - assert!(listener1.is_none()); - - // Notify the second listener. - inner.notify(GenericNotify::new(1, false, || ())); - assert!(woken.load(Ordering::SeqCst)); - - // Remove the second listener and propagate the notification. - assert_eq!( - inner.remove(Pin::new(&mut listener2), true), - Some(State::NotifiedTaken) - ); - - // Second listener is already removed. - assert!(listener2.is_none()); - - // Third listener should be notified. - assert_eq!( - inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)), - RegisterResult::Notified(()) - ); - } -} diff --git a/src/no_std/node.rs b/src/no_std/node.rs deleted file mode 100644 index 8901eb2..0000000 --- a/src/no_std/node.rs +++ /dev/null @@ -1,249 +0,0 @@ -//! An operation that can be delayed. - -//! The node that makes up queues. - -use crate::notify::{GenericNotify, Internal, NotificationPrivate, TagProducer}; -use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use crate::sync::Arc; -use crate::sys::ListenerSlab; -use crate::{State, Task}; - -use alloc::boxed::Box; - -use core::fmt; -use core::marker::PhantomData; -use core::mem; -use core::num::NonZeroUsize; -use core::ptr; - -pub(crate) struct NothingProducer(PhantomData); - -impl Default for NothingProducer { - fn default() -> Self { - Self(PhantomData) - } -} - -impl fmt::Debug for NothingProducer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("NothingProducer").finish() - } -} - -impl TagProducer for NothingProducer { - type Tag = T; - - fn next_tag(&mut self) -> Self::Tag { - // This has to be a zero-sized type with no drop handler. - assert_eq!(mem::size_of::(), 0); - assert!(!mem::needs_drop::()); - - // SAFETY: As this is a ZST without a drop handler, zero is valid. - unsafe { mem::zeroed() } - } -} - -/// A node in the backup queue. -pub(crate) enum Node { - /// This node is requesting to add a listener. - // For some reason, the MSRV build says this variant is never constructed. - #[allow(dead_code)] - AddListener { - /// The state of the listener that wants to be added. - task_waiting: Arc, - }, - - /// This node is notifying a listener. - Notify(GenericNotify>), - - /// This node is removing a listener. - RemoveListener { - /// The ID of the listener to remove. - listener: NonZeroUsize, - - /// Whether to propagate notifications to the next listener. - propagate: bool, - }, - - /// We are waiting for the mutex to lock, so they can manipulate it. - Waiting(Task), -} - -impl fmt::Debug for Node { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::AddListener { .. } => f.write_str("AddListener"), - Self::Notify(notify) => f - .debug_struct("Notify") - .field("count", ¬ify.count(Internal::new())) - .field("is_additional", ¬ify.is_additional(Internal::new())) - .finish(), - Self::RemoveListener { - listener, - propagate, - } => f - .debug_struct("RemoveListener") - .field("listener", listener) - .field("propagate", propagate) - .finish(), - Self::Waiting(_) => f.write_str("Waiting"), - } - } -} - -#[derive(Debug)] -pub(crate) struct TaskWaiting { - /// The task that is being waited on. - task: AtomicCell, - - /// The ID of the new entry. - /// - /// This is set to zero when the task is still queued, or usize::MAX when the node should not - /// be added at all. - entry_id: AtomicUsize, -} - -impl Node { - pub(crate) fn listener() -> (Self, Arc) { - // Create a new `TaskWaiting` structure. - let task_waiting = Arc::new(TaskWaiting { - task: AtomicCell::new(), - entry_id: AtomicUsize::new(0), - }); - - ( - Self::AddListener { - task_waiting: task_waiting.clone(), - }, - task_waiting, - ) - } - - /// Apply the node to the list. - pub(super) fn apply(self, list: &mut ListenerSlab) -> Option { - match self { - Node::AddListener { task_waiting } => { - // If we're cancelled, do nothing. - if task_waiting.entry_id.load(Ordering::Relaxed) == usize::MAX { - return task_waiting.task.take().map(|t| *t); - } - - // Add a new entry to the list. - let key = list.insert(State::Created); - assert!(key.get() != usize::MAX); - - // Send the new key to the listener and wake it if necessary. - let old_value = task_waiting.entry_id.swap(key.get(), Ordering::Release); - - // If we're cancelled, remove ourselves from the list. - if old_value == usize::MAX { - list.remove(key, false); - } - - return task_waiting.task.take().map(|t| *t); - } - Node::Notify(notify) => { - // Notify the next `count` listeners. - list.notify(notify); - } - Node::RemoveListener { - listener, - propagate, - } => { - // Remove the listener from the list. - list.remove(listener, propagate); - } - Node::Waiting(task) => { - return Some(task); - } - } - - None - } -} - -impl TaskWaiting { - /// Determine if we are still queued. - /// - /// Returns `Some` with the entry ID if we are no longer queued. - pub(crate) fn status(&self) -> Option { - NonZeroUsize::new(self.entry_id.load(Ordering::Acquire)) - } - - /// Register a listener. - pub(crate) fn register(&self, task: Task) { - // Set the task. - if let Some(task) = self.task.replace(Some(Box::new(task))) { - task.wake(); - } - - // If the entry ID is non-zero, then we are no longer queued. - if self.status().is_some() { - // Wake the task. - if let Some(task) = self.task.take() { - task.wake(); - } - } - } - - /// Mark this listener as cancelled, indicating that it should not be inserted into the list. - /// - /// If this listener was already inserted into the list, returns the entry ID. Otherwise returns - /// `None`. - pub(crate) fn cancel(&self) -> Option { - // Set the entry ID to usize::MAX. - let id = self.entry_id.swap(usize::MAX, Ordering::Release); - - // Wake the task. - if let Some(task) = self.task.take() { - task.wake(); - } - - // Return the entry ID if we were queued. - NonZeroUsize::new(id) - } -} - -/// A shared pointer to a value. -/// -/// The inner value is a `Box`. -#[derive(Debug)] -struct AtomicCell(AtomicPtr); - -impl AtomicCell { - /// Create a new `AtomicCell`. - fn new() -> Self { - Self(AtomicPtr::new(ptr::null_mut())) - } - - /// Swap the value out. - fn replace(&self, value: Option>) -> Option> { - let old_value = match value { - Some(value) => self.0.swap(Box::into_raw(value), Ordering::AcqRel), - // Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr - // will never be dereferenced, there is no need to synchronize the store of a null ptr. - None => self.0.swap(ptr::null_mut(), Ordering::Acquire), - }; - - if old_value.is_null() { - None - } else { - // SAFETY: - // - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory. - // - We've checked that old_value is not null. - // - We do not store invalid pointers other than null in self.0. - Some(unsafe { Box::from_raw(old_value) }) - } - } - - /// Take the value out. - fn take(&self) -> Option> { - self.replace(None) - } -} - -impl Drop for AtomicCell { - fn drop(&mut self) { - self.take(); - } -} diff --git a/src/notify.rs b/src/notify.rs deleted file mode 100644 index 61a9b59..0000000 --- a/src/notify.rs +++ /dev/null @@ -1,622 +0,0 @@ -//! The `Notification` trait for specifying notification. - -use crate::sync::atomic::{self, Ordering}; -#[cfg(feature = "std")] -use core::fmt; - -pub(crate) use __private::Internal; - -/// The type of notification to use with an [`Event`]. -/// -/// This is hidden and sealed to prevent changes to this trait from being breaking. -/// -/// [`Event`]: crate::Event -#[doc(hidden)] -pub trait NotificationPrivate { - /// The tag data associated with a notification. - type Tag; - - /// Emit a fence to ensure that the notification is visible to the listeners. - fn fence(&self, internal: Internal); - - /// Whether or not the number of currently waiting listeners should be subtracted from `count()`. - fn is_additional(&self, internal: Internal) -> bool; - - /// Get the number of listeners to wake. - fn count(&self, internal: Internal) -> usize; - - /// Get a tag to be associated with a notification. - /// - /// This method is expected to be called `count()` times. - fn next_tag(&mut self, internal: Internal) -> Self::Tag; -} - -/// A notification that can be used to notify an [`Event`]. -/// -/// This type is used by the [`Event::notify()`] function to determine how many listeners to wake up, whether -/// or not to subtract additional listeners, and other properties. The actual internal data is hidden in a -/// private trait and is intentionally not exposed. This means that users cannot manually implement the -/// [`Notification`] trait. However, it also means that changing the underlying trait is not a semver breaking -/// change. -/// -/// Users can create types that implement notifications using the combinators on the [`IntoNotification`] type. -/// Typical construction of a [`Notification`] starts with a numeric literal (like `3usize`) and then optionally -/// adding combinators. -/// -/// # Example -/// -/// ``` -/// use event_listener::{Event, IntoNotification, Notification}; -/// -/// fn notify(ev: &Event, notify: impl Notification) { -/// ev.notify(notify); -/// } -/// -/// notify(&Event::new(), 1.additional()); -/// ``` -/// -/// [`Event`]: crate::Event -pub trait Notification: NotificationPrivate {} -impl Notification for N {} - -/// Notify a given number of unnotifed listeners. -#[derive(Debug, Clone)] -#[doc(hidden)] -pub struct Notify(usize); - -impl Notify { - /// Create a new `Notify` with the given number of listeners to notify. - fn new(count: usize) -> Self { - Self(count) - } -} - -impl NotificationPrivate for Notify { - type Tag = (); - - fn is_additional(&self, _: Internal) -> bool { - false - } - - fn fence(&self, _: Internal) { - full_fence(); - } - - fn count(&self, _: Internal) -> usize { - self.0 - } - - fn next_tag(&mut self, _: Internal) -> Self::Tag {} -} - -/// Make the underlying notification additional. -#[derive(Debug, Clone)] -#[doc(hidden)] -pub struct Additional(N); - -impl Additional { - /// Create a new `Additional` with the given notification. - fn new(inner: N) -> Self { - Self(inner) - } -} - -impl NotificationPrivate for Additional -where - N: Notification + ?Sized, -{ - type Tag = N::Tag; - - fn is_additional(&self, _: Internal) -> bool { - true - } - - fn fence(&self, i: Internal) { - self.0.fence(i); - } - - fn count(&self, i: Internal) -> usize { - self.0.count(i) - } - - fn next_tag(&mut self, i: Internal) -> Self::Tag { - self.0.next_tag(i) - } -} - -/// Don't emit a fence for this notification. -#[derive(Debug, Clone)] -#[doc(hidden)] -pub struct Relaxed(N); - -impl Relaxed { - /// Create a new `Relaxed` with the given notification. - fn new(inner: N) -> Self { - Self(inner) - } -} - -impl NotificationPrivate for Relaxed -where - N: Notification + ?Sized, -{ - type Tag = N::Tag; - - fn is_additional(&self, i: Internal) -> bool { - self.0.is_additional(i) - } - - fn fence(&self, _: Internal) { - // Don't emit a fence. - } - - fn count(&self, i: Internal) -> usize { - self.0.count(i) - } - - fn next_tag(&mut self, i: Internal) -> Self::Tag { - self.0.next_tag(i) - } -} - -/// Use a tag to notify listeners. -#[cfg(feature = "std")] -#[derive(Debug, Clone)] -#[doc(hidden)] -pub struct Tag { - tag: T, - inner: N, -} - -#[cfg(feature = "std")] -impl Tag { - /// Create a new `Tag` with the given tag and notification. - fn new(tag: T, inner: N) -> Self - where - N: Sized, - { - Self { tag, inner } - } -} - -#[cfg(feature = "std")] -impl NotificationPrivate for Tag -where - N: Notification + ?Sized, - T: Clone, -{ - type Tag = T; - - fn is_additional(&self, i: Internal) -> bool { - self.inner.is_additional(i) - } - - fn fence(&self, i: Internal) { - self.inner.fence(i); - } - - fn count(&self, i: Internal) -> usize { - self.inner.count(i) - } - - fn next_tag(&mut self, _: Internal) -> Self::Tag { - self.tag.clone() - } -} - -/// Use a function to generate a tag to notify listeners. -#[cfg(feature = "std")] -#[doc(hidden)] -pub struct TagWith { - tag: F, - inner: N, -} - -#[cfg(feature = "std")] -impl fmt::Debug for TagWith { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct Ellipses; - - impl fmt::Debug for Ellipses { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("..") - } - } - - f.debug_struct("TagWith") - .field("tag", &Ellipses) - .field("inner", &self.inner) - .finish() - } -} - -#[cfg(feature = "std")] -impl TagWith { - /// Create a new `TagFn` with the given tag function and notification. - fn new(tag: F, inner: N) -> Self { - Self { tag, inner } - } -} - -#[cfg(feature = "std")] -impl NotificationPrivate for TagWith -where - N: Notification + ?Sized, - F: FnMut() -> T, -{ - type Tag = T; - - fn is_additional(&self, i: Internal) -> bool { - self.inner.is_additional(i) - } - - fn fence(&self, i: Internal) { - self.inner.fence(i); - } - - fn count(&self, i: Internal) -> usize { - self.inner.count(i) - } - - fn next_tag(&mut self, _: Internal) -> Self::Tag { - (self.tag)() - } -} - -/// A generic notification. -#[derive(Debug)] -pub(crate) struct GenericNotify { - /// Number of listeners to notify. - count: usize, - - /// Whether this notification is additional. - additional: bool, - - /// Generate tags. - tags: F, -} - -impl> GenericNotify { - pub(crate) fn new(count: usize, additional: bool, tags: F) -> Self { - Self { - count, - additional, - tags, - } - } -} - -impl> NotificationPrivate for GenericNotify { - type Tag = T; - - fn is_additional(&self, _: Internal) -> bool { - self.additional - } - - fn fence(&self, _: Internal) { - // Don't emit a fence. - } - - fn count(&self, _: Internal) -> usize { - self.count - } - - fn next_tag(&mut self, _: Internal) -> Self::Tag { - self.tags.next_tag() - } -} - -/// The producer for a generic notification. -pub(crate) trait TagProducer { - type Tag; - - /// Get the next tag. - fn next_tag(&mut self) -> Self::Tag; -} - -impl T> TagProducer for F { - type Tag = T; - - fn next_tag(&mut self) -> T { - (self)() - } -} - -/// A value that can be converted into a [`Notification`]. -/// -/// This trait adds onto the [`Notification`] trait by providing combinators that can be applied to all -/// notification types as well as numeric literals. This transforms what would normally be: -/// -/// ``` -/// use event_listener::Event; -/// -/// let event = Event::new(); -/// -/// // Note that each use case needs its own function, leading to bloat. -/// event.notify(1); -/// event.notify_additional(3); -/// event.notify_relaxed(5); -/// event.notify_additional_relaxed(2); -/// ``` -/// -/// into this: -/// -/// ``` -/// use event_listener::{Event, IntoNotification, Listener}; -/// -/// let event = Event::new(); -/// -/// event.notify(1); -/// event.notify(3.additional()); -/// event.notify(5.relaxed()); -/// event.notify(2.additional().relaxed()); -/// ``` -/// -/// This trait is implemented for all types that implement [`Notification`], as well as for non-floating-point -/// numeric literals (`usize`, `i32`, etc). -/// -/// This function can be thought of as being analogous to [`std::iter::IntoIterator`], but for [`Notification`]. -pub trait IntoNotification: __private::Sealed { - /// The tag data associated with a notification. - /// - /// By default, most [`Event`]s will use the unit type, `()`. However, this can be used to pass data along to - /// the listener. - type Tag; - - /// The notification type. - /// - /// Tells what kind of underlying type that the [`Notification`] is. You probably don't need to worry about - /// this. - type Notify: Notification; - - /// Convert this value into a notification. - /// - /// This allows the user to convert an [`IntoNotification`] into a [`Notification`]. - /// - /// # Panics - /// - /// This function panics if the value represents a negative number of notifications. - /// - /// # Examples - /// - /// ``` - /// use event_listener::IntoNotification; - /// - /// let _ = 3.into_notification(); - /// ``` - fn into_notification(self) -> Self::Notify; - - /// Convert this value into an additional notification. - /// - /// By default, notifications ignore listeners that are already notified. Generally, this happens when there - /// is an [`EventListener`] that has been woken up, but hasn't been polled to completion or waited on yet. - /// For instance, if you have three notified listeners and you call `event.notify(5)`, only two listeners - /// will be woken up. - /// - /// This default behavior is generally desired. For instance, if you are writing a `Mutex` implementation - /// powered by an [`Event`], you usually only want one consumer to be notified at a time. If you notified - /// a listener when another listener is already notified, you would have unnecessary contention for your - /// lock, as both listeners fight over the lock. Therefore, you would call `event.notify(1)` to make sure - /// *at least* one listener is awake. - /// - /// Sometimes, this behavior is not desired. For instance, if you are writing an MPMC channel, it is desirable - /// for multiple listeners to be reading from the underlying queue at once. In this case, you would instead - /// call `event.notify(1.additional())`. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, IntoNotification, Listener}; - /// - /// let event = Event::new(); - /// - /// let mut l1 = event.listen(); - /// let mut l2 = event.listen(); - /// - /// // This will only wake up the first listener, as the second call observes that there is already a - /// // notified listener. - /// event.notify(1); - /// event.notify(1); - /// - /// // This call wakes up the other listener. - /// event.notify(1.additional()); - /// ``` - fn additional(self) -> Additional - where - Self: Sized, - { - Additional::new(self.into_notification()) - } - - /// Don't emit a fence for this notification. - /// - /// Usually, notifications emit a `SeqCst` atomic fence before any listeners are woken up. This ensures - /// that notification state isn't inconsistent before any wakers are woken up. However, it may be - /// desirable to omit this fence in certain cases. - /// - /// - You are running the [`Event`] on a single thread, where no synchronization needs to occur. - /// - You are emitting the `SeqCst` fence yourself. - /// - /// In these cases, `relaxed()` can be used to avoid emitting the `SeqCst` fence. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, IntoNotification, Listener}; - /// use std::sync::atomic::{self, Ordering}; - /// - /// let event = Event::new(); - /// - /// let listener1 = event.listen(); - /// let listener2 = event.listen(); - /// let listener3 = event.listen(); - /// - /// // We should emit a fence manually when using relaxed notifications. - /// atomic::fence(Ordering::SeqCst); - /// - /// // Notifies two listeners. - /// // - /// // Listener queueing is fair, which means `listener1` and `listener2` - /// // get notified here since they start listening before `listener3`. - /// event.notify(1.relaxed()); - /// event.notify(1.relaxed()); - /// ``` - fn relaxed(self) -> Relaxed - where - Self: Sized, - { - Relaxed::new(self.into_notification()) - } - - /// Use a tag with this notification. - /// - /// In many cases, it is desired to send additional information to the listener of the [`Event`]. For instance, - /// it is possible to optimize a `Mutex` implementation by locking directly on the next listener, without - /// needing to ever unlock the mutex at all. - /// - /// The tag provided is cloned to provide the tag for all listeners. In cases where this is not flexible - /// enough, use [`IntoNotification::with_tag()`] instead. - /// - /// Tagging functions cannot be implemented efficiently for `no_std`, so this is only available - /// when the `std` feature is enabled. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{IntoNotification, Listener, Event}; - /// - /// let event = Event::::with_tag(); - /// - /// let mut listener1 = event.listen(); - /// let mut listener2 = event.listen(); - /// - /// // Notify with `true` then `false`. - /// event.notify(1.additional().tag(true)); - /// event.notify(1.additional().tag(false)); - /// - /// assert_eq!(listener1.wait(), true); - /// assert_eq!(listener2.wait(), false); - /// ``` - #[cfg(feature = "std")] - fn tag(self, tag: T) -> Tag - where - Self: Sized + IntoNotification, - { - Tag::new(tag, self.into_notification()) - } - - /// Use a function to generate a tag with this notification. - /// - /// In many cases, it is desired to send additional information to the listener of the [`Event`]. For instance, - /// it is possible to optimize a `Mutex` implementation by locking directly on the next listener, without - /// needing to ever unlock the mutex at all. - /// - /// Tagging functions cannot be implemented efficiently for `no_std`, so this is only available - /// when the `std` feature is enabled. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{IntoNotification, Listener, Event}; - /// - /// let event = Event::::with_tag(); - /// - /// let mut listener1 = event.listen(); - /// let mut listener2 = event.listen(); - /// - /// // Notify with `true` then `false`. - /// event.notify(1.additional().tag_with(|| true)); - /// event.notify(1.additional().tag_with(|| false)); - /// - /// assert_eq!(listener1.wait(), true); - /// assert_eq!(listener2.wait(), false); - /// ``` - #[cfg(feature = "std")] - fn tag_with(self, tag: F) -> TagWith - where - Self: Sized + IntoNotification, - F: FnMut() -> T, - { - TagWith::new(tag, self.into_notification()) - } -} - -impl IntoNotification for N { - type Tag = N::Tag; - type Notify = N; - - fn into_notification(self) -> Self::Notify { - self - } -} - -macro_rules! impl_for_numeric_types { - ($($ty:ty)*) => {$( - impl IntoNotification for $ty { - type Tag = (); - type Notify = Notify; - - #[allow(unused_comparisons)] - fn into_notification(self) -> Self::Notify { - if self < 0 { - panic!("negative notification count"); - } - - Notify::new(self.try_into().expect("overflow")) - } - } - - impl __private::Sealed for $ty {} - )*}; -} - -impl_for_numeric_types! { usize u8 u16 u32 u64 u128 isize i8 i16 i32 i64 i128 } - -/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. -#[inline] -pub(super) fn full_fence() { - #[cfg(all(any(target_arch = "x86", target_arch = "x86_64"), not(miri), not(loom)))] - { - use core::{arch::asm, cell::UnsafeCell}; - // HACK(stjepang): On x86 architectures there are two different ways of executing - // a `SeqCst` fence. - // - // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. - // 2. A `lock ` instruction. - // - // Both instructions have the effect of a full barrier, but empirical benchmarks have shown - // that the second one is sometimes a bit faster. - let a = UnsafeCell::new(0_usize); - // It is common to use `lock or` here, but when using a local variable, `lock not`, which - // does not change the flag, should be slightly more efficient. - // Refs: https://www.felixcloutier.com/x86/not - unsafe { - #[cfg(target_pointer_width = "64")] - asm!("lock not qword ptr [{0}]", in(reg) a.get(), options(nostack, preserves_flags)); - #[cfg(target_pointer_width = "32")] - asm!("lock not dword ptr [{0:e}]", in(reg) a.get(), options(nostack, preserves_flags)); - } - return; - } - #[allow(unreachable_code)] - { - atomic::fence(Ordering::SeqCst); - } -} - -mod __private { - /// Make sure the NotificationPrivate trait can't be implemented outside of this crate. - #[doc(hidden)] - #[derive(Debug)] - pub struct Internal(()); - - impl Internal { - pub(crate) fn new() -> Self { - Self(()) - } - } - - #[doc(hidden)] - pub trait Sealed {} - impl Sealed for N {} -} diff --git a/src/std.rs b/src/std.rs deleted file mode 100644 index 3e7f76a..0000000 --- a/src/std.rs +++ /dev/null @@ -1,397 +0,0 @@ -//! libstd-based implementation of `event-listener`. -//! -//! This implementation crates an intrusive linked list of listeners. - -use crate::notify::{GenericNotify, Internal, Notification}; -use crate::sync::atomic::Ordering; -use crate::sync::cell::{Cell, UnsafeCell}; -use crate::sync::{Mutex, MutexGuard}; -use crate::{RegisterResult, State, TaskRef}; - -use core::marker::PhantomPinned; -use core::mem; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::ptr::NonNull; - -pub(super) struct List(Mutex>); - -struct Inner { - /// The head of the linked list. - head: Option>>, - - /// The tail of the linked list. - tail: Option>>, - - /// The first unnotified listener. - next: Option>>, - - /// Total number of listeners. - len: usize, - - /// The number of notified listeners. - notified: usize, -} - -impl List { - /// Create a new, empty event listener list. - pub(super) fn new() -> Self { - Self(Mutex::new(Inner { - head: None, - tail: None, - next: None, - len: 0, - notified: 0, - })) - } - - /// Get the total number of listeners without blocking. - pub(crate) fn try_total_listeners(&self) -> Option { - self.0.try_lock().ok().map(|list| list.len) - } - - /// Get the total number of listeners with blocking. - pub(crate) fn total_listeners(&self) -> usize { - self.0.lock().unwrap_or_else(|e| e.into_inner()).len - } -} - -impl crate::Inner { - fn lock(&self) -> ListLock<'_, '_, T> { - ListLock { - inner: self, - lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()), - } - } - - /// Whether or not this number of listeners would lead to a notification. - pub(crate) fn needs_notification(&self, limit: usize) -> bool { - self.notified.load(Ordering::Acquire) < limit - } - - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - let mut inner = self.lock(); - - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; - } - - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } - - // Bump the entry count. - inner.len += 1; - } - - /// Remove a listener from the list. - pub(crate) fn remove( - &self, - listener: Pin<&mut Option>>, - propagate: bool, - ) -> Option> { - self.lock().remove(listener, propagate) - } - - /// Notifies a number of entries. - #[cold] - pub(crate) fn notify(&self, notify: impl Notification) -> usize { - self.lock().notify(notify) - } - - /// Register a task to be notified when the event is triggered. - /// - /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener - /// isn't inserted, returns `None`. - pub(crate) fn register( - &self, - mut listener: Pin<&mut Option>>, - task: TaskRef<'_>, - ) -> RegisterResult { - let mut inner = self.lock(); - let entry_guard = match listener.as_mut().as_pin_mut() { - Some(listener) => listener.link.get(), - None => return RegisterResult::NeverInserted, - }; - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Take out the state and check it. - match entry.state.replace(State::NotifiedTaken) { - State::Notified { tag, .. } => { - // We have been notified, remove the listener. - inner.remove(listener, false); - RegisterResult::Notified(tag) - } - - State::Task(other_task) => { - // Only replace the task if it's different. - entry.state.set(State::Task({ - if !task.will_wake(other_task.as_task_ref()) { - task.into_task() - } else { - other_task - } - })); - - RegisterResult::Registered - } - - _ => { - // We have not been notified, register the task. - entry.state.set(State::Task(task.into_task())); - RegisterResult::Registered - } - } - } -} - -impl Inner { - fn remove( - &mut self, - mut listener: Pin<&mut Option>>, - propagate: bool, - ) -> Option> { - let entry_guard = listener.as_mut().as_pin_mut()?.link.get(); - let entry = unsafe { entry_guard.deref() }; - - let prev = entry.prev.get(); - let next = entry.next.get(); - - // Unlink from the previous entry. - match prev { - None => self.head = next, - Some(p) => unsafe { - p.as_ref().next.set(next); - }, - } - - // Unlink from the next entry. - match next { - None => self.tail = prev, - Some(n) => unsafe { - n.as_ref().prev.set(prev); - }, - } - - // If this was the first unnotified entry, update the next pointer. - if self.next == Some(entry.into()) { - self.next = next; - } - - // The entry is now fully unlinked, so we can now take it out safely. - let entry = unsafe { - listener - .get_unchecked_mut() - .take() - .unwrap() - .link - .into_inner() - }; - - // This State::Created is immediately dropped and exists as a workaround for the absence of - // loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();` - // - // refs: https://github.com/tokio-rs/loom/pull/341 - let mut state = entry.state.replace(State::Created); - - // Update the notified count. - if state.is_notified() { - self.notified -= 1; - - if propagate { - let state = mem::replace(&mut state, State::NotifiedTaken); - if let State::Notified { additional, tag } = state { - let tags = { - let mut tag = Some(tag); - move || tag.take().expect("tag already taken") - }; - self.notify(GenericNotify::new(1, additional, tags)); - } - } - } - self.len -= 1; - - Some(state) - } - - #[cold] - fn notify(&mut self, mut notify: impl Notification) -> usize { - let mut n = notify.count(Internal::new()); - let is_additional = notify.is_additional(Internal::new()); - - if !is_additional { - if n < self.notified { - return 0; - } - n -= self.notified; - } - - let original_count = n; - while n > 0 { - n -= 1; - - // Notify the next entry. - match self.next { - None => return original_count - n - 1, - - Some(e) => { - // Get the entry and move the pointer forwards. - let entry = unsafe { e.as_ref() }; - self.next = entry.next.get(); - - // Set the state to `Notified` and notify. - let tag = notify.next_tag(Internal::new()); - if let State::Task(task) = entry.state.replace(State::Notified { - additional: is_additional, - tag, - }) { - task.wake(); - } - - // Bump the notified count. - self.notified += 1; - } - } - } - - original_count - n - } -} - -struct ListLock<'a, 'b, T> { - lock: MutexGuard<'a, Inner>, - inner: &'b crate::Inner, -} - -impl Deref for ListLock<'_, '_, T> { - type Target = Inner; - - fn deref(&self) -> &Self::Target { - &self.lock - } -} - -impl DerefMut for ListLock<'_, '_, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.lock - } -} - -impl Drop for ListLock<'_, '_, T> { - fn drop(&mut self) { - let list = &mut **self; - - // Update the notified count. - let notified = if list.notified < list.len { - list.notified - } else { - core::usize::MAX - }; - - self.inner.notified.store(notified, Ordering::Release); - } -} - -pub(crate) struct Listener { - /// The inner link in the linked list. - /// - /// # Safety - /// - /// This can only be accessed while the central mutex is locked. - link: UnsafeCell>, - - /// This listener cannot be moved after being pinned. - _pin: PhantomPinned, -} - -struct Link { - /// The current state of the listener. - state: Cell>, - - /// The previous link in the linked list. - prev: Cell>>>, - - /// The next link in the linked list. - next: Cell>>>, -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_lite::pin; - - #[cfg(target_family = "wasm")] - use wasm_bindgen_test::wasm_bindgen_test as test; - - macro_rules! make_listeners { - ($($id:ident),*) => { - $( - let $id = Option::>::None; - pin!($id); - )* - }; - } - - #[test] - fn insert() { - let inner = crate::Inner::new(); - make_listeners!(listen1, listen2, listen3); - - // Register the listeners. - inner.insert(listen1.as_mut()); - inner.insert(listen2.as_mut()); - inner.insert(listen3.as_mut()); - - assert_eq!(inner.lock().len, 3); - - // Remove one. - assert_eq!(inner.remove(listen2, false), Some(State::Created)); - assert_eq!(inner.lock().len, 2); - - // Remove another. - assert_eq!(inner.remove(listen1, false), Some(State::Created)); - assert_eq!(inner.lock().len, 1); - } - - #[test] - fn drop_non_notified() { - let inner = crate::Inner::new(); - make_listeners!(listen1, listen2, listen3); - - // Register the listeners. - inner.insert(listen1.as_mut()); - inner.insert(listen2.as_mut()); - inner.insert(listen3.as_mut()); - - // Notify one. - inner.notify(GenericNotify::new(1, false, || ())); - - // Remove one. - inner.remove(listen3, true); - - // Remove the rest. - inner.remove(listen1, true); - inner.remove(listen2, true); - } -} diff --git a/tests/loom.rs b/tests/loom.rs deleted file mode 100644 index 6ef1d05..0000000 --- a/tests/loom.rs +++ /dev/null @@ -1,212 +0,0 @@ -#![cfg(loom)] -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::Context; -use std::usize; - -use event_listener::{Event, EventListener}; -use waker_fn::waker_fn; - -#[cfg(target_family = "wasm")] -use wasm_bindgen_test::wasm_bindgen_test as test; - -fn is_notified(listener: &mut EventListener) -> bool { - let waker = waker_fn(|| ()); - Pin::new(listener) - .poll(&mut Context::from_waker(&waker)) - .is_ready() -} - -#[test] -fn notify() { - loom::model(|| { - let event = Event::new(); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - - assert_eq!(event.notify(2), 2); - assert_eq!(event.notify(1), 0); - - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - }); -} - -#[test] -fn notify_additional() { - loom::model(|| { - let event = Event::new(); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - assert_eq!(event.notify_additional(1), 1); - assert_eq!(event.notify(1), 0); - assert_eq!(event.notify_additional(1), 1); - - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - }) -} - -#[test] -fn notify_one() { - loom::model(|| { - let event = Event::new(); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - - assert_eq!(event.notify(1), 1); - assert!(is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - - assert_eq!(event.notify(1), 1); - assert!(is_notified(&mut l2)); - }); -} - -#[test] -fn notify_all() { - loom::model(|| { - let event = Event::new(); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - - assert!(!is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - - assert_eq!(event.notify(usize::MAX), 2); - assert!(is_notified(&mut l1)); - assert!(is_notified(&mut l2)); - }); -} - -#[test] -fn drop_notified() { - loom::model(|| { - let event = Event::new(); - - let l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - assert_eq!(event.notify(1), 1); - drop(l1); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - }); -} - -#[test] -fn drop_notified2() { - loom::model(|| { - let event = Event::new(); - - let l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - assert_eq!(event.notify(2), 2); - drop(l1); - assert!(is_notified(&mut l2)); - assert!(!is_notified(&mut l3)); - }); -} - -#[test] -fn drop_notified_additional() { - loom::model(|| { - let event = Event::new(); - - let l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - let mut l4 = event.listen(); - - assert_eq!(event.notify_additional(1), 1); - assert_eq!(event.notify(2), 1); - drop(l1); - assert!(is_notified(&mut l2)); - assert!(is_notified(&mut l3)); - assert!(!is_notified(&mut l4)); - }); -} - -#[test] -fn drop_non_notified() { - loom::model(|| { - let event = Event::new(); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - let l3 = event.listen(); - - assert_eq!(event.notify(1), 1); - drop(l3); - assert!(is_notified(&mut l1)); - assert!(!is_notified(&mut l2)); - }) -} - -#[test] -fn notify_all_fair() { - loom::model(|| { - let event = Event::new(); - let v = Arc::new(Mutex::new(vec![])); - - let mut l1 = event.listen(); - let mut l2 = event.listen(); - let mut l3 = event.listen(); - - let waker1 = { - let v = v.clone(); - waker_fn(move || v.lock().unwrap().push(1)) - }; - let waker2 = { - let v = v.clone(); - waker_fn(move || v.lock().unwrap().push(2)) - }; - let waker3 = { - let v = v.clone(); - waker_fn(move || v.lock().unwrap().push(3)) - }; - - assert!(Pin::new(&mut l1) - .poll(&mut Context::from_waker(&waker1)) - .is_pending()); - assert!(Pin::new(&mut l2) - .poll(&mut Context::from_waker(&waker2)) - .is_pending()); - assert!(Pin::new(&mut l3) - .poll(&mut Context::from_waker(&waker3)) - .is_pending()); - - assert_eq!(event.notify(usize::MAX), 3); - assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]); - - assert!(Pin::new(&mut l1) - .poll(&mut Context::from_waker(&waker1)) - .is_ready()); - assert!(Pin::new(&mut l2) - .poll(&mut Context::from_waker(&waker2)) - .is_ready()); - assert!(Pin::new(&mut l3) - .poll(&mut Context::from_waker(&waker3)) - .is_ready()); - }) -} diff --git a/tests/notify.rs b/tests/notify.rs index 490a492..1a77020 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -7,9 +7,6 @@ use std::usize; use event_listener::{Event, EventListener}; use waker_fn::waker_fn; -#[cfg(target_family = "wasm")] -use wasm_bindgen_test::wasm_bindgen_test as test; - fn is_notified(listener: &mut EventListener) -> bool { let waker = waker_fn(|| ()); Pin::new(listener) @@ -29,9 +26,8 @@ fn notify() { assert!(!is_notified(&mut l2)); assert!(!is_notified(&mut l3)); - assert_eq!(event.notify(2), 2); - assert_eq!(event.notify(1), 0); - + event.notify(2); + event.notify(1); assert!(is_notified(&mut l1)); assert!(is_notified(&mut l2)); assert!(!is_notified(&mut l3)); @@ -45,9 +41,9 @@ fn notify_additional() { let mut l2 = event.listen(); let mut l3 = event.listen(); - assert_eq!(event.notify_additional(1), 1); - assert_eq!(event.notify(1), 0); - assert_eq!(event.notify_additional(1), 1); + event.notify_additional(1); + event.notify(1); + event.notify_additional(1); assert!(is_notified(&mut l1)); assert!(is_notified(&mut l2)); @@ -64,11 +60,11 @@ fn notify_one() { assert!(!is_notified(&mut l1)); assert!(!is_notified(&mut l2)); - assert_eq!(event.notify(1), 1); + event.notify(1); assert!(is_notified(&mut l1)); assert!(!is_notified(&mut l2)); - assert_eq!(event.notify(1), 1); + event.notify(1); assert!(is_notified(&mut l2)); } @@ -82,7 +78,7 @@ fn notify_all() { assert!(!is_notified(&mut l1)); assert!(!is_notified(&mut l2)); - assert_eq!(event.notify(usize::MAX), 2); + event.notify(usize::MAX); assert!(is_notified(&mut l1)); assert!(is_notified(&mut l2)); } @@ -95,7 +91,7 @@ fn drop_notified() { let mut l2 = event.listen(); let mut l3 = event.listen(); - assert_eq!(event.notify(1), 1); + event.notify(1); drop(l1); assert!(is_notified(&mut l2)); assert!(!is_notified(&mut l3)); @@ -109,7 +105,7 @@ fn drop_notified2() { let mut l2 = event.listen(); let mut l3 = event.listen(); - assert_eq!(event.notify(2), 2); + event.notify(2); drop(l1); assert!(is_notified(&mut l2)); assert!(!is_notified(&mut l3)); @@ -124,8 +120,8 @@ fn drop_notified_additional() { let mut l3 = event.listen(); let mut l4 = event.listen(); - assert_eq!(event.notify_additional(1), 1); - assert_eq!(event.notify(2), 1); + event.notify_additional(1); + event.notify(2); drop(l1); assert!(is_notified(&mut l2)); assert!(is_notified(&mut l3)); @@ -140,7 +136,7 @@ fn drop_non_notified() { let mut l2 = event.listen(); let l3 = event.listen(); - assert_eq!(event.notify(1), 1); + event.notify(1); drop(l3); assert!(is_notified(&mut l1)); assert!(!is_notified(&mut l2)); @@ -178,7 +174,7 @@ fn notify_all_fair() { .poll(&mut Context::from_waker(&waker3)) .is_pending()); - assert_eq!(event.notify(usize::MAX), 3); + event.notify(usize::MAX); assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]); assert!(Pin::new(&mut l1)