Skip to content

Commit

Permalink
feat: implement basic async IO with timers (#6505)
Browse files Browse the repository at this point in the history
This PR implements a basic async framework as well as asynchronously
running timers using libuv.

---------

Co-authored-by: Sofia Rodrigues <[email protected]>
Co-authored-by: Markus Himmel <[email protected]>
Co-authored-by: Markus Himmel <[email protected]>
  • Loading branch information
4 people authored Jan 13, 2025
1 parent 30ba383 commit e6a6437
Show file tree
Hide file tree
Showing 16 changed files with 1,246 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/Std/Internal.lean
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Internal.Async
import Std.Internal.Parsec
import Std.Internal.UV

/-!
This directory is used for components of the standard library that are either considered
Expand Down
8 changes: 8 additions & 0 deletions src/Std/Internal/Async.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Internal.Async.Basic
import Std.Internal.Async.Timer
115 changes: 115 additions & 0 deletions src/Std/Internal/Async/Basic.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Init.Core
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace IO
namespace Async

/--
A `Task` that may resolve to a value or an `IO.Error`.
-/
def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α)

namespace AsyncTask

/--
Construct an `AsyncTask` that is already resolved with value `x`.
-/
@[inline]
protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x

instance : Pure AsyncTask where
pure := AsyncTask.pure

/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that resolves to the error.
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
-/
@[inline]
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β :=
Task.bind x fun r =>
match r with
| .ok a => f a
| .error e => Task.pure <| .error e

/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that reolves to the error.
- succeeds, return an `AsyncTask` that resolves to `f x`.
-/
@[inline]
def map (f : α → β) (x : AsyncTask α) : AsyncTask β :=
Task.map (x := x) fun r =>
match r with
| .ok a => .ok (f a)
| .error e => .error e

/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) :=
IO.bindTask x fun r =>
match r with
| .ok a => f a
| .error e => .error e

/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def mapIO (f : α → IO β) (x : AsyncTask α) : BaseIO (AsyncTask β) :=
IO.mapTask (t := x) fun r =>
match r with
| .ok a => f a
| .error e => .error e

/--
Block until the `AsyncTask` in `x` finishes.
-/
def block (x : AsyncTask α) : IO α := do
let res := x.get
match res with
| .ok a => return a
| .error e => .error e

/--
Create an `AsyncTask` that resolves to the value of `x`.
-/
@[inline]
def ofPromise (x : IO.Promise (Except IO.Error α)) : AsyncTask α :=
x.result

/--
Create an `AsyncTask` that resolves to the value of `x`.
-/
@[inline]
def ofPurePromise (x : IO.Promise α) : AsyncTask α :=
x.result.map pure

/--
Obtain the `IO.TaskState` of `x`.
-/
@[inline]
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
IO.getTaskState x

end AsyncTask

end Async
end IO
end Internal
end Std
139 changes: 139 additions & 0 deletions src/Std/Internal/Async/Timer.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Time
import Std.Internal.UV
import Std.Internal.Async.Basic


namespace Std
namespace Internal
namespace IO
namespace Async

/--
`Sleep` can be used to sleep for some duration once.
The underlying timer has millisecond resolution.
-/
structure Sleep where
private ofNative ::
native : Internal.UV.Timer

namespace Sleep

/--
Set up a `Sleep` that waits for `duration` milliseconds.
This function only initializes but does not yet start the timer.
-/
@[inline]
def mk (duration : Std.Time.Millisecond.Offset) : IO Sleep := do
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 false
return ofNative native

/--
If:
- `s` is not yet running start it and return an `AsyncTask` that will resolve once the previously
configured `duration` has run out.
- `s` is already or not anymore running return the same `AsyncTask` as the first call to `wait`.
-/
@[inline]
def wait (s : Sleep) : IO (AsyncTask Unit) := do
let promise ← s.native.next
return .ofPurePromise promise

/--
If:
- `s` is still running the timer restarts counting from now and finishes after `duration`
milliseconds.
- `s` is not yet or not anymore running this is a no-op.
-/
@[inline]
def reset (s : Sleep) : IO Unit :=
s.native.reset

/--
If:
- `s` is still running this stops `s` without resolving any remaining `AsyncTask`s that were created
through `wait`. Note that if another `AsyncTask` is binding on any of these it is going hang
forever without further intervention.
- `s` is not yet or not anymore running this is a no-op.
-/
@[inline]
def stop (s : Sleep) : IO Unit :=
s.native.stop

end Sleep

/--
Return an `AsyncTask` that resolves after `duration`.
-/
def sleep (duration : Std.Time.Millisecond.Offset) : IO (AsyncTask Unit) := do
let sleeper ← Sleep.mk duration
sleeper.wait

/--
`Interval` can be used to repeatedly wait for some duration like a clock.
The underlying timer has millisecond resolution.
-/
structure Interval where
private ofNative ::
native : Internal.UV.Timer


namespace Interval

/--
Setup up an `Interval` that waits for `duration` milliseconds.
This function only initializes but does not yet start the timer.
-/
@[inline]
def mk (duration : Std.Time.Millisecond.Offset) (_ : 0 < duration := by decide) : IO Interval := do
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 true
return ofNative native

