Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Experimental.wait_with_timeout #57148

Merged
merged 4 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 3 additions & 87 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,104 +125,20 @@ proceeding.
"""
function wait end

# wait with timeout
#
# The behavior of wait changes if a timeout is specified. There are
# three concurrent entities that can interact:
# 1. Task W: the task that calls wait w/timeout.
# 2. Task T: the task created to handle a timeout.
# 3. Task N: the task that notifies the Condition being waited on.
#
# Typical flow:
# - W enters the Condition's wait queue.
# - W creates T and stops running (calls wait()).
# - T, when scheduled, waits on a Timer.
# - Two common outcomes:
# - N notifies the Condition.
# - W starts running, closes the Timer, sets waiter_left and returns
# the notify'ed value.
# - The closed Timer throws an EOFError to T which simply ends.
# - The Timer expires.
# - T starts running and locks the Condition.
# - T confirms that waiter_left is unset and that W is still in the
# Condition's wait queue; it then removes W from the wait queue,
# sets dosched to true and unlocks the Condition.
# - If dosched is true, T schedules W with the special :timed_out
# value.
# - T ends.
# - W runs and returns :timed_out.
#
# Some possible interleavings:
# - N notifies the Condition but the Timer expires and T starts running
# before W:
# - W closing the expired Timer is benign.
# - T will find that W is no longer in the Condition's wait queue
# (which is protected by a lock) and will not schedule W.
# - N notifies the Condition; W runs and calls wait on the Condition
# again before the Timer expires:
# - W sets waiter_left before leaving. When T runs, it will find that
# waiter_left is set and will not schedule W.
#
# The lock on the Condition's wait queue and waiter_left together
# ensure proper synchronization and behavior of the tasks involved.

"""
wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
wait(c::GenericCondition; first::Bool=false)

Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.

If the keyword `first` is set to `true`, the waiter will be put _first_
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.

If `timeout` is specified, cancel the `wait` when it expires and return
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
millisecond.
"""
function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))

function wait(c::GenericCondition; first::Bool=false)
ct = current_task()
_wait2(c, ct, first)
token = unlockall(c.lock)

timer::Union{Timer, Nothing} = nothing
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
if timeout > 0.0
timer = Timer(timeout)
waiter_left = Threads.Atomic{Bool}(false)
# start a task to wait on the timer
t = Task() do
try
wait(timer)
catch e
# if the timer was closed, the waiting task has been scheduled; do nothing
e isa EOFError && return
end
dosched = false
lock(c.lock)
# Confirm that the waiting task is still in the wait queue and remove it. If
# the task is not in the wait queue, it must have been notified already so we
# don't do anything here.
if !waiter_left[] && ct.queue == c.waitq
dosched = true
Base.list_deletefirst!(c.waitq, ct)
end
unlock(c.lock)
# send the waiting task a timeout
dosched && schedule(ct, :timed_out)
end
t.sticky = false
Threads._spawn_set_thrpool(t, :interactive)
schedule(t)
end

try
res = wait()
if timer !== nothing
close(timer)
waiter_left[] = true
end
return res
return wait()
catch
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
Expand Down
111 changes: 111 additions & 0 deletions base/experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
module Experimental

using Base: Threads, sync_varname, is_function_def, @propagate_inbounds
using Base: GenericCondition
using Base.Meta

"""
Expand Down Expand Up @@ -577,4 +578,114 @@ function task_wall_time_ns(t::Task=current_task())
return end_at - start_at
end

# wait_with_timeout
#
# A version of `wait(c::Condition)` that additionally allows the
# specification of a timeout. This is experimental as it will likely
# be dropped when a cancellation framework is added.
#
# The parallel behavior of wait_with_timeout is specified here. There
# are three concurrent entities that can interact:
# 1. Task W: the task that calls wait_with_timeout.
# 2. Task T: the task created to handle a timeout.
# 3. Task N: the task that notifies the Condition being waited on.
#
# Typical flow:
# - W enters the Condition's wait queue.
# - W creates T and stops running (calls wait()).
# - T, when scheduled, waits on a Timer.
# - Two common outcomes:
# - N notifies the Condition.
# - W starts running, closes the Timer, sets waiter_left and returns
# the notify'ed value.
# - The closed Timer throws an EOFError to T which simply ends.
# - The Timer expires.
# - T starts running and locks the Condition.
# - T confirms that waiter_left is unset and that W is still in the
# Condition's wait queue; it then removes W from the wait queue,
# sets dosched to true and unlocks the Condition.
# - If dosched is true, T schedules W with the special :timed_out
# value.
# - T ends.
# - W runs and returns :timed_out.
#
# Some possible interleavings:
# - N notifies the Condition but the Timer expires and T starts running
# before W:
# - W closing the expired Timer is benign.
# - T will find that W is no longer in the Condition's wait queue
# (which is protected by a lock) and will not schedule W.
# - N notifies the Condition; W runs and calls wait on the Condition
# again before the Timer expires:
# - W sets waiter_left before leaving. When T runs, it will find that
# waiter_left is set and will not schedule W.
#
# The lock on the Condition's wait queue and waiter_left together
# ensure proper synchronization and behavior of the tasks involved.

"""
wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0)

Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.

If the keyword `first` is set to `true`, the waiter will be put _first_
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.

If `timeout` is specified, cancel the `wait` when it expires and return
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
millisecond.
"""
function wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))

kpamnany marked this conversation as resolved.
Show resolved Hide resolved
ct = current_task()
Base._wait2(c, ct, first)
token = Base.unlockall(c.lock)

timer::Union{Timer, Nothing} = nothing
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
if timeout > 0.0
timer = Timer(timeout)
waiter_left = Threads.Atomic{Bool}(false)
# start a task to wait on the timer
t = Task() do
try
wait(timer)
catch e
# if the timer was closed, the waiting task has been scheduled; do nothing
e isa EOFError && return
end
dosched = false
lock(c.lock)
# Confirm that the waiting task is still in the wait queue and remove it. If
# the task is not in the wait queue, it must have been notified already so we
# don't do anything here.
if !waiter_left[] && ct.queue == c.waitq
dosched = true
Base.list_deletefirst!(c.waitq, ct)
end
unlock(c.lock)
# send the waiting task a timeout
dosched && schedule(ct, :timed_out)
end
t.sticky = false
Threads._spawn_set_thrpool(t, :interactive)
schedule(t)
end

try
res = wait()
if timer !== nothing
close(timer)
waiter_left[] = true
end
return res
catch
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
finally
Base.relockall(c.lock, token)
end
end

end # module
8 changes: 4 additions & 4 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ end
@test fetch(t) == "finished"
end

@testset "timed wait on Condition" begin
@testset "wait_with_timeout on Condition" begin
a = Threads.Condition()
@test_throws ArgumentError @lock a wait(a; timeout=0.0005)
@test @lock a wait(a; timeout=0.1)==:timed_out
@test_throws ArgumentError @lock a Experimental.wait_with_timeout(a; timeout=0.0005)
kpamnany marked this conversation as resolved.
Show resolved Hide resolved
@test @lock a Experimental.wait_with_timeout(a; timeout=0.1)==:timed_out
lock(a)
@spawn begin
@lock a notify(a)
end
@test try
wait(a; timeout=2)
Experimental.wait_with_timeout(a; timeout=2)
true
finally
unlock(a)
Expand Down
Loading