From aafa21566537cd71db666c5e5b27eb69bda6b937 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Fri, 19 Apr 2024 08:57:29 -0400 Subject: [PATCH] sketch the stream/async-iter split --- src/async_iter.rs | 77 +++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 6 ++++ src/stream.rs | 65 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 src/async_iter.rs create mode 100644 src/stream.rs diff --git a/src/async_iter.rs b/src/async_iter.rs new file mode 100644 index 0000000..3fc0178 --- /dev/null +++ b/src/async_iter.rs @@ -0,0 +1,77 @@ +use crate::Scope; + +pub trait AsyncIterator { + type Item; + + // fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) + // -> std::task::Poll>; + // + // how do you do this? + // + // fn next(&mut self) -> impl Future> { + // pin!(self); + // std::future::poll_fn(|cx| this.poll_next(cx)) + // } + + async fn next(&mut self) -> Option; + + fn filter( + self, + op: impl async FnMut(&Self::Item) -> bool, + ) -> impl AsyncIterator + where + Self: Sized, + { + Filter { + iter: self, + filter_op: op, + } + } +} + +pub trait IntoAsyncIter { + type Item; + + // FIXME: The Scope type needs to not carry R. + fn into_async_iter( + self, + scope: &Scope<'_, '_, R>, + ) -> impl AsyncIterator; +} + +impl IntoAsyncIter for T { + type Item = T::Item; + + fn into_async_iter( + self, + _scope: &Scope<'_, '_, R>, + ) -> impl AsyncIterator { + self + } +} + +struct Filter +where + I: AsyncIterator, + O: async FnMut(&I::Item) -> bool, +{ + iter: I, + filter_op: O, +} + +impl AsyncIterator for Filter +where + I: AsyncIterator, + O: async FnMut(&I::Item) -> bool, +{ + type Item = I::Item; + + async fn next(&mut self) -> Option { + loop { + let item = self.iter.next().await?; + if (self.filter_op)(&item).await { + return Some(item); + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index a61f103..63a7438 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,24 @@ #![feature(async_closure)] #![feature(async_fn_traits)] #![feature(unboxed_closures)] +#![allow(async_fn_in_trait)] use std::ops::AsyncFnOnce; #[macro_use] mod macros; +mod async_iter; mod body; pub mod prelude; mod result_ext; mod scope; mod scope_body; mod spawned; +mod stream; + +pub use async_iter::{AsyncIterator, IntoAsyncIter}; +pub use stream::Stream; /// Creates an async scope within which you can spawn jobs. /// This works much like the stdlib's diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..f23b771 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,65 @@ +use crate::{AsyncIterator, IntoAsyncIter, Scope}; + +pub trait Stream: IntoAsyncIter { + fn filter(self, op: impl async FnMut(&Self::Item) -> bool) -> impl Stream + where + Self: Sized, + { + Filter { + stream: self, + filter_op: op, + } + } + + async fn for_each(&mut self, mut op: impl async FnMut(Self::Item)) + where + Self: Sized, + { + self.fold((), async |(), item| op(item).await).await + } + + async fn fold(&mut self, start: R, op: impl async FnMut(R, Self::Item) -> R) -> R; +} + +struct Filter +where + S: Stream, + O: async FnMut(&S::Item) -> bool, +{ + stream: S, + filter_op: O, +} + +impl Stream for Filter +where + S: Stream, + O: async FnMut(&S::Item) -> bool, +{ + async fn fold(&mut self, start: R, mut op: impl async FnMut(R, Self::Item) -> R) -> R { + self.stream + .fold(start, async |acc, item| { + if (self.filter_op)(&item).await { + op(acc, item).await + } else { + acc + } + }) + .await + } +} + +impl IntoAsyncIter for Filter +where + S: Stream, + O: async FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn into_async_iter( + self, + scope: &Scope<'_, '_, R>, + ) -> impl AsyncIterator { + let iter = self.stream.into_async_iter(scope); + iter.filter(self.filter_op) + } +}