diff --git a/Cargo.toml b/Cargo.toml index d98f878..1a0d66e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ optional = true [dev-dependencies] futures-lite = "2.0.0" +try-lock = "0.2.5" waker-fn = "1" [dev-dependencies.criterion] diff --git a/benches/bench.rs b/benches/bench.rs index 55557a6..d9e0db1 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -1,31 +1,22 @@ use std::iter; -use std::pin::Pin; use criterion::{criterion_group, criterion_main, Criterion}; -use event_listener::{Event, EventListener}; +use event_listener::{Event, Listener}; const COUNT: usize = 8000; fn bench_events(c: &mut Criterion) { c.bench_function("notify_and_wait", |b| { let ev = Event::new(); - let mut handles = iter::repeat_with(EventListener::new) - .take(COUNT) - .collect::>(); + let mut handles = Vec::with_capacity(COUNT); b.iter(|| { - for handle in &mut handles { - // SAFETY: The handle is not moved out. - let listener = unsafe { Pin::new_unchecked(handle) }; - listener.listen(&ev); - } + handles.extend(iter::repeat_with(|| ev.listen()).take(COUNT)); ev.notify(COUNT); - for handle in &mut handles { - // SAFETY: The handle is not moved out. - let listener = unsafe { Pin::new_unchecked(handle) }; - listener.wait(); + for handle in handles.drain(..) { + handle.wait(); } }); }); diff --git a/examples/mutex.rs b/examples/mutex.rs index a2b543c..30fbe66 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -6,25 +6,21 @@ mod example { #![allow(dead_code)] - use std::cell::UnsafeCell; use std::ops::{Deref, DerefMut}; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{mpsc, Arc}; use std::thread; use std::time::{Duration, Instant}; - use event_listener::Event; + use event_listener::{listener, Event, Listener}; + use try_lock::{Locked, TryLock}; /// A simple mutex. struct Mutex { - /// Set to `true` when the mutex is locked. - locked: AtomicBool, - /// Blocked lock operations. lock_ops: Event, - /// The inner protected data. - data: UnsafeCell, + /// The inner non-blocking mutex. + data: TryLock, } unsafe impl Send for Mutex {} @@ -34,49 +30,40 @@ mod example { /// Creates a mutex. fn new(t: T) -> Mutex { Mutex { - locked: AtomicBool::new(false), lock_ops: Event::new(), - data: UnsafeCell::new(t), + data: TryLock::new(t), } } /// Attempts to acquire a lock. fn try_lock(&self) -> Option> { - if !self.locked.swap(true, Ordering::Acquire) { - Some(MutexGuard(self)) - } else { - None - } + self.data.try_lock().map(MutexGuard) } /// Blocks until a lock is acquired. fn lock(&self) -> MutexGuard<'_, T> { - let mut listener = None; - loop { // Attempt grabbing a lock. if let Some(guard) = self.try_lock() { return guard; } - // Set up an event listener or wait for a notification. - match listener.take() { - None => { - // Start listening and then try locking again. - listener = Some(self.lock_ops.listen()); - } - Some(mut l) => { - // Wait until a notification is received. - l.as_mut().wait(); - } + // Set up an event listener. + listener!(self.lock_ops => listener); + + // Try again. + if let Some(guard) = self.try_lock() { + return guard; } + + // Wait for a notification. + listener.wait(); } } /// Blocks until a lock is acquired or the timeout is reached. fn lock_timeout(&self, timeout: Duration) -> Option> { let deadline = Instant::now() + timeout; - let mut listener = None; loop { // Attempt grabbing a lock. @@ -84,69 +71,55 @@ mod example { return Some(guard); } - // Set up an event listener or wait for an event. - match listener.take() { - None => { - // Start listening and then try locking again. - listener = Some(self.lock_ops.listen()); - } - Some(mut l) => { - // Wait until a notification is received. - l.as_mut().wait_deadline(deadline)?; - } + // Set up an event listener. + listener!(self.lock_ops => listener); + + // Try again. + if let Some(guard) = self.try_lock() { + return Some(guard); } + + // Wait until a notification is received. + listener.wait_deadline(deadline)?; } } /// Acquires a lock asynchronously. async fn lock_async(&self) -> MutexGuard<'_, T> { - let mut listener = None; - loop { // Attempt grabbing a lock. if let Some(guard) = self.try_lock() { return guard; } - // Set up an event listener or wait for an event. - match listener.take() { - None => { - // Start listening and then try locking again. - listener = Some(self.lock_ops.listen()); - } - Some(l) => { - // Wait until a notification is received. - l.await; - } + // Set up an event listener. + listener!(self.lock_ops => listener); + + // Try again. + if let Some(guard) = self.try_lock() { + return guard; } + + // Wait until a notification is received. + listener.await; } } } /// A guard holding a lock. - struct MutexGuard<'a, T>(&'a Mutex); - - unsafe impl Send for MutexGuard<'_, T> {} - unsafe impl Sync for MutexGuard<'_, T> {} - - impl Drop for MutexGuard<'_, T> { - fn drop(&mut self) { - self.0.locked.store(false, Ordering::Release); - self.0.lock_ops.notify(1); - } - } + struct MutexGuard<'a, T>(Locked<'a, T>); impl Deref for MutexGuard<'_, T> { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.0.data.get() } + &self.0 } } impl DerefMut for MutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.0.data.get() } + &mut self.0 } } diff --git a/src/lib.rs b/src/lib.rs index 969f807..7ac0016 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ //! use std::thread; //! use std::time::Duration; //! use std::usize; -//! use event_listener::Event; +//! use event_listener::{Event, Listener}; //! //! let flag = Arc::new(AtomicBool::new(false)); //! let event = Arc::new(Event::new()); @@ -56,7 +56,7 @@ //! } //! //! // Wait for a notification and continue the loop. -//! listener.as_mut().wait(); +//! listener.wait(); //! } //! ``` //! @@ -106,11 +106,6 @@ use sync::{Arc, WithMut}; use notify::{Internal, NotificationPrivate}; pub use notify::{IntoNotification, Notification}; -/// Useful traits for notifications. -pub mod prelude { - pub use crate::{IntoNotification, Notification}; -} - /// Inner state of [`Event`]. struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. @@ -230,7 +225,7 @@ impl Event { /// # Examples /// /// ``` - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let listener = event.listen(); @@ -264,12 +259,12 @@ impl Event { /// /// The above example is equivalent to this code: /// - /// ``` + /// ```no_compile /// use event_listener::{Event, EventListener}; /// /// let event = Event::new(); /// let mut listener = Box::pin(EventListener::new()); - /// listener.as_mut().listen(&event); + /// listener.listen(&event); /// ``` /// /// It creates a new listener, pins it to the heap, and inserts it into the linked list @@ -279,10 +274,18 @@ impl Event { /// allocated. However, users of this `new` method must be careful to ensure that the /// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise. #[cold] - pub fn listen(&self) -> Pin>> { - let mut listener = Box::pin(EventListener::new()); - listener.as_mut().listen(self); - listener + pub fn listen(&self) -> EventListener { + let inner = ManuallyDrop::new(unsafe { Arc::from_raw(self.inner()) }); + + // Allocate the listener on the heap and insert it. + let mut listener = Box::pin(InnerListener { + event: Arc::clone(&inner), + listener: None, + }); + listener.as_mut().listen(); + + // Return the listener. + EventListener { listener } } /// Notifies a number of active listeners. @@ -338,7 +341,7 @@ impl Event { /// [`relaxed`]: IntoNotification::relaxed /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Event}; /// use std::sync::atomic::{self, Ordering}; /// /// let event = Event::new(); @@ -367,7 +370,7 @@ impl Event { /// [`additional`]: IntoNotification::additional /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Event}; /// /// let event = Event::new(); /// @@ -390,7 +393,7 @@ impl Event { /// equivalent to calling [`Event::notify_additional_relaxed()`]. /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Event}; /// use std::sync::atomic::{self, Ordering}; /// /// let event = Event::new(); @@ -555,7 +558,7 @@ impl Event<()> { /// use [`Event::notify()`] like so: /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Event}; /// let event = Event::new(); /// /// // Old way: @@ -568,7 +571,7 @@ impl Event<()> { /// # Examples /// /// ``` - /// use event_listener::Event; + /// use event_listener::{Event, IntoNotification}; /// use std::sync::atomic::{self, Ordering}; /// /// let event = Event::new(); @@ -607,7 +610,7 @@ impl Event<()> { /// use [`Event::notify()`] like so: /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Event}; /// let event = Event::new(); /// /// // Old way: @@ -657,7 +660,7 @@ impl Event<()> { /// use [`Event::notify()`] like so: /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Event}; /// let event = Event::new(); /// /// // Old way: @@ -712,205 +715,17 @@ impl Drop for Event { } } -pin_project_lite::pin_project! { - /// A guard waiting for a notification from an [`Event`]. - /// - /// There are two ways for a listener to wait for a notification: - /// - /// 1. In an asynchronous manner using `.await`. - /// 2. In a blocking manner by calling [`EventListener::wait()`] on it. - /// - /// If a notified listener is dropped without receiving a notification, dropping will notify - /// another active listener. Whether one *additional* listener will be notified depends on what - /// kind of notification was delivered. - /// - /// The listener is not registered into the linked list inside of the [`Event`] by default if - /// it is created via the `new()` method. It needs to be pinned first before being inserted - /// using the `listen()` method. After the listener has begun `listen`ing, the user can - /// `await` it like a future or call `wait()` to block the current thread until it is notified. - /// - /// ## Examples - /// - /// ``` - /// use event_listener::{Event, EventListener}; - /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; - /// use std::thread; - /// use std::time::Duration; - /// - /// // Some flag to wait on. - /// let flag = Arc::new(AtomicBool::new(false)); - /// - /// // Create an event to wait on. - /// let event = Arc::new(Event::new()); - /// - /// thread::spawn({ - /// let flag = flag.clone(); - /// let event = event.clone(); - /// move || { - /// thread::sleep(Duration::from_secs(2)); - /// flag.store(true, Ordering::SeqCst); - /// - /// // Wake up the listener. - /// event.notify_additional(std::usize::MAX); - /// } - /// }); - /// - /// let listener = EventListener::new(); - /// - /// // Make sure that the event listener is pinned before doing anything else. - /// // - /// // We pin the listener to the stack here, as it lets us avoid a heap allocation. - /// futures_lite::pin!(listener); - /// - /// // Wait for the flag to become ready. - /// loop { - /// if flag.load(Ordering::Acquire) { - /// // We are done. - /// break; - /// } - /// - /// if listener.is_listening() { - /// // We are inserted into the linked list and we can now wait. - /// listener.as_mut().wait(); - /// } else { - /// // We need to insert ourselves into the list. Since this insertion is an atomic - /// // operation, we should check the flag again before waiting. - /// listener.as_mut().listen(&event); - /// } - /// } - /// ``` - /// - /// The above example is equivalent to the one provided in the crate level example. However, - /// it has some advantages. By directly creating the listener with `EventListener::new()`, - /// we have control over how the listener is handled in memory. We take advantage of this by - /// pinning the `listener` variable to the stack using the [`futures_lite::pin`] macro. In - /// contrast, `Event::listen` binds the listener to the heap. - /// - /// However, this additional power comes with additional responsibility. By default, the - /// event listener is created in an "uninserted" state. This property means that any - /// notifications delivered to the [`Event`] by default will not wake up this listener. - /// Before any notifications can be received, the `listen()` method must be called on - /// `EventListener` to insert it into the list of listeners. After a `.await` or a `wait()` - /// call has completed, `listen()` must be called again if the user is still interested in - /// any events. - /// - /// [`futures_lite::pin`]: https://docs.rs/futures-lite/latest/futures_lite/macro.pin.html - #[project(!Unpin)] // implied by Listener, but can generate better docs - pub struct EventListener { - #[pin] - listener: Listener>>, - } -} - -unsafe impl Send for EventListener {} -unsafe impl Sync for EventListener {} - -impl core::panic::UnwindSafe for EventListener {} -impl core::panic::RefUnwindSafe for EventListener {} - -impl Default for EventListener { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for EventListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("EventListener") - .field("listening", &self.is_listening()) - .finish() - } -} - -impl EventListener { - /// Create a new `EventListener` that will wait for a notification from the given [`Event`]. - /// - /// This function does not register the `EventListener` into the linked list of listeners - /// contained within the [`Event`]. Make sure to call `listen` before `await`ing on - /// this future or calling `wait()`. - /// - /// ## Examples - /// - /// ``` - /// use event_listener::{Event, EventListener}; - /// - /// let event = Event::new(); - /// let listener = EventListener::new(); - /// - /// // Make sure that the listener is pinned and listening before doing anything else. - /// let mut listener = Box::pin(listener); - /// listener.as_mut().listen(&event); - /// ``` - pub fn new() -> Self { - Self { - listener: Listener { - event: None, - listener: None, - }, - } - } - - /// Register this listener into the given [`Event`]. - /// - /// This method can only be called after the listener has been pinned, and must be called before - /// the listener is polled. - /// - /// Notifications that exist when this function is called will be discarded. - pub fn listen(mut self: Pin<&mut Self>, event: &Event) { - let inner = { - let inner = event.inner(); - unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) } - }; - - let ListenerProject { - event, - mut listener, - } = self.as_mut().project().listener.project(); - - // If an event is already registered, make sure to remove it. - if let Some(current_event) = event.as_ref() { - current_event.remove(listener.as_mut(), false); - } - - let inner = event.insert(inner); - inner.insert(listener); - - // Make sure the listener is registered before whatever happens next. - notify::full_fence(); - } - - /// Tell if this [`EventListener`] is currently listening for a notification. - /// - /// # Examples - /// - /// ``` - /// use event_listener::{Event, EventListener}; - /// - /// let event = Event::new(); - /// let mut listener = Box::pin(EventListener::new()); - /// - /// // The listener starts off not listening. - /// assert!(!listener.is_listening()); - /// - /// // After listen() is called, the listener is listening. - /// listener.as_mut().listen(&event); - /// assert!(listener.is_listening()); - /// - /// // Once the future is notified, the listener is no longer listening. - /// event.notify(1); - /// listener.as_mut().wait(); - /// assert!(!listener.is_listening()); - /// ``` - pub fn is_listening(&self) -> bool { - self.listener.listener.is_some() - } - +/// A handle that is listening to an [`Event`]. +/// +/// This trait represents a type waiting for a notification from an [`Event`]. See the +/// [`EventListener`] type for more documentation on this trait's usage. +pub trait Listener: Future + __sealed::Sealed { /// Blocks until a notification is received. /// /// # Examples /// /// ``` - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let mut listener = event.listen(); @@ -919,12 +734,10 @@ impl EventListener { /// event.notify(1); /// /// // Receive the notification. - /// listener.as_mut().wait(); + /// listener.wait(); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] - pub fn wait(self: Pin<&mut Self>) -> T { - self.listener().wait_internal(None).unwrap() - } + fn wait(self) -> T; /// Blocks until a notification is received or a timeout is reached. /// @@ -934,19 +747,16 @@ impl EventListener { /// /// ``` /// use std::time::Duration; - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(listener.as_mut().wait_timeout(Duration::from_secs(1)).is_none()); + /// assert!(listener.wait_timeout(Duration::from_secs(1)).is_none()); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] - pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> Option { - self.listener() - .wait_internal(Instant::now().checked_add(timeout)) - } + fn wait_timeout(self, timeout: Duration) -> Option; /// Blocks until a notification is received or a deadline is reached. /// @@ -956,18 +766,16 @@ impl EventListener { /// /// ``` /// use std::time::{Duration, Instant}; - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)).is_none()); + /// assert!(listener.wait_deadline(Instant::now() + Duration::from_secs(1)).is_none()); /// ``` #[cfg(all(feature = "std", not(target_family = "wasm")))] - pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> Option { - self.listener().wait_internal(Some(deadline)) - } + fn wait_deadline(self, deadline: Instant) -> Option; /// Drops this listener and discards its notification (if any) without notifying another /// active listener. @@ -975,8 +783,9 @@ impl EventListener { /// Returns `true` if a notification was discarded. /// /// # Examples + /// /// ``` - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let mut listener1 = event.listen(); @@ -984,40 +793,31 @@ impl EventListener { /// /// event.notify(1); /// - /// assert!(listener1.as_mut().discard()); - /// assert!(!listener2.as_mut().discard()); + /// assert!(listener1.discard()); + /// assert!(!listener2.discard()); /// ``` - pub fn discard(self: Pin<&mut Self>) -> bool { - self.project().listener.discard() - } + fn discard(self) -> bool; /// Returns `true` if this listener listens to the given `Event`. /// /// # Examples /// /// ``` - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let listener = event.listen(); /// /// assert!(listener.listens_to(&event)); /// ``` - #[inline] - pub fn listens_to(&self, event: &Event) -> bool { - if let Some(inner) = &self.listener.event { - return ptr::eq::>(&**inner, event.inner.load(Ordering::Acquire)); - } - - false - } + fn listens_to(&self, event: &Event) -> bool; /// Returns `true` if both listeners listen to the same `Event`. /// /// # Examples /// /// ``` - /// use event_listener::Event; + /// use event_listener::{Event, Listener}; /// /// let event = Event::new(); /// let listener1 = event.listen(); @@ -1025,40 +825,213 @@ impl EventListener { /// /// assert!(listener1.same_event(&listener2)); /// ``` - pub fn same_event(&self, other: &EventListener) -> bool { - if let (Some(inner1), Some(inner2)) = (self.inner(), other.inner()) { - return ptr::eq::>(&**inner1, &**inner2); + fn same_event(&self, other: &Self) -> bool; +} + +/// Implement the `Listener` trait using the underlying `InnerListener`. +macro_rules! forward_impl_to_listener { + ($gen:ident => $ty:ty) => { + impl<$gen> crate::Listener<$gen> for $ty { + #[cfg(all(feature = "std", not(target_family = "wasm")))] + fn wait(mut self) -> $gen { + self.listener_mut().wait_internal(None).unwrap() + } + + #[cfg(all(feature = "std", not(target_family = "wasm")))] + fn wait_timeout(mut self, timeout: std::time::Duration) -> Option<$gen> { + self.listener_mut() + .wait_internal(std::time::Instant::now().checked_add(timeout)) + } + + #[cfg(all(feature = "std", not(target_family = "wasm")))] + fn wait_deadline(mut self, deadline: std::time::Instant) -> Option<$gen> { + self.listener_mut().wait_internal(Some(deadline)) + } + + fn discard(mut self) -> bool { + self.listener_mut().discard() + } + + #[inline] + fn listens_to(&self, event: &Event<$gen>) -> bool { + core::ptr::eq::>( + &*self.listener().event, + event.inner.load(core::sync::atomic::Ordering::Acquire), + ) + } + + #[inline] + fn same_event(&self, other: &$ty) -> bool { + core::ptr::eq::>(&*self.listener().event, &*other.listener().event) + } } - false + impl<$gen> Future for $ty { + type Output = $gen; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<$gen> { + self.listener_mut().poll_internal(cx) + } + } + }; +} + +/// A guard waiting for a notification from an [`Event`]. +/// +/// There are two ways for a listener to wait for a notification: +/// +/// 1. In an asynchronous manner using `.await`. +/// 2. In a blocking manner by calling [`EventListener::wait()`] on it. +/// +/// If a notified listener is dropped without receiving a notification, dropping will notify +/// another active listener. Whether one *additional* listener will be notified depends on what +/// kind of notification was delivered. +/// +/// See the [`Listener`] trait for the functionality exposed by this type. +/// +/// This structure allocates the listener on the heap. +pub struct EventListener { + listener: Pin>>>>, +} + +unsafe impl Send for EventListener {} +unsafe impl Sync for EventListener {} + +impl core::panic::UnwindSafe for EventListener {} +impl core::panic::RefUnwindSafe for EventListener {} +impl Unpin for EventListener {} + +impl fmt::Debug for EventListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EventListener").finish_non_exhaustive() } +} - fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener>>> { - self.project().listener +impl EventListener { + #[inline] + fn listener(&self) -> &InnerListener>> { + &self.listener } - fn inner(&self) -> Option<&Arc>> { - self.listener.event.as_ref() + #[inline] + fn listener_mut(&mut self) -> Pin<&mut InnerListener>>> { + self.listener.as_mut() } } -impl Future for EventListener { - type Output = T; +forward_impl_to_listener! { T => EventListener } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.listener().poll_internal(cx) - } +/// Create a stack-based event listener for an [`Event`]. +/// +/// [`EventListener`] allocates the listener on the heap. While this works for most use cases, in +/// practice this heap allocation can be expensive for repeated uses. This method allows for +/// allocating the listener on the stack instead. +/// +/// There are limitations to using this macro instead of the [`EventListener`] type, however. +/// Firstly, it is significantly less flexible. The listener is locked to the current stack +/// frame, meaning that it can't be returned or put into a place where it would go out of +/// scope. For instance, this will not work: +/// +/// ```compile_fail +/// use event_listener::{Event, Listener, listener}; +/// +/// fn get_listener(event: &Event) -> impl Listener { +/// listener!(event => cant_return_this); +/// cant_return_this +/// } +/// ``` +/// +/// In addition, the types involved in creating this listener are not able to be named. Therefore +/// it cannot be used in hand-rolled futures or similar structures. +/// +/// The type created by this macro implements [`Listener`], allowing it to be used in cases where +/// [`EventListener`] would normally be used. +/// +/// ## Example +/// +/// To use this macro, replace cases where you would normally use this... +/// +/// ```no_compile +/// let listener = event.listen(); +/// ``` +/// +/// ...with this: +/// +/// ```no_compile +/// listener!(event => listener); +/// ``` +/// +/// Here is the top level example from this crate's documentation, but using [`listener`] instead +/// of [`EventListener`]. +/// +/// ``` +/// use std::sync::atomic::{AtomicBool, Ordering}; +/// use std::sync::Arc; +/// use std::thread; +/// use std::time::Duration; +/// use std::usize; +/// use event_listener::{Event, listener, IntoNotification, Listener}; +/// +/// let flag = Arc::new(AtomicBool::new(false)); +/// let event = Arc::new(Event::new()); +/// +/// // Spawn a thread that will set the flag after 1 second. +/// thread::spawn({ +/// let flag = flag.clone(); +/// let event = event.clone(); +/// move || { +/// // Wait for a second. +/// thread::sleep(Duration::from_secs(1)); +/// +/// // Set the flag. +/// flag.store(true, Ordering::SeqCst); +/// +/// // Notify all listeners that the flag has been set. +/// event.notify(usize::MAX); +/// } +/// }); +/// +/// // Wait until the flag is set. +/// loop { +/// // Check the flag. +/// if flag.load(Ordering::SeqCst) { +/// break; +/// } +/// +/// // Start listening for events. +/// // NEW: Changed to a stack-based listener. +/// listener!(event => listener); +/// +/// // Check the flag again after creating the listener. +/// if flag.load(Ordering::SeqCst) { +/// break; +/// } +/// +/// // Wait for a notification and continue the loop. +/// listener.wait(); +/// } +/// ``` +#[macro_export] +macro_rules! listener { + ($event:expr => $listener:ident) => { + let mut $listener = $crate::__private::StackSlot::new(&$event); + // SAFETY: We shadow $listener so it can't be moved after. + let mut $listener = unsafe { $crate::__private::Pin::new_unchecked(&mut $listener) }; + #[allow(unused_mut)] + let mut $listener = $listener.listen(); + }; } pin_project_lite::pin_project! { #[project(!Unpin)] #[project = ListenerProject] - struct Listener>> + struct InnerListener>> where B: Unpin, { // The reference to the original event. - event: Option, + event: B, // The inner state of the listener. // @@ -1068,24 +1041,29 @@ pin_project_lite::pin_project! { listener: Option>, } - impl>> PinnedDrop for Listener + impl>> PinnedDrop for InnerListener where B: Unpin, { fn drop(mut this: Pin<&mut Self>) { // If we're being dropped, we need to remove ourself from the list. let this = this.project(); - if let Some(inner) = this.event { - (*inner).borrow().remove(this.listener, true); - } + (*this.event).borrow().remove(this.listener, true); } } } -unsafe impl> + Unpin + Send> Send for Listener {} -unsafe impl> + Unpin + Sync> Sync for Listener {} +unsafe impl> + Unpin + Send> Send for InnerListener {} +unsafe impl> + Unpin + Sync> Sync for InnerListener {} + +impl> + Unpin> InnerListener { + /// Insert this listener into the linked list. + #[inline] + fn listen(self: Pin<&mut Self>) { + let this = self.project(); + (*this.event).borrow().insert(this.listener); + } -impl> + Unpin> Listener { /// Wait until the provided deadline. #[cfg(all(feature = "std", not(target_family = "wasm")))] fn wait_internal(mut self: Pin<&mut Self>, deadline: Option) -> Option { @@ -1110,7 +1088,8 @@ impl> + Unpin> Listener { // If the pair isn't accessible, we may be being called in a destructor. // Just create a new pair. let (parker, unparker) = parking::pair(); - self.wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker)) + self.as_mut() + .wait_with_parker(deadline, &parker, TaskRef::Unparker(&unparker)) }) } @@ -1123,11 +1102,7 @@ impl> + Unpin> Listener { unparker: TaskRef<'_>, ) -> Option { let mut this = self.project(); - let inner = (*this - .event - .as_ref() - .expect("must listen() on event listener before waiting")) - .borrow(); + let inner = (*this.event).borrow(); // Set the listener's state to `Task`. if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() { @@ -1146,7 +1121,7 @@ impl> + Unpin> Listener { if now >= deadline { // Remove our entry and check if we were notified. return inner - .remove(this.listener, false) + .remove(this.listener.as_mut(), false) .expect("We never removed ourself from the list") .notified(); } @@ -1165,28 +1140,20 @@ impl> + Unpin> Listener { /// active listener. fn discard(self: Pin<&mut Self>) -> bool { let this = self.project(); - - if let Some(inner) = this.event.as_ref() { - (*inner) - .borrow() - .remove(this.listener, false) - .map_or(false, |state| state.is_notified()) - } else { - false - } + (*this.event) + .borrow() + .remove(this.listener, false) + .map_or(false, |state| state.is_notified()) } /// Poll this listener for a notification. fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - let inner = match &this.event { - Some(inner) => (*inner).borrow(), - None => panic!(""), - }; + let this = self.project(); + let inner = (*this.event).borrow(); // Try to register the listener. match inner - .register(this.listener.as_mut(), TaskRef::Waker(cx.waker())) + .register(this.listener, TaskRef::Waker(cx.waker())) .notified() { Some(tag) => { @@ -1402,3 +1369,100 @@ fn __test_send_and_sync() { _assert_send::>(); _assert_sync::>(); } + +#[doc(hidden)] +mod __sealed { + use super::{EventListener, __private::StackListener}; + + pub trait Sealed {} + impl Sealed for EventListener {} + impl Sealed for StackListener<'_, '_, T> {} +} + +/// Semver exempt module. +#[doc(hidden)] +pub mod __private { + pub use core::pin::Pin; + + use super::{Event, Inner, InnerListener}; + use core::fmt; + use core::future::Future; + use core::task::{Context, Poll}; + + pin_project_lite::pin_project! { + /// Space on the stack where a stack-based listener can be allocated. + #[doc(hidden)] + #[project(!Unpin)] + pub struct StackSlot<'ev, T> { + #[pin] + listener: InnerListener> + } + } + + impl fmt::Debug for StackSlot<'_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StackSlot").finish_non_exhaustive() + } + } + + impl core::panic::UnwindSafe for StackSlot<'_, T> {} + impl core::panic::RefUnwindSafe for StackSlot<'_, T> {} + + impl<'ev, T> StackSlot<'ev, T> { + /// Create a new `StackSlot` on the stack. + #[inline] + #[doc(hidden)] + pub fn new(event: &'ev Event) -> Self { + let inner = unsafe { &*event.inner() }; + Self { + listener: InnerListener { + event: inner, + listener: None, + }, + } + } + + /// Start listening on this `StackSlot`. + #[inline] + #[doc(hidden)] + pub fn listen(mut self: Pin<&mut Self>) -> StackListener<'ev, '_, T> { + // Insert ourselves into the list. + self.as_mut().project().listener.listen(); + + // We are now listening. + StackListener { slot: self } + } + } + + /// A stack-based `EventListener`. + #[doc(hidden)] + pub struct StackListener<'ev, 'stack, T> { + slot: Pin<&'stack mut StackSlot<'ev, T>>, + } + + impl core::panic::UnwindSafe for StackListener<'_, '_, T> {} + impl core::panic::RefUnwindSafe for StackListener<'_, '_, T> {} + impl Unpin for StackListener<'_, '_, T> {} + + impl fmt::Debug for StackListener<'_, '_, T> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StackListener").finish_non_exhaustive() + } + } + + impl<'ev, T> StackListener<'ev, '_, T> { + #[inline] + fn listener(&self) -> &InnerListener> { + &self.slot.listener + } + + #[inline] + fn listener_mut(&mut self) -> Pin<&mut InnerListener>> { + self.slot.as_mut().project().listener + } + } + + forward_impl_to_listener! { T => StackListener<'_, '_, T> } +} diff --git a/src/notify.rs b/src/notify.rs index 9484668..61a9b59 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -46,7 +46,7 @@ pub trait NotificationPrivate { /// # Example /// /// ``` -/// use event_listener::{Event, prelude::*}; +/// use event_listener::{Event, IntoNotification, Notification}; /// /// fn notify(ev: &Event, notify: impl Notification) { /// ev.notify(notify); @@ -342,7 +342,7 @@ impl T> TagProducer for F { /// into this: /// /// ``` -/// use event_listener::{Event, prelude::*}; +/// use event_listener::{Event, IntoNotification, Listener}; /// /// let event = Event::new(); /// @@ -380,7 +380,7 @@ pub trait IntoNotification: __private::Sealed { /// # Examples /// /// ``` - /// use event_listener::prelude::*; + /// use event_listener::IntoNotification; /// /// let _ = 3.into_notification(); /// ``` @@ -406,7 +406,7 @@ pub trait IntoNotification: __private::Sealed { /// # Examples /// /// ``` - /// use event_listener::{Event, prelude::*}; + /// use event_listener::{Event, IntoNotification, Listener}; /// /// let event = Event::new(); /// @@ -442,7 +442,7 @@ pub trait IntoNotification: __private::Sealed { /// # Examples /// /// ``` - /// use event_listener::{Event, prelude::*}; + /// use event_listener::{Event, IntoNotification, Listener}; /// use std::sync::atomic::{self, Ordering}; /// /// let event = Event::new(); @@ -483,7 +483,7 @@ pub trait IntoNotification: __private::Sealed { /// # Examples /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Listener, Event}; /// /// let event = Event::::with_tag(); /// @@ -494,8 +494,8 @@ pub trait IntoNotification: __private::Sealed { /// event.notify(1.additional().tag(true)); /// event.notify(1.additional().tag(false)); /// - /// assert_eq!(listener1.as_mut().wait(), true); - /// assert_eq!(listener2.as_mut().wait(), false); + /// assert_eq!(listener1.wait(), true); + /// assert_eq!(listener2.wait(), false); /// ``` #[cfg(feature = "std")] fn tag(self, tag: T) -> Tag @@ -517,7 +517,7 @@ pub trait IntoNotification: __private::Sealed { /// # Examples /// /// ``` - /// use event_listener::{prelude::*, Event}; + /// use event_listener::{IntoNotification, Listener, Event}; /// /// let event = Event::::with_tag(); /// @@ -528,8 +528,8 @@ pub trait IntoNotification: __private::Sealed { /// event.notify(1.additional().tag_with(|| true)); /// event.notify(1.additional().tag_with(|| false)); /// - /// assert_eq!(listener1.as_mut().wait(), true); - /// assert_eq!(listener2.as_mut().wait(), false); + /// assert_eq!(listener1.wait(), true); + /// assert_eq!(listener2.wait(), false); /// ``` #[cfg(feature = "std")] fn tag_with(self, tag: F) -> TagWith diff --git a/tests/notify.rs b/tests/notify.rs index 319b2b6..490a492 100644 --- a/tests/notify.rs +++ b/tests/notify.rs @@ -10,9 +10,11 @@ use waker_fn::waker_fn; #[cfg(target_family = "wasm")] use wasm_bindgen_test::wasm_bindgen_test as test; -fn is_notified(listener: Pin<&mut EventListener>) -> bool { +fn is_notified(listener: &mut EventListener) -> bool { let waker = waker_fn(|| ()); - listener.poll(&mut Context::from_waker(&waker)).is_ready() + Pin::new(listener) + .poll(&mut Context::from_waker(&waker)) + .is_ready() } #[test] @@ -23,16 +25,16 @@ fn notify() { let mut l2 = event.listen(); let mut l3 = event.listen(); - assert!(!is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); assert_eq!(event.notify(2), 2); assert_eq!(event.notify(1), 0); - assert!(is_notified(l1.as_mut())); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -47,9 +49,9 @@ fn notify_additional() { assert_eq!(event.notify(1), 0); assert_eq!(event.notify_additional(1), 1); - assert!(is_notified(l1.as_mut())); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -59,15 +61,15 @@ fn notify_one() { let mut l1 = event.listen(); let mut l2 = event.listen(); - assert!(!is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); assert_eq!(event.notify(1), 1); - assert!(is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); assert_eq!(event.notify(1), 1); - assert!(is_notified(l2.as_mut())); + assert!(is_notified(&mut l2)); } #[test] @@ -77,12 +79,12 @@ fn notify_all() { let mut l1 = event.listen(); let mut l2 = event.listen(); - assert!(!is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); assert_eq!(event.notify(usize::MAX), 2); - assert!(is_notified(l1.as_mut())); - assert!(is_notified(l2.as_mut())); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); } #[test] @@ -95,8 +97,8 @@ fn drop_notified() { assert_eq!(event.notify(1), 1); drop(l1); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -109,8 +111,8 @@ fn drop_notified2() { assert_eq!(event.notify(2), 2); drop(l1); - assert!(is_notified(l2.as_mut())); - assert!(!is_notified(l3.as_mut())); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); } #[test] @@ -125,9 +127,9 @@ fn drop_notified_additional() { assert_eq!(event.notify_additional(1), 1); assert_eq!(event.notify(2), 1); drop(l1); - assert!(is_notified(l2.as_mut())); - assert!(is_notified(l3.as_mut())); - assert!(!is_notified(l4.as_mut())); + assert!(is_notified(&mut l2)); + assert!(is_notified(&mut l3)); + assert!(!is_notified(&mut l4)); } #[test] @@ -140,8 +142,8 @@ fn drop_non_notified() { assert_eq!(event.notify(1), 1); drop(l3); - assert!(is_notified(l1.as_mut())); - assert!(!is_notified(l2.as_mut())); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); } #[test] @@ -189,16 +191,3 @@ fn notify_all_fair() { .poll(&mut Context::from_waker(&waker3)) .is_ready()); } - -#[test] -fn more_than_one_event() { - let event = Event::new(); - let event2 = Event::new(); - - let mut listener = Box::pin(EventListener::<()>::new()); - listener.as_mut().listen(&event); - listener.as_mut().listen(&event2); - - drop(listener); - event.notify(1); -}