From 481939b9611cf7b8eabded9b7f325f56d84fa4e8 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Tue, 2 Jul 2019 08:26:19 +0200 Subject: [PATCH 1/9] Ignore CLion files. --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 58706af7c..274bf181a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ target *~ TAGS *.bk +.idea \ No newline at end of file From 55f235fdf10f49d21265bca2108a425c0f7775b6 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Tue, 2 Jul 2019 08:27:09 +0200 Subject: [PATCH 2/9] Make StackJob take a &mut latch. --- rayon-core/src/job.rs | 10 +++++----- rayon-core/src/join/mod.rs | 9 ++++----- rayon-core/src/registry.rs | 12 +++++++----- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index a47f4558d..c2f48b84f 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -64,24 +64,24 @@ impl JobRef { /// executes it need not free any heap data, the cleanup occurs when /// the stack frame is later popped. The function parameter indicates /// `true` if the job was stolen -- executed on a different thread. -pub(super) struct StackJob +pub(super) struct StackJob<'a, L, F, R> where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { - pub(super) latch: L, + pub(super) latch: &'a mut L, func: UnsafeCell>, result: UnsafeCell>, } -impl StackJob +impl<'a, L, F, R> StackJob<'a, L, F, R> where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { - pub(super) fn new(func: F, latch: L) -> StackJob { + pub(super) fn new(func: F, latch: &'a mut L) -> StackJob<'a, L, F, R> { StackJob { latch, func: UnsafeCell::new(Some(func)), @@ -102,7 +102,7 @@ where } } -impl Job for StackJob +impl<'a, L, F, R> Job for StackJob<'a, L, F, R> where L: Latch + Sync, F: FnOnce(bool) -> R + Send, diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index b5ec331f6..5b98884f8 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -120,13 +120,12 @@ where worker: worker_thread.index() }); + let mut latch = SpinLatch::new(); + // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. - let job_b = StackJob::new( - |migrated| oper_b(FnContext::new(migrated)), - SpinLatch::new(), - ); + let job_b = StackJob::new(|migrated| oper_b(FnContext::new(migrated)), &mut latch); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); @@ -165,7 +164,7 @@ where log!(LostJob { worker: worker_thread.index() }); - worker_thread.wait_until(&job_b.latch); + worker_thread.wait_until(job_b.latch); debug_assert!(job_b.latch.probe()); break; } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 76567c370..45830359d 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -9,7 +9,7 @@ use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch}; use log::Event::*; use sleep::Sleep; use std::any::Any; -use std::cell::Cell; +use std::cell::{Cell, RefCell}; use std::collections::hash_map::DefaultHasher; use std::fmt; use std::hash::Hasher; @@ -486,6 +486,8 @@ impl Registry { OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { + let mut latch = LockLatch::new(); + // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); let job = StackJob::new( @@ -494,7 +496,7 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - LockLatch::new(), + &mut latch, ); self.inject(&[job.as_job_ref()]); job.latch.wait(); @@ -510,17 +512,17 @@ impl Registry { // This thread is a member of a different pool, so let it process // other work while waiting for this `op` to complete. debug_assert!(current_thread.registry().id() != self.id()); - let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); + let mut latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - latch, + &mut latch, ); self.inject(&[job.as_job_ref()]); - current_thread.wait_until(&job.latch); + current_thread.wait_until(job.latch); job.into_result() } From bfc072f96bb9c5e8f0b68353c6ada755b8de8274 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Tue, 2 Jul 2019 09:00:02 +0200 Subject: [PATCH 3/9] Added TLS LockLatch with a reset() method. --- rayon-core/src/latch.rs | 5 +++++ rayon-core/src/registry.rs | 38 +++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index ef93fec03..62a3eff69 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -86,6 +86,11 @@ impl LockLatch { } } + /// Resets this lock latch so it can be reused again. + pub(super) fn reset(&mut self) { + *self.m.lock().unwrap() = false; + } + /// Block until latch is set. pub(super) fn wait(&self) { let mut guard = self.m.lock().unwrap(); diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 45830359d..dc89b2c08 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -15,6 +15,7 @@ use std::fmt; use std::hash::Hasher; use std::io; use std::mem; +use std::ops::DerefMut; use std::ptr; #[allow(deprecated)] use std::sync::atomic::ATOMIC_USIZE_INIT; @@ -486,21 +487,26 @@ impl Registry { OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { - let mut latch = LockLatch::new(); - - // This thread isn't a member of *any* thread pool, so just block. - debug_assert!(WorkerThread::current().is_null()); - let job = StackJob::new( - |injected| { - let worker_thread = WorkerThread::current(); - assert!(injected && !worker_thread.is_null()); - op(&*worker_thread, true) - }, - &mut latch, - ); - self.inject(&[job.as_job_ref()]); - job.latch.wait(); - job.into_result() + LOCK_LATCH.with(|l| { + // This should not panic since the latch is thread local and we are in the `cold` path. + // If `op` were to call another `ThreadPool::install` it would not end up here. + let mut latch = l.borrow_mut(); + + // This thread isn't a member of *any* thread pool, so just block. + debug_assert!(WorkerThread::current().is_null()); + let job = StackJob::new( + |injected| { + let worker_thread = WorkerThread::current(); + assert!(injected && !worker_thread.is_null()); + op(&*worker_thread, true) + }, + latch.deref_mut(), + ); + self.inject(&[job.as_job_ref()]); + job.latch.wait(); + job.latch.reset(); // Makes sure we can use the same latch again next time. + job.into_result() + }) } #[cold] @@ -613,6 +619,8 @@ pub(super) struct WorkerThread { // for a RefCell etc. thread_local! { static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); + + static LOCK_LATCH: RefCell = RefCell::new(LockLatch::new()); } impl Drop for WorkerThread { From 228dbf7f7c6fecd4063e4d37f59486da1bc8fe43 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Tue, 2 Jul 2019 11:16:17 +0200 Subject: [PATCH 4/9] Added lifetime bound. --- rayon-core/src/job.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index c2f48b84f..f4020234e 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -64,7 +64,7 @@ impl JobRef { /// executes it need not free any heap data, the cleanup occurs when /// the stack frame is later popped. The function parameter indicates /// `true` if the job was stolen -- executed on a different thread. -pub(super) struct StackJob<'a, L, F, R> +pub(super) struct StackJob<'a, L: 'a, F, R> where L: Latch + Sync, F: FnOnce(bool) -> R + Send, From 073b79b22f23dc78769521555388d18897b48b26 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Sat, 13 Jul 2019 13:29:42 +0200 Subject: [PATCH 5/9] Impl `LatchProbe` and `Latch` for &L where L: Latch. --- rayon-core/src/job.rs | 10 +++++----- rayon-core/src/join/mod.rs | 4 ++-- rayon-core/src/latch.rs | 14 +++++++++++++- rayon-core/src/registry.rs | 13 +++++++------ 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index f4020234e..a47f4558d 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -64,24 +64,24 @@ impl JobRef { /// executes it need not free any heap data, the cleanup occurs when /// the stack frame is later popped. The function parameter indicates /// `true` if the job was stolen -- executed on a different thread. -pub(super) struct StackJob<'a, L: 'a, F, R> +pub(super) struct StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { - pub(super) latch: &'a mut L, + pub(super) latch: L, func: UnsafeCell>, result: UnsafeCell>, } -impl<'a, L, F, R> StackJob<'a, L, F, R> +impl StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, R: Send, { - pub(super) fn new(func: F, latch: &'a mut L) -> StackJob<'a, L, F, R> { + pub(super) fn new(func: F, latch: L) -> StackJob { StackJob { latch, func: UnsafeCell::new(Some(func)), @@ -102,7 +102,7 @@ where } } -impl<'a, L, F, R> Job for StackJob<'a, L, F, R> +impl Job for StackJob where L: Latch + Sync, F: FnOnce(bool) -> R + Send, diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index 5b98884f8..19c5935b8 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -120,12 +120,12 @@ where worker: worker_thread.index() }); - let mut latch = SpinLatch::new(); + let latch = SpinLatch::new(); // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. - let job_b = StackJob::new(|migrated| oper_b(FnContext::new(migrated)), &mut latch); + let job_b = StackJob::new(|migrated| oper_b(FnContext::new(migrated)), &latch); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index 62a3eff69..08ee4ecd4 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -87,7 +87,7 @@ impl LockLatch { } /// Resets this lock latch so it can be reused again. - pub(super) fn reset(&mut self) { + pub(super) fn reset(&self) { *self.m.lock().unwrap() = false; } @@ -191,3 +191,15 @@ impl<'a, L: Latch> Latch for TickleLatch<'a, L> { self.sleep.tickle(usize::MAX); } } + +impl<'a, L> LatchProbe for &'a L where L: LatchProbe { + fn probe(&self) -> bool { + unimplemented!() + } +} + +impl<'a, L> Latch for &'a L where L: Latch { + fn set(&self) { + unimplemented!() + } +} diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index dc89b2c08..645acd43d 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -15,7 +15,7 @@ use std::fmt; use std::hash::Hasher; use std::io; use std::mem; -use std::ops::DerefMut; +use std::ops::{Deref}; use std::ptr; #[allow(deprecated)] use std::sync::atomic::ATOMIC_USIZE_INIT; @@ -480,7 +480,8 @@ impl Registry { } } } - + + #[cold] unsafe fn in_worker_cold(&self, op: OP) -> R where @@ -490,7 +491,7 @@ impl Registry { LOCK_LATCH.with(|l| { // This should not panic since the latch is thread local and we are in the `cold` path. // If `op` were to call another `ThreadPool::install` it would not end up here. - let mut latch = l.borrow_mut(); + let latch = l.borrow(); // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); @@ -500,7 +501,7 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - latch.deref_mut(), + latch.deref(), ); self.inject(&[job.as_job_ref()]); job.latch.wait(); @@ -518,14 +519,14 @@ impl Registry { // This thread is a member of a different pool, so let it process // other work while waiting for this `op` to complete. debug_assert!(current_thread.registry().id() != self.id()); - let mut latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); + let latch = TickleLatch::new(SpinLatch::new(), ¤t_thread.registry().sleep); let job = StackJob::new( |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - &mut latch, + &latch, ); self.inject(&[job.as_job_ref()]); current_thread.wait_until(job.latch); From 5045a24127898a17ce2c24caa4df6f3fc91bade7 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Sat, 13 Jul 2019 13:41:48 +0200 Subject: [PATCH 6/9] Actually impl `LatchProbe` and `Latch` for &L where L: Latch. --- rayon-core/src/latch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index 08ee4ecd4..df9c2f454 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -194,12 +194,12 @@ impl<'a, L: Latch> Latch for TickleLatch<'a, L> { impl<'a, L> LatchProbe for &'a L where L: LatchProbe { fn probe(&self) -> bool { - unimplemented!() + L::probe(&self) } } impl<'a, L> Latch for &'a L where L: Latch { fn set(&self) { - unimplemented!() + L::set(&self); } } From d9bc1fcc9d7db5dbfd87da1a6b37692365fd47f5 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Sat, 13 Jul 2019 13:44:35 +0200 Subject: [PATCH 7/9] We don't need these. --- rayon-core/src/latch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index df9c2f454..98a6c4fc8 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -194,12 +194,12 @@ impl<'a, L: Latch> Latch for TickleLatch<'a, L> { impl<'a, L> LatchProbe for &'a L where L: LatchProbe { fn probe(&self) -> bool { - L::probe(&self) + L::probe(self) } } impl<'a, L> Latch for &'a L where L: Latch { fn set(&self) { - L::set(&self); + L::set(self); } } From 20c526e3ca9962043a92d2400c6a0e8f05600994 Mon Sep 17 00:00:00 2001 From: Ralf Biedert Date: Fri, 19 Jul 2019 10:43:09 +0200 Subject: [PATCH 8/9] Removed RefCell, refs, moved thread_local, applied cargo fmt. --- rayon-core/src/join/mod.rs | 4 ++-- rayon-core/src/latch.rs | 20 +++++++++++++++----- rayon-core/src/registry.rs | 19 ++++++------------- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index 19c5935b8..ceb268d44 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -125,7 +125,7 @@ where // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. - let job_b = StackJob::new(|migrated| oper_b(FnContext::new(migrated)), &latch); + let job_b = StackJob::new(|migrated| oper_b(FnContext::new(migrated)), latch); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); @@ -164,7 +164,7 @@ where log!(LostJob { worker: worker_thread.index() }); - worker_thread.wait_until(job_b.latch); + worker_thread.wait_until(&job_b.latch); debug_assert!(job_b.latch.probe()); break; } diff --git a/rayon-core/src/latch.rs b/rayon-core/src/latch.rs index 98a6c4fc8..db97224f0 100644 --- a/rayon-core/src/latch.rs +++ b/rayon-core/src/latch.rs @@ -86,9 +86,13 @@ impl LockLatch { } } - /// Resets this lock latch so it can be reused again. - pub(super) fn reset(&self) { - *self.m.lock().unwrap() = false; + /// Block until latch is set, then resets this lock latch so it can be reused again. + pub(super) fn wait_and_reset(&self) { + let mut guard = self.m.lock().unwrap(); + while !*guard { + guard = self.v.wait(guard).unwrap(); + } + *guard = false; } /// Block until latch is set. @@ -192,13 +196,19 @@ impl<'a, L: Latch> Latch for TickleLatch<'a, L> { } } -impl<'a, L> LatchProbe for &'a L where L: LatchProbe { +impl<'a, L> LatchProbe for &'a L +where + L: LatchProbe, +{ fn probe(&self) -> bool { L::probe(self) } } -impl<'a, L> Latch for &'a L where L: Latch { +impl<'a, L> Latch for &'a L +where + L: Latch, +{ fn set(&self) { L::set(self); } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 645acd43d..b76e83e92 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -9,13 +9,12 @@ use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch}; use log::Event::*; use sleep::Sleep; use std::any::Any; -use std::cell::{Cell, RefCell}; +use std::cell::Cell; use std::collections::hash_map::DefaultHasher; use std::fmt; use std::hash::Hasher; use std::io; use std::mem; -use std::ops::{Deref}; use std::ptr; #[allow(deprecated)] use std::sync::atomic::ATOMIC_USIZE_INIT; @@ -480,19 +479,16 @@ impl Registry { } } } - - + #[cold] unsafe fn in_worker_cold(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send, { - LOCK_LATCH.with(|l| { - // This should not panic since the latch is thread local and we are in the `cold` path. - // If `op` were to call another `ThreadPool::install` it would not end up here. - let latch = l.borrow(); + thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new()); + LOCK_LATCH.with(|l| { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); let job = StackJob::new( @@ -501,11 +497,10 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - latch.deref(), + l, ); self.inject(&[job.as_job_ref()]); - job.latch.wait(); - job.latch.reset(); // Makes sure we can use the same latch again next time. + job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. job.into_result() }) } @@ -620,8 +615,6 @@ pub(super) struct WorkerThread { // for a RefCell etc. thread_local! { static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null()); - - static LOCK_LATCH: RefCell = RefCell::new(LockLatch::new()); } impl Drop for WorkerThread { From 9619a16127e1e279a5550b68759b1e8dd01f513e Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 22 Jul 2019 10:01:49 -0700 Subject: [PATCH 9/9] in_worker_cross doesn't need an indirect latch --- rayon-core/src/registry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index b76e83e92..92956d57c 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -521,10 +521,10 @@ impl Registry { assert!(injected && !worker_thread.is_null()); op(&*worker_thread, true) }, - &latch, + latch, ); self.inject(&[job.as_job_ref()]); - current_thread.wait_until(job.latch); + current_thread.wait_until(&job.latch); job.into_result() }