/--
If:
- `i` is not yet running start it and return an `AsyncTask` that resolves right away as the 0th
multiple of `duration` has elapsed.
- `i` is already running and:
- the tick from the last call of `i` has not yet finished return the same `AsyncTask` as the last
call
- the tick frrom the last call of `i` has finished return a new `AsyncTask` that waits for the
closest next tick from the time of calling this function.
- `i` is not running aymore this is a no-op.
-/
@[inline]
def tick (i : Interval) : IO (AsyncTask Unit) := do
let promise ← i.native.next
return .ofPurePromise promise

/--
If:
- `Interval.tick` was called on `i` before the timer restarts counting from now and the next tick
happens in `duration`.
- `i` is not yet or not anymore running this is a no-op.
-/
@[inline]
def reset (i : Interval) : IO Unit :=
i.native.reset

/--
If:
- `i` is still running this stops `i` without resolving any remaing `AsyncTask` that were created
through `tick`. Note that if another `AsyncTask` is binding on any of these it is going hang
forever without further intervention.
- `i` is not yet or not anymore running this is a no-op.
-/
@[inline]
def stop (i : Interval) : IO Unit :=
i.native.stop

end Interval

end Async
end IO
end Internal
end Std
119 changes: 119 additions & 0 deletions src/Std/Internal/UV.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
prelude
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace UV

namespace Loop

/--
Options for configuring the event loop behavior.
-/
structure Loop.Options where
/--
Accumulate the amount of idle time the event loop spends in the event provider.
-/
accumulateIdleTime : Bool := False

/--
Block a SIGPROF signal when polling for new events. It's commonly used for unnecessary wakeups
when using a sampling profiler.
-/
blockSigProfSignal : Bool := False

/--
Configures the event loop with the specified options.
-/
@[extern "lean_uv_event_loop_configure"]
opaque configure (options : Loop.Options) : BaseIO Unit

/--
Checks if the event loop is still active and processing events.
-/
@[extern "lean_uv_event_loop_alive"]
opaque alive : BaseIO Bool

end Loop

private opaque TimerImpl : NonemptyType.{0}

/--
`Timer`s are used to generate `IO.Promise`s that resolve after some time.
A `Timer` can be in one of 3 states:
- Right after construction it's initial.
- While it is ticking it's running.
- If it has stopped for some reason it's finished.
This together with whether it was set up as `repeating` with `Timer.new` determines the behavior
of all functions on `Timer`s.
-/
def Timer : Type := TimerImpl.type

instance : Nonempty Timer := TimerImpl.property

namespace Timer

/--
This creates a `Timer` in the initial state and doesn't run it yet.
- If `repeating` is `false` this constructs a timer that resolves once after `durationMs`
milliseconds, counting from when it's run.
- If `repeating` is `true` this constructs a timer that resolves after multiples of `durationMs`
milliseconds, counting from when it's run. Note that this includes the 0th multiple right after
starting the timer. Furthermore a repeating timer will only be freed after `Timer.stop` is called.
-/
@[extern "lean_uv_timer_mk"]
opaque mk (timeout : UInt64) (repeating : Bool) : IO Timer

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- if `repeating` is `false` and:
- it is initial, run it and return a new `IO.Promise` that is set to resolve once `durationMs`
milliseconds have elapsed. After this `IO.Promise` is resolved the `Timer` is finished.
- it is running or finished, return the same `IO.Promise` that the first call to `next` returned.
- if `repeating` is `true` and:
- it is initial, run it and return a new `IO.Promise` that resolves right away
(as it is the 0th multiple of `durationMs`).
- it is running, check whether the last returned `IO.Promise` is already resolved:
- If it is, return a new `IO.Promise` that resolves upon finishing the next cycle
- If it is not, return the last `IO.Promise`
This ensures that the returned `IO.Promise` resolves at the next repetition of the timer.
- if it is finished, return the last `IO.Promise` created by `next`. Notably this could be one
that never resolves if the timer was stopped before fulfilling the last one.
-/
@[extern "lean_uv_timer_next"]
opaque next (timer : @& Timer) : IO (IO.Promise Unit)

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running and `repeating` is `false` this will delay the resolution of the timer until
`durationMs` milliseconds after the call of this function.
- Delay the resolution of the next tick of the timer until `durationMs` milliseconds after the
call of this function, then continue normal ticking behavior from there.
-/
@[extern "lean_uv_timer_reset"]
opaque reset (timer : @& Timer) : IO Unit

/--
This function has different behavior depending on the state of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running the execution of the timer is stopped and it is put into the finished state.
Note that if the last `IO.Promise` generated by `next` is unresolved and being waited
on this creates a memory leak and the waiting task is not going to be awoken anymore.
-/
@[extern "lean_uv_timer_stop"]
opaque stop (timer : @& Timer) : IO Unit

end Timer

end UV
end Internal
end Std
Loading

0 comments on commit e6a6437

Please sign in to comment.