Skip to content

Commit

Permalink
feat: Add scheduling options for listener lists.
Browse files Browse the repository at this point in the history
Two schedulers are available:
- FIFO: The original round-robin queueing; listeners are inserted at the back.
- LIFO: The new most-recent queuing; listeners are inserted at the front.

LIFO queuing is beneficial for cache-efficiency with workloads that are
tolerant of starvation. The same listener is repeatedly drawn from the list
until the load dictates additional listeners be drawn from the list. These
listeners expand outward as a "hot set" for optimal reuse of resources rather
than continuously drawing from the coldest resources in a FIFO schedule.

Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Dec 25, 2024
1 parent afe606f commit 80957df
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 41 deletions.
89 changes: 52 additions & 37 deletions src/intrusive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::Ordering;
use crate::sync::cell::{Cell, UnsafeCell};
use crate::{RegisterResult, State, TaskRef};
use crate::{RegisterResult, State, TaskRef, Sched};

#[cfg(feature = "critical-section")]
use core::cell::RefCell;
Expand Down Expand Up @@ -42,17 +42,21 @@ struct Inner<T> {

/// The number of notified listeners.
notified: usize,

/// Schedule by which the list is organized.
sched: Sched,
}

impl<T> List<T> {
/// Create a new, empty event listener list.
pub(super) fn new() -> Self {
pub(super) fn new(sched: Sched) -> Self {
let inner = Inner {
head: None,
tail: None,
next: None,
len: 0,
notified: 0,
sched,
};

#[cfg(feature = "critical-section")]
Expand Down Expand Up @@ -149,39 +153,9 @@ impl<T> crate::Inner<T> {
})
}

/// Add a new listener to the list.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
self.with_inner(|inner| {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}

// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
inner.next = inner.tail;
}

// Bump the entry count.
inner.len += 1;
});
/// Adds a listener to the list.
pub(crate) fn insert(&self, listener: Pin<&mut Option<Listener<T>>>) {
self.with_inner(|inner| inner.insert(listener))
}

/// Remove a listener from the list.
Expand Down Expand Up @@ -248,6 +222,47 @@ impl<T> crate::Inner<T> {
}

impl<T> Inner<T> {
fn insert(&mut self, mut listener: Pin<&mut Option<Listener<T>>>) {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(self.tail.filter(|_| self.sched == Sched::Fifo)),
next: Cell::new(self.head.filter(|_| self.sched == Sched::Lifo)),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the head or tail with the new entry.
let replacing = match self.sched {
Sched::Lifo => &mut self.head,
Sched::Fifo => &mut self.tail,
};

match mem::replace(replacing, Some(entry.into())) {
None => *replacing = Some(entry.into()),
Some(t) if self.sched == Sched::Lifo => unsafe { t.as_ref().prev.set(Some(entry.into())) },
Some(t) if self.sched == Sched::Fifo => unsafe { t.as_ref().next.set(Some(entry.into())) },
Some(_) => unimplemented!("unimplemented scheduling type"),
};
}

// If there are no unnotified entries, or if using LIFO schedule, this is the first one.
if self.sched == Sched::Lifo {
self.next = self.head;
} else if self.next.is_none() {
self.next = self.tail;
}

// Bump the entry count.
self.len += 1;
}

fn remove(
&mut self,
mut listener: Pin<&mut Option<Listener<T>>>,
Expand Down Expand Up @@ -413,7 +428,7 @@ mod tests {

#[test]
fn insert() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(Sched::Fifo);
make_listeners!(listen1, listen2, listen3);

// Register the listeners.
Expand All @@ -434,7 +449,7 @@ mod tests {

#[test]
fn drop_non_notified() {
let inner = crate::Inner::new();
let inner = crate::Inner::new(Sched::Fifo);
make_listeners!(listen1, listen2, listen3);

// Register the listeners.
Expand Down
46 changes: 42 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ use sync::WithMut;
use notify::NotificationPrivate;
pub use notify::{IntoNotification, Notification};

/// Queue schedule of a listener.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Sched {
/// First-in-first-out listeners are added to the back of the list.
Fifo,

/// Last-in-first-out listeners are added to the front of the list.
Lifo,
}

/// Inner state of [`Event`].
struct Inner<T> {
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
Expand All @@ -143,10 +153,10 @@ struct Inner<T> {
}

impl<T> Inner<T> {
fn new() -> Self {
fn new(sched: Sched) -> Self {
Self {
notified: AtomicUsize::new(usize::MAX),
list: sys::List::new(),
list: sys::List::new(sched),
}
}
}
Expand Down Expand Up @@ -177,6 +187,9 @@ pub struct Event<T = ()> {
/// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
/// reference count.
inner: AtomicPtr<Inner<T>>,

/// Queue schedule type.
sched: Sched,
}

unsafe impl<T: Send> Send for Event<T> {}
Expand Down Expand Up @@ -238,13 +251,15 @@ impl<T> Event<T> {
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
sched: Sched::Fifo,
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
sched: Sched::Fifo,
}
}

Expand Down Expand Up @@ -471,7 +486,7 @@ impl<T> Event<T> {
// If this is the first use, initialize the state.
if inner.is_null() {
// Allocate the state on the heap.
let new = Arc::new(Inner::<T>::new());
let new = Arc::new(Inner::<T>::new(self.sched));

// Convert the state to a raw pointer.
let new = Arc::into_raw(new) as *mut Inner<T>;
Expand Down Expand Up @@ -556,16 +571,39 @@ impl Event<()> {
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self::new_with_sched(Sched::Fifo)
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self::new_with_sched(Sched::Fifo)
}

/// Creates a new [`Event`] with queue schedule type.
///
/// # Examples
///
/// ```
/// use event_listener::{Event, Sched};
///
/// let event = Event::new_with_sched(Sched::Fifo);
/// ```
#[inline]
#[cfg(not(loom))]
pub const fn new_with_sched(sched: Sched) -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
sched,
}
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
pub fn new_with_sched(sched: Sched) -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
sched,
}
}

Expand Down

0 comments on commit 80957df

Please sign in to comment.