diff --git a/Cargo.toml b/Cargo.toml index 907bcfe..a1b647a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" } maintenance = { status = "passively-maintained" } [dependencies] -atomic-option = "0.1" num_cpus = "1.6.2" parking_lot_core = "0.9" crossbeam-channel = "0.5" diff --git a/src/lib.rs b/src/lib.rs index 6471395..0881845 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -99,12 +99,13 @@ #![deny(missing_docs)] #![warn(rust_2018_idioms)] -use atomic_option::AtomicOption; use crossbeam_channel as mpsc; use parking_lot_core::SpinWait; use std::cell::UnsafeCell; +use std::marker::PhantomData; use std::ops::Deref; +use std::ptr; use std::sync::atomic; use std::sync::mpsc as std_mpsc; use std::sync::Arc; @@ -189,7 +190,7 @@ impl Seat { // we're the last reader, so we may need to notify the writer there's space in the buf. // can be relaxed, since the acquire at the top already guarantees that we'll see // updates. - waiting = self.waiting.take(atomic::Ordering::Relaxed); + waiting = self.waiting.take(); // since we're the last reader, no-one else will be cloning this value, so we can // safely take a mutable reference, and just take the val instead of cloning it. @@ -222,7 +223,7 @@ impl Default for Seat { fn default() -> Self { Seat { read: atomic::AtomicUsize::new(0), - waiting: AtomicOption::empty(), + waiting: AtomicOption::new(), state: MutSeatState(UnsafeCell::new(SeatState { max: 0, val: None })), } } @@ -380,7 +381,7 @@ impl Bus { // no, so block by parking and telling readers to notify on last read self.state.ring[fence] .waiting - .replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed); + .swap(Some(Box::new(thread::current()))); // need the atomic fetch_add to ensure reader threads will see the new .waiting self.state.ring[fence] @@ -418,7 +419,7 @@ impl Bus { let state = unsafe { &mut *next.state.get() }; state.max = readers; state.val = Some(val); - next.waiting.replace(None, atomic::Ordering::Relaxed); + next.waiting.take(); next.read.store(0, atomic::Ordering::Release); } self.rleft[tail] = 0; @@ -819,3 +820,47 @@ impl Iterator for BusIntoIter { self.0.recv().ok() } } + +struct AtomicOption { + ptr: atomic::AtomicPtr, + _marker: PhantomData>>, +} + +unsafe impl Send for AtomicOption {} +unsafe impl Sync for AtomicOption {} + +impl AtomicOption { + fn new() -> Self { + Self { + ptr: atomic::AtomicPtr::new(ptr::null_mut()), + _marker: PhantomData, + } + } + + fn swap(&self, val: Option>) -> Option> { + let new = match val { + Some(val) => Box::into_raw(val), + None => ptr::null_mut(), + }; + let old = self.ptr.swap(new, atomic::Ordering::AcqRel); + if old.is_null() { + None + } else { + // SAFETY: + // - AcqRel ensures that it does not read a pointer to potentially invalid memory. + // - We've checked that old is not null. + // - We do not store invalid pointers other than null in self.ptr. + Some(unsafe { Box::from_raw(old) }) + } + } + + fn take(&self) -> Option> { + self.swap(None) + } +} + +impl Drop for AtomicOption { + fn drop(&mut self) { + drop(self.take()); + } +}