Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specialize remotecall_pool(remotecall) to wait for the remotecall #120

Merged
merged 1 commit into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,28 @@
end
end

# Specialization for remotecall. We have to wait for the Future it returns
# before putting the worker back in the pool.
function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, args...; kwargs...)
worker = take!(pool)
local x
try
x = rc_f(f, worker, args...; kwargs...)
catch
put!(pool, worker)
rethrow()

Check warning on line 147 in src/workerpool.jl

View check run for this annotation

Codecov / codecov/patch

src/workerpool.jl#L146-L147

Added lines #L146 - L147 were not covered by tests
end

t = Threads.@spawn Threads.threadpool() try
wait(x)
finally
put!(pool, worker)
end
errormonitor(t)

return x
end

# Check if pool is local or remote and forward calls if required.
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
# it avoids the overhead associated with a local remotecall.
Expand Down Expand Up @@ -242,6 +264,10 @@

[`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and
perform a `remote_do` on it.

Note that it's not possible to wait for the result of a `remote_do()` to finish
so the worker will immediately be put back in the pool (i.e. potentially causing
oversubscription).
"""
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...)

Expand Down
15 changes: 15 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,21 @@ let
@test_throws RemoteException fetch(ref)
end

# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
# keep the worker out of the pool until the underlying remotecall has
# finished.
let
remotechan = RemoteChannel(wrkr1)
pool = WorkerPool([wrkr1])
put_future = remotecall(() -> wait(remotechan), pool)
@test !isready(pool)
put!(remotechan, 1)
wait(put_future)
# The task that waits on the future to put it back into the pool runs
# asynchronously so we use timedwait() to check when the worker is back in.
@test timedwait(() -> isready(pool), 10) === :ok
end

# Test calling @everywhere from a module not defined on the workers
module LocalBar
using Distributed
Expand Down
Loading