Skip to content

Commit

Permalink
remove completable future
Browse files Browse the repository at this point in the history
  • Loading branch information
jdonszelmann committed May 24, 2024
1 parent 8ca3d1c commit c5e3ae9
Showing 1 changed file with 11 additions and 220 deletions.
231 changes: 11 additions & 220 deletions scopegraphs/examples/records.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use self::completable_future::{CompletableFuture, CompletableFutureSignal};
use crate::ast::{Expr, Program, StructDef, Type};
use crate::resolve::{resolve_lexical_ref, resolve_member_ref, resolve_record_ref};
use async_recursion::async_recursion;
Expand All @@ -14,6 +13,8 @@ use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::future::Future;
use std::rc::Rc;
use smol::channel::{bounded, Sender};


#[derive(Debug, Label, Copy, Clone, Hash, PartialEq, Eq)]
enum SgLabel {
Expand Down Expand Up @@ -91,7 +92,7 @@ pub struct UnionFind {
/// A vec of signals for each type variable.
/// Whenever type variable 0 is unified with anything, we go through
/// the list at index 0 and notify each.
callbacks: Vec<Vec<CompletableFutureSignal<PartialType>>>,
callbacks: Vec<Vec<Sender<PartialType>>>,
}

impl Debug for UnionFind {
Expand Down Expand Up @@ -137,8 +138,8 @@ impl UnionFind {
// FIXME: use rank heuristic in case right is a variable?
*self.get(left) = right.clone();
if self.callbacks.len() > left.0 {
for fut in std::mem::take(&mut self.callbacks[left.0]) {
fut.complete(right.clone());
for fut in self.callbacks[left.0].drain(..) {
let _ = fut.send_blocking(right.clone());
}
}
}
Expand Down Expand Up @@ -211,15 +212,17 @@ impl UnionFind {

/// Wait for when tv is unified with something.
fn wait_for_unification(&mut self, tv: TypeVar) -> impl Future<Output = PartialType> {
let future = CompletableFuture::<PartialType>::new();
let callbacks = &mut self.callbacks;
for _ in callbacks.len()..=tv.0 {
callbacks.push(vec![]);
}

callbacks[tv.0].push(future.signal());
let (tx, rx) = bounded(1);
callbacks[tv.0].push(tx);

future
async move {
rx.recv().await.expect("sender dropped")
}
}
}

Expand Down Expand Up @@ -814,216 +817,4 @@ in a.b.a.x;
println!("Type of example is: {:?}", typecheck(&example));

Ok(())
}

#[allow(unused)]
mod completable_future {
//! Copied and adapted from https://crates.io/crates/completable_future (due to dependency mismatch)
//!
//! # Completable Future
//!
//! Similar to Java's CompletableFuture, this crate provides a simple
//! future that can be completed and properly notified from elsewhere other
//! than the executor of the future. It is sutable for some blocking
//! tasks that could block the executor if we use a future directly in
//! an executor.
//!
//! A CompletableFuture is still a future and has all the combinators that
//! you can use to chain logic working on the result or the error. Also,
//! unlike Java and inherited from Rust's poll model future, some executor
//! needs to execute the CompletableFuture in order to get the result; the
//! thread or code that completes (or errors) the future will not execute
//! the logic chained after the future.
//!
//! The CompletableFuture uses Arc and Mutex to synchronize poll and completion,
//! so there's overhead for using it.
//!
//! # Example
//! ```
//! extern crate futures;
//! extern crate completable_future;
//!
//! use futures::prelude::*;
//! use futures::executor::block_on;
//! use std::thread::spawn;
//! use std::thread::sleep;
//! use std::time::Duration;
//! use completable_future::CompletableFuture;
//!
//! fn main() {
//! let fut1 = CompletableFuture::<String, ()>::new();
//! // we will give the signal to some worker for it to complete
//! let mut signal = fut1.signal();
//! let fut2 = fut1.and_then(|s| {
//! // this will come from whoever completes the future
//! println!("in fut2: {}", s);
//! Ok("this comes from fut2".to_string())
//! });
//!
//! let j = spawn(move || {
//! println!("waiter thread: I'm going to block on fut2");
//! let ret = block_on(fut2).unwrap();
//! println!("waiter thread: fut2 completed with message -- {}", ret);
//! });
//!
//! spawn(move || {
//! println!("worker thread: going to block for 1000 ms");
//! sleep(Duration::from_millis(1000));
//! signal.complete("this comes from fut1".to_string());
//! println!("worker thread: completed fut1");
//! });
//!
//! j.join().unwrap();
//! }
//! ```
use futures::future::Future;
use futures::task::{AtomicWaker, Context, Waker};
use std::mem;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;

enum WakerWrapper {
Registered(AtomicWaker),
NotRegistered,
}

impl WakerWrapper {
fn register(&mut self, waker: &Waker) {
match self {
WakerWrapper::Registered(_dont_care) => (),
WakerWrapper::NotRegistered => {
let w = AtomicWaker::new();
w.register(waker);
*self = WakerWrapper::Registered(w)
}
}
}

fn wake(&self) {
match *self {
WakerWrapper::Registered(ref w) => w.wake(),
WakerWrapper::NotRegistered => (),
};
}
}

enum FutureState<V> {
Pending,
Completed(V),
Taken,
}

impl<V> FutureState<V> {
fn swap(&mut self, new_val: FutureState<V>) -> FutureState<V> {
mem::replace(self, new_val)
}

fn unwrap_val(&mut self) -> V {
match self.swap(FutureState::Taken) {
FutureState::Completed(val) => val,
_ => panic!("cannot unwrap because my state is not completed"),
}
}
}

/// the state of the future; reference counted
struct SignalInternal<V> {
waker: WakerWrapper,
state: FutureState<V>,
}

/// A handle to the future state. When you create a completable future,
/// you should also create a signal that somebody can use to complete
/// the future.
#[derive(Clone)]
pub struct CompletableFutureSignal<V> {
internal: Arc<Mutex<SignalInternal<V>>>,
}

impl<V> CompletableFutureSignal<V> {
fn mutate_self(&self, new_state: FutureState<V>) -> bool {
let mut internal = self.internal.lock().unwrap();
match internal.state {
FutureState::Pending => {
internal.state.swap(new_state);
internal.waker.wake();
true
}
_ => false,
}
}

/// Complete the associated CompletableFuture. This method
/// can be called safely across multiple threads multiple times,
/// but only the winning call would mutate the future; other calls
/// will be rendered noop.
///
/// Returns whether the call successfully mutates the future.
pub fn complete(&self, value: V) -> bool {
self.mutate_self(FutureState::Completed(value))
}
}

/// A CompletableFuture is a future that you can expect a result (or error)
/// from and chain logic on. You will need some executor to actively poll
/// the result. Executors provided by the futures crate are usually good
/// enough for common situations.
///
/// If you use a custom executor, be careful that don't poll the CompletableFuture
/// after it has already completed (or errored) in previous polls. Doing so
/// will panic your executor.
pub struct CompletableFuture<V> {
internal: Arc<Mutex<SignalInternal<V>>>,
}

impl<V> CompletableFuture<V> {
/// Construct a CompletableFuture.
pub fn new() -> CompletableFuture<V> {
CompletableFuture {
internal: Arc::new(Mutex::new(SignalInternal {
waker: WakerWrapper::NotRegistered,
state: FutureState::Pending,
})),
}
}

/// Construct a CompletableFuture that's already completed
/// with the value provided.
pub fn completed(val: V) -> CompletableFuture<V> {
CompletableFuture {
internal: Arc::new(Mutex::new(SignalInternal {
waker: WakerWrapper::NotRegistered,
state: FutureState::Completed(val),
})),
}
}

/// Get a CompletableFutureSignal that can be used to complete
/// or error this CompletableFuture.
pub fn signal(&self) -> CompletableFutureSignal<V> {
CompletableFutureSignal {
internal: self.internal.clone(),
}
}
}

impl<V> Future for CompletableFuture<V> {
type Output = V;

fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let mut signal = self.internal.lock().unwrap();
signal.waker.register(ctx.waker());

let state = &mut signal.state;
match state {
FutureState::Pending => Poll::Pending,
FutureState::Taken => {
panic!("bug: the value has been taken, yet I'm still polled again")
}
FutureState::Completed(_) => Poll::Ready(state.unwrap_val()),
}
}
}
}
}

0 comments on commit c5e3ae9

Please sign in to comment.