diff --git a/tests/bounded.rs b/tests/bounded.rs index 2841c21..6f402b7 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -279,11 +279,93 @@ fn linearizable() { let q = ConcurrentQueue::bounded(THREADS); Parallel::new() - .each(0..THREADS, |_| { + .each(0..THREADS / 2, |_| { for _ in 0..COUNT { while q.push(0).is_err() {} q.pop().unwrap(); } }) + .each(0..THREADS / 2, |_| { + for _ in 0..COUNT { + if q.force_push(0).unwrap().is_none() { + q.pop().unwrap(); + } + } + }) + .run(); +} + +#[cfg(not(target_family = "wasm"))] +#[test] +fn spsc_ring_buffer() { + const COUNT: usize = if cfg!(miri) { 200 } else { 100_000 }; + + let t = AtomicUsize::new(1); + let q = ConcurrentQueue::<usize>::bounded(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + Parallel::new() + .add(|| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Ok(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }) + .add(|| { + for i in 0..COUNT { + if let Ok(Some(n)) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }) + .run(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), 1); + } +} + +#[cfg(not(target_family = "wasm"))] +#[test] +fn mpmc_ring_buffer() { + const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 }; + const THREADS: usize = 4; + + let t = AtomicUsize::new(THREADS); + let q = ConcurrentQueue::<usize>::bounded(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + Parallel::new() + .each(0..THREADS, |_| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Ok(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }) + .each(0..THREADS, |_| { + for i in 0..COUNT { + if let Ok(Some(n)) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }) .run(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } } diff --git a/tests/loom.rs b/tests/loom.rs index 793b597..77f99d4 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -1,6 +1,6 @@ #![cfg(loom)] -use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError}; use loom::sync::atomic::{AtomicUsize, Ordering}; use loom::sync::{Arc, Condvar, Mutex}; use loom::thread; @@ -115,9 +115,26 @@ impl<T> Sender<T> { } } } + + /// Send a value forcefully. + fn force_send(&self, value: T) -> Result<Option<T>, T> { + match self.channel.queue.force_push(value) { + Ok(bumped) => { + self.channel.push_event.signal(); + Ok(bumped) + } + + Err(ForcePushError(val)) => Err(val), + } + } } impl<T> Receiver<T> { + /// Channel capacity. + fn capacity(&self) -> Option<usize> { + self.channel.queue.capacity() + } + /// Receive a value. /// /// Returns an error if the channel is closed. @@ -248,3 +265,43 @@ fn spsc() { handle.join().unwrap(); }); } + +#[test] +fn spsc_force() { + run_test(|q, limit| { + // Create a new pair of senders/receivers. + let (tx, rx) = pair(q); + + // Push each onto a thread and run them. + let handle = thread::spawn(move || { + for i in 0..limit { + if tx.force_send(i).is_err() { + break; + } + } + }); + + let mut recv_values = vec![]; + + loop { + match rx.recv() { + Ok(value) => recv_values.push(value), + Err(()) => break, + } + } + + // Values may not be in order. + recv_values.sort_unstable(); + let cap = rx.capacity().unwrap_or(usize::MAX); + for (left, right) in (0..limit) + .rev() + .take(cap) + .zip(recv_values.into_iter().rev()) + { + assert_eq!(left, right); + } + + // Join the handle before we exit. + handle.join().unwrap(); + }); +} diff --git a/tests/single.rs b/tests/single.rs index 8d2a0d6..ec4b912 100644 --- a/tests/single.rs +++ b/tests/single.rs @@ -197,11 +197,93 @@ fn linearizable() { let q = ConcurrentQueue::bounded(1); Parallel::new() - .each(0..THREADS, |_| { + .each(0..THREADS / 2, |_| { for _ in 0..COUNT { while q.push(0).is_err() {} q.pop().unwrap(); } }) + .each(0..THREADS / 2, |_| { + for _ in 0..COUNT { + if q.force_push(0).unwrap().is_none() { + q.pop().unwrap(); + } + } + }) + .run(); +} + +#[cfg(not(target_family = "wasm"))] +#[test] +fn spsc_ring_buffer() { + const COUNT: usize = if cfg!(miri) { 200 } else { 100_000 }; + + let t = AtomicUsize::new(1); + let q = ConcurrentQueue::<usize>::bounded(1); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + Parallel::new() + .add(|| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Ok(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }) + .add(|| { + for i in 0..COUNT { + if let Ok(Some(n)) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }) + .run(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), 1); + } +} + +#[cfg(not(target_family = "wasm"))] +#[test] +fn mpmc_ring_buffer() { + const COUNT: usize = if cfg!(miri) { 100 } else { 25_000 }; + const THREADS: usize = 4; + + let t = AtomicUsize::new(THREADS); + let q = ConcurrentQueue::<usize>::bounded(1); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + Parallel::new() + .each(0..THREADS, |_| loop { + match t.load(Ordering::SeqCst) { + 0 if q.is_empty() => break, + + _ => { + while let Ok(n) = q.pop() { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + } + }) + .each(0..THREADS, |_| { + for i in 0..COUNT { + if let Ok(Some(n)) = q.force_push(i) { + v[n].fetch_add(1, Ordering::SeqCst); + } + } + + t.fetch_sub(1, Ordering::SeqCst); + }) .run(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } }