Skip to content

Commit

Permalink
Adjust EventListener::new and listen calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Jules-Bertholet committed Nov 21, 2023
1 parent eb1d978 commit cc3efbb
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 26 deletions.
6 changes: 3 additions & 3 deletions src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Barrier {
BarrierWait::_new(BarrierWaitInner {
barrier: self,
lock: Some(self.state.lock()),
evl: EventListener::new(&self.event),
evl: EventListener::new(),
state: WaitState::Initial,
})
}
Expand Down Expand Up @@ -200,7 +200,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> {

if state.count < this.barrier.n {
// We need to wait for the event.
this.evl.as_mut().listen();
this.evl.as_mut().listen(&this.barrier.event);
*this.state = WaitState::Waiting { local_gen };
} else {
// We are the last one.
Expand Down Expand Up @@ -233,7 +233,7 @@ impl EventListenerFuture for BarrierWaitInner<'_> {

if *local_gen == state.generation_id && state.count < this.barrier.n {
// We need to wait for the event again.
this.evl.as_mut().listen();
this.evl.as_mut().listen(&this.barrier.event);
*this.state = WaitState::Waiting {
local_gen: *local_gen,
};
Expand Down
9 changes: 3 additions & 6 deletions src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,7 @@ impl<T: ?Sized, B: Borrow<Mutex<T>>> AcquireSlow<B, T> {
#[cold]
fn new(mutex: B) -> Self {
// Create a new instance of the listener.
let listener = {
let mutex = Borrow::<Mutex<T>>::borrow(&mutex);
EventListener::new(&mutex.lock_ops)
};
let listener = { EventListener::new() };

AcquireSlow {
mutex: Some(mutex),
Expand Down Expand Up @@ -532,7 +529,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow
loop {
// Start listening for events.
if !this.listener.is_listening() {
this.listener.as_mut().listen();
this.listener.as_mut().listen(&mutex.lock_ops);

// Try locking if nobody is being starved.
match mutex
Expand Down Expand Up @@ -596,7 +593,7 @@ impl<T: ?Sized, B: Unpin + Borrow<Mutex<T>>> EventListenerFuture for AcquireSlow
loop {
if !this.listener.is_listening() {
// Start listening for events.
this.listener.as_mut().listen();
this.listener.as_mut().listen(&mutex.lock_ops);

// Try locking if nobody else is being starved.
match mutex
Expand Down
12 changes: 6 additions & 6 deletions src/once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ impl<T> OnceCell<T> {
}

// Slow path: wait for the value to be initialized.
let listener = EventListener::new(&self.passive_waiters);
let listener = EventListener::new();
pin!(listener);
listener.as_mut().listen();
listener.as_mut().listen(&self.passive_waiters);

// Try again.
if let Some(value) = self.get() {
Expand Down Expand Up @@ -329,9 +329,9 @@ impl<T> OnceCell<T> {
}

// Slow path: wait for the value to be initialized.
let listener = EventListener::new(&self.passive_waiters);
let listener = EventListener::new();
pin!(listener);
listener.as_mut().listen();
listener.as_mut().listen(&self.passive_waiters);

// Try again.
if let Some(value) = self.get() {
Expand Down Expand Up @@ -591,7 +591,7 @@ impl<T> OnceCell<T> {
strategy: &mut impl for<'a> Strategy<'a>,
) -> Result<(), E> {
// The event listener we're currently waiting on.
let event_listener = EventListener::new(&self.active_initializers);
let event_listener = EventListener::new();
pin!(event_listener);

let mut closure = Some(closure);
Expand All @@ -614,7 +614,7 @@ impl<T> OnceCell<T> {
if event_listener.is_listening() {
strategy.wait(event_listener.as_mut()).await;
} else {
event_listener.as_mut().listen();
event_listener.as_mut().listen(&self.active_initializers);
}
}
State::Uninitialized => {
Expand Down
14 changes: 7 additions & 7 deletions src/rwlock/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl RawRwLock {
RawRead {
lock: self,
state: self.state.load(Ordering::Acquire),
listener: EventListener::new(&self.no_writer),
listener: EventListener::new(),
}
}

Expand Down Expand Up @@ -161,7 +161,7 @@ impl RawRwLock {
pub(super) fn write(&self) -> RawWrite<'_> {
RawWrite {
lock: self,
no_readers: EventListener::new(&self.no_readers),
no_readers: EventListener::new(),
state: WriteState::Acquiring {
lock: self.mutex.lock(),
},
Expand Down Expand Up @@ -193,7 +193,7 @@ impl RawRwLock {

RawUpgrade {
lock: Some(self),
listener: EventListener::new(&self.no_readers),
listener: EventListener::new(),
}
}

Expand Down Expand Up @@ -328,7 +328,7 @@ impl<'a> EventListenerFuture for RawRead<'a> {
} else {
// Start listening for "no writer" events.
let load_ordering = if !this.listener.is_listening() {
this.listener.as_mut().listen();
this.listener.as_mut().listen(&this.lock.no_writer);

// Make sure there really is no writer.
Ordering::SeqCst
Expand Down Expand Up @@ -473,7 +473,7 @@ impl<'a> EventListenerFuture for RawWrite<'a> {
}

// Start waiting for the readers to finish.
this.no_readers.as_mut().listen();
this.no_readers.as_mut().listen(&this.lock.no_readers);
this.state.as_mut().set(WriteState::WaitingReaders);
}

Expand All @@ -494,7 +494,7 @@ impl<'a> EventListenerFuture for RawWrite<'a> {
// Wait for the readers to finish.
if !this.no_readers.is_listening() {
// Register a listener.
this.no_readers.as_mut().listen();
this.no_readers.as_mut().listen(&this.lock.no_readers);
} else {
// Wait for the readers to finish.
ready!(strategy.poll(this.no_readers.as_mut(), cx));
Expand Down Expand Up @@ -559,7 +559,7 @@ impl<'a> EventListenerFuture for RawUpgrade<'a> {
// If there are readers, wait for them to finish.
if !this.listener.is_listening() {
// Start listening for "no readers" events.
this.listener.as_mut().listen();
this.listener.as_mut().listen(&lock.no_readers);
} else {
// Wait for the readers to finish.
ready!(strategy.poll(this.listener.as_mut(), cx));
Expand Down
8 changes: 4 additions & 4 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Semaphore {
pub fn acquire(&self) -> Acquire<'_> {
Acquire::_new(AcquireInner {
semaphore: self,
listener: EventListener::new(&self.event),
listener: EventListener::new(),
})
}

Expand Down Expand Up @@ -176,7 +176,7 @@ impl Semaphore {
pub fn acquire_arc(self: &Arc<Self>) -> AcquireArc {
AcquireArc {
semaphore: self.clone(),
listener: EventListener::new(&self.event),
listener: EventListener::new(),
}
}

Expand Down Expand Up @@ -245,7 +245,7 @@ impl<'a> EventListenerFuture for AcquireInner<'a> {
None => {
// Wait on the listener.
if !this.listener.is_listening() {
this.listener.as_mut().listen();
this.listener.as_mut().listen(&this.semaphore.event);
} else {
ready!(strategy.poll(this.listener.as_mut(), cx));
}
Expand Down Expand Up @@ -285,7 +285,7 @@ impl Future for AcquireArc {
None => {
// Wait on the listener.
if !this.listener.is_listening() {
this.listener.as_mut().listen();
this.listener.as_mut().listen(&this.semaphore.event);
} else {
ready!(this.listener.as_mut().poll(cx));
}
Expand Down

0 comments on commit cc3efbb

Please sign in to comment.