Skip to content

Commit

Permalink
Specialize remotecall_pool(remotecall) to wait for the remotecall
Browse files Browse the repository at this point in the history
Otherwise the worker would prematurely be put back into the pool, causing
oversubscription. Also added a warning about oversubscription to the docstring
for `remote_do(f, ::AbstractWorkerPool)`.
  • Loading branch information
JamesWrigley committed Jan 13, 2025
1 parent 0cca4d3 commit 0170b7d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ This documents notable changes in DistributedNext.jl. The format is based on

### Fixed
- Fixed a cause of potential hangs when exiting the process ([#16]).
- Fixed a subtle bug in `remotecall(f, ::AbstractWorkerPool)`, previously the
implementation would take a worker out of the pool and immediately put it back
in without waiting for the returned [`Future`](@ref). Now it will wait for the
`Future` before putting the worker back in the pool ([#20]).

### Added
- A watcher mechanism has been added to detect when both the Distributed stdlib
Expand Down
26 changes: 26 additions & 0 deletions src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,28 @@ function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
end
end

# Specialization for remotecall. We have to wait for the Future it returns
# before putting the worker back in the pool.
function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, args...; kwargs...)
worker = take!(pool)
local x
try
x = rc_f(f, worker, args...; kwargs...)
catch
put!(pool, worker)
rethrow()
end

t = Threads.@spawn try
wait(x)
finally
put!(pool, worker)
end
errormonitor(t)

return x
end

# Check if pool is local or remote and forward calls if required.
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
# it avoids the overhead associated with a local remotecall.
Expand Down Expand Up @@ -242,6 +264,10 @@ remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_p
[`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and
perform a `remote_do` on it.
Note that it's not possible to wait for the result of a `remote_do()` to finish
so the worker will immediately be put back in the pool (i.e. potentially causing
oversubscription).
"""
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...)

Expand Down
13 changes: 13 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,19 @@ f16091b = () -> 1
@test_throws RemoteException fetch(ref)
end

# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
# keep the worker out of the pool until the underlying remotecall has
# finished.
remotechan = RemoteChannel(wrkr1)
pool = WorkerPool([wrkr1])
put_future = remotecall(() -> wait(remotechan), pool)
@test !isready(pool)
put!(remotechan, 1)
wait(put_future)
# The task that waits on the future to put it back into the pool runs
# asynchronously so we use timedwait() to check when the worker is back in.
@test timedwait(() -> isready(pool), 10) === :ok

# Test calling @everywhere from a module not defined on the workers
LocalBar.bar()
for p in procs()
Expand Down

0 comments on commit 0170b7d

Please sign in to comment.