From e6a643770f1e93e9ab2e3dc8be5987d326c83437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20B=C3=B6ving?= Date: Mon, 13 Jan 2025 19:11:04 +0100 Subject: [PATCH] feat: implement basic async IO with timers (#6505) This PR implements a basic async framework as well as asynchronously running timers using libuv. --------- Co-authored-by: Sofia Rodrigues Co-authored-by: Markus Himmel Co-authored-by: Markus Himmel --- src/Std/Internal.lean | 2 + src/Std/Internal/Async.lean | 8 + src/Std/Internal/Async/Basic.lean | 115 +++++++++++ src/Std/Internal/Async/Timer.lean | 139 +++++++++++++ src/Std/Internal/UV.lean | 119 +++++++++++ src/runtime/CMakeLists.txt | 3 +- src/runtime/init_module.cpp | 2 + src/runtime/libuv.cpp | 19 +- src/runtime/libuv.h | 22 +- src/runtime/object.h | 5 + src/runtime/uv/event_loop.cpp | 143 +++++++++++++ src/runtime/uv/event_loop.h | 47 +++++ src/runtime/uv/timer.cpp | 254 ++++++++++++++++++++++++ src/runtime/uv/timer.h | 54 +++++ tests/lean/run/async_sleep.lean | 219 ++++++++++++++++++++ tests/lean/run/async_surface_sleep.lean | 99 +++++++++ 16 files changed, 1246 insertions(+), 4 deletions(-) create mode 100644 src/Std/Internal/Async.lean create mode 100644 src/Std/Internal/Async/Basic.lean create mode 100644 src/Std/Internal/Async/Timer.lean create mode 100644 src/Std/Internal/UV.lean create mode 100644 src/runtime/uv/event_loop.cpp create mode 100644 src/runtime/uv/event_loop.h create mode 100644 src/runtime/uv/timer.cpp create mode 100644 src/runtime/uv/timer.h create mode 100644 tests/lean/run/async_sleep.lean create mode 100644 tests/lean/run/async_surface_sleep.lean diff --git a/src/Std/Internal.lean b/src/Std/Internal.lean index 44d41356f01a..bc065b5dfef9 100644 --- a/src/Std/Internal.lean +++ b/src/Std/Internal.lean @@ -4,7 +4,9 @@ Released under Apache 2.0 license as described in the file LICENSE. Authors: Henrik Böving -/ prelude +import Std.Internal.Async import Std.Internal.Parsec +import Std.Internal.UV /-! This directory is used for components of the standard library that are either considered diff --git a/src/Std/Internal/Async.lean b/src/Std/Internal/Async.lean new file mode 100644 index 000000000000..8d8e7db3e919 --- /dev/null +++ b/src/Std/Internal/Async.lean @@ -0,0 +1,8 @@ +/- +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Henrik Böving +-/ +prelude +import Std.Internal.Async.Basic +import Std.Internal.Async.Timer diff --git a/src/Std/Internal/Async/Basic.lean b/src/Std/Internal/Async/Basic.lean new file mode 100644 index 000000000000..cbab3ed68d11 --- /dev/null +++ b/src/Std/Internal/Async/Basic.lean @@ -0,0 +1,115 @@ +/- +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Henrik Böving +-/ +prelude +import Init.Core +import Init.System.IO +import Init.System.Promise + +namespace Std +namespace Internal +namespace IO +namespace Async + +/-- +A `Task` that may resolve to a value or an `IO.Error`. +-/ +def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α) + +namespace AsyncTask + +/-- +Construct an `AsyncTask` that is already resolved with value `x`. +-/ +@[inline] +protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x + +instance : Pure AsyncTask where + pure := AsyncTask.pure + +/-- +Create a new `AsyncTask` that will run after `x` has finished. +If `x`: +- errors, return an `AsyncTask` that resolves to the error. +- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`. +-/ +@[inline] +protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β := + Task.bind x fun r => + match r with + | .ok a => f a + | .error e => Task.pure <| .error e + +/-- +Create a new `AsyncTask` that will run after `x` has finished. +If `x`: +- errors, return an `AsyncTask` that reolves to the error. +- succeeds, return an `AsyncTask` that resolves to `f x`. +-/ +@[inline] +def map (f : α → β) (x : AsyncTask α) : AsyncTask β := + Task.map (x := x) fun r => + match r with + | .ok a => .ok (f a) + | .error e => .error e + +/-- +Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned +`AsyncTask` resolves to that error. +-/ +@[inline] +def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) := + IO.bindTask x fun r => + match r with + | .ok a => f a + | .error e => .error e + +/-- +Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned +`AsyncTask` resolves to that error. +-/ +@[inline] +def mapIO (f : α → IO β) (x : AsyncTask α) : BaseIO (AsyncTask β) := + IO.mapTask (t := x) fun r => + match r with + | .ok a => f a + | .error e => .error e + +/-- +Block until the `AsyncTask` in `x` finishes. +-/ +def block (x : AsyncTask α) : IO α := do + let res := x.get + match res with + | .ok a => return a + | .error e => .error e + +/-- +Create an `AsyncTask` that resolves to the value of `x`. +-/ +@[inline] +def ofPromise (x : IO.Promise (Except IO.Error α)) : AsyncTask α := + x.result + +/-- +Create an `AsyncTask` that resolves to the value of `x`. +-/ +@[inline] +def ofPurePromise (x : IO.Promise α) : AsyncTask α := + x.result.map pure + +/-- +Obtain the `IO.TaskState` of `x`. +-/ +@[inline] +def getState (x : AsyncTask α) : BaseIO IO.TaskState := + IO.getTaskState x + +end AsyncTask + +end Async +end IO +end Internal +end Std diff --git a/src/Std/Internal/Async/Timer.lean b/src/Std/Internal/Async/Timer.lean new file mode 100644 index 000000000000..8a6133658e99 --- /dev/null +++ b/src/Std/Internal/Async/Timer.lean @@ -0,0 +1,139 @@ +/- +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Henrik Böving +-/ +prelude +import Std.Time +import Std.Internal.UV +import Std.Internal.Async.Basic + + +namespace Std +namespace Internal +namespace IO +namespace Async + +/-- +`Sleep` can be used to sleep for some duration once. +The underlying timer has millisecond resolution. +-/ +structure Sleep where + private ofNative :: + native : Internal.UV.Timer + +namespace Sleep + +/-- +Set up a `Sleep` that waits for `duration` milliseconds. +This function only initializes but does not yet start the timer. +-/ +@[inline] +def mk (duration : Std.Time.Millisecond.Offset) : IO Sleep := do + let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 false + return ofNative native + +/-- +If: +- `s` is not yet running start it and return an `AsyncTask` that will resolve once the previously + configured `duration` has run out. +- `s` is already or not anymore running return the same `AsyncTask` as the first call to `wait`. +-/ +@[inline] +def wait (s : Sleep) : IO (AsyncTask Unit) := do + let promise ← s.native.next + return .ofPurePromise promise + +/-- +If: +- `s` is still running the timer restarts counting from now and finishes after `duration` + milliseconds. +- `s` is not yet or not anymore running this is a no-op. +-/ +@[inline] +def reset (s : Sleep) : IO Unit := + s.native.reset + +/-- +If: +- `s` is still running this stops `s` without resolving any remaining `AsyncTask`s that were created + through `wait`. Note that if another `AsyncTask` is binding on any of these it is going hang + forever without further intervention. +- `s` is not yet or not anymore running this is a no-op. +-/ +@[inline] +def stop (s : Sleep) : IO Unit := + s.native.stop + +end Sleep + +/-- +Return an `AsyncTask` that resolves after `duration`. +-/ +def sleep (duration : Std.Time.Millisecond.Offset) : IO (AsyncTask Unit) := do + let sleeper ← Sleep.mk duration + sleeper.wait + +/-- +`Interval` can be used to repeatedly wait for some duration like a clock. +The underlying timer has millisecond resolution. +-/ +structure Interval where + private ofNative :: + native : Internal.UV.Timer + + +namespace Interval + +/-- +Setup up an `Interval` that waits for `duration` milliseconds. +This function only initializes but does not yet start the timer. +-/ +@[inline] +def mk (duration : Std.Time.Millisecond.Offset) (_ : 0 < duration := by decide) : IO Interval := do + let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 true + return ofNative native + +/-- +If: +- `i` is not yet running start it and return an `AsyncTask` that resolves right away as the 0th + multiple of `duration` has elapsed. +- `i` is already running and: + - the tick from the last call of `i` has not yet finished return the same `AsyncTask` as the last + call + - the tick frrom the last call of `i` has finished return a new `AsyncTask` that waits for the + closest next tick from the time of calling this function. +- `i` is not running aymore this is a no-op. +-/ +@[inline] +def tick (i : Interval) : IO (AsyncTask Unit) := do + let promise ← i.native.next + return .ofPurePromise promise + +/-- +If: +- `Interval.tick` was called on `i` before the timer restarts counting from now and the next tick + happens in `duration`. +- `i` is not yet or not anymore running this is a no-op. +-/ +@[inline] +def reset (i : Interval) : IO Unit := + i.native.reset + +/-- +If: +- `i` is still running this stops `i` without resolving any remaing `AsyncTask` that were created + through `tick`. Note that if another `AsyncTask` is binding on any of these it is going hang + forever without further intervention. +- `i` is not yet or not anymore running this is a no-op. +-/ +@[inline] +def stop (i : Interval) : IO Unit := + i.native.stop + +end Interval + +end Async +end IO +end Internal +end Std diff --git a/src/Std/Internal/UV.lean b/src/Std/Internal/UV.lean new file mode 100644 index 000000000000..0720747c6a42 --- /dev/null +++ b/src/Std/Internal/UV.lean @@ -0,0 +1,119 @@ +/- +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +prelude +import Init.System.IO +import Init.System.Promise + +namespace Std +namespace Internal +namespace UV + +namespace Loop + +/-- +Options for configuring the event loop behavior. +-/ +structure Loop.Options where + /-- + Accumulate the amount of idle time the event loop spends in the event provider. + -/ + accumulateIdleTime : Bool := False + + /-- + Block a SIGPROF signal when polling for new events. It's commonly used for unnecessary wakeups + when using a sampling profiler. + -/ + blockSigProfSignal : Bool := False + +/-- +Configures the event loop with the specified options. +-/ +@[extern "lean_uv_event_loop_configure"] +opaque configure (options : Loop.Options) : BaseIO Unit + +/-- +Checks if the event loop is still active and processing events. +-/ +@[extern "lean_uv_event_loop_alive"] +opaque alive : BaseIO Bool + +end Loop + +private opaque TimerImpl : NonemptyType.{0} + +/-- +`Timer`s are used to generate `IO.Promise`s that resolve after some time. + +A `Timer` can be in one of 3 states: +- Right after construction it's initial. +- While it is ticking it's running. +- If it has stopped for some reason it's finished. + +This together with whether it was set up as `repeating` with `Timer.new` determines the behavior +of all functions on `Timer`s. +-/ +def Timer : Type := TimerImpl.type + +instance : Nonempty Timer := TimerImpl.property + +namespace Timer + +/-- +This creates a `Timer` in the initial state and doesn't run it yet. +- If `repeating` is `false` this constructs a timer that resolves once after `durationMs` + milliseconds, counting from when it's run. +- If `repeating` is `true` this constructs a timer that resolves after multiples of `durationMs` + milliseconds, counting from when it's run. Note that this includes the 0th multiple right after + starting the timer. Furthermore a repeating timer will only be freed after `Timer.stop` is called. +-/ +@[extern "lean_uv_timer_mk"] +opaque mk (timeout : UInt64) (repeating : Bool) : IO Timer + +/-- +This function has different behavior depending on the state and configuration of the `Timer`: +- if `repeating` is `false` and: + - it is initial, run it and return a new `IO.Promise` that is set to resolve once `durationMs` + milliseconds have elapsed. After this `IO.Promise` is resolved the `Timer` is finished. + - it is running or finished, return the same `IO.Promise` that the first call to `next` returned. +- if `repeating` is `true` and: + - it is initial, run it and return a new `IO.Promise` that resolves right away + (as it is the 0th multiple of `durationMs`). + - it is running, check whether the last returned `IO.Promise` is already resolved: + - If it is, return a new `IO.Promise` that resolves upon finishing the next cycle + - If it is not, return the last `IO.Promise` + This ensures that the returned `IO.Promise` resolves at the next repetition of the timer. + - if it is finished, return the last `IO.Promise` created by `next`. Notably this could be one + that never resolves if the timer was stopped before fulfilling the last one. +-/ +@[extern "lean_uv_timer_next"] +opaque next (timer : @& Timer) : IO (IO.Promise Unit) + +/-- +This function has different behavior depending on the state and configuration of the `Timer`: +- If it is initial or finished this is a no-op. +- If it is running and `repeating` is `false` this will delay the resolution of the timer until + `durationMs` milliseconds after the call of this function. +- Delay the resolution of the next tick of the timer until `durationMs` milliseconds after the + call of this function, then continue normal ticking behavior from there. +-/ +@[extern "lean_uv_timer_reset"] +opaque reset (timer : @& Timer) : IO Unit + +/-- +This function has different behavior depending on the state of the `Timer`: +- If it is initial or finished this is a no-op. +- If it is running the execution of the timer is stopped and it is put into the finished state. + Note that if the last `IO.Promise` generated by `next` is unresolved and being waited + on this creates a memory leak and the waiting task is not going to be awoken anymore. +-/ +@[extern "lean_uv_timer_stop"] +opaque stop (timer : @& Timer) : IO Unit + +end Timer + +end UV +end Internal +end Std diff --git a/src/runtime/CMakeLists.txt b/src/runtime/CMakeLists.txt index d6d8b50b395d..fc1e41f41693 100644 --- a/src/runtime/CMakeLists.txt +++ b/src/runtime/CMakeLists.txt @@ -2,7 +2,8 @@ set(RUNTIME_OBJS debug.cpp thread.cpp mpz.cpp utf8.cpp object.cpp apply.cpp exception.cpp interrupt.cpp memory.cpp stackinfo.cpp compact.cpp init_module.cpp load_dynlib.cpp io.cpp hash.cpp platform.cpp alloc.cpp allocprof.cpp sharecommon.cpp stack_overflow.cpp -process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp) +process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp uv/event_loop.cpp +uv/timer.cpp) add_library(leanrt_initial-exec STATIC ${RUNTIME_OBJS}) set_target_properties(leanrt_initial-exec PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/src/runtime/init_module.cpp b/src/runtime/init_module.cpp index 8bf949d8bd5d..9ae9afb76390 100644 --- a/src/runtime/init_module.cpp +++ b/src/runtime/init_module.cpp @@ -13,6 +13,7 @@ Author: Leonardo de Moura #include "runtime/process.h" #include "runtime/mutex.h" #include "runtime/init_module.h" +#include "runtime/libuv.h" namespace lean { extern "C" LEAN_EXPORT void lean_initialize_runtime_module() { @@ -24,6 +25,7 @@ extern "C" LEAN_EXPORT void lean_initialize_runtime_module() { initialize_mutex(); initialize_process(); initialize_stack_overflow(); + initialize_libuv(); } void initialize_runtime_module() { lean_initialize_runtime_module(); diff --git a/src/runtime/libuv.cpp b/src/runtime/libuv.cpp index 627eb848d3ef..f625c115950f 100644 --- a/src/runtime/libuv.cpp +++ b/src/runtime/libuv.cpp @@ -2,21 +2,36 @@ Copyright (c) 2024 Lean FRO, LLC. All rights reserved. Released under Apache 2.0 license as described in the file LICENSE. -Author: Markus Himmel -*/ +Author: Markus Himmel, Sofia Rodrigues + */ +#include #include "runtime/libuv.h" +#include "runtime/object.h" + +namespace lean { #ifndef LEAN_EMSCRIPTEN #include +extern "C" void initialize_libuv() { + initialize_libuv_timer(); + initialize_libuv_loop(); + + lthread([]() { event_loop_run_loop(&global_ev); }); +} + +/* Lean.libUVVersionFn : Unit → Nat */ extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg o) { return lean_unsigned_to_nat(uv_version()); } #else +extern "C" void initialize_libuv() {} + extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg o) { return lean_box(0); } #endif +} \ No newline at end of file diff --git a/src/runtime/libuv.h b/src/runtime/libuv.h index 4c53786a59d6..94afa55aa2b0 100644 --- a/src/runtime/libuv.h +++ b/src/runtime/libuv.h @@ -2,9 +2,29 @@ Copyright (c) 2024 Lean FRO, LLC. All rights reserved. Released under Apache 2.0 license as described in the file LICENSE. -Author: Markus Himmel +Author: Markus Himmel, Sofia Rodrigues */ #pragma once #include +#include "runtime/uv/event_loop.h" + #include "runtime/uv/timer.h" +#include "runtime/alloc.h" +#include "runtime/io.h" +#include "runtime/utf8.h" +#include "runtime/object.h" +#include "runtime/thread.h" +#include "runtime/allocprof.h" +#include "runtime/object.h" +namespace lean { +#ifndef LEAN_EMSCRIPTEN +#include +#endif + +extern "C" void initialize_libuv(); + +// ======================================= +// General LibUV functions. extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg); + +} \ No newline at end of file diff --git a/src/runtime/object.h b/src/runtime/object.h index f94b32e95482..4f7fcc782a7c 100644 --- a/src/runtime/object.h +++ b/src/runtime/object.h @@ -467,6 +467,11 @@ inline obj_res st_ref_set(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_re inline obj_res st_ref_reset(b_obj_arg r, obj_arg w) { return lean_st_ref_reset(r, w); } inline obj_res st_ref_swap(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_ref_swap(r, v, w); } + +extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg); +extern "C" LEAN_EXPORT obj_res lean_io_promise_resolve(obj_arg value, b_obj_arg promise, obj_arg); +extern "C" LEAN_EXPORT obj_res lean_io_promise_result(obj_arg promise); + // ======================================= // Module initialization/finalization void initialize_object(); diff --git a/src/runtime/uv/event_loop.cpp b/src/runtime/uv/event_loop.cpp new file mode 100644 index 000000000000..0afd59534ee4 --- /dev/null +++ b/src/runtime/uv/event_loop.cpp @@ -0,0 +1,143 @@ +/* +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Author: Sofia Rodrigues, Henrik Böving +*/ +#include "runtime/uv/event_loop.h" + + +/* +This file builds a thread safe event loop on top of the thread unsafe libuv event loop. +We achieve this by always having a `uv_async_t` associated with the libuv event loop. +As `uv_async_t` are a thread safe primitive it is safe to send a notification to it from another +thread. Once this notification arrives the event loop suspends its own execution and unlocks a mutex +that protects it. This mutex can then be taken by another thread that wants to work with the event +loop. After that work is done it signals a condition variable that the event loop is waiting on +to continue its execution. +*/ + +namespace lean { +#ifndef LEAN_EMSCRIPTEN +using namespace std; + +event_loop_t global_ev; + +// Utility function for error checking. This function is only used inside the +// initializition of the event loop. +static void check_uv(int result, const char * msg) { + if (result != 0) { + std::string err_message = std::string(msg) + ": " + uv_strerror(result); + lean_internal_panic(err_message.c_str()); + } +} + +// The callback that stops the loop when it's called. +void async_callback(uv_async_t * handle) { + uv_stop(handle->loop); +} + +// Interrupts the event loop and stops it so it can receive future requests. +void event_loop_interrupt(event_loop_t * event_loop) { + int result = uv_async_send(&event_loop->async); + (void)result; + lean_assert(result == 0); +} + +// Initializes the event loop +void event_loop_init(event_loop_t * event_loop) { + event_loop->loop = uv_default_loop(); + check_uv(uv_mutex_init_recursive(&event_loop->mutex), "Failed to initialize mutex"); + check_uv(uv_cond_init(&event_loop->cond_var), "Failed to initialize condition variable"); + check_uv(uv_async_init(event_loop->loop, &event_loop->async, NULL), "Failed to initialize async"); + event_loop->n_waiters = 0; +} + +// Locks the event loop for the side of the requesters. +void event_loop_lock(event_loop_t * event_loop) { + if (uv_mutex_trylock(&event_loop->mutex) != 0) { + event_loop->n_waiters++; + event_loop_interrupt(event_loop); + uv_mutex_lock(&event_loop->mutex); + event_loop->n_waiters--; + } +} + +// Unlock event loop +void event_loop_unlock(event_loop_t * event_loop) { + if (event_loop->n_waiters == 0) { + uv_cond_signal(&event_loop->cond_var); + } + uv_mutex_unlock(&event_loop->mutex); +} + +// Runs the loop and stops when it needs to register new requests. +void event_loop_run_loop(event_loop_t * event_loop) { + while (uv_loop_alive(event_loop->loop)) { + uv_mutex_lock(&event_loop->mutex); + + while (event_loop->n_waiters != 0) { + uv_cond_wait(&event_loop->cond_var, &event_loop->mutex); + } + + uv_run(event_loop->loop, UV_RUN_ONCE); + /* + * We leave `uv_run` only when `uv_stop` is called as there is always the `uv_async_t` so + * we can never run out of things to wait on. `uv_stop` is only called from `async_callback` + * when another thread wants to work with the event loop so we need to give up the mutex. + */ + + uv_mutex_unlock(&event_loop->mutex); + } +} + +/* Std.Internal.UV.Loop.configure (options : Loop.Options) : BaseIO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_configure(b_obj_arg options, obj_arg /* w */ ) { + bool accum = lean_ctor_get_uint8(options, 0); + bool block = lean_ctor_get_uint8(options, 1); + + event_loop_lock(&global_ev); + + if (accum && uv_loop_configure(global_ev.loop, UV_METRICS_IDLE_TIME) != 0) { + return io_result_mk_error("failed to configure global_ev.loop with UV_METRICS_IDLE_TIME"); + } + + #if!defined(WIN32) && !defined(_WIN32) + if (block && uv_loop_configure(global_ev.loop, UV_LOOP_BLOCK_SIGNAL, SIGPROF) != 0) { + return io_result_mk_error("failed to configure global_ev.loop with UV_LOOP_BLOCK_SIGNAL"); + } + #endif + + event_loop_unlock(&global_ev); + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.Loop.alive : BaseIO UInt64 */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ) { + event_loop_lock(&global_ev); + int is_alive = uv_loop_alive(global_ev.loop); + event_loop_unlock(&global_ev); + + return lean_io_result_mk_ok(lean_box(is_alive)); +} + +void initialize_libuv_loop() { + event_loop_init(&global_ev); +} + +#else + +/* Std.Internal.UV.Loop.configure (options : Loop.Options) : BaseIO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_configure(b_obj_arg options, obj_arg /* w */ ) { + return io_result_mk_error("lean_uv_event_loop_configure is not supported"); +} + +/* Std.Internal.UV.Loop.alive : BaseIO UInt64 */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ) { + return io_result_mk_error("lean_uv_event_loop_alive is not supported"); +} + +#endif + +} \ No newline at end of file diff --git a/src/runtime/uv/event_loop.h b/src/runtime/uv/event_loop.h new file mode 100644 index 000000000000..42dba397e3fa --- /dev/null +++ b/src/runtime/uv/event_loop.h @@ -0,0 +1,47 @@ +/* +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Author: Sofia Rodrigues +*/ +#pragma once +#include +#include "runtime/io.h" +#include "runtime/object.h" + +namespace lean { + +void initialize_libuv_loop(); + +#ifndef LEAN_EMSCRIPTEN +using namespace std; +#include + +// Event loop structure for managing asynchronous events and synchronization across multiple threads. +typedef struct { + uv_loop_t * loop; // The libuv event loop. + uv_mutex_t mutex; // Mutex for protecting `loop`. + uv_cond_t cond_var; // Condition variable for signaling that `loop` is free. + uv_async_t async; // Async handle to interrupt `loop`. + _Atomic(int) n_waiters; // Atomic counter for managing waiters for `loop`. +} event_loop_t; + +// The multithreaded event loop object for all tasks in the task manager. +extern event_loop_t global_ev; + +// ======================================= +// Event loop manipulation functions. +void event_loop_init(event_loop_t *event_loop); +void event_loop_cleanup(event_loop_t *event_loop); +void event_loop_lock(event_loop_t *event_loop); +void event_loop_unlock(event_loop_t *event_loop); +void event_loop_run_loop(event_loop_t *event_loop); + +#endif + +// ======================================= +// Global event loop manipulation functions +extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_configure(b_obj_arg options, obj_arg /* w */ ); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ); + +} diff --git a/src/runtime/uv/timer.cpp b/src/runtime/uv/timer.cpp new file mode 100644 index 000000000000..c34443c08b94 --- /dev/null +++ b/src/runtime/uv/timer.cpp @@ -0,0 +1,254 @@ +/* +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Author: Sofia Rodrigues, Henrik Böving +*/ +#include "runtime/uv/timer.h" + +namespace lean { +#ifndef LEAN_EMSCRIPTEN + +using namespace std; + +// The finalizer of the `Timer`. +void lean_uv_timer_finalizer(void* ptr) { + lean_uv_timer_object * timer = (lean_uv_timer_object*) ptr; + + if (timer->m_promise != NULL) { + lean_dec(timer->m_promise); + } + + event_loop_lock(&global_ev); + + uv_close((uv_handle_t*)timer->m_uv_timer, [](uv_handle_t* handle) { + free(handle); + }); + + event_loop_unlock(&global_ev); + + free(timer); +} + +void initialize_libuv_timer() { + g_uv_timer_external_class = lean_register_external_class(lean_uv_timer_finalizer, [](void* obj, lean_object* f) { + if (((lean_uv_timer_object*)obj)->m_promise != NULL) { + lean_inc(f); + lean_apply_1(f, ((lean_uv_timer_object*)obj)->m_promise); + } + }); +} + +void handle_timer_event(uv_timer_t* handle) { + lean_object * obj = (lean_object*)handle->data; + lean_uv_timer_object * timer = lean_to_uv_timer(obj); + // handle_timer_event may only be called while the timer is running, this means the promise must + // not be NULL. + lean_assert(timer->m_state == TIMER_STATE_RUNNING); + lean_assert(timer->m_promise != NULL); + + if (timer->m_repeating) { + if (lean_io_get_task_state_core(timer->m_promise) != 2) { + lean_object* res = lean_io_promise_resolve(lean_box(0), timer->m_promise, lean_io_mk_world()); + lean_dec(res); + } + } else { + lean_assert(lean_io_get_task_state_core(timer->m_promise) != 2); + uv_timer_stop(timer->m_uv_timer); + timer->m_state = TIMER_STATE_FINISHED; + + lean_object* res = lean_io_promise_resolve(lean_box(0), timer->m_promise, lean_io_mk_world()); + lean_dec(res); + + // The loop does not need to keep the timer alive anymore. + lean_dec(obj); + } +} + +/* Std.Internal.UV.Timer.mk (timeout : UInt64) (repeating : Bool) : IO Timer */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_mk(uint64_t timeout, uint8_t repeating, obj_arg /* w */) { + lean_uv_timer_object * timer = (lean_uv_timer_object*)malloc(sizeof(lean_uv_timer_object)); + timer->m_timeout = timeout; + timer->m_repeating = repeating; + timer->m_state = TIMER_STATE_INITIAL; + timer->m_promise = NULL; + + uv_timer_t * uv_timer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); + + event_loop_lock(&global_ev); + int result = uv_timer_init(global_ev.loop, uv_timer); + event_loop_unlock(&global_ev); + + if (result != 0) { + free(uv_timer); + free(timer); + std::string err = std::string("failed to initialize timer: ") + uv_strerror(result); + return io_result_mk_error(err.c_str()); + } + + timer->m_uv_timer = uv_timer; + + lean_object * obj = lean_uv_timer_new(timer); + lean_mark_mt(obj); + timer->m_uv_timer->data = obj; + + return lean_io_result_mk_ok(obj); +} + +/* Std.Internal.UV.Timer.next (timer : @& Timer) : IO (IO.Promise Unit) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_next(b_obj_arg obj, obj_arg /* w */ ) { + lean_uv_timer_object * timer = lean_to_uv_timer(obj); + + auto create_promise = []() { + lean_object * prom_res = lean_io_promise_new(lean_io_mk_world()); + lean_object * promise = lean_ctor_get(prom_res, 0); + lean_inc(promise); + lean_dec(prom_res); + + return promise; + }; + + auto setup_timer = [create_promise, obj, timer]() { + lean_assert(timer->m_promise == NULL); + timer->m_promise = create_promise(); + timer->m_state = TIMER_STATE_RUNNING; + + // The event loop must keep the timer alive for the duration of the run time. + lean_inc(obj); + + event_loop_lock(&global_ev); + + int result = uv_timer_start( + timer->m_uv_timer, + handle_timer_event, + timer->m_repeating ? 0 : timer->m_timeout, + timer->m_repeating ? timer->m_timeout : 0 + ); + + event_loop_unlock(&global_ev); + + if (result != 0) { + lean_dec(obj); + std::string err = std::string("failed to initialize timer: ") + uv_strerror(result); + return io_result_mk_error(err.c_str()); + } else { + lean_inc(timer->m_promise); + return lean_io_result_mk_ok(timer->m_promise); + } + }; + + if (timer->m_repeating) { + switch (timer->m_state) { + case TIMER_STATE_INITIAL: + { + return setup_timer(); + } + case TIMER_STATE_RUNNING: + { + lean_assert(timer->m_promise != NULL); + // 2 indicates finished + if (lean_io_get_task_state_core(timer->m_promise) == 2) { + lean_dec(timer->m_promise); + timer->m_promise = create_promise(); + lean_inc(timer->m_promise); + return lean_io_result_mk_ok(timer->m_promise); + } else { + lean_inc(timer->m_promise); + return lean_io_result_mk_ok(timer->m_promise); + } + } + case TIMER_STATE_FINISHED: + { + lean_assert(timer->m_promise != NULL); + lean_inc(timer->m_promise); + return lean_io_result_mk_ok(timer->m_promise); + } + } + } else { + if (timer->m_state == TIMER_STATE_INITIAL) { + return setup_timer(); + } else { + lean_assert(timer->m_promise != NULL); + + lean_inc(timer->m_promise); + return lean_io_result_mk_ok(timer->m_promise); + } + } +} + +/* Std.Internal.UV.Timer.reset (timer : @& Timer) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_reset(b_obj_arg obj, obj_arg /* w */ ) { + lean_uv_timer_object * timer = lean_to_uv_timer(obj); + + if (timer->m_state == TIMER_STATE_RUNNING) { + lean_assert(timer->m_promise != NULL); + + event_loop_lock(&global_ev); + + uv_timer_stop(timer->m_uv_timer); + + int result = uv_timer_start( + timer->m_uv_timer, + handle_timer_event, + timer->m_timeout, + timer->m_repeating ? timer->m_timeout : 0 + ); + + event_loop_unlock(&global_ev); + + if (result != 0) { + return io_result_mk_error("failed to restart uv_timer"); + } else { + return lean_io_result_mk_ok(lean_box(0)); + } + } else { + return lean_io_result_mk_ok(lean_box(0)); + } +} + +/* Std.Internal.UV.Timer.stop (timer : @& Timer) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_stop(b_obj_arg obj, obj_arg /* w */) { + lean_uv_timer_object * timer = lean_to_uv_timer(obj); + + if (timer->m_state == TIMER_STATE_RUNNING) { + lean_assert(timer->m_promise != NULL); + + event_loop_lock(&global_ev); + + uv_timer_stop(timer->m_uv_timer); + + event_loop_unlock(&global_ev); + + timer->m_state = TIMER_STATE_FINISHED; + + // The loop does not need to keep the timer alive anymore. + lean_dec(obj); + + return lean_io_result_mk_ok(lean_box(0)); + } else { + return lean_io_result_mk_ok(lean_box(0)); + } +} + +#else + +void lean_uv_timer_finalizer(void* ptr); + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_mk(uint64_t timeout, uint8_t repeating, obj_arg /* w */) { + return io_result_mk_error("lean_uv_timer_mk is not supported"); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_next(b_obj_arg timer, obj_arg /* w */ ) { + return io_result_mk_error("lean_uv_timer_next is not supported"); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_reset(b_obj_arg timer, obj_arg /* w */ ) { + return io_result_mk_error("lean_uv_timer_reset is not supported"); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_stop(b_obj_arg timer, obj_arg /* w */ ) { + return io_result_mk_error("lean_uv_timer_stop is not supported"); +} + +#endif +} diff --git a/src/runtime/uv/timer.h b/src/runtime/uv/timer.h new file mode 100644 index 000000000000..cda80d34fc70 --- /dev/null +++ b/src/runtime/uv/timer.h @@ -0,0 +1,54 @@ +/* +Copyright (c) 2024 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Author: Sofia Rodrigues, Henrik Böving +*/ +#pragma once +#include +#include "runtime/uv/event_loop.h" + +namespace lean { + +static lean_external_class * g_uv_timer_external_class = NULL; +void initialize_libuv_timer(); + +#ifndef LEAN_EMSCRIPTEN +using namespace std; +#include + +enum uv_timer_state { + TIMER_STATE_INITIAL, + TIMER_STATE_RUNNING, + TIMER_STATE_FINISHED, +}; + +// Structure for managing a single UV timer object, including promise handling, timeout, and +// repeating behavior. +typedef struct { + uv_timer_t * m_uv_timer; // LibUV timer handle. + lean_object * m_promise; // The associated promise for asynchronous results. + uint64_t m_timeout; // Timeout duration in milliseconds. + bool m_repeating; // Flag indicating if the timer is repeating. + uv_timer_state m_state; // The state of the timer. Beyond the API description on the Lean + // side this state has the invariant: + // `m_state != TIMER_STATE_INITIAL` -> `m_promise != NULL` +} lean_uv_timer_object; + +// ======================================= +// Timer object manipulation functions. +static inline lean_object* lean_uv_timer_new(lean_uv_timer_object * s) { return lean_alloc_external(g_uv_timer_external_class, s); } +static inline lean_uv_timer_object* lean_to_uv_timer(lean_object * o) { return (lean_uv_timer_object*)(lean_get_external_data(o)); } + +#else + +// ======================================= +// Timer manipulation functions +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_mk(uint64_t timeout, uint8_t repeating, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_next(b_obj_arg timer, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_reset(b_obj_arg timer, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_stop(b_obj_arg timer, obj_arg /* w */); + +#endif + +} diff --git a/tests/lean/run/async_sleep.lean b/tests/lean/run/async_sleep.lean new file mode 100644 index 000000000000..b8a438515235 --- /dev/null +++ b/tests/lean/run/async_sleep.lean @@ -0,0 +1,219 @@ +import Std.Internal.UV +open Std.Internal.UV + +def assertElapsed (t1 t2 : Nat) (should : Nat) (eps : Nat) : IO Unit := do + let dur := t2 - t1 + if (Int.ofNat dur - Int.ofNat should).natAbs > eps then + throw <| .userError s!"elapsed time was too different, measured {dur}, should: {should}, tolerance: {eps}" + +def assertDuration (should : Nat) (eps : Nat) (x : IO α) : IO α := do + let t1 ← IO.monoMsNow + let res ← x + let t2 ← IO.monoMsNow + assertElapsed t1 t2 should eps + return res + + +def BASE_DURATION : Nat := 1000 + +-- generous tolerance for slow CI systems +def EPS : Nat := 150 + +def await (x : Task α) : IO α := pure x.get + +namespace SleepTest + +def oneShotSleep : IO Unit := do + assertDuration BASE_DURATION EPS do + let timer ← Timer.mk BASE_DURATION.toUInt64 false + let p ← timer.next + await p.result + +def promiseBehavior1 : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 false + let p ← timer.next + let r := p.result + assert! (← IO.getTaskState r) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState r) == .finished + +def promiseBehavior2 : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 false + let p1 ← timer.next + let p2 ← timer.next + assert! (← IO.getTaskState p1.result) != .finished + assert! (← IO.getTaskState p2.result) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + assert! (← IO.getTaskState p2.result) == .finished + +def promiseBehavior3 : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 false + let p1 ← timer.next + assert! (← IO.getTaskState p1.result) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + let p3 ← timer.next + assert! (← IO.getTaskState p3.result) == .finished + +def resetBehavior : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 false + let p ← timer.next + assert! (← IO.getTaskState p.result) != .finished + + IO.sleep (BASE_DURATION / 2).toUInt32 + assert! (← IO.getTaskState p.result) != .finished + timer.reset + + IO.sleep (BASE_DURATION / 2).toUInt32 + assert! (← IO.getTaskState p.result) != .finished + + IO.sleep ((BASE_DURATION / 2) + EPS).toUInt32 + assert! (← IO.getTaskState p.result) == .finished + +#eval oneShotSleep +#eval promiseBehavior1 +#eval promiseBehavior2 +#eval promiseBehavior3 +#eval resetBehavior +#eval oneShotSleep + +end SleepTest + +namespace IntervalTest + +def sleepFirst : IO Unit := do + assertDuration 0 EPS go +where + go : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let prom ← timer.next + await prom.result + timer.stop + +def sleepSecond : IO Unit := do + discard <| assertDuration BASE_DURATION EPS go +where + go : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + + let task ← + IO.bindTask (← timer.next).result fun _ => do + IO.bindTask (← timer.next).result fun _ => pure (Task.pure (.ok 2)) + + discard <| await task + timer.stop + +def promiseBehavior1 : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let p1 ← timer.next + IO.sleep EPS.toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + let p2 ← timer.next + assert! (← IO.getTaskState p2.result) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState p2.result) == .finished + timer.stop + +def promiseBehavior2 : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let p1 ← timer.next + IO.sleep EPS.toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + + let prom1 ← timer.next + let prom2 ← timer.next + assert! (← IO.getTaskState prom1.result) != .finished + assert! (← IO.getTaskState prom2.result) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState prom1.result) == .finished + assert! (← IO.getTaskState prom2.result) == .finished + timer.stop + +def promiseBehavior3 : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let p1 ← timer.next + IO.sleep EPS.toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + + let prom1 ← timer.next + assert! (← IO.getTaskState prom1.result) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState prom1.result) == .finished + let prom2 ← timer.next + assert! (← IO.getTaskState prom2.result) != .finished + IO.sleep (BASE_DURATION + EPS).toUInt32 + assert! (← IO.getTaskState prom2.result) == .finished + timer.stop + +def delayedTickBehavior : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let p1 ← timer.next + IO.sleep EPS.toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + + IO.sleep (BASE_DURATION / 2).toUInt32 + let p2 ← timer.next + assert! (← IO.getTaskState p2.result) != .finished + IO.sleep ((BASE_DURATION / 2) + EPS).toUInt32 + assert! (← IO.getTaskState p2.result) == .finished + timer.stop + +def skippedTickBehavior : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let p1 ← timer.next + IO.sleep EPS.toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + + IO.sleep (2 * BASE_DURATION + BASE_DURATION / 2).toUInt32 + let p2 ← timer.next + assert! (← IO.getTaskState p2.result) != .finished + IO.sleep ((BASE_DURATION / 2) + EPS).toUInt32 + assert! (← IO.getTaskState p2.result) == .finished + timer.stop + +def resetBehavior : IO Unit := do + let timer ← Timer.mk BASE_DURATION.toUInt64 true + let p1 ← timer.next + IO.sleep EPS.toUInt32 + assert! (← IO.getTaskState p1.result) == .finished + + let prom ← timer.next + assert! (← IO.getTaskState prom.result) != .finished + + IO.sleep (BASE_DURATION / 2).toUInt32 + assert! (← IO.getTaskState prom.result) != .finished + timer.reset + + IO.sleep (BASE_DURATION / 2).toUInt32 + assert! (← IO.getTaskState prom.result) != .finished + + IO.sleep ((BASE_DURATION / 2) + EPS).toUInt32 + assert! (← IO.getTaskState prom.result) == .finished + timer.stop + +def sequentialSleep : IO Unit := do + discard <| assertDuration BASE_DURATION EPS go +where + go : IO Unit := do + let timer ← Timer.mk (BASE_DURATION / 2).toUInt64 true + -- 0th interval ticks instantly + let task ← + IO.bindTask (← timer.next).result fun _ => do + IO.bindTask (← timer.next).result fun _ => do + IO.bindTask (← timer.next).result fun _ => pure (Task.pure (.ok 2)) + + discard <| await task + timer.stop + +#eval sleepFirst +#eval sleepSecond +#eval promiseBehavior1 +#eval promiseBehavior2 +#eval promiseBehavior3 +#eval delayedTickBehavior +#eval skippedTickBehavior +#eval resetBehavior +#eval sequentialSleep + +end IntervalTest diff --git a/tests/lean/run/async_surface_sleep.lean b/tests/lean/run/async_surface_sleep.lean new file mode 100644 index 000000000000..b548fd058f2c --- /dev/null +++ b/tests/lean/run/async_surface_sleep.lean @@ -0,0 +1,99 @@ +import Std.Internal.Async.Timer + +/- +these tests are just some preliminary ones as `async_sleep.lean` already contains extensive tests +for the entire timer state machine and `Async.Timer` is merely a light wrapper around it. +-/ + +open Std.Internal.IO.Async + +def BASE_DURATION : Std.Time.Millisecond.Offset := 10 + +namespace SleepTest + +def oneSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + let sleep ← Sleep.mk BASE_DURATION + (← sleep.wait).mapIO fun _ => + return 37 + +def doubleSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + let sleep ← Sleep.mk BASE_DURATION + (← sleep.wait).bindIO fun _ => do + (← sleep.wait).mapIO fun _ => + return 37 + +def resetSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + let sleep ← Sleep.mk BASE_DURATION + let waiter ← sleep.wait + sleep.reset + waiter.mapIO fun _ => + return 37 + +def simpleSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + (← sleep BASE_DURATION).mapIO fun _ => + return 37 + +#eval oneSleep +#eval doubleSleep +#eval resetSleep +#eval simpleSleep + +end SleepTest + +namespace IntervalTest + +def oneSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + let interval ← Interval.mk BASE_DURATION + (← interval.tick).mapIO fun _ => do + interval.stop + return 37 + +def doubleSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + let interval ← Interval.mk BASE_DURATION + (← interval.tick).bindIO fun _ => do + (← interval.tick).mapIO fun _ => do + interval.stop + return 37 + +def resetSleep : IO Unit := do + let task ← go + assert! (← task.block) == 37 +where + go : IO (AsyncTask Nat) := do + let interval ← Interval.mk BASE_DURATION + (← interval.tick).bindIO fun _ => do + let waiter ← interval.tick + interval.reset + waiter.mapIO fun _ => do + interval.stop + return 37 + +#eval oneSleep +#eval doubleSleep +#eval resetSleep + +end IntervalTest