Skip to content

Commit

Permalink
breaking: Fix the EventListener::new() footgun
Browse files Browse the repository at this point in the history
This is a breaking change. It makes `new()` take no parameters in its
signature and `listen()` take a reference to an event in its signature.
This should avoid a footgun where a listener can be waited on without
listening on it.

Closes #91

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Nov 15, 2023
1 parent e6ec597 commit 21b34bf
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 51 deletions.
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ const COUNT: usize = 8000;
fn bench_events(c: &mut Criterion) {
c.bench_function("notify_and_wait", |b| {
let ev = Event::new();
let mut handles = iter::repeat_with(|| EventListener::new(&ev))
let mut handles = iter::repeat_with(EventListener::new)
.take(COUNT)
.collect::<Vec<_>>();

b.iter(|| {
for handle in &mut handles {
// SAFETY: The handle is not moved out.
let listener = unsafe { Pin::new_unchecked(handle) };
listener.listen();
listener.listen(&ev);
}

ev.notify(COUNT);
Expand Down
114 changes: 70 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ impl<T> Event<T> {
/// use event_listener::{Event, EventListener};
///
/// let event = Event::new();
/// let mut listener = Box::pin(EventListener::new(&event));
/// listener.as_mut().listen();
/// let mut listener = Box::pin(EventListener::new());
/// listener.as_mut().listen(&event);
/// ```
///
/// It creates a new listener, pins it to the heap, and inserts it into the linked list
Expand All @@ -280,8 +280,8 @@ impl<T> Event<T> {
/// [`EventListener`] is `listen`ing before waiting on it; panics may occur otherwise.
#[cold]
pub fn listen(&self) -> Pin<Box<EventListener<T>>> {
let mut listener = Box::pin(EventListener::new(self));
listener.as_mut().listen();
let mut listener = Box::pin(EventListener::new());
listener.as_mut().listen(self);
listener
}

Expand Down Expand Up @@ -711,7 +711,7 @@ pin_project_lite::pin_project! {
/// }
/// });
///
/// let listener = EventListener::new(&event);
/// let listener = EventListener::new();
///
/// // Make sure that the event listener is pinned before doing anything else.
/// //
Expand All @@ -731,7 +731,7 @@ pin_project_lite::pin_project! {
/// } else {
/// // We need to insert ourselves into the list. Since this insertion is an atomic
/// // operation, we should check the flag again before waiting.
/// listener.as_mut().listen();
/// listener.as_mut().listen(&event);
/// }
/// }
/// ```
Expand Down Expand Up @@ -764,6 +764,12 @@ unsafe impl<T: Send> Sync for EventListener<T> {}
impl<T> core::panic::UnwindSafe for EventListener<T> {}
impl<T> core::panic::RefUnwindSafe for EventListener<T> {}

impl<T> Default for EventListener<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> fmt::Debug for EventListener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventListener")
Expand All @@ -785,29 +791,34 @@ impl<T> EventListener<T> {
/// use event_listener::{Event, EventListener};
///
/// let event = Event::new();
/// let listener = EventListener::new(&event);
/// let listener = EventListener::new();
///
/// // Make sure that the listener is pinned and listening before doing anything else.
/// let mut listener = Box::pin(listener);
/// listener.as_mut().listen();
/// listener.as_mut().listen(&event);
/// ```
pub fn new(event: &Event<T>) -> Self {
let inner = event.inner();

let listener = Listener {
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
listener: None,
};

Self { listener }
pub fn new() -> Self {
Self {
listener: Listener {
event: None,
listener: None,
},
}
}

/// Register this listener into the given [`Event`].
///
/// This method can only be called after the listener has been pinned, and must be called before
/// the listener is polled.
pub fn listen(self: Pin<&mut Self>) {
self.listener().insert();
pub fn listen(mut self: Pin<&mut Self>, event: &Event<T>) {
let inner = {
let inner = event.inner();
unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }
};

let ListenerProject { event, listener } = self.as_mut().project().listener.project();
let inner = event.insert(inner);
inner.insert(listener);

// Make sure the listener is registered before whatever happens next.
notify::full_fence();
Expand All @@ -821,13 +832,13 @@ impl<T> EventListener<T> {
/// use event_listener::{Event, EventListener};
///
/// let event = Event::new();
/// let mut listener = Box::pin(EventListener::new(&event));
/// let mut listener = Box::pin(EventListener::new());
///
/// // The listener starts off not listening.
/// assert!(!listener.is_listening());
///
/// // After listen() is called, the listener is listening.
/// listener.as_mut().listen();
/// listener.as_mut().listen(&event);
/// assert!(listener.is_listening());
///
/// // Once the future is notified, the listener is no longer listening.
Expand Down Expand Up @@ -922,7 +933,7 @@ impl<T> EventListener<T> {
/// assert!(!listener2.as_mut().discard());
/// ```
pub fn discard(self: Pin<&mut Self>) -> bool {
self.listener().discard()
self.project().listener.discard()
}

/// Returns `true` if this listener listens to the given `Event`.
Expand All @@ -939,7 +950,11 @@ impl<T> EventListener<T> {
/// ```
#[inline]
pub fn listens_to(&self, event: &Event<T>) -> bool {
ptr::eq::<Inner<T>>(&**self.inner(), event.inner.load(Ordering::Acquire))
if let Some(inner) = &self.listener.event {
return ptr::eq::<Inner<T>>(&**inner, event.inner.load(Ordering::Acquire));
}

false
}

/// Returns `true` if both listeners listen to the same `Event`.
Expand All @@ -956,15 +971,19 @@ impl<T> EventListener<T> {
/// assert!(listener1.same_event(&listener2));
/// ```
pub fn same_event(&self, other: &EventListener<T>) -> bool {
ptr::eq::<Inner<T>>(&**self.inner(), &**other.inner())
if let (Some(inner1), Some(inner2)) = (self.inner(), other.inner()) {
return ptr::eq::<Inner<T>>(&**inner1, &**inner2);
}

false
}

fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener<T, Arc<Inner<T>>>> {
self.project().listener
}

fn inner(&self) -> &Arc<Inner<T>> {
&self.listener.event
fn inner(&self) -> Option<&Arc<Inner<T>>> {
self.listener.event.as_ref()
}
}

Expand All @@ -978,14 +997,18 @@ impl<T> Future for EventListener<T> {

pin_project_lite::pin_project! {
#[project(!Unpin)]
#[project = ListenerProject]
struct Listener<T, B: Borrow<Inner<T>>>
where
B: Unpin,
{
// The reference to the original event.
event: B,
event: Option<B>,

// The inner state of the listener.
//
// This is only ever `None` during initialization. After `listen()` has completed, this
// should be `Some`.
#[pin]
listener: Option<sys::Listener<T>>,
}
Expand All @@ -997,9 +1020,9 @@ pin_project_lite::pin_project! {
fn drop(mut this: Pin<&mut Self>) {
// If we're being dropped, we need to remove ourself from the list.
let this = this.project();
let inner = (*this.event).borrow();

inner.remove(this.listener, true);
if let Some(inner) = this.event {
(*inner).borrow().remove(this.listener, true);
}
}
}
}
Expand All @@ -1008,14 +1031,6 @@ unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for Listener<T, B>
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for Listener<T, B> {}

impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
/// Register this listener with the event.
fn insert(self: Pin<&mut Self>) {
let this = self.project();
let inner = (*this.event).borrow();

inner.insert(this.listener);
}

/// Wait until the provided deadline.
#[cfg(all(feature = "std", not(target_family = "wasm")))]
fn wait_internal(mut self: Pin<&mut Self>, deadline: Option<Instant>) -> Option<T> {
Expand Down Expand Up @@ -1059,7 +1074,11 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
unparker: TaskRef<'_>,
) -> Option<T> {
let mut this = self.project();
let inner = (*this.event).borrow();
let inner = (*this
.event
.as_ref()
.expect("must listen() on event listener before waiting"))
.borrow();

// Set the listener's state to `Task`.
if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
Expand Down Expand Up @@ -1096,17 +1115,24 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
/// active listener.
fn discard(self: Pin<&mut Self>) -> bool {
let this = self.project();
let inner = (*this.event).borrow();

inner
.remove(this.listener, false)
.map_or(false, |state| state.is_notified())
if let Some(inner) = this.event.as_ref() {
(*inner)
.borrow()
.remove(this.listener, false)
.map_or(false, |state| state.is_notified())
} else {
false
}
}

/// Poll this listener for a notification.
fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut this = self.project();
let inner = (*this.event).borrow();
let inner = match &this.event {
Some(inner) => (*inner).borrow(),
None => panic!(""),
};

// Try to register the listener.
match inner
Expand Down
5 changes: 0 additions & 5 deletions src/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,11 @@ impl<T> crate::Inner<T> {
}

/// Add a new listener to the list.
///
/// Does nothing is the listener is already registered.
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
if listener.is_some() {
return;
}
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
Expand Down

0 comments on commit 21b34bf

Please sign in to comment.