Skip to content

Commit

Permalink
feat: add async timer primitives based on libuv (#6219)
Browse files Browse the repository at this point in the history
This PR adds support for `Timer` and a event loop thread that gets
requests from another threads and executes it.

---------

Co-authored-by: Markus Himmel <[email protected]>
Co-authored-by: Henrik Böving <[email protected]>
  • Loading branch information
3 people committed Jan 10, 2025
1 parent 50cc599 commit cd71bf0
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 4 deletions.
72 changes: 72 additions & 0 deletions src/Std/Internal/UV.lean
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,78 @@ opaque alive : BaseIO Bool

end Loop

private opaque TimerImpl : NonemptyType.{0}

/--
`Timer`s are used to generate `IO.Promise`s that resolve after some time.
A `Timer` can be in one of 3 states:
- Right after construction it's initial.
- While it is ticking it's running.
- If it has stopped for some reason it's finished.
This together with whether it was set up as `repeating` with `Timer.new` determines the behavior
of all functions on `Timer`s.
-/
def Timer : Type := TimerImpl.type

instance : Nonempty Timer := TimerImpl.property

namespace Timer

/--
This creates a `Timer` in the initial state and doesn't run it yet.
- If `repeating` is `false` this constructs a timer that resolves once after `durationMs`
milliseconds, counting from when it's run.
- If `repeating` is `true` this constructs a timer that resolves after multiples of `durationMs`
milliseconds, counting from when it's run. Note that this includes the 0th multiple right after
starting the timer. Furthermore a repeating timer will only be freed after `Timer.stop` is called.
-/
@[extern "lean_uv_timer_mk"]
opaque mk (timeout : UInt64) (repeating : Bool) : IO Timer

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- if `repeating` is `false` and:
- it is initial, run it and return a new `IO.Promise` that is set to resolve once `durationMs`
milliseconds have elapsed. After this `IO.Promise` is resolved the `Timer` is finished.
- it is running or finished, return the same `IO.Promise` that the first call to `next` returned.
- if `repeating` is `true` and:
- it is initial, run it and return a new `IO.Promise` that resolves right away
(as it is the 0th multiple of `durationMs`).
- it is running, check whether the last returned `IO.Promise` is already resolved:
- If it is, return a new `IO.Promise` that resolves upon finishing the next cycle
- If it is not, return the last `IO.Promise`
This ensures that the returned `IO.Promise` resolves at the next repetition of the timer.
- if it is finished, return the last `IO.Promise` created by `next`. Notably this could be one
that never resolves if the timer was stopped before fulfilling the last one.
-/
@[extern "lean_uv_timer_next"]
opaque next (timer : @& Timer) : IO (IO.Promise Unit)

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running and `repeating` is `false` this will delay the resolution of the timer until
`durationMs` milliseconds after the call of this function.
- Delay the resolution of the next tick of the timer until `durationMs` milliseconds after the
call of this function, then continue normal ticking behavior from there.
-/
@[extern "lean_uv_timer_reset"]
opaque reset (timer : @& Timer) : IO Unit

/--
This function has different behavior depending on the state of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running the execution of the timer is stopped and it is put into the finished state.
Note that if the last `IO.Promise` generated by `next` is unresolved and being waited
on this creates a memory leak and the waiting task is not going to be awoken anymore.
-/
@[extern "lean_uv_timer_stop"]
opaque stop (timer : @& Timer) : IO Unit

end Timer

end UV
end Internal
end Std
3 changes: 2 additions & 1 deletion src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ set(RUNTIME_OBJS debug.cpp thread.cpp mpz.cpp utf8.cpp
object.cpp apply.cpp exception.cpp interrupt.cpp memory.cpp
stackinfo.cpp compact.cpp init_module.cpp load_dynlib.cpp io.cpp hash.cpp
platform.cpp alloc.cpp allocprof.cpp sharecommon.cpp stack_overflow.cpp
process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp uv/event_loop.cpp)
process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp uv/event_loop.cpp
uv/timer.cpp)
add_library(leanrt_initial-exec STATIC ${RUNTIME_OBJS})
set_target_properties(leanrt_initial-exec PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/libuv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace lean {
#include <uv.h>

extern "C" void initialize_libuv() {
initialize_libuv_timer();
initialize_libuv_loop();

lthread([]() { event_loop_run_loop(&global_ev); });
Expand All @@ -33,4 +34,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg o) {
}

#endif
}
}
3 changes: 2 additions & 1 deletion src/runtime/libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Author: Markus Himmel, Sofia Rodrigues
#pragma once
#include <lean/lean.h>
#include "runtime/uv/event_loop.h"
#include "runtime/uv/timer.h"
#include "runtime/alloc.h"
#include "runtime/io.h"
#include "runtime/utf8.h"
Expand All @@ -26,4 +27,4 @@ extern "C" void initialize_libuv();
// General LibUV functions.
extern "C" LEAN_EXPORT lean_obj_res lean_libuv_version(lean_obj_arg);

}
}
5 changes: 5 additions & 0 deletions src/runtime/object.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ inline obj_res st_ref_set(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_re
inline obj_res st_ref_reset(b_obj_arg r, obj_arg w) { return lean_st_ref_reset(r, w); }
inline obj_res st_ref_swap(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_ref_swap(r, v, w); }


extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg);
extern "C" LEAN_EXPORT obj_res lean_io_promise_resolve(obj_arg value, b_obj_arg promise, obj_arg);
extern "C" LEAN_EXPORT obj_res lean_io_promise_result(obj_arg promise);

