Skip to content

Commit

Permalink
sketch the stream/async-iter split
Browse files Browse the repository at this point in the history
  • Loading branch information
nikomatsakis committed Apr 19, 2024
1 parent 63b4989 commit aafa215
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 0 deletions.
77 changes: 77 additions & 0 deletions src/async_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::Scope;

pub trait AsyncIterator {
type Item;

// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
// -> std::task::Poll<Option<Self::Item>>;
//
// how do you do this?
//
// fn next(&mut self) -> impl Future<Output = Option<Self::Item>> {
// pin!(self);
// std::future::poll_fn(|cx| this.poll_next(cx))
// }

async fn next(&mut self) -> Option<Self::Item>;

fn filter(
self,
op: impl async FnMut(&Self::Item) -> bool,
) -> impl AsyncIterator<Item = Self::Item>
where
Self: Sized,
{
Filter {
iter: self,
filter_op: op,
}
}
}

pub trait IntoAsyncIter {
type Item;

// FIXME: The Scope type needs to not carry R.
fn into_async_iter<R: Send>(
self,
scope: &Scope<'_, '_, R>,
) -> impl AsyncIterator<Item = Self::Item>;
}

impl<T: AsyncIterator> IntoAsyncIter for T {
type Item = T::Item;

fn into_async_iter<R: Send>(
self,
_scope: &Scope<'_, '_, R>,
) -> impl AsyncIterator<Item = Self::Item> {
self
}
}

struct Filter<I, O>
where
I: AsyncIterator,
O: async FnMut(&I::Item) -> bool,
{
iter: I,
filter_op: O,
}

impl<I, O> AsyncIterator for Filter<I, O>
where
I: AsyncIterator,
O: async FnMut(&I::Item) -> bool,
{
type Item = I::Item;

async fn next(&mut self) -> Option<Self::Item> {
loop {
let item = self.iter.next().await?;
if (self.filter_op)(&item).await {
return Some(item);
}
}
}
}
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
#![feature(async_closure)]
#![feature(async_fn_traits)]
#![feature(unboxed_closures)]
#![allow(async_fn_in_trait)]

use std::ops::AsyncFnOnce;

#[macro_use]
mod macros;

mod async_iter;
mod body;
pub mod prelude;
mod result_ext;
mod scope;
mod scope_body;
mod spawned;
mod stream;

pub use async_iter::{AsyncIterator, IntoAsyncIter};
pub use stream::Stream;

/// Creates an async scope within which you can spawn jobs.
/// This works much like the stdlib's
Expand Down
65 changes: 65 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::{AsyncIterator, IntoAsyncIter, Scope};

pub trait Stream: IntoAsyncIter {
fn filter(self, op: impl async FnMut(&Self::Item) -> bool) -> impl Stream<Item = Self::Item>
where
Self: Sized,
{
Filter {
stream: self,
filter_op: op,
}
}

async fn for_each(&mut self, mut op: impl async FnMut(Self::Item))
where
Self: Sized,
{
self.fold((), async |(), item| op(item).await).await
}

async fn fold<R>(&mut self, start: R, op: impl async FnMut(R, Self::Item) -> R) -> R;
}

struct Filter<S, O>
where
S: Stream,
O: async FnMut(&S::Item) -> bool,
{
stream: S,
filter_op: O,
}

impl<S, O> Stream for Filter<S, O>
where
S: Stream,
O: async FnMut(&S::Item) -> bool,
{
async fn fold<R>(&mut self, start: R, mut op: impl async FnMut(R, Self::Item) -> R) -> R {
self.stream
.fold(start, async |acc, item| {
if (self.filter_op)(&item).await {
op(acc, item).await
} else {
acc
}
})
.await
}
}

impl<S, O> IntoAsyncIter for Filter<S, O>
where
S: Stream,
O: async FnMut(&S::Item) -> bool,
{
type Item = S::Item;

fn into_async_iter<R: Send>(
self,
scope: &Scope<'_, '_, R>,
) -> impl AsyncIterator<Item = Self::Item> {
let iter = self.stream.into_async_iter(scope);
iter.filter(self.filter_op)
}
}

0 comments on commit aafa215

Please sign in to comment.