From 034912bd0391448d51c2b3361d6efb2ce7ead124 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 1 Nov 2022 15:58:12 -0700 Subject: [PATCH 1/5] New, no_std algorithm --- Cargo.toml | 9 +- src/lib.rs | 679 ++++++++++++++---------------------------------- src/listener.rs | 410 +++++++++++++++++++++++++++++ src/queue.rs | 219 ++++++++++++++++ src/sync.rs | 60 +++++ src/util.rs | 83 ++++++ 6 files changed, 981 insertions(+), 479 deletions(-) create mode 100644 src/listener.rs create mode 100644 src/queue.rs create mode 100644 src/sync.rs create mode 100644 src/util.rs diff --git a/Cargo.toml b/Cargo.toml index 38f4cee..28a390f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,13 @@ keywords = ["condvar", "eventcount", "wake", "blocking", "park"] categories = ["asynchronous", "concurrency"] exclude = ["/.*"] +[features] +default = ["std"] +std = ["parking"] + [dependencies] -parking = "2.0.0" +crossbeam-utils = { version = "0.8.12", default-features = false } +parking = { version = "2.0.0", optional = true } [dev-dependencies] criterion = "0.3.4" @@ -26,4 +31,4 @@ name = "bench" harness = false [lib] -bench = false \ No newline at end of file +bench = false diff --git a/src/lib.rs b/src/lib.rs index 0290a48..dc796c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,22 +61,37 @@ //! ``` #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![no_std] -use std::cell::{Cell, UnsafeCell}; -use std::fmt; -use std::future::Future; -use std::mem::{self, ManuallyDrop}; -use std::ops::{Deref, DerefMut}; -use std::panic::{RefUnwindSafe, UnwindSafe}; -use std::pin::Pin; -use std::ptr::{self, NonNull}; -use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::task::{Context, Poll, Waker}; +extern crate alloc; + +#[cfg(feature = "std")] +extern crate std; + +mod listener; +mod queue; +mod sync; +mod util; + +use alloc::sync::Arc; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::ptr::{self, NonNull}; +use core::task::{Context, Poll}; +use core::usize; + +use listener::{Listener, Wakeup}; +use queue::{ListenerQueue, Node}; +use sync::atomic::{self, AtomicUsize, Ordering}; +use util::RacyArc; + +#[cfg(feature = "std")] use std::time::{Duration, Instant}; -use std::usize; -use parking::Unparker; +#[cfg(feature = "std")] +use std::panic::{RefUnwindSafe, UnwindSafe}; /// Inner state of [`Event`]. struct Inner { @@ -85,27 +100,11 @@ struct Inner { /// If there are no entries, this value is set to `usize::MAX`. notified: AtomicUsize, - /// A linked list holding registered listeners. - list: Mutex, + /// The number of entries that haven't been orphaned yet. + len: AtomicUsize, - /// A single cached list entry to avoid allocations on the fast path of the insertion. - cache: UnsafeCell, -} - -impl Inner { - /// Locks the list. - fn lock(&self) -> ListGuard<'_> { - ListGuard { - inner: self, - guard: self.list.lock().unwrap(), - } - } - - /// Returns the pointer to the single cached list entry. - #[inline(always)] - fn cache_ptr(&self) -> NonNull { - unsafe { NonNull::new_unchecked(self.cache.get()) } - } + /// The queue that holds all listeners. + queue: ListenerQueue, } /// A synchronization primitive for notifying async tasks and threads. @@ -128,18 +127,15 @@ impl Inner { /// /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. pub struct Event { - /// A pointer to heap-allocated inner state. - /// - /// This pointer is initially null and gets lazily initialized on first use. Semantically, it - /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s - /// reference count. - inner: AtomicPtr, + inner: util::RacyArc, } -unsafe impl Send for Event {} -unsafe impl Sync for Event {} +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} +#[cfg(feature = "std")] impl UnwindSafe for Event {} +#[cfg(feature = "std")] impl RefUnwindSafe for Event {} impl Event { @@ -155,7 +151,7 @@ impl Event { #[inline] pub const fn new() -> Event { Event { - inner: AtomicPtr::new(ptr::null_mut()), + inner: RacyArc::new(), } } @@ -173,12 +169,23 @@ impl Event { /// ``` #[cold] pub fn listen(&self) -> EventListener { - let inner = self.inner(); + // Initialize the inner state if we haven't already. + let inner = self.inner.get_or_init(|| { + Arc::new(Inner { + notified: AtomicUsize::new(0), + len: AtomicUsize::new(0), + queue: ListenerQueue::new(), + }) + }); + let listener = EventListener { - inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, - entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) }, + inner: Arc::clone(&inner), + entry: Some(inner.queue.push(Listener::new())), }; + // Bump the number of listeners. + inner.len.fetch_add(1, Ordering::SeqCst); + // Make sure the listener is registered before whatever happens next. full_fence(); listener @@ -193,6 +200,11 @@ impl Event { /// /// This method emits a `SeqCst` fence before notifying listeners. /// + /// # Behavior + /// + /// If this method is called at the same time as [`notify()`] or [`notify_additional()`] + /// is called on another thread, it may wake more listeners than `n`. + /// /// # Examples /// /// ``` @@ -218,11 +230,12 @@ impl Event { // Make sure the notification comes after whatever triggered it. full_fence(); - if let Some(inner) = self.try_inner() { + if let Some(inner) = self.inner.get() { // Notify if there is at least one unnotified listener and the number of notified // listeners is less than `n`. - if inner.notified.load(Ordering::Acquire) < n { - inner.lock().notify(n); + let notified = inner.notified.load(Ordering::Acquire); + if notified < n { + inner.notify(n - notified, false); } } } @@ -236,6 +249,11 @@ impl Event { /// /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence. /// + /// # Behavior + /// + /// If this method is called at the same time as [`notify()`] or [`notify_additional()`] + /// is called on another thread, it may wake more listeners than `n`. + /// /// # Examples /// /// ``` @@ -262,11 +280,12 @@ impl Event { /// ``` #[inline] pub fn notify_relaxed(&self, n: usize) { - if let Some(inner) = self.try_inner() { + if let Some(inner) = self.inner.get() { // Notify if there is at least one unnotified listener and the number of notified // listeners is less than `n`. - if inner.notified.load(Ordering::Acquire) < n { - inner.lock().notify(n); + let notified = inner.notified.load(Ordering::Acquire); + if notified < n { + inner.notify(n - notified, false); } } } @@ -306,10 +325,10 @@ impl Event { // Make sure the notification comes after whatever triggered it. full_fence(); - if let Some(inner) = self.try_inner() { + if let Some(inner) = self.inner.get() { // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < usize::MAX { - inner.lock().notify_additional(n); + if inner.len.load(Ordering::Acquire) > 0 { + inner.notify(n, true); } } } @@ -351,82 +370,10 @@ impl Event { /// ``` #[inline] pub fn notify_additional_relaxed(&self, n: usize) { - if let Some(inner) = self.try_inner() { + if let Some(inner) = self.inner.get() { // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < usize::MAX { - inner.lock().notify_additional(n); - } - } - } - - /// Returns a reference to the inner state if it was initialized. - #[inline] - fn try_inner(&self) -> Option<&Inner> { - let inner = self.inner.load(Ordering::Acquire); - unsafe { inner.as_ref() } - } - - /// Returns a raw pointer to the inner state, initializing it if necessary. - /// - /// This returns a raw pointer instead of reference because `from_raw` - /// requires raw/mut provenance: - fn inner(&self) -> *const Inner { - let mut inner = self.inner.load(Ordering::Acquire); - - // Initialize the state if this is its first use. - if inner.is_null() { - // Allocate on the heap. - let new = Arc::new(Inner { - notified: AtomicUsize::new(usize::MAX), - list: std::sync::Mutex::new(List { - head: None, - tail: None, - start: None, - len: 0, - notified: 0, - cache_used: false, - }), - cache: UnsafeCell::new(Entry { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(None), - }), - }); - // Convert the heap-allocated state into a raw pointer. - let new = Arc::into_raw(new) as *mut Inner; - - // Attempt to replace the null-pointer with the new state pointer. - inner = self - .inner - .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) - .unwrap_or_else(|x| x); - - // Check if the old pointer value was indeed null. - if inner.is_null() { - // If yes, then use the new state pointer. - inner = new; - } else { - // If not, that means a concurrent operation has initialized the state. - // In that case, use the old pointer and deallocate the new one. - unsafe { - drop(Arc::from_raw(new)); - } - } - } - - inner - } -} - -impl Drop for Event { - #[inline] - fn drop(&mut self) { - let inner: *mut Inner = *self.inner.get_mut(); - - // If the state pointer has been initialized, deallocate it. - if !inner.is_null() { - unsafe { - drop(Arc::from_raw(inner)); + if inner.len.load(Ordering::Acquire) > 0 { + inner.notify(n, true); } } } @@ -444,6 +391,19 @@ impl Default for Event { } } +impl Drop for Event { + fn drop(&mut self) { + // Notify all remaining listeners. + if let Some(inner) = self.inner.get() { + while let Some(entry) = inner.queue.pop() { + unsafe { + entry.as_ref().listener().notify(false); + } + } + } + } +} + /// A guard waiting for a notification from an [`Event`]. /// /// There are two ways for a listener to wait for a notification: @@ -459,15 +419,18 @@ pub struct EventListener { inner: Arc, /// A pointer to this listener's entry in the linked list. - entry: Option>, + entry: Option>, } unsafe impl Send for EventListener {} unsafe impl Sync for EventListener {} +#[cfg(feature = "std")] impl UnwindSafe for EventListener {} +#[cfg(feature = "std")] impl RefUnwindSafe for EventListener {} +#[cfg(feature = "std")] impl EventListener { /// Blocks until a notification is received. /// @@ -529,6 +492,54 @@ impl EventListener { self.wait_internal(Some(deadline)) } + fn wait_internal(mut self, deadline: Option) -> bool { + // Take out the entry pointer and set it to `None`. + let entry = match self.entry.take() { + None => unreachable!("cannot wait twice on an `EventListener`"), + Some(entry) => entry, + }; + let (parker, unparker) = parking::pair(); + + // Wait until a notification is received or the timeout is reached. + loop { + // See if we've been notified. + if unsafe { + entry + .as_ref() + .listener() + .register(|| Wakeup::Thread(unparker.clone())) + } { + // We've been notified, so we're done. + self.decrement_length(); + return true; + } + + match deadline { + None => parker.park(), + + Some(deadline) => { + // Check for timeout. + let now = Instant::now(); + + // Park until the deadline. + let notified = parker.park_timeout( + deadline + .checked_duration_since(now) + .unwrap_or_else(|| Duration::from_secs(0)), + ); + + if !notified { + // We timed out, so we're done. + self.decrement_length(); + return false; + } + } + } + } + } +} + +impl EventListener { /// Drops this listener and discards its notification (if any) without notifying another /// active listener. /// @@ -548,15 +559,7 @@ impl EventListener { /// assert!(!listener2.discard()); /// ``` pub fn discard(mut self) -> bool { - // If this listener has never picked up a notification... - if let Some(entry) = self.entry.take() { - let mut list = self.inner.lock(); - // Remove the listener from the list and return `true` if it was notified. - if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) { - return true; - } - } - false + self.orphan().is_some() } /// Returns `true` if this listener listens to the given `Event`. @@ -573,7 +576,7 @@ impl EventListener { /// ``` #[inline] pub fn listens_to(&self, event: &Event) -> bool { - ptr::eq::(&*self.inner, event.inner.load(Ordering::Acquire)) + ptr::eq::(&*self.inner, event.inner.as_ptr()) } /// Returns `true` if both listeners listen to the same `Event`. @@ -593,67 +596,27 @@ impl EventListener { ptr::eq::(&*self.inner, &*other.inner) } - fn wait_internal(mut self, deadline: Option) -> bool { - // Take out the entry pointer and set it to `None`. - let entry = match self.entry.take() { - None => unreachable!("cannot wait twice on an `EventListener`"), - Some(entry) => entry, - }; - let (parker, unparker) = parking::pair(); + /// Orphans this listener and updates counts in the main struct. + /// + /// Returns Some() if the listener was notified, None otherwise. + fn orphan(&mut self) -> Option { + if let Some(node) = self.entry.take() { + self.decrement_length(); + let orphan = unsafe { self.inner.queue.orphan(node) }; - // Set this listener's state to `Waiting`. - { - let mut list = self.inner.lock(); - let e = unsafe { entry.as_ref() }; - - // Do a dummy replace operation in order to take out the state. - match e.state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry, self.inner.cache_ptr()); - return true; - } - // Otherwise, set the state to `Waiting`. - _ => e.state.set(State::Waiting(unparker)), + // If the listener was notified, update the counts. + if let Some(additional) = orphan { + self.inner.notified.fetch_sub(1, Ordering::Release); + return Some(additional); } } - // Wait until a notification is received or the timeout is reached. - loop { - match deadline { - None => parker.park(), - - Some(deadline) => { - // Check for timeout. - let now = Instant::now(); - if now >= deadline { - // Remove the entry and check if notified. - return self - .inner - .lock() - .remove(entry, self.inner.cache_ptr()) - .is_notified(); - } - - // Park until the deadline. - parker.park_timeout(deadline - now); - } - } - - let mut list = self.inner.lock(); - let e = unsafe { entry.as_ref() }; + None + } - // Do a dummy replace operation in order to take out the state. - match e.state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry, self.inner.cache_ptr()); - return true; - } - // Otherwise, set the state back to `Waiting`. - state => e.state.set(state), - } - } + /// Decrement the length of the queue. + fn decrement_length(&self) { + self.inner.len.fetch_sub(1, Ordering::Release); } } @@ -666,309 +629,59 @@ impl fmt::Debug for EventListener { impl Future for EventListener { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut list = self.inner.lock(); - + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let entry = match self.entry { None => unreachable!("cannot poll a completed `EventListener` future"), Some(entry) => entry, }; - let state = unsafe { &entry.as_ref().state }; - - // Do a dummy replace operation in order to take out the state. - match state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry, self.inner.cache_ptr()); - drop(list); - self.entry = None; - return Poll::Ready(()); - } - State::Created => { - // If the listener was just created, put it in the `Polling` state. - state.set(State::Polling(cx.waker().clone())); - } - State::Polling(w) => { - // If the listener was in the `Polling` state, update the waker. - if w.will_wake(cx.waker()) { - state.set(State::Polling(w)); - } else { - state.set(State::Polling(cx.waker().clone())); - } - } - State::Waiting(_) => { - unreachable!("cannot poll and wait on `EventListener` at the same time") - } - } - Poll::Pending + // See if we've been notified. + if unsafe { + entry + .as_ref() + .listener() + .register(|| Wakeup::Waker(cx.waker().clone())) + } { + // We've been notified, so we're done. + // Remove ourselves from the list. + self.get_mut().orphan(); + Poll::Ready(()) + } else { + Poll::Pending + } } } impl Drop for EventListener { fn drop(&mut self) { - // If this listener has never picked up a notification... - if let Some(entry) = self.entry.take() { - let mut list = self.inner.lock(); - - // But if a notification was delivered to it... - if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) { - // Then pass it on to another active listener. - if additional { - list.notify_additional(1); - } else { - list.notify(1); - } + // Orphan the listener. + if let Some(additional) = self.orphan() { + // If a notification was delivered, then pass it on to the next listener. + if additional || self.inner.notified.load(Ordering::Acquire) == 0 { + self.inner.notify(1, additional); } } } } -/// A guard holding the linked list locked. -struct ListGuard<'a> { - /// A reference to [`Event`]'s inner state. - inner: &'a Inner, - - /// The actual guard that acquired the linked list. - guard: MutexGuard<'a, List>, -} - -impl Drop for ListGuard<'_> { - #[inline] - fn drop(&mut self) { - let list = &mut **self; - - // Update the atomic `notified` counter. - let notified = if list.notified < list.len { - list.notified - } else { - usize::MAX - }; - self.inner.notified.store(notified, Ordering::Release); - } -} - -impl Deref for ListGuard<'_> { - type Target = List; - - #[inline] - fn deref(&self) -> &List { - &*self.guard - } -} - -impl DerefMut for ListGuard<'_> { - #[inline] - fn deref_mut(&mut self) -> &mut List { - &mut *self.guard - } -} - -/// The state of a listener. -enum State { - /// It has just been created. - Created, - - /// It has received a notification. - /// - /// The `bool` is `true` if this was an "additional" notification. - Notified(bool), - - /// An async task is polling it. - Polling(Waker), - - /// A thread is blocked on it. - Waiting(Unparker), -} - -impl State { - /// Returns `true` if this is the `Notified` state. - #[inline] - fn is_notified(&self) -> bool { - match self { - State::Notified(_) => true, - State::Created | State::Polling(_) | State::Waiting(_) => false, - } - } -} - -/// An entry representing a registered listener. -struct Entry { - /// The state of this listener. - state: Cell, - - /// Previous entry in the linked list. - prev: Cell>>, - - /// Next entry in the linked list. - next: Cell>>, -} - -/// A linked list of entries. -struct List { - /// First entry in the list. - head: Option>, - - /// Last entry in the list. - tail: Option>, - - /// The first unnotified entry in the list. - start: Option>, - - /// Total number of entries in the list. - len: usize, - - /// The number of notified entries in the list. - notified: usize, - - /// Whether the cached entry is used. - cache_used: bool, -} - -impl List { - /// Inserts a new entry into the list. - fn insert(&mut self, cache: NonNull) -> NonNull { - unsafe { - let entry = Entry { - state: Cell::new(State::Created), - prev: Cell::new(self.tail), - next: Cell::new(None), - }; - - let entry = if self.cache_used { - // Allocate an entry that is going to become the new tail. - NonNull::new_unchecked(Box::into_raw(Box::new(entry))) - } else { - // No need to allocate - we can use the cached entry. - self.cache_used = true; - cache.as_ptr().write(entry); - cache - }; - - // Replace the tail with the new entry. - match mem::replace(&mut self.tail, Some(entry)) { - None => self.head = Some(entry), - Some(t) => t.as_ref().next.set(Some(entry)), - } - - // If there were no unnotified entries, this one is the first now. - if self.start.is_none() { - self.start = self.tail; - } - - // Bump the entry count. - self.len += 1; - - entry - } - } - - /// Removes an entry from the list and returns its state. - fn remove(&mut self, entry: NonNull, cache: NonNull) -> State { - unsafe { - let prev = entry.as_ref().prev.get(); - let next = entry.as_ref().next.get(); - - // Unlink from the previous entry. - match prev { - None => self.head = next, - Some(p) => p.as_ref().next.set(next), - } - - // Unlink from the next entry. - match next { - None => self.tail = prev, - Some(n) => n.as_ref().prev.set(prev), - } - - // If this was the first unnotified entry, move the pointer to the next one. - if self.start == Some(entry) { - self.start = next; - } - - // Extract the state. - let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) { - // Free the cached entry. - self.cache_used = false; - entry.as_ref().state.replace(State::Created) - } else { - // Deallocate the entry. - Box::from_raw(entry.as_ptr()).state.into_inner() - }; - - // Update the counters. - if state.is_notified() { - self.notified -= 1; - } - self.len -= 1; - - state - } - } - - /// Notifies a number of entries. +impl Inner { + /// Notify `n` listeners that have not yet been notified. #[cold] - fn notify(&mut self, mut n: usize) { - if n <= self.notified { - return; - } - n -= self.notified; - + fn notify(&self, mut n: usize, additional: bool) { while n > 0 { - n -= 1; - - // Notify the first unnotified entry. - match self.start { + // Try to notify a listener. + let listener = match self.queue.pop() { + Some(listener) => listener, None => break, - Some(e) => { - // Get the entry and move the pointer forward. - let e = unsafe { e.as_ref() }; - self.start = e.next.get(); - - // Set the state of this entry to `Notified` and notify. - match e.state.replace(State::Notified(false)) { - State::Notified(_) => {} - State::Created => {} - State::Polling(w) => w.wake(), - State::Waiting(t) => { - t.unpark(); - } - } - - // Update the counter. - self.notified += 1; - } - } - } - } + }; - /// Notifies a number of additional entries. - #[cold] - fn notify_additional(&mut self, mut n: usize) { - while n > 0 { n -= 1; - // Notify the first unnotified entry. - match self.start { - None => break, - Some(e) => { - // Get the entry and move the pointer forward. - let e = unsafe { e.as_ref() }; - self.start = e.next.get(); - - // Set the state of this entry to `Notified` and notify. - match e.state.replace(State::Notified(true)) { - State::Notified(_) => {} - State::Created => {} - State::Polling(w) => w.wake(), - State::Waiting(t) => { - t.unpark(); - } - } + // Notify the listener. + let notified = unsafe { listener.as_ref().listener().notify(additional) }; - // Update the counter. - self.notified += 1; - } + if notified { + self.notified.fetch_add(1, Ordering::Release); } } } @@ -1001,3 +714,15 @@ fn full_fence() { atomic::fence(Ordering::SeqCst); } } + +#[inline] +fn yield_now() { + #[cfg(not(feature = "std"))] + { + #[allow(deprecated)] + sync::atomic::spin_loop_hint(); + } + + #[cfg(feature = "std")] + std::thread::yield_now(); +} diff --git a/src/listener.rs b/src/listener.rs new file mode 100644 index 0000000..6db180d --- /dev/null +++ b/src/listener.rs @@ -0,0 +1,410 @@ +//! A listener in the queue of listeners. + +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::cell::Cell; + +use core::task::Waker; + +#[cfg(feature = "std")] +use parking::Unparker; + +/// A listener in the queue of listeners. +pub(crate) struct Listener { + /// The current state of this listener. + /// + /// This also serves as the refcount: + /// - Ref 1: The listener state is not orphaned. + /// - Ref 2: The listener is in the queue (queue bit is set). + state: AtomicUsize, + + /// The waker or thread handle that will be notified when the listener is + /// ready. + /// + /// This is kept synchronized by the `state` variable; therefore, it's + /// technically `Sync`. + waker: Cell, +} + +impl Listener { + /// Create a new listener. + pub(crate) fn new() -> Self { + Self { + state: AtomicUsize::new( + State { + listen: ListenState::Created, + queued: false, + } + .into(), + ), + waker: Cell::new(Wakeup::None), + } + } + + /// Enqueue this listener. + pub(crate) fn enqueue(&self) { + let mut state = State::from(self.state.load(Ordering::Acquire)); + + // If the listener is already queued, then we don't need to do anything. + loop { + if state.queued { + return; + } + + // Mark the listener as queued. + let new_state = State { + queued: true, + ..state + }; + + match self.state.compare_exchange( + state.into(), + new_state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => state = State::from(actual), + } + } + } + + /// Dequeue this listener. + /// + /// Returns `true` if the listener is also orphaned, and that the caller + /// should drop the listener. + pub(crate) fn dequeue(&self) -> bool { + let mut state = State::from(self.state.load(Ordering::Acquire)); + + // If the listener is not queued, then we don't need to do anything. + loop { + if !state.queued { + return false; + } + + // Mark the listener as not queued. + let new_state = State { + queued: false, + ..state + }; + + match self.state.compare_exchange( + state.into(), + new_state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(actual) => state = State::from(actual), + } + } + + // If the listener is orphaned, then we need to drop it. + state.listen == ListenState::Orphaned + } + + /// Orphan this listener, and return `true` if we need to be dropped. + /// + /// This method is called when `EventListener` is dropped. The second result is whether or not + /// the listener was notified (and `true` if the notification was additional.) + pub(crate) fn orphan(&self) -> (bool, Option) { + let mut state = State::from(self.state.load(Ordering::Acquire)); + + loop { + match state.listen { + ListenState::Orphaned => { + // If the listener is already orphaned, then we don't need to do + // anything. + return (false, None); + } + ListenState::SettingWaker => { + // We may be in the middle of being notified. Wait for the + // notification to complete. + crate::yield_now(); + state = State::from(self.state.load(Ordering::Acquire)); + continue; + } + _ => {} + } + + // Mark the listener as orphaned. + let new_state = State { + listen: ListenState::Orphaned, + ..state + }; + + match self.state.compare_exchange( + state.into(), + new_state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + break; + } + Err(actual) => state = State::from(actual), + } + } + + // Determine if we were woken before we were orphaned. + let notified = match state.listen { + ListenState::Notified => Some(false), + ListenState::NotifiedAdditional => Some(true), + ListenState::Registered => { + // Make sure to delete the waker. + self.waker.replace(Wakeup::None); + + None + } + _ => None, + }; + + // If the listener is not queued, then we need to drop it. + (!state.queued, notified) + } + + /// Check to see if we have been notified yet. + /// + /// Returns true if we have been notified, and false otherwise. + pub(crate) fn register(&self, init: impl FnOnce() -> Wakeup) -> bool { + let mut state = State::from(self.state.load(Ordering::Acquire)); + + loop { + match state.listen { + ListenState::Orphaned => { + // We've been orphaned, so there is no way we will be notified. + return false; + } + ListenState::Created | ListenState::Registered => { + // We are not yet notified, so we need to register the waker. + let new_state = State { + listen: ListenState::SettingWaker, + ..state + }; + + match self.state.compare_exchange( + state.into(), + new_state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // We're the first to register, so we need to set the waker. + self.waker.set(init()); + + // Mark the listener as registered. + let new_state = State { + listen: ListenState::Registered, + ..state + }; + + self.state.store(new_state.into(), Ordering::Release); + + return false; + } + Err(actual) => { + state = State::from(actual); + continue; + } + } + } + ListenState::Notified | ListenState::NotifiedAdditional => { + // We've been notified! + return true; + } + ListenState::SettingWaker => { + // We may be in the middle of being notified. Wait for the + // notification to complete. + crate::yield_now(); + state = State::from(self.state.load(Ordering::Acquire)); + continue; + } + } + } + } + + /// Notify this listener. + /// + /// Returns "true" if the notification did anything. + pub(crate) fn notify(&self, additional: bool) -> bool { + let mut state = State::from(self.state.load(Ordering::Acquire)); + let new_listen = if additional { + ListenState::NotifiedAdditional + } else { + ListenState::Notified + }; + + loop { + // Determine what we want the new state to be. + let new_state = State { + listen: new_listen, + ..state + }; + + match state.listen { + ListenState::Notified | ListenState::NotifiedAdditional | ListenState::Orphaned => { + // We're already either notified or orphaned. + return false; + } + ListenState::Created => { + // They haven't registered a wakeup yet, so register them. + if let Err(actual) = self.state.compare_exchange( + state.into(), + new_state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + // We failed to register them, so we need to try again. + crate::yield_now(); + state = State::from(actual); + continue; + } + + return true; + } + ListenState::Registered => { + let intermediate_state = State { + listen: ListenState::SettingWaker, + ..state + }; + + // We're registered, so we need to wake them up. + if let Err(actual) = self.state.compare_exchange( + state.into(), + intermediate_state.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) { + // We failed to lock the waker, try again. + crate::yield_now(); + state = State::from(actual); + continue; + } + + // If a waker panics, set the state to notified anyways. + let _cleanup = Cleanup(|| { + self.state.store(new_state.into(), Ordering::Release); + }); + + // Wake them up. + self.waker.replace(Wakeup::None).wake(); + + return true; + } + ListenState::SettingWaker => { + // The listener is setting the waker, so we need to wait for them to finish. + crate::yield_now(); + state = State::from(self.state.load(Ordering::Acquire)); + continue; + } + } + } + } +} + +/// The current state of a listener. +/// +/// This is stored in a `usize` in order to allow it to be used atomically. +#[derive(Copy, Clone)] +struct State { + /// The current listener state. + listen: ListenState, + + /// Whether or not we are queued. + queued: bool, +} + +const LISTEN_STATE_MASK: usize = 0b111; +const QUEUED_BIT: usize = 0b1000; + +impl From for State { + fn from(value: usize) -> Self { + // Determine the `ListenState`. + let listen = match value & LISTEN_STATE_MASK { + 0 => ListenState::Created, + 1 => ListenState::Registered, + 2 => ListenState::SettingWaker, + 3 => ListenState::Notified, + 4 => ListenState::NotifiedAdditional, + 5 => ListenState::Orphaned, + _ => unreachable!("invalid state"), + }; + + // Determine if we are queued. + let queued = (value & QUEUED_BIT) != 0; + + Self { listen, queued } + } +} + +impl From for usize { + fn from(state: State) -> Self { + state.listen as usize | (if state.queued { QUEUED_BIT } else { 0 }) + } +} + +/// The current state of a listener. +/// +/// This is the portion that matters to the `EventListener` structure. +#[repr(usize)] +#[derive(Copy, Clone, PartialEq, Eq)] +enum ListenState { + /// We've just been created and are not yet registered. + Created = 0, + + /// We've been registered with a task. + /// + /// The `waker` field is initialized when we transition to this state. + Registered = 1, + + /// We are currently editing the `waker` field. + /// + /// When we are in this state, someone is modifying the `waker` field. + /// We shouldn't do anything until they're done. + SettingWaker = 2, + + /// We have been notified, using the `notify()` function. + Notified = 3, + + /// We have been notified, using the `notify_additional()` function. + NotifiedAdditional = 4, + + /// The event listener is no longer paying attention to us. + /// + /// The `EventListener` reference no longer exists. If we are in this state, + /// and the `QUEUED` bit is no longer set, we should drop ourselves. + Orphaned = 5, +} + +/// A waker or thread handle that will be notified when the listener is ready. +pub(crate) enum Wakeup { + /// No waker or thread handle has been set. + None, + + /// A waker has been set. + Waker(Waker), + + /// A thread handle has been set. + #[cfg(feature = "std")] + Thread(Unparker), +} + +impl Wakeup { + fn wake(self) { + match self { + Wakeup::None => {} + Wakeup::Waker(waker) => waker.wake(), + #[cfg(feature = "std")] + Wakeup::Thread(unparker) => { + unparker.unpark(); + } + } + } +} + +struct Cleanup(F); + +impl Drop for Cleanup { + fn drop(&mut self) { + (self.0)(); + } +} diff --git a/src/queue.rs b/src/queue.rs new file mode 100644 index 0000000..d1784f4 --- /dev/null +++ b/src/queue.rs @@ -0,0 +1,219 @@ +//! The atomic queue containing listeners. + +use crate::listener::Listener; +use crate::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; + +use crossbeam_utils::CachePadded; + +use alloc::boxed::Box; + +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::ptr::{self, NonNull}; + +/// A queue of listeners. +/// +/// The operations on this queue that are defined are: +/// +/// - Insertion at the tail of the list. +/// - Removal from the head of the list. +/// - Removing arbitrary elements from the list. +pub(crate) struct ListenerQueue { + /// The head of the queue. + head: CachePadded>, + + /// The tail of the queue. + tail: CachePadded>, + + /// A single cached node. + /// + /// In theory, this helps the hot case where only a single listener is + /// registered. In practice, I don't see much of a difference in benchmarks + /// with or without this. + cache: CachedNode, +} + +// Ensure the bottom bits of the pointers are not used. +#[repr(align(4))] +pub(crate) struct Node { + /// The listener in this node. + listener: Listener, + + /// The next node in the queue. + next: AtomicPtr, +} + +/// A single cached node. +struct CachedNode { + /// Whether or not the cache is occupied. + occupied: AtomicBool, + + /// The cached node. + node: UnsafeCell>, +} + +impl ListenerQueue { + /// Creates a new, empty queue. + pub(crate) fn new() -> Self { + Self { + head: CachePadded::new(AtomicPtr::new(ptr::null_mut())), + tail: CachePadded::new(AtomicPtr::new(ptr::null_mut())), + cache: CachedNode { + occupied: AtomicBool::new(false), + node: UnsafeCell::new(MaybeUninit::uninit()), + }, + } + } + + /// Push a new node onto the queue. + /// + /// Returns a node pointer. + pub(crate) fn push(&self, listener: Listener) -> NonNull { + // Allocate a new node. + let node = self.alloc(listener); + + loop { + // Get the pointer to the node. + let tail = self.tail.load(Ordering::Acquire); + + // If the tail is empty, the list has to be empty. + let next_ptr = if tail.is_null() { + &self.head + } else { + unsafe { &(*tail).next } + }; + + // Try to set the next pointer. + let next = next_ptr + .compare_exchange( + ptr::null_mut(), + node.as_ptr(), + Ordering::AcqRel, + Ordering::Acquire, + ) + .unwrap_or_else(|x| x); + + // See if the operation succeeded. + if next.is_null() { + // The node has been added, update the current tail. + // If this fails, it means that another node already added themselves as the tail, which probably means + // that they are the tail. + self.tail + .compare_exchange(tail, node.as_ptr(), Ordering::AcqRel, Ordering::Acquire) + .ok(); + + // Make sure the node knows it's enqueued. + unsafe { + node.as_ref().listener.enqueue(); + } + + return node; + } else { + // The node has not been added, try again. + continue; + } + } + } + + /// Pop a node from the queue. + pub(crate) fn pop(&self) -> Option> { + loop { + // Get the head of the queue. + let head = self.head.load(Ordering::Acquire); + + // If the head is null, the queue is empty. + if head.is_null() { + return None; + } + + // Get the next pointer. + let next = unsafe { (*head).next.load(Ordering::Acquire) }; + + // Try to set the head to the next pointer. + if self + .head + .compare_exchange(head, next, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // The head has been updated. Dequeue the old head. + let head = unsafe { NonNull::new_unchecked(head) }; + if unsafe { head.as_ref().listener.dequeue() } { + // The EventListener for this node has been dropped, free the node. + unsafe { self.dealloc(head) }; + } else { + // The event is still in use, return the node. + return Some(head); + } + } + } + } + + /// Orphan a node in the queue. + /// + /// The return value is Some if the node was notified, and true is the notification + /// was an additional notification. + pub(crate) unsafe fn orphan(&self, node: NonNull) -> Option { + let (needs_drop, notified) = unsafe { node.as_ref().listener.orphan() }; + + // If we need to deallocate the node, do so. + if needs_drop { + unsafe { self.dealloc(node) }; + } + + notified + } + + /// Allocate a new node for a listener. + fn alloc(&self, listener: Listener) -> NonNull { + // Try to install a cached node. + if !self.cache.occupied.swap(true, Ordering::Acquire) { + // We can now initialize the node. + let node_ptr = self.cache.node.get() as *mut Node; + + unsafe { + // Initialize the node. + node_ptr.write(Node::new(listener)); + + // Return the node. + NonNull::new_unchecked(node_ptr) + } + } else { + // We failed to install a cached node, so we need to allocate a new + // one. + unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(Node::new(listener)))) } + } + } + + /// Deallocate a node. + unsafe fn dealloc(&self, node: NonNull) { + // Is this node the current cache node? + if ptr::eq(self.cache.node.get() as *const Node, node.as_ptr()) { + // We can now clear the cache. + unsafe { (self.cache.node.get() as *mut Node).drop_in_place() }; + self.cache.occupied.store(false, Ordering::Release); + } else { + // We need to deallocate the node on the heap. + unsafe { Box::from_raw(node.as_ptr()) }; + } + } +} + +impl Node { + fn new(listener: Listener) -> Self { + Self { + listener, + next: AtomicPtr::new(ptr::null_mut()), + } + } + + pub(crate) fn listener(&self) -> &Listener { + &self.listener + } +} + +impl Drop for ListenerQueue { + fn drop(&mut self) { + // Drain the queue. + while self.pop().is_some() {} + } +} diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..5b35d80 --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,60 @@ +//! The current implementation of synchronization primitives. + +// TODO: Add implementations for portable_atomic and loom. + +mod sync_impl { + pub(crate) use alloc::sync::Arc; + pub(crate) use core::cell; + pub(crate) use core::sync::atomic; + + pub(crate) trait AtomicWithMut { + type Target; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Target) -> R; + } + + impl AtomicWithMut for atomic::AtomicPtr { + type Target = *mut T; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Target) -> R, + { + f(self.get_mut()) + } + } + + impl AtomicWithMut for atomic::AtomicUsize { + type Target = usize; + + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self::Target) -> R, + { + f(self.get_mut()) + } + } + + pub(crate) trait CellWithMut { + type Target; + + fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut Self::Target) -> R; + } + + impl CellWithMut for cell::UnsafeCell { + type Target = T; + + fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut Self::Target) -> R, + { + f(self.get()) + } + } +} + +pub(crate) use sync_impl::*; diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..6cbc571 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,83 @@ +//! Various utility structures. + +use crate::sync::atomic::{AtomicPtr, Ordering}; +use crate::sync::{Arc, AtomicWithMut}; + +use core::mem::ManuallyDrop; +use core::ptr; + +/// A wrapper around an `Arc` that is initialized atomically in a racy manner. +#[derive(Default)] +pub(crate) struct RacyArc { + ptr: AtomicPtr, +} + +unsafe impl Send for RacyArc {} +unsafe impl Sync for RacyArc {} + +impl RacyArc { + /// Create a new, empty `RacyArc`. + pub(crate) const fn new() -> Self { + Self { + ptr: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Try to load the inner `T` or return `None` if it is not initialized. + pub(crate) fn get(&self) -> Option<&T> { + let ptr = self.ptr.load(Ordering::Acquire); + + // SAFETY: `ptr` is either a valid `T` reference or `null_mut()`. + unsafe { ptr.as_ref() } + } + + /// Load the inner `T`, initializing it using the given closure if it is not + /// initialized. + pub(crate) fn get_or_init(&self, init: impl FnOnce() -> Arc) -> ManuallyDrop> { + let mut ptr = self.ptr.load(Ordering::Acquire); + + if ptr.is_null() { + // Initialize the `Arc` using the given closure. + let data = init(); + + // Convert it to a pointer. + let new = Arc::into_raw(data) as *mut T; + + // Try to swap the pointer. + ptr = self + .ptr + .compare_exchange(ptr, new, Ordering::AcqRel, Ordering::Acquire) + .unwrap_or_else(|x| x); + + // Check if the pointer we tried to replace is null. + if ptr.is_null() { + // If it is, we successfully initialized the `RacyArc`. + ptr = new; + } else { + // If it is not, we failed to initialize the `RacyArc` and we + // need to drop the `Arc` we created. + drop(unsafe { Arc::from_raw(new) }); + } + } + + // SAFETY: `ptr` is now a valid T reference. + ManuallyDrop::new(unsafe { Arc::from_raw(ptr) }) + } + + pub(crate) fn as_ptr(&self) -> *const T { + self.ptr.load(Ordering::Acquire) + } +} + +impl Drop for RacyArc { + fn drop(&mut self) { + // SAFETY: `ptr` is either `null` or a valid `Arc` reference. + self.ptr.with_mut(|ptr| { + if !ptr.is_null() { + unsafe { + Arc::from_raw(*ptr); + } + } + }); + } +} From 3c69b09b2d51cd53d52b031e8f03139e0fb53d85 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 1 Nov 2022 16:16:49 -0700 Subject: [PATCH 2/5] Fix a bug where the queue tail isn't set --- src/lib.rs | 8 +++++++- src/queue.rs | 5 +++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index dc796c2..13bec04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -511,6 +511,7 @@ impl EventListener { } { // We've been notified, so we're done. self.decrement_length(); + self.decrement_notified(); return true; } @@ -606,7 +607,7 @@ impl EventListener { // If the listener was notified, update the counts. if let Some(additional) = orphan { - self.inner.notified.fetch_sub(1, Ordering::Release); + self.decrement_notified(); return Some(additional); } } @@ -618,6 +619,11 @@ impl EventListener { fn decrement_length(&self) { self.inner.len.fetch_sub(1, Ordering::Release); } + + /// Decrement the number of notified listeners. + fn decrement_notified(&self) { + self.inner.notified.fetch_sub(1, Ordering::Release); + } } impl fmt::Debug for EventListener { diff --git a/src/queue.rs b/src/queue.rs index d1784f4..1e16e32 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -135,6 +135,11 @@ impl ListenerQueue { .compare_exchange(head, next, Ordering::AcqRel, Ordering::Acquire) .is_ok() { + // If we just set the head to zero, set the tail to zero as well. + if next.is_null() { + self.tail.store(ptr::null_mut(), Ordering::SeqCst); + } + // The head has been updated. Dequeue the old head. let head = unsafe { NonNull::new_unchecked(head) }; if unsafe { head.as_ref().listener.dequeue() } { From bc799073b947cd00bc1a190b005f8d47f5334788 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 1 Nov 2022 17:53:18 -0700 Subject: [PATCH 3/5] Adjust lints --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 13bec04..07ac37c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,7 @@ //! } //! ``` +#![allow(unused_unsafe)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![no_std] From 3c81f04bbc6902e534f7e046e4a66440c6ca3634 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Wed, 2 Nov 2022 08:41:55 -0700 Subject: [PATCH 4/5] Move queue methods to CachedNode --- src/queue.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 1e16e32..17ef0b2 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -70,7 +70,7 @@ impl ListenerQueue { /// Returns a node pointer. pub(crate) fn push(&self, listener: Listener) -> NonNull { // Allocate a new node. - let node = self.alloc(listener); + let node = self.cache.alloc(listener); loop { // Get the pointer to the node. @@ -144,7 +144,7 @@ impl ListenerQueue { let head = unsafe { NonNull::new_unchecked(head) }; if unsafe { head.as_ref().listener.dequeue() } { // The EventListener for this node has been dropped, free the node. - unsafe { self.dealloc(head) }; + unsafe { self.cache.dealloc(head) }; } else { // The event is still in use, return the node. return Some(head); @@ -162,18 +162,20 @@ impl ListenerQueue { // If we need to deallocate the node, do so. if needs_drop { - unsafe { self.dealloc(node) }; + unsafe { self.cache.dealloc(node) }; } notified } +} +impl CachedNode { /// Allocate a new node for a listener. fn alloc(&self, listener: Listener) -> NonNull { // Try to install a cached node. - if !self.cache.occupied.swap(true, Ordering::Acquire) { + if !self.occupied.swap(true, Ordering::Acquire) { // We can now initialize the node. - let node_ptr = self.cache.node.get() as *mut Node; + let node_ptr = self.node.get() as *mut Node; unsafe { // Initialize the node. @@ -192,10 +194,10 @@ impl ListenerQueue { /// Deallocate a node. unsafe fn dealloc(&self, node: NonNull) { // Is this node the current cache node? - if ptr::eq(self.cache.node.get() as *const Node, node.as_ptr()) { + if ptr::eq(self.node.get() as *const Node, node.as_ptr()) { // We can now clear the cache. - unsafe { (self.cache.node.get() as *mut Node).drop_in_place() }; - self.cache.occupied.store(false, Ordering::Release); + unsafe { (self.node.get() as *mut Node).drop_in_place() }; + self.occupied.store(false, Ordering::Release); } else { // We need to deallocate the node on the heap. unsafe { Box::from_raw(node.as_ptr()) }; From 1de1adca612ae2474c803204cee2e42ae8addc2b Mon Sep 17 00:00:00 2001 From: jtnunley Date: Wed, 2 Nov 2022 18:32:03 -0700 Subject: [PATCH 5/5] Fix MaybeUninit usage --- src/queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/queue.rs b/src/queue.rs index 17ef0b2..b36eb53 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -175,7 +175,7 @@ impl CachedNode { // Try to install a cached node. if !self.occupied.swap(true, Ordering::Acquire) { // We can now initialize the node. - let node_ptr = self.node.get() as *mut Node; + let node_ptr = unsafe { (*self.node.get()).as_mut_ptr() }; unsafe { // Initialize the node.