diff --git a/src/intrusive.rs b/src/intrusive.rs index 69f460b..7733aa0 100644 --- a/src/intrusive.rs +++ b/src/intrusive.rs @@ -6,7 +6,7 @@ use crate::notify::{GenericNotify, Internal, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; -use crate::{RegisterResult, State, TaskRef}; +use crate::{RegisterResult, State, TaskRef, Sched}; #[cfg(feature = "critical-section")] use core::cell::RefCell; @@ -42,17 +42,21 @@ struct Inner { /// The number of notified listeners. notified: usize, + + /// Schedule by which the list is organized. + sched: Sched, } impl List { /// Create a new, empty event listener list. - pub(super) fn new() -> Self { + pub(super) fn new(sched: Sched) -> Self { let inner = Inner { head: None, tail: None, next: None, len: 0, notified: 0, + sched, }; #[cfg(feature = "critical-section")] @@ -149,39 +153,9 @@ impl crate::Inner { }) } - /// Add a new listener to the list. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { - self.with_inner(|inner| { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - { - let entry_guard = listener.link.get(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { entry_guard.deref() }; - - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; - } - - // If there are no unnotified entries, this is the first one. - if inner.next.is_none() { - inner.next = inner.tail; - } - - // Bump the entry count. - inner.len += 1; - }); + /// Adds a listener to the list. + pub(crate) fn insert(&self, listener: Pin<&mut Option>>) { + self.with_inner(|inner| inner.insert(listener)) } /// Remove a listener from the list. @@ -248,6 +222,47 @@ impl crate::Inner { } impl Inner { + fn insert(&mut self, mut listener: Pin<&mut Option>>) { + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(self.tail.filter(|_| self.sched == Sched::Fifo)), + next: Cell::new(self.head.filter(|_| self.sched == Sched::Lifo)), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the head or tail with the new entry. + let replacing = match self.sched { + Sched::Lifo => &mut self.head, + Sched::Fifo => &mut self.tail, + }; + + match mem::replace(replacing, Some(entry.into())) { + None => *replacing = Some(entry.into()), + Some(t) if self.sched == Sched::Lifo => unsafe { t.as_ref().prev.set(Some(entry.into())) }, + Some(t) if self.sched == Sched::Fifo => unsafe { t.as_ref().next.set(Some(entry.into())) }, + Some(_) => unimplemented!("unimplemented scheduling type"), + }; + } + + // If there are no unnotified entries, or if using LIFO schedule, this is the first one. + if self.sched == Sched::Lifo { + self.next = self.head; + } else if self.next.is_none() { + self.next = self.tail; + } + + // Bump the entry count. + self.len += 1; + } + fn remove( &mut self, mut listener: Pin<&mut Option>>, @@ -413,7 +428,7 @@ mod tests { #[test] fn insert() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(Sched::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. @@ -434,7 +449,7 @@ mod tests { #[test] fn drop_non_notified() { - let inner = crate::Inner::new(); + let inner = crate::Inner::new(Sched::Fifo); make_listeners!(listen1, listen2, listen3); // Register the listeners. diff --git a/src/lib.rs b/src/lib.rs index d6a8e44..f68f27d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,6 +127,16 @@ use sync::WithMut; use notify::NotificationPrivate; pub use notify::{IntoNotification, Notification}; +/// Queue schedule of a listener. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum Sched { + /// First-in-first-out listeners are added to the back of the list. + Fifo, + + /// Last-in-first-out listeners are added to the front of the list. + Lifo, +} + /// Inner state of [`Event`]. struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. @@ -143,10 +153,10 @@ struct Inner { } impl Inner { - fn new() -> Self { + fn new(sched: Sched) -> Self { Self { notified: AtomicUsize::new(usize::MAX), - list: sys::List::new(), + list: sys::List::new(sched), } } } @@ -177,6 +187,9 @@ pub struct Event { /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. inner: AtomicPtr>, + + /// Queue schedule type. + sched: Sched, } unsafe impl Send for Event {} @@ -238,6 +251,7 @@ impl Event { pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + sched: Sched::Fifo, } } #[cfg(all(feature = "std", loom))] @@ -245,6 +259,7 @@ impl Event { pub fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + sched: Sched::Fifo, } } @@ -471,7 +486,7 @@ impl Event { // If this is the first use, initialize the state. if inner.is_null() { // Allocate the state on the heap. - let new = Arc::new(Inner::::new()); + let new = Arc::new(Inner::::new(self.sched)); // Convert the state to a raw pointer. let new = Arc::into_raw(new) as *mut Inner; @@ -556,16 +571,39 @@ impl Event<()> { #[inline] #[cfg(not(loom))] pub const fn new() -> Self { + Self::new_with_sched(Sched::Fifo) + } + + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self::new_with_sched(Sched::Fifo) + } + + /// Creates a new [`Event`] with queue schedule type. + /// + /// # Examples + /// + /// ``` + /// use event_listener::{Event, Sched}; + /// + /// let event = Event::new_with_sched(Sched::Fifo); + /// ``` + #[inline] + #[cfg(not(loom))] + pub const fn new_with_sched(sched: Sched) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + sched, } } #[inline] #[cfg(loom)] - pub fn new() -> Self { + pub fn new_with_sched(sched: Sched) -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), + sched, } }