Skip to content

Commit

Permalink
Remove dependency on unsound atomic-option crate
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jun 17, 2022
1 parent eaed299 commit afef3c9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" }
maintenance = { status = "passively-maintained" }

[dependencies]
atomic-option = "0.1"
num_cpus = "1.6.2"
parking_lot_core = "0.9"
crossbeam-channel = "0.5"
Expand Down
55 changes: 50 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,13 @@
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]

use atomic_option::AtomicOption;
use crossbeam_channel as mpsc;
use parking_lot_core::SpinWait;

use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic;
use std::sync::mpsc as std_mpsc;
use std::sync::Arc;
Expand Down Expand Up @@ -189,7 +190,7 @@ impl<T: Clone + Sync> Seat<T> {
// we're the last reader, so we may need to notify the writer there's space in the buf.
// can be relaxed, since the acquire at the top already guarantees that we'll see
// updates.
waiting = self.waiting.take(atomic::Ordering::Relaxed);
waiting = self.waiting.take();

// since we're the last reader, no-one else will be cloning this value, so we can
// safely take a mutable reference, and just take the val instead of cloning it.
Expand Down Expand Up @@ -222,7 +223,7 @@ impl<T> Default for Seat<T> {
fn default() -> Self {
Seat {
read: atomic::AtomicUsize::new(0),
waiting: AtomicOption::empty(),
waiting: AtomicOption::new(),
state: MutSeatState(UnsafeCell::new(SeatState { max: 0, val: None })),
}
}
Expand Down Expand Up @@ -380,7 +381,7 @@ impl<T> Bus<T> {
// no, so block by parking and telling readers to notify on last read
self.state.ring[fence]
.waiting
.replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed);
.swap(Some(Box::new(thread::current())));

// need the atomic fetch_add to ensure reader threads will see the new .waiting
self.state.ring[fence]
Expand Down Expand Up @@ -418,7 +419,7 @@ impl<T> Bus<T> {
let state = unsafe { &mut *next.state.get() };
state.max = readers;
state.val = Some(val);
next.waiting.replace(None, atomic::Ordering::Relaxed);
next.waiting.take();
next.read.store(0, atomic::Ordering::Release);
}
self.rleft[tail] = 0;
Expand Down Expand Up @@ -819,3 +820,47 @@ impl<T: Clone + Sync> Iterator for BusIntoIter<T> {
self.0.recv().ok()
}
}

struct AtomicOption<T> {
ptr: atomic::AtomicPtr<T>,
_marker: PhantomData<Option<Box<T>>>,
}

unsafe impl<T: Send> Send for AtomicOption<T> {}
unsafe impl<T: Send> Sync for AtomicOption<T> {}

impl<T> AtomicOption<T> {
fn new() -> Self {
Self {
ptr: atomic::AtomicPtr::new(ptr::null_mut()),
_marker: PhantomData,
}
}

fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let new = match val {
Some(val) => Box::into_raw(val),
None => ptr::null_mut(),
};
let old = self.ptr.swap(new, atomic::Ordering::AcqRel);
if old.is_null() {
None
} else {
// SAFETY:
// - AcqRel ensures that it does not read a pointer to potentially invalid memory.
// - We've checked that old is not null.
// - We do not store invalid pointers other than null in self.ptr.
Some(unsafe { Box::from_raw(old) })
}
}

fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}

impl<T> Drop for AtomicOption<T> {
fn drop(&mut self) {
drop(self.take());
}
}

0 comments on commit afef3c9

Please sign in to comment.