diff --git a/Cargo.toml b/Cargo.toml index 6aea0b6..1ce1fd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "rust/task/irq", "rust/task/affinity", "rust/task/wait_queue", + "rust/task/sync", ] exclude = [".arceos"] diff --git a/rust/task/sync/Cargo.toml b/rust/task/sync/Cargo.toml new file mode 100644 index 0000000..86f4999 --- /dev/null +++ b/rust/task/sync/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "arceos-sync" +version = "0.1.0" +edition = "2021" +authors = ["Keyang Hu "] +description = "A simple demo to test the task synchronization mechanisms provided by ArceOS" + +[features] +sched_rr = ["axstd?/sched_rr"] +sched_cfs = ["axstd?/sched_cfs"] + +[dependencies] +axstd = { workspace = true, features = ["alloc", "multitask", "irq"], optional = true } + +rand = { version = "0.8", default-features = false, features = ["small_rng"] } \ No newline at end of file diff --git a/rust/task/sync/expect_info_smp1_fifo.out b/rust/task/sync/expect_info_smp1_fifo.out new file mode 100644 index 0000000..73b3fbf --- /dev/null +++ b/rust/task/sync/expect_info_smp1_fifo.out @@ -0,0 +1,32 @@ +smp = 1 +build_mode = release +log_level = info + +Primary CPU 0 started, +Found physcial memory regions: + .text (READ | EXECUTE | RESERVED) + .rodata (READ | RESERVED) + .data .tdata .tbss .percpu (READ | WRITE | RESERVED) + .percpu (READ | WRITE | RESERVED) + boot stack (READ | WRITE | RESERVED) + .bss (READ | WRITE | RESERVED) + free memory (READ | WRITE | FREE) +Initialize global memory allocator... +Initialize platform devices... +Initialize scheduling... + use FIFO scheduler. +Initialize interrupt handlers... +Primary CPU 0 init OK. +Hello, synchronization mechanisms test for ArceOS! +Mutex test... +Mutex test ok +Condvar test... +Condvar test ok +Barrier test... +Barrier test OK +RwLock test... +RwLock test ok +Semaphore test... +Semaphore test ok +All synchronization mechanisms provided by ArceOS seem to work fine, enjoy! +Shutting down... diff --git a/rust/task/sync/expect_info_smp4_cfs.out b/rust/task/sync/expect_info_smp4_cfs.out new file mode 100644 index 0000000..09b6316 --- /dev/null +++ b/rust/task/sync/expect_info_smp4_cfs.out @@ -0,0 +1,38 @@ +smp = 4 +build_mode = release +log_level = info + +CPU 0 started +Found physcial memory regions: + .text (READ | EXECUTE | RESERVED) + .rodata (READ | RESERVED) + .data .tdata .tbss .percpu (READ | WRITE | RESERVED) + .percpu (READ | WRITE | RESERVED) + boot stack (READ | WRITE | RESERVED) + .bss (READ | WRITE | RESERVED) + free memory (READ | WRITE | FREE) +Initialize global memory allocator... +Initialize platform devices... +Initialize scheduling... + use Completely Fair scheduler. +Initialize interrupt handlers... +CPU 0 init OK +CPU 1 started +CPU 2 started +CPU 3 started +CPU 1 init OK +CPU 2 init OK +CPU 3 init OK +Hello, synchronization mechanisms test for ArceOS! +Mutex test... +Mutex test ok +Condvar test... +Condvar test ok +Barrier test... +Barrier test OK +RwLock test... +RwLock test ok +Semaphore test... +Semaphore test ok +All synchronization mechanisms provided by ArceOS seem to work fine, enjoy! +Shutting down... diff --git a/rust/task/sync/expect_info_smp4_fifo.out b/rust/task/sync/expect_info_smp4_fifo.out new file mode 100644 index 0000000..4a2d129 --- /dev/null +++ b/rust/task/sync/expect_info_smp4_fifo.out @@ -0,0 +1,38 @@ +smp = 4 +build_mode = release +log_level = info + +CPU 0 started +Found physcial memory regions: + .text (READ | EXECUTE | RESERVED) + .rodata (READ | RESERVED) + .data .tdata .tbss .percpu (READ | WRITE | RESERVED) + .percpu (READ | WRITE | RESERVED) + boot stack (READ | WRITE | RESERVED) + .bss (READ | WRITE | RESERVED) + free memory (READ | WRITE | FREE) +Initialize global memory allocator... +Initialize platform devices... +Initialize scheduling... + use FIFO scheduler. +Initialize interrupt handlers... +CPU 0 init OK +CPU 1 started +CPU 2 started +CPU 3 started +CPU 1 init OK +CPU 2 init OK +CPU 3 init OK +Hello, synchronization mechanisms test for ArceOS! +Mutex test... +Mutex test ok +Condvar test... +Condvar test ok +Barrier test... +Barrier test OK +RwLock test... +RwLock test ok +Semaphore test... +Semaphore test ok +All synchronization mechanisms provided by ArceOS seem to work fine, enjoy! +Shutting down... diff --git a/rust/task/sync/expect_info_smp4_rr.out b/rust/task/sync/expect_info_smp4_rr.out new file mode 100644 index 0000000..00d7695 --- /dev/null +++ b/rust/task/sync/expect_info_smp4_rr.out @@ -0,0 +1,38 @@ +smp = 4 +build_mode = release +log_level = info + +CPU 0 started +Found physcial memory regions: + .text (READ | EXECUTE | RESERVED) + .rodata (READ | RESERVED) + .data .tdata .tbss .percpu (READ | WRITE | RESERVED) + .percpu (READ | WRITE | RESERVED) + boot stack (READ | WRITE | RESERVED) + .bss (READ | WRITE | RESERVED) + free memory (READ | WRITE | FREE) +Initialize global memory allocator... +Initialize platform devices... +Initialize scheduling... + use Round-robin scheduler. +Initialize interrupt handlers... +CPU 0 init OK +CPU 1 started +CPU 2 started +CPU 3 started +CPU 1 init OK +CPU 2 init OK +CPU 3 init OK +Hello, synchronization mechanisms test for ArceOS! +Mutex test... +Mutex test ok +Condvar test... +Condvar test ok +Barrier test... +Barrier test OK +RwLock test... +RwLock test ok +Semaphore test... +Semaphore test ok +All synchronization mechanisms provided by ArceOS seem to work fine, enjoy! +Shutting down... diff --git a/rust/task/sync/src/barrier.rs b/rust/task/sync/src/barrier.rs new file mode 100644 index 0000000..d7c01b1 --- /dev/null +++ b/rust/task/sync/src/barrier.rs @@ -0,0 +1,76 @@ +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Barrier}; +use std::thread; +use std::vec::Vec; + +const NUM_TASKS: u32 = 10; +const NUM_ITERS: u32 = 100; + +fn test_barrier_rendezvous() { + static BARRIER: Barrier = Barrier::new(NUM_TASKS as usize); + + let mut join_handlers = Vec::new(); + + fn rendezvous() { + for _ in 0..NUM_ITERS { + BARRIER.wait(); + } + } + + for _ in 0..NUM_TASKS { + join_handlers.push(thread::spawn(rendezvous)); + } + + // Wait for all threads to finish. + for join_handler in join_handlers { + join_handler.join().unwrap(); + } +} + +fn test_wait_result() { + static LEADER_FOUND: AtomicBool = AtomicBool::new(false); + static FINISHED_TASKS: AtomicUsize = AtomicUsize::new(0); + + let barrier = Arc::new(Barrier::new(NUM_TASKS as _)); + + let mut join_handlers = Vec::new(); + + for _ in 0..NUM_TASKS - 1 { + let c = barrier.clone(); + join_handlers.push(thread::spawn(move || { + let is_leader = c.wait().is_leader(); + if is_leader { + LEADER_FOUND.store(true, Ordering::SeqCst); + } + FINISHED_TASKS.fetch_add(1, Ordering::SeqCst); + })); + } + + // At this point, all spawned threads should be blocked, + // so we shouldn't get a true value from `LEADER_FOUND`. + assert!(!LEADER_FOUND.load(Ordering::Acquire)); + + let leader_found = barrier.wait().is_leader(); + if leader_found { + LEADER_FOUND.store(true, Ordering::SeqCst); + } + + // Wait for all threads to finish. + for join_handler in join_handlers { + join_handler.join().unwrap(); + } + + assert_eq!( + FINISHED_TASKS.load(Ordering::Relaxed), + NUM_TASKS as usize - 1 + ); + // Now, the barrier is cleared and we should get true from `LEADER_FOUND`. + assert!(LEADER_FOUND.load(Ordering::Relaxed)); +} + +pub fn test_barrier() { + println!("Barrier test..."); + thread::spawn(|| test_barrier_rendezvous()).join().unwrap(); + thread::spawn(|| test_wait_result()).join().unwrap(); + println!("Barrier test OK"); +} diff --git a/rust/task/sync/src/condvar.rs b/rust/task/sync/src/condvar.rs new file mode 100644 index 0000000..399b41e --- /dev/null +++ b/rust/task/sync/src/condvar.rs @@ -0,0 +1,190 @@ +use core::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; +use std::time::Duration; + +const NUM_TASKS: usize = 100; + +fn notify_one() { + let m = Arc::new(Mutex::new(())); + let m2 = m.clone(); + let c = Arc::new(Condvar::new()); + let c2 = c.clone(); + + let g = m.lock(); + let _t = thread::spawn(move || { + let _g = m2.lock(); + c2.notify_one(); + }); + let g = c.wait(g); + drop(g); +} + +fn notify_all() { + static FINISHED_TASKS: AtomicUsize = AtomicUsize::new(0); + + let data = Arc::new((Mutex::new(0), Condvar::new())); + for _ in 0..NUM_TASKS { + let data = data.clone(); + thread::spawn(move || { + let &(ref lock, ref cond) = &*data; + let mut cnt = lock.lock(); + *cnt += 1; + while *cnt != 0 { + cnt = cond.wait(cnt); + } + FINISHED_TASKS.fetch_add(1, Ordering::SeqCst); + }); + } + + let &(ref lock, ref cond) = &*data; + loop { + let mut cnt = lock.lock(); + if *cnt == NUM_TASKS { + *cnt = 0; + cond.notify_all(); + break; + } + drop(cnt); + // Note: on FIFO scheduler, "preempt" is not enabled, + // yield manually to avoid deadlock. + #[cfg(all(not(feature = "sched_rr"), not(feature = "sched_cfs")))] + thread::yield_now(); + } + + while FINISHED_TASKS.load(Ordering::SeqCst) < NUM_TASKS { + // Note: on FIFO scheduler, "preempt" is not enabled, + // yield manually to avoid deadlock. + #[cfg(all(not(feature = "sched_rr"), not(feature = "sched_cfs")))] + thread::yield_now(); + } +} + +fn wait_while() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // Inside of our lock, spawn a new thread, and then wait for it to start. + thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock(); + *started = true; + // We notify the condvar that the value has changed. + cvar.notify_one(); + }); + + // Wait for the thread to start up. + let &(ref lock, ref cvar) = &*pair; + let guard = cvar.wait_while(lock.lock(), |started| !*started); + assert!(*guard); +} + +fn wait_timeout_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock(); + let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not timeout + if !no_timeout.timed_out() { + continue; + } + + break; + } +} + +fn wait_timeout_while_wait() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(1), |_| true); + // no spurious wakeups. ensure it timed-out + assert!(wait.timed_out()); +} + +fn wait_timeout_while_instant_satisfy() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + let g = m.lock(); + let (_g, wait) = c.wait_timeout_while(g, Duration::from_millis(0), |_| false); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); +} + +fn wait_timeout_while_wake() { + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair_copy = pair.clone(); + + let &(ref m, ref c) = &*pair; + let g = m.lock(); + let _t = thread::spawn(move || { + let &(ref lock, ref cvar) = &*pair_copy; + let mut started = lock.lock(); + thread::sleep(Duration::from_millis(1)); + *started = true; + cvar.notify_one(); + }); + let (g2, wait) = c.wait_timeout_while(g, Duration::from_millis(u64::MAX), |&mut notified| { + !notified + }); + // ensure it didn't time-out even if we were not given any time. + assert!(!wait.timed_out()); + assert!(*g2); +} + +fn wait_timeout_wake() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock(); + + let c2 = c.clone(); + let m2 = m.clone(); + + let notified = Arc::new(AtomicBool::new(false)); + let notified_copy = notified.clone(); + + let t = thread::spawn(move || { + let _g = m2.lock(); + thread::sleep(Duration::from_millis(1)); + notified_copy.store(true, Ordering::Relaxed); + c2.notify_one(); + }); + let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)); + assert!(!timeout_res.timed_out()); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not notified + if !notified.load(Ordering::Relaxed) { + t.join().unwrap(); + continue; + } + drop(g); + + t.join().unwrap(); + + break; + } +} + +pub fn test_condvar() { + println!("Condvar test..."); + thread::spawn(|| notify_one()).join().unwrap(); + thread::spawn(|| notify_all()).join().unwrap(); + thread::spawn(|| wait_while()).join().unwrap(); + thread::spawn(|| wait_timeout_wait()).join().unwrap(); + thread::spawn(|| wait_timeout_while_wait()).join().unwrap(); + thread::spawn(|| wait_timeout_while_instant_satisfy()) + .join() + .unwrap(); + thread::spawn(|| wait_timeout_while_wake()).join().unwrap(); + thread::spawn(|| wait_timeout_wake()).join().unwrap(); + + println!("Condvar test ok"); +} diff --git a/rust/task/sync/src/main.rs b/rust/task/sync/src/main.rs new file mode 100644 index 0000000..cd3c4b4 --- /dev/null +++ b/rust/task/sync/src/main.rs @@ -0,0 +1,29 @@ +#![cfg_attr(feature = "axstd", no_std)] +#![cfg_attr(feature = "axstd", no_main)] + +#[macro_use] +#[cfg(feature = "axstd")] +extern crate axstd as std; + +use std::thread; + +mod barrier; +mod condvar; +mod mutex; +mod rwlock; +mod semaphore; + +#[cfg_attr(feature = "axstd", no_mangle)] +fn main() { + println!("Hello, synchronization mechanisms test for ArceOS!"); + + thread::spawn(|| mutex::test_mutex()).join().unwrap(); + thread::spawn(|| condvar::test_condvar()).join().unwrap(); + thread::spawn(|| barrier::test_barrier()).join().unwrap(); + thread::spawn(|| rwlock::test_rwlock()).join().unwrap(); + thread::spawn(|| semaphore::test_semaphore()) + .join() + .unwrap(); + + println!("All synchronization mechanisms provided by ArceOS seem to work fine, enjoy!"); +} diff --git a/rust/task/sync/src/mutex.rs b/rust/task/sync/src/mutex.rs new file mode 100644 index 0000000..7e6ffee --- /dev/null +++ b/rust/task/sync/src/mutex.rs @@ -0,0 +1,43 @@ +use std::sync::{Arc, Mutex}; +use std::thread; + +pub fn test_mutex() { + const NUM_ITERS: u32 = 1000; + const NUM_TASKS: u32 = 100; + println!("Mutex test..."); + + let m = Arc::new(Mutex::new(0)); + + fn inc(m: &Mutex, val: u32) { + for _ in 0..NUM_ITERS { + *m.lock() += val; + } + } + + for _ in 0..NUM_TASKS { + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2, 1); + }); + let m2 = m.clone(); + thread::spawn(move || { + inc(&m2, 2); + }); + } + + loop { + let val = m.lock(); + if *val == NUM_ITERS * NUM_TASKS * 3 { + break; + } + drop(val); + + // Note: on FIFO scheduler, "preempt" is not enabled, + // yield manually to avoid deadlock. + #[cfg(all(not(feature = "sched_rr"), not(feature = "sched_cfs")))] + thread::yield_now(); + } + assert_eq!(*m.lock(), NUM_ITERS * NUM_TASKS * 3); + + println!("Mutex test ok"); +} diff --git a/rust/task/sync/src/rwlock.rs b/rust/task/sync/src/rwlock.rs new file mode 100644 index 0000000..d886b8b --- /dev/null +++ b/rust/task/sync/src/rwlock.rs @@ -0,0 +1,93 @@ +use crate::thread; +use core::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::vec::Vec; + +use rand::{rngs::SmallRng, Rng, SeedableRng}; + +pub(crate) fn test_rng() -> SmallRng { + SmallRng::seed_from_u64(0xdead_beef) +} + +fn frob() { + const N: u32 = 10; + const M: usize = 1000; + + static FINISHED_TASKS: AtomicUsize = AtomicUsize::new(0); + + let r = Arc::new(RwLock::new(())); + + for _ in 0..N { + let r = r.clone(); + thread::spawn(move || { + let mut rng = test_rng(); + for _ in 0..M { + if rng.gen_bool(1.0 / (N as f64)) { + drop(r.write()); + } else { + drop(r.read()); + } + } + FINISHED_TASKS.fetch_add(1, Ordering::SeqCst); + }); + } + + while FINISHED_TASKS.load(Ordering::Acquire) < N as usize { + // Note: on FIFO scheduler, "preempt" is not enabled, + // yield manually to avoid deadlock. + #[cfg(all(not(feature = "sched_rr"), not(feature = "sched_cfs")))] + thread::yield_now(); + } +} + +fn test_rw_arc() { + let arc = Arc::new(RwLock::new(0)); + let arc2 = arc.clone(); + + static WRITER_FINISHED: AtomicBool = AtomicBool::new(false); + + thread::spawn(move || { + let mut lock = arc2.write(); + for _ in 0..10 { + let tmp = *lock; + *lock = -1; + thread::yield_now(); + *lock = tmp + 1; + } + WRITER_FINISHED.store(true, Ordering::Release); + }); + + // Readers try to catch the writer in the act + let mut children = Vec::new(); + for _ in 0..5 { + let arc3 = arc.clone(); + children.push(thread::spawn(move || { + let lock = arc3.read(); + assert!(*lock >= 0); + })); + } + + // Wait for children to pass their asserts + for r in children { + assert!(r.join().is_ok()); + } + + // Wait for writer to finish + while WRITER_FINISHED.load(Ordering::Acquire) == false { + // Note: on FIFO scheduler, "preempt" is not enabled, + // yield manually to avoid deadlock. + #[cfg(all(not(feature = "sched_rr"), not(feature = "sched_cfs")))] + thread::yield_now(); + } + + let lock = arc.read(); + assert_eq!(*lock, 10); +} + +pub fn test_rwlock() { + println!("RwLock test..."); + thread::spawn(|| frob()).join().unwrap(); + thread::spawn(|| test_rw_arc()).join().unwrap(); + println!("RwLock test ok"); +} diff --git a/rust/task/sync/src/semaphore.rs b/rust/task/sync/src/semaphore.rs new file mode 100644 index 0000000..d1074f3 --- /dev/null +++ b/rust/task/sync/src/semaphore.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::sync::Semaphore; +use std::thread; +use std::vec::Vec; + +const NUM_ITERS: u32 = 1000; +const NUM_TASKS: u32 = 100; + +fn test_sem_as_mutex() { + let s = Arc::new(Semaphore::new(1)); + let s2 = s.clone(); + + let mut join_handlers = Vec::new(); + + for _ in 0..NUM_TASKS { + let s2 = s2.clone(); + join_handlers.push(thread::spawn(move || { + for _ in 0..NUM_ITERS { + let _g = s2.access(); + } + })); + } + + drop(s.access()); + + for join_handler in join_handlers { + join_handler.join().unwrap(); + } +} + +fn test_sem_as_cvar() { + let mut join_handlers = Vec::new(); + + // Child waits and parent signals + let s = Arc::new(Semaphore::new(0)); + + for _ in 0..NUM_TASKS { + let s2 = s.clone(); + join_handlers.push(thread::spawn(move || { + s2.acquire(); + })); + } + + for _ in 0..NUM_TASKS { + s.release(); + // Note: on FIFO scheduler, "preempt" is not enabled, + // just yield manually. + #[cfg(all(not(feature = "sched_rr"), not(feature = "sched_cfs")))] + thread::yield_now(); + } + + // Wait for all child tasks to finish. + while let Some(join_handler) = join_handlers.pop() { + join_handler.join().unwrap(); + } + + // Parent waits and child signals + let s = Arc::new(Semaphore::new(0)); + + for _ in 0..NUM_TASKS { + let s2 = s.clone(); + join_handlers.push(thread::spawn(move || { + s2.release(); + })); + } + + for _ in 0..NUM_TASKS { + s.acquire(); + } + + // Wait for all child tasks to finish. + while let Some(join_handler) = join_handlers.pop() { + join_handler.join().unwrap(); + } +} + +pub fn test_semaphore() { + println!("Semaphore test..."); + + thread::spawn(|| test_sem_as_mutex()).join().unwrap(); + thread::spawn(|| test_sem_as_cvar()).join().unwrap(); + + println!("Semaphore test ok"); +} diff --git a/rust/task/sync/test_cmd b/rust/task/sync/test_cmd new file mode 100644 index 0000000..c341ea6 --- /dev/null +++ b/rust/task/sync/test_cmd @@ -0,0 +1,4 @@ +test_one "LOG=info" "expect_info_smp1_fifo.out" +test_one "SMP=4 LOG=info" "expect_info_smp4_fifo.out" +test_one "SMP=4 LOG=info FEATURES=sched_rr" "expect_info_smp4_rr.out" +test_one "SMP=4 LOG=info FEATURES=sched_cfs" "expect_info_smp4_cfs.out" diff --git a/scripts/app_test.sh b/scripts/app_test.sh index f4a339e..556f4cd 100755 --- a/scripts/app_test.sh +++ b/scripts/app_test.sh @@ -117,6 +117,7 @@ if [ -z "$1" ]; then "rust/task/irq" "rust/task/affinity" "rust/task/wait_queue" + "rust/task/sync" "rust/net/httpclient" "c/helloworld" "c/memtest"