diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index e17d7e422..9b54ed660 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -55,3 +55,6 @@ path = "tests/simple_panic.rs" [[test]] name = "scoped_threadpool" path = "tests/scoped_threadpool.rs" + +[build-dependencies] +autocfg = "0.1.5" diff --git a/rayon-core/build.rs b/rayon-core/build.rs index 8771b63fc..34943056f 100644 --- a/rayon-core/build.rs +++ b/rayon-core/build.rs @@ -1,7 +1,12 @@ +extern crate autocfg; + // We need a build script to use `link = "rayon-core"`. But we're not // *actually* linking to anything, just making sure that we're the only // rayon-core in use. fn main() { + let ac = autocfg::new(); + ac.emit_path_cfg("std::future::Future", "has_future"); + // we don't need to rebuild for anything else - println!("cargo:rerun-if-changed=build.rs"); + autocfg::rerun_path("build.rs"); } diff --git a/rayon-core/src/future.rs b/rayon-core/src/future.rs new file mode 100644 index 000000000..18086e5c6 --- /dev/null +++ b/rayon-core/src/future.rs @@ -0,0 +1,139 @@ +#![allow(missing_docs)] + +use crate::ThreadPool; +use crate::{spawn, spawn_fifo}; +use crate::{Scope, ScopeFifo}; + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +use job::JobResult; +use unwind; + +struct RayonFuture { + state: Arc>>, +} + +struct RayonFutureJob { + state: Arc>>, +} + +struct State { + result: JobResult, + waker: Option, +} + +fn new() -> (RayonFuture, RayonFutureJob) { + let state = Arc::new(Mutex::new(State { + result: JobResult::None, + waker: None, + })); + ( + RayonFuture { + state: state.clone(), + }, + RayonFutureJob { state }, + ) +} + +impl Future for RayonFuture { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut guard = self.state.lock().expect("rayon future lock"); + match mem::replace(&mut guard.result, JobResult::None) { + JobResult::None => { + guard.waker = Some(cx.waker().clone()); + Poll::Pending + } + JobResult::Ok(x) => Poll::Ready(x), + JobResult::Panic(p) => { + drop(guard); // don't poison the lock + unwind::resume_unwinding(p); + } + } + } +} + +impl RayonFutureJob { + fn execute(self, func: impl FnOnce() -> T) { + let result = unwind::halt_unwinding(func); + let mut guard = self.state.lock().expect("rayon future lock"); + guard.result = match result { + Ok(x) => JobResult::Ok(x), + Err(p) => JobResult::Panic(p), + }; + if let Some(waker) = guard.waker.take() { + waker.wake(); + } + } +} + +pub fn spawn_future(func: F) -> impl Future +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (future, job) = new(); + spawn(move || job.execute(func)); + future +} + +pub fn spawn_fifo_future(func: F) -> impl Future +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (future, job) = new(); + spawn_fifo(move || job.execute(func)); + future +} + +impl ThreadPool { + pub fn spawn_future(&self, func: F) -> impl Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (future, job) = new(); + self.spawn(move || job.execute(func)); + future + } + + pub fn spawn_fifo_future(&self, func: F) -> impl Future + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + let (future, job) = new(); + self.spawn_fifo(move || job.execute(func)); + future + } +} + +impl<'scope> Scope<'scope> { + pub fn spawn_future(&self, func: F) -> impl Future + where + F: FnOnce(&Self) -> T + Send + 'scope, + T: Send + 'scope, + { + let (future, job) = new(); + self.spawn(|scope| job.execute(move || func(scope))); + future + } +} + +impl<'scope> ScopeFifo<'scope> { + pub fn spawn_fifo_future(&self, func: F) -> impl Future + where + F: FnOnce(&Self) -> T + Send + 'scope, + T: Send + 'scope, + { + let (future, job) = new(); + self.spawn_fifo(|scope| job.execute(move || func(scope))); + future + } +} diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 0eb8026a6..581d4c7eb 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -61,6 +61,9 @@ mod thread_pool; mod unwind; mod util; +#[cfg(has_future)] +mod future; + mod compile_fail; mod test; @@ -75,6 +78,9 @@ pub use thread_pool::current_thread_has_pending_tasks; pub use thread_pool::current_thread_index; pub use thread_pool::ThreadPool; +#[cfg(has_future)] +pub use future::{spawn_future, spawn_fifo_future}; + use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; /// Returns the number of threads in the current registry. If this