From 037ab888d6eeb1725e4efc49076f69d474ad4835 Mon Sep 17 00:00:00 2001 From: Harold Bruintjes Date: Wed, 27 Mar 2024 14:41:35 +0100 Subject: [PATCH] Add BoundLocalPool and BoundLocalSpawner Adds derivatives of LocalPool and BoundLocalSpawner with a generic lifetime parameter that replaces the previously used 'static, and aliases both types with a 'static parameter to the originals. Add a BoundLocalSpawn trait in a similar fashion. At the same time simplify the implementation of Spawn, LocalSpawn and BoundLocalSpawn to just work on any type implementing Deref to a type implementing a spawn trait. --- futures-executor/src/lib.rs | 5 +- futures-executor/src/local_pool.rs | 69 ++++++++--- futures-task/src/lib.rs | 2 +- futures-task/src/spawn.rs | 117 +++++------------- .../src/stream/futures_unordered/mod.rs | 9 +- futures-util/src/task/mod.rs | 6 +- futures-util/src/task/spawn.rs | 75 ++++++++++- futures/src/lib.rs | 4 +- 8 files changed, 175 insertions(+), 112 deletions(-) diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index ec3a5a4011..f8b1703931 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -54,7 +54,10 @@ extern crate std; #[cfg(feature = "std")] mod local_pool; #[cfg(feature = "std")] -pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner}; +pub use crate::local_pool::{ + block_on, block_on_stream, BlockingStream, BoundLocalPool, BoundLocalSpawner, LocalPool, + LocalSpawner, +}; #[cfg(feature = "thread-pool")] #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 90c2a41520..d0d7ed4171 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -2,7 +2,7 @@ use crate::enter; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use futures_task::{waker_ref, ArcWake}; +use futures_task::{waker_ref, ArcWake, BoundLocalSpawn}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; use futures_util::pin_mut; use futures_util::stream::FuturesUnordered; @@ -17,6 +17,36 @@ use std::sync::{ use std::thread::{self, Thread}; use std::vec::Vec; +/// A single-threaded task pool with bound lifetime for polling futures to +/// completion. +/// +/// This executor allows you to multiplex any number of tasks onto a single +/// thread. It's appropriate to poll strictly I/O-bound futures that do very +/// little work in between I/O actions. The lifetime of the executor is bound by +/// a generic parameter. Futures associated with the executor need only outlive +/// this lifetime. That uncompleted futures are dropped when the lifetime of the +/// executor expires. +/// +/// To get a handle to the pool that implements [`Spawn`](futures_task::Spawn), +/// use the [`spawner()`](BoundLocalPool::spawner) method. Because the executor +/// is single-threaded, it supports a special form of task spawning for +/// non-`Send` futures, via +/// [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj). +/// Additionally, tasks with a limited lifetime can be spawned via +/// [`spawn_bound_local_obj`](futures_task::BoundLocalSpawn::spawn_bound_local_obj). +#[derive(Debug)] +pub struct BoundLocalPool<'a> { + pool: FuturesUnordered>, + incoming: Rc>, +} + +/// A handle to a [`BoundLocalPool`] that implements +/// [`BoundLocalSpawn`](futures_task::BoundLocalSpawn). +#[derive(Clone, Debug)] +pub struct BoundLocalSpawner<'a> { + incoming: Weak>, +} + /// A single-threaded task pool for polling futures to completion. /// /// This executor allows you to multiplex any number of tasks onto a single @@ -28,19 +58,13 @@ use std::vec::Vec; /// [`spawner()`](LocalPool::spawner) method. Because the executor is /// single-threaded, it supports a special form of task spawning for non-`Send` /// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj). -#[derive(Debug)] -pub struct LocalPool { - pool: FuturesUnordered>, - incoming: Rc, -} +pub type LocalPool = BoundLocalPool<'static>; -/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn). -#[derive(Clone, Debug)] -pub struct LocalSpawner { - incoming: Weak, -} +/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn) +/// and [`LocalSpawn`](futures_task::LocalSpawn). +pub type LocalSpawner = BoundLocalSpawner<'static>; -type Incoming = RefCell>>; +type Incoming<'a> = RefCell>>; pub(crate) struct ThreadNotify { /// The (single) executor thread. @@ -107,15 +131,15 @@ fn woken() -> bool { CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) } -impl LocalPool { +impl<'a> BoundLocalPool<'a> { /// Create a new, empty pool of tasks. pub fn new() -> Self { Self { pool: FuturesUnordered::new(), incoming: Default::default() } } /// Get a clonable handle to the pool as a [`Spawn`]. - pub fn spawner(&self) -> LocalSpawner { - LocalSpawner { incoming: Rc::downgrade(&self.incoming) } + pub fn spawner(&self) -> BoundLocalSpawner<'a> { + BoundLocalSpawner { incoming: Rc::downgrade(&self.incoming) } } /// Run all tasks in the pool to completion. @@ -362,7 +386,7 @@ impl Iterator for BlockingStream { } } -impl Spawn for LocalSpawner { +impl Spawn for BoundLocalSpawner<'_> { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { incoming.borrow_mut().push(future.into()); @@ -381,7 +405,7 @@ impl Spawn for LocalSpawner { } } -impl LocalSpawn for LocalSpawner { +impl LocalSpawn for BoundLocalSpawner<'_> { fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { incoming.borrow_mut().push(future); @@ -399,3 +423,14 @@ impl LocalSpawn for LocalSpawner { } } } + +impl<'a> BoundLocalSpawn<'a> for BoundLocalSpawner<'a> { + fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} diff --git a/futures-task/src/lib.rs b/futures-task/src/lib.rs index c119b6b1e4..6e73d16d9b 100644 --- a/futures-task/src/lib.rs +++ b/futures-task/src/lib.rs @@ -16,7 +16,7 @@ extern crate alloc; extern crate std; mod spawn; -pub use crate::spawn::{LocalSpawn, Spawn, SpawnError}; +pub use crate::spawn::{BoundLocalSpawn, LocalSpawn, Spawn, SpawnError}; #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] diff --git a/futures-task/src/spawn.rs b/futures-task/src/spawn.rs index 4a9a45a446..70286e8208 100644 --- a/futures-task/src/spawn.rs +++ b/futures-task/src/spawn.rs @@ -51,6 +51,22 @@ pub trait LocalSpawn { } } +/// The `BoundLocalSpawn` is similar to [`LocalSpawn`], but allows spawning +/// futures that don't implement `Send` and have a lifetime that only needs to +/// exceed that of the associated executor. +pub trait BoundLocalSpawn<'a> { + /// Spawns a future that will be run to completion or until the executor is + /// dropped. + /// + /// # Errors + /// + /// The executor may be unable to spawn tasks. Spawn errors should + /// represent relatively rare scenarios, such as the executor + /// having been shut down so that it is no longer able to accept + /// tasks. + fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError>; +} + /// An error that occurred during spawning. pub struct SpawnError { _priv: (), @@ -83,17 +99,10 @@ impl SpawnError { } } -impl Spawn for &Sp { - fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - Sp::spawn_obj(self, future) - } - - fn status(&self) -> Result<(), SpawnError> { - Sp::status(self) - } -} - -impl Spawn for &mut Sp { +impl Spawn for T +where + T: core::ops::Deref, +{ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_obj(self, future) } @@ -103,17 +112,10 @@ impl Spawn for &mut Sp { } } -impl LocalSpawn for &Sp { - fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { - Sp::spawn_local_obj(self, future) - } - - fn status_local(&self) -> Result<(), SpawnError> { - Sp::status_local(self) - } -} - -impl LocalSpawn for &mut Sp { +impl LocalSpawn for T +where + T: core::ops::Deref, +{ fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { Sp::spawn_local_obj(self, future) } @@ -123,70 +125,11 @@ impl LocalSpawn for &mut Sp { } } -#[cfg(feature = "alloc")] -mod if_alloc { - use super::*; - use alloc::{boxed::Box, rc::Rc}; - - impl Spawn for Box { - fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - (**self).spawn_obj(future) - } - - fn status(&self) -> Result<(), SpawnError> { - (**self).status() - } - } - - impl LocalSpawn for Box { - fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { - (**self).spawn_local_obj(future) - } - - fn status_local(&self) -> Result<(), SpawnError> { - (**self).status_local() - } - } - - impl Spawn for Rc { - fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - (**self).spawn_obj(future) - } - - fn status(&self) -> Result<(), SpawnError> { - (**self).status() - } - } - - impl LocalSpawn for Rc { - fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { - (**self).spawn_local_obj(future) - } - - fn status_local(&self) -> Result<(), SpawnError> { - (**self).status_local() - } - } - - #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] - impl Spawn for alloc::sync::Arc { - fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - (**self).spawn_obj(future) - } - - fn status(&self) -> Result<(), SpawnError> { - (**self).status() - } - } - - #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] - impl LocalSpawn for alloc::sync::Arc { - fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { - (**self).spawn_local_obj(future) - } - - fn status_local(&self) -> Result<(), SpawnError> { - (**self).status_local() - } +impl<'a, T, Sp: ?Sized + BoundLocalSpawn<'a>> BoundLocalSpawn<'a> for T +where + T: core::ops::Deref, +{ + fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> { + Sp::spawn_bound_local_obj(self, future) } } diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 2d4f15158f..60f39a8c94 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -17,7 +17,7 @@ use core::sync::atomic::{AtomicBool, AtomicPtr}; use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use futures_task::{BoundLocalSpawn, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; @@ -78,6 +78,13 @@ impl LocalSpawn for FuturesUnordered> { } } +impl<'a> BoundLocalSpawn<'a> for FuturesUnordered> { + fn spawn_bound_local_obj(&self, future_obj: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> { + self.push(future_obj); + Ok(()) + } +} + // FuturesUnordered is implemented using two linked lists. One which links all // futures managed by a `FuturesUnordered` and one that tracks futures that have // been scheduled for polling. The first linked list allows for thread safe diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index 7a9e993e5e..16570dd108 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -13,7 +13,9 @@ #[doc(no_inline)] pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj}; +pub use futures_task::{ + BoundLocalSpawn, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj, +}; pub use futures_task::noop_waker; pub use futures_task::noop_waker_ref; @@ -37,4 +39,4 @@ pub use futures_task::{waker_ref, WakerRef}; pub use futures_core::task::__internal::AtomicWaker; mod spawn; -pub use self::spawn::{LocalSpawnExt, SpawnExt}; +pub use self::spawn::{BoundLocalSpawnExt, LocalSpawnExt, SpawnExt}; diff --git a/futures-util/src/task/spawn.rs b/futures-util/src/task/spawn.rs index d9e9985309..8aa366d296 100644 --- a/futures-util/src/task/spawn.rs +++ b/futures-util/src/task/spawn.rs @@ -1,4 +1,4 @@ -use futures_task::{LocalSpawn, Spawn}; +use futures_task::{BoundLocalSpawn, LocalSpawn, Spawn}; #[cfg(feature = "compat")] use crate::compat::Compat; @@ -15,6 +15,7 @@ use futures_task::{FutureObj, LocalFutureObj, SpawnError}; impl SpawnExt for Sp where Sp: Spawn {} impl LocalSpawnExt for Sp where Sp: LocalSpawn {} +impl<'a, Sp: ?Sized> BoundLocalSpawnExt<'a> for Sp where Sp: BoundLocalSpawn<'a> {} /// Extension trait for `Spawn`. pub trait SpawnExt: Spawn { @@ -167,3 +168,75 @@ pub trait LocalSpawnExt: LocalSpawn { Ok(handle) } } + +/// Extension trait for `BoundLocalSpawn`. +pub trait BoundLocalSpawnExt<'a>: BoundLocalSpawn<'a> { + /// Spawns a task that polls the given future with output `()` to + /// completion or until the bounded lifetime expires. + /// + /// This method returns a [`Result`] that contains a [`SpawnError`] if + /// spawning fails. + /// + /// You can use + /// [`spawn_bound_local_with_handle`](BoundLocalSpawnExt::spawn_bound_local_with_handle) + /// if you want to spawn a future with output other than `()` or if you want + /// to be able to await its completion. + /// + /// Note this method will eventually be replaced with the upcoming + /// `Spawn::spawn` method which will take a `dyn Future` as input. + /// Technical limitations prevent `Spawn::spawn` from being implemented + /// today. Feel free to use this method in the meantime. + /// + /// ``` + /// use futures::executor::BoundLocalPool; + /// use futures::task::BoundLocalSpawnExt; + /// + /// let executor = BoundLocalPool::new(); + /// let spawner = executor.spawner(); + /// + /// let future = async { /* ... */ }; + /// spawner.spawn_bound_local(future).unwrap(); + /// ``` + #[cfg(feature = "alloc")] + fn spawn_bound_local(&self, future: Fut) -> Result<(), SpawnError> + where + Fut: Future + 'a, + { + self.spawn_bound_local_obj(LocalFutureObj::new(Box::new(future))) + } + + /// Spawns a task that polls the given future to completion or until the + /// bounded lifetime expires, and returns a future that resolves to the + /// spawned future's output. + /// + /// This method returns a [`Result`] that contains a + /// [`RemoteHandle`](crate::future::RemoteHandle), or, if spawning fails, a + /// [`SpawnError`]. [`RemoteHandle`](crate::future::RemoteHandle) is a + /// future that resolves to the output of the spawned future. + /// + /// ``` + /// use futures::executor::BoundLocalPool; + /// use futures::task::BoundLocalSpawnExt; + /// + /// let mut executor = BoundLocalPool::new(); + /// let spawner = executor.spawner(); + /// + /// let future = async { 1 }; + /// let join_handle_fut = spawner.spawn_bound_local_with_handle(future).unwrap(); + /// assert_eq!(executor.run_until(join_handle_fut), 1); + /// ``` + #[cfg(feature = "channel")] + #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] + #[cfg(feature = "std")] + fn spawn_bound_local_with_handle( + &self, + future: Fut, + ) -> Result, SpawnError> + where + Fut: Future + 'a, + { + let (future, handle) = future.remote_handle(); + self.spawn_bound_local(future)?; + Ok(handle) + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 839a0a1f07..b49ff2089e 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -189,8 +189,8 @@ pub mod executor { //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj pub use futures_executor::{ - block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool, - LocalSpawner, + block_on, block_on_stream, enter, BlockingStream, BoundLocalPool, BoundLocalSpawner, Enter, + EnterError, LocalPool, LocalSpawner, }; #[cfg(feature = "thread-pool")]