// =======================================
// Module initialization/finalization
void initialize_object();
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/uv/event_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ) {

#endif

}
}
254 changes: 254 additions & 0 deletions src/runtime/uv/timer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Author: Sofia Rodrigues, Henrik Böving
*/
#include "runtime/uv/timer.h"

namespace lean {
#ifndef LEAN_EMSCRIPTEN

using namespace std;

// The finalizer of the `Timer`.
void lean_uv_timer_finalizer(void* ptr) {
lean_uv_timer_object * timer = (lean_uv_timer_object*) ptr;

if (timer->m_promise != NULL) {
lean_dec(timer->m_promise);
}

event_loop_lock(&global_ev);

uv_close((uv_handle_t*)timer->m_uv_timer, [](uv_handle_t* handle) {
free(handle);
});

event_loop_unlock(&global_ev);

free(timer);
}

void initialize_libuv_timer() {
g_uv_timer_external_class = lean_register_external_class(lean_uv_timer_finalizer, [](void* obj, lean_object* f) {
if (((lean_uv_timer_object*)obj)->m_promise != NULL) {
lean_inc(f);
lean_apply_1(f, ((lean_uv_timer_object*)obj)->m_promise);
}
});
}

void handle_timer_event(uv_timer_t* handle) {
lean_object * obj = (lean_object*)handle->data;
lean_uv_timer_object * timer = lean_to_uv_timer(obj);
// handle_timer_event may only be called while the timer is running, this means the promise must
// not be NULL.
lean_assert(timer->m_state == TIMER_STATE_RUNNING);
lean_assert(timer->m_promise != NULL);

if (timer->m_repeating) {
if (lean_io_get_task_state_core(timer->m_promise) != 2) {
lean_object* res = lean_io_promise_resolve(lean_box(0), timer->m_promise, lean_io_mk_world());
lean_dec(res);
}
} else {
lean_assert(lean_io_get_task_state_core(timer->m_promise) != 2);
uv_timer_stop(timer->m_uv_timer);
timer->m_state = TIMER_STATE_FINISHED;

lean_object* res = lean_io_promise_resolve(lean_box(0), timer->m_promise, lean_io_mk_world());
lean_dec(res);

// The loop does not need to keep the timer alive anymore.
lean_dec(obj);
}
}

/* Std.Internal.UV.Timer.mk (timeout : UInt64) (repeating : Bool) : IO Timer */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_mk(uint64_t timeout, uint8_t repeating, obj_arg /* w */) {
lean_uv_timer_object * timer = (lean_uv_timer_object*)malloc(sizeof(lean_uv_timer_object));
timer->m_timeout = timeout;
timer->m_repeating = repeating;
timer->m_state = TIMER_STATE_INITIAL;
timer->m_promise = NULL;

uv_timer_t * uv_timer = (uv_timer_t*)malloc(sizeof(uv_timer_t));

event_loop_lock(&global_ev);
int result = uv_timer_init(global_ev.loop, uv_timer);
event_loop_unlock(&global_ev);

if (result != 0) {
free(uv_timer);
free(timer);
std::string err = std::string("failed to initialize timer: ") + uv_strerror(result);
return io_result_mk_error(err.c_str());
}

timer->m_uv_timer = uv_timer;

lean_object * obj = lean_uv_timer_new(timer);
lean_mark_mt(obj);
timer->m_uv_timer->data = obj;

return lean_io_result_mk_ok(obj);
}

/* Std.Internal.UV.Timer.next (timer : @& Timer) : IO (IO.Promise Unit) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_next(b_obj_arg obj, obj_arg /* w */ ) {
lean_uv_timer_object * timer = lean_to_uv_timer(obj);

auto create_promise = []() {
lean_object * prom_res = lean_io_promise_new(lean_io_mk_world());
lean_object * promise = lean_ctor_get(prom_res, 0);
lean_inc(promise);
lean_dec(prom_res);

return promise;
};

auto setup_timer = [create_promise, obj, timer]() {
lean_assert(timer->m_promise == NULL);
timer->m_promise = create_promise();
timer->m_state = TIMER_STATE_RUNNING;

// The event loop must keep the timer alive for the duration of the run time.
lean_inc(obj);

event_loop_lock(&global_ev);

int result = uv_timer_start(
timer->m_uv_timer,
handle_timer_event,
timer->m_repeating ? 0 : timer->m_timeout,
timer->m_repeating ? timer->m_timeout : 0
);

event_loop_unlock(&global_ev);

if (result != 0) {
lean_dec(obj);
std::string err = std::string("failed to initialize timer: ") + uv_strerror(result);
return io_result_mk_error(err.c_str());
} else {
lean_inc(timer->m_promise);
return lean_io_result_mk_ok(timer->m_promise);
}
};

if (timer->m_repeating) {
switch (timer->m_state) {
case TIMER_STATE_INITIAL:
{
return setup_timer();
}
case TIMER_STATE_RUNNING:
{
lean_assert(timer->m_promise != NULL);
// 2 indicates finished
if (lean_io_get_task_state_core(timer->m_promise) == 2) {
lean_dec(timer->m_promise);
timer->m_promise = create_promise();
lean_inc(timer->m_promise);
return lean_io_result_mk_ok(timer->m_promise);
} else {
lean_inc(timer->m_promise);
return lean_io_result_mk_ok(timer->m_promise);
}
}
case TIMER_STATE_FINISHED:
{
lean_assert(timer->m_promise != NULL);
lean_inc(timer->m_promise);
return lean_io_result_mk_ok(timer->m_promise);
}
}
} else {
if (timer->m_state == TIMER_STATE_INITIAL) {
return setup_timer();
} else {
lean_assert(timer->m_promise != NULL);

lean_inc(timer->m_promise);
return lean_io_result_mk_ok(timer->m_promise);
}
}
}

/* Std.Internal.UV.Timer.reset (timer : @& Timer) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_reset(b_obj_arg obj, obj_arg /* w */ ) {
lean_uv_timer_object * timer = lean_to_uv_timer(obj);

if (timer->m_state == TIMER_STATE_RUNNING) {
lean_assert(timer->m_promise != NULL);

event_loop_lock(&global_ev);

uv_timer_stop(timer->m_uv_timer);

int result = uv_timer_start(
timer->m_uv_timer,
handle_timer_event,
timer->m_timeout,
timer->m_repeating ? timer->m_timeout : 0
);

event_loop_unlock(&global_ev);

if (result != 0) {
return io_result_mk_error("failed to restart uv_timer");
} else {
return lean_io_result_mk_ok(lean_box(0));
}
} else {
return lean_io_result_mk_ok(lean_box(0));
}
}

/* Std.Internal.UV.Timer.stop (timer : @& Timer) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_stop(b_obj_arg obj, obj_arg /* w */) {
lean_uv_timer_object * timer = lean_to_uv_timer(obj);

if (timer->m_state == TIMER_STATE_RUNNING) {
lean_assert(timer->m_promise != NULL);

event_loop_lock(&global_ev);

uv_timer_stop(timer->m_uv_timer);

event_loop_unlock(&global_ev);

timer->m_state = TIMER_STATE_FINISHED;

// The loop does not need to keep the timer alive anymore.
lean_dec(obj);

return lean_io_result_mk_ok(lean_box(0));
} else {
return lean_io_result_mk_ok(lean_box(0));
}
}

#else

void lean_uv_timer_finalizer(void* ptr);

extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_mk(uint64_t timeout, uint8_t repeating, obj_arg /* w */) {
return io_result_mk_error("lean_uv_timer_mk is not supported");
}

extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_next(b_obj_arg timer, obj_arg /* w */ ) {
return io_result_mk_error("lean_uv_timer_next is not supported");
}

extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_reset(b_obj_arg timer, obj_arg /* w */ ) {
return io_result_mk_error("lean_uv_timer_reset is not supported");
}

extern "C" LEAN_EXPORT lean_obj_res lean_uv_timer_stop(b_obj_arg timer, obj_arg /* w */ ) {
return io_result_mk_error("lean_uv_timer_stop is not supported");
}

#endif
}
Loading

0 comments on commit cd71bf0

Please sign in to comment.