diff --git a/NEWS.md b/NEWS.md index 71014e1e57695..3aa74034ead54 100644 --- a/NEWS.md +++ b/NEWS.md @@ -23,6 +23,8 @@ New language features - actual running time for the task (`Base.Experimental.task_running_time_ns`), and - wall-time for the task (`Base.Experimental.task_wall_time_ns`). - Support for Unicode 16 ([#56925]). +- `Threads.@spawn` now takes a `:samepool` argument to specify the same threadpool as the caller. + `Threads.@spawn :samepool foo()` which is shorthand for `Threads.@spawn Threads.threadpool() foo()` ([#57109]) Language changes ---------------- diff --git a/base/condition.jl b/base/condition.jl index 90c53b7ad310d..fd771c9be346a 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -125,104 +125,20 @@ proceeding. """ function wait end -# wait with timeout -# -# The behavior of wait changes if a timeout is specified. There are -# three concurrent entities that can interact: -# 1. Task W: the task that calls wait w/timeout. -# 2. Task T: the task created to handle a timeout. -# 3. Task N: the task that notifies the Condition being waited on. -# -# Typical flow: -# - W enters the Condition's wait queue. -# - W creates T and stops running (calls wait()). -# - T, when scheduled, waits on a Timer. -# - Two common outcomes: -# - N notifies the Condition. -# - W starts running, closes the Timer, sets waiter_left and returns -# the notify'ed value. -# - The closed Timer throws an EOFError to T which simply ends. -# - The Timer expires. -# - T starts running and locks the Condition. -# - T confirms that waiter_left is unset and that W is still in the -# Condition's wait queue; it then removes W from the wait queue, -# sets dosched to true and unlocks the Condition. -# - If dosched is true, T schedules W with the special :timed_out -# value. -# - T ends. -# - W runs and returns :timed_out. -# -# Some possible interleavings: -# - N notifies the Condition but the Timer expires and T starts running -# before W: -# - W closing the expired Timer is benign. -# - T will find that W is no longer in the Condition's wait queue -# (which is protected by a lock) and will not schedule W. -# - N notifies the Condition; W runs and calls wait on the Condition -# again before the Timer expires: -# - W sets waiter_left before leaving. When T runs, it will find that -# waiter_left is set and will not schedule W. -# -# The lock on the Condition's wait queue and waiter_left together -# ensure proper synchronization and behavior of the tasks involved. - """ - wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + wait(c::GenericCondition; first::Bool=false) Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`. If the keyword `first` is set to `true`, the waiter will be put _first_ in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. - -If `timeout` is specified, cancel the `wait` when it expires and return -`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1 -millisecond. """ -function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0) - timeout == 0.0 || timeout ≥ 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond")) - +function wait(c::GenericCondition; first::Bool=false) ct = current_task() _wait2(c, ct, first) token = unlockall(c.lock) - - timer::Union{Timer, Nothing} = nothing - waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing - if timeout > 0.0 - timer = Timer(timeout) - waiter_left = Threads.Atomic{Bool}(false) - # start a task to wait on the timer - t = Task() do - try - wait(timer) - catch e - # if the timer was closed, the waiting task has been scheduled; do nothing - e isa EOFError && return - end - dosched = false - lock(c.lock) - # Confirm that the waiting task is still in the wait queue and remove it. If - # the task is not in the wait queue, it must have been notified already so we - # don't do anything here. - if !waiter_left[] && ct.queue == c.waitq - dosched = true - Base.list_deletefirst!(c.waitq, ct) - end - unlock(c.lock) - # send the waiting task a timeout - dosched && schedule(ct, :timed_out) - end - t.sticky = false - Threads._spawn_set_thrpool(t, :interactive) - schedule(t) - end - try - res = wait() - if timer !== nothing - close(timer) - waiter_left[] = true - end - return res + return wait() catch q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) rethrow() diff --git a/base/experimental.jl b/base/experimental.jl index 17871b4f346d6..e35e920298c3d 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -10,6 +10,7 @@ module Experimental using Base: Threads, sync_varname, is_function_def, @propagate_inbounds +using Base: GenericCondition using Base.Meta """ @@ -577,4 +578,112 @@ function task_wall_time_ns(t::Task=current_task()) return end_at - start_at end +# wait_with_timeout +# +# A version of `wait(c::Condition)` that additionally allows the +# specification of a timeout. This is experimental as it will likely +# be dropped when a cancellation framework is added. +# +# The parallel behavior of wait_with_timeout is specified here. There +# are three concurrent entities that can interact: +# 1. Task W: the task that calls wait_with_timeout. +# 2. Task T: the task created to handle a timeout. +# 3. Task N: the task that notifies the Condition being waited on. +# +# Typical flow: +# - W enters the Condition's wait queue. +# - W creates T and stops running (calls wait()). +# - T, when scheduled, waits on a Timer. +# - Two common outcomes: +# - N notifies the Condition. +# - W starts running, closes the Timer, sets waiter_left and returns +# the notify'ed value. +# - The closed Timer throws an EOFError to T which simply ends. +# - The Timer expires. +# - T starts running and locks the Condition. +# - T confirms that waiter_left is unset and that W is still in the +# Condition's wait queue; it then removes W from the wait queue, +# sets dosched to true and unlocks the Condition. +# - If dosched is true, T schedules W with the special :timed_out +# value. +# - T ends. +# - W runs and returns :timed_out. +# +# Some possible interleavings: +# - N notifies the Condition but the Timer expires and T starts running +# before W: +# - W closing the expired Timer is benign. +# - T will find that W is no longer in the Condition's wait queue +# (which is protected by a lock) and will not schedule W. +# - N notifies the Condition; W runs and calls wait on the Condition +# again before the Timer expires: +# - W sets waiter_left before leaving. When T runs, it will find that +# waiter_left is set and will not schedule W. +# +# The lock on the Condition's wait queue and waiter_left together +# ensure proper synchronization and behavior of the tasks involved. + +""" + wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + +Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`. + +If the keyword `first` is set to `true`, the waiter will be put _first_ +in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. + +If `timeout` is specified, cancel the `wait` when it expires and return +`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1 +millisecond. +""" +function wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0) + ct = current_task() + Base._wait2(c, ct, first) + token = Base.unlockall(c.lock) + + timer::Union{Timer, Nothing} = nothing + waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing + if timeout > 0.0 + timer = Timer(timeout) + waiter_left = Threads.Atomic{Bool}(false) + # start a task to wait on the timer + t = Task() do + try + wait(timer) + catch e + # if the timer was closed, the waiting task has been scheduled; do nothing + e isa EOFError && return + end + dosched = false + lock(c.lock) + # Confirm that the waiting task is still in the wait queue and remove it. If + # the task is not in the wait queue, it must have been notified already so we + # don't do anything here. + if !waiter_left[] && ct.queue == c.waitq + dosched = true + Base.list_deletefirst!(c.waitq, ct) + end + unlock(c.lock) + # send the waiting task a timeout + dosched && schedule(ct, :timed_out) + end + t.sticky = false + Threads._spawn_set_thrpool(t, :interactive) + schedule(t) + end + + try + res = wait() + if timer !== nothing + close(timer) + waiter_left[] = true + end + return res + catch + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) + rethrow() + finally + Base.relockall(c.lock, token) + end +end + end # module diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 3d86e203ef72e..b83f47ef7c8cd 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -440,10 +440,11 @@ function _spawn_set_thrpool(t::Task, tp::Symbol) end """ - Threads.@spawn [:default|:interactive] expr + Threads.@spawn [:default|:interactive|:samepool] expr Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available -thread in the specified threadpool (`:default` if unspecified). The task is +thread in the specified threadpool: `:default`, `:interactive`, or `:samepool` +to use the same as the caller. `:default` is used if unspecified. The task is allocated to a thread once one becomes available. To wait for the task to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to wait and then obtain its return value. @@ -468,6 +469,9 @@ the variable's value in the current task. !!! compat "Julia 1.9" A threadpool may be specified as of Julia 1.9. +!!! compat "Julia 1.12" + The same threadpool may be specified as of Julia 1.12. + # Examples ```julia-repl julia> t() = println("Hello from ", Threads.threadid()); @@ -486,7 +490,7 @@ macro spawn(args...) ttype, ex = args if ttype isa QuoteNode ttype = ttype.value - if ttype !== :interactive && ttype !== :default + if !in(ttype, (:interactive, :default, :samepool)) throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype))) end tp = QuoteNode(ttype) @@ -507,7 +511,11 @@ macro spawn(args...) let $(letargs...) local task = Task($thunk) task.sticky = false - _spawn_set_thrpool(task, $(esc(tp))) + local tp = $(esc(tp)) + if tp == :samepool + tp = Threads.threadpool() + end + _spawn_set_thrpool(task, tp) if $(Expr(:islocal, var)) put!($var, task) end diff --git a/contrib/refresh_checksums.mk b/contrib/refresh_checksums.mk index 5a787b0b67cb1..fa7cddc705958 100644 --- a/contrib/refresh_checksums.mk +++ b/contrib/refresh_checksums.mk @@ -24,7 +24,7 @@ CLANG_TRIPLETS=$(filter %-darwin %-freebsd,$(TRIPLETS)) NON_CLANG_TRIPLETS=$(filter-out %-darwin %-freebsd,$(TRIPLETS)) # These are the projects currently using BinaryBuilder; both GCC-expanded and non-GCC-expanded: -BB_PROJECTS=openssl libssh2 nghttp2 mpfr curl libgit2 pcre libuv unwind llvmunwind dsfmt objconv p7zip zlib libsuitesparse openlibm blastrampoline libtracyclient +BB_PROJECTS=openssl libssh2 nghttp2 mpfr curl libgit2 pcre libuv unwind llvmunwind dsfmt objconv p7zip zlib libsuitesparse openlibm blastrampoline libtracyclient mmtk_julia BB_GCC_EXPANDED_PROJECTS=openblas csl BB_CXX_EXPANDED_PROJECTS=gmp llvm clang llvm-tools lld # These are non-BB source-only deps diff --git a/deps/checksums/Distributed-8890288f01a3b4c2b64c87d98409bc9d865f506e.tar.gz/md5 b/deps/checksums/Distributed-8890288f01a3b4c2b64c87d98409bc9d865f506e.tar.gz/md5 new file mode 100644 index 0000000000000..3b640d12f3344 --- /dev/null +++ b/deps/checksums/Distributed-8890288f01a3b4c2b64c87d98409bc9d865f506e.tar.gz/md5 @@ -0,0 +1 @@ +7405afe10033da0431c8fd920a8cbbbf diff --git a/deps/checksums/Distributed-8890288f01a3b4c2b64c87d98409bc9d865f506e.tar.gz/sha512 b/deps/checksums/Distributed-8890288f01a3b4c2b64c87d98409bc9d865f506e.tar.gz/sha512 new file mode 100644 index 0000000000000..e9003e31edcba --- /dev/null +++ b/deps/checksums/Distributed-8890288f01a3b4c2b64c87d98409bc9d865f506e.tar.gz/sha512 @@ -0,0 +1 @@ +ad3498cfee95bcd088e47c15eb2707f47ced9493881ec356cbeb22f66207406d23a3e3b27e70a00be7c2c755c6651f54f5378ef42bf4d1312c84d589010aab7b diff --git a/deps/checksums/Distributed-c6136853451677f1957bec20ecce13419cde3a12.tar.gz/md5 b/deps/checksums/Distributed-c6136853451677f1957bec20ecce13419cde3a12.tar.gz/md5 deleted file mode 100644 index e1c0f9e87b7c7..0000000000000 --- a/deps/checksums/Distributed-c6136853451677f1957bec20ecce13419cde3a12.tar.gz/md5 +++ /dev/null @@ -1 +0,0 @@ -98b8b8bc0ea4bf24c4b2986a5b7ae3e9 diff --git a/deps/checksums/Distributed-c6136853451677f1957bec20ecce13419cde3a12.tar.gz/sha512 b/deps/checksums/Distributed-c6136853451677f1957bec20ecce13419cde3a12.tar.gz/sha512 deleted file mode 100644 index ed816ebc21e97..0000000000000 --- a/deps/checksums/Distributed-c6136853451677f1957bec20ecce13419cde3a12.tar.gz/sha512 +++ /dev/null @@ -1 +0,0 @@ -4043933825bf716f2733f8e90632de34a95a437f3b31cda92edd510ffee208f8e374ec3c5922c8142342ae21b4ec4cbd1ecd4036b9057056a12c86169632ac7b diff --git a/deps/checksums/mmtk_julia b/deps/checksums/mmtk_julia index 979ab79e52207..098937aea1991 100644 --- a/deps/checksums/mmtk_julia +++ b/deps/checksums/mmtk_julia @@ -4,3 +4,7 @@ mmtk_julia-f07d66aafc86af84ea988b35335acc9bbc770fa1.tar.gz/md5/38afb5db6d8c55413 mmtk_julia-f07d66aafc86af84ea988b35335acc9bbc770fa1.tar.gz/sha512/78525582a46a6baf8d33df7b622e55cf244439afcd7192ba55489c1bc18393d1237d2903d517c610484bf9e2a7338ad31435a9cbf70889d6bcf87c40cec829e5 mmtk_julia.v0.30.3+1.x86_64-linux-gnu.tar.gz/md5/631b204574da7062802dac501a4b711f mmtk_julia.v0.30.3+1.x86_64-linux-gnu.tar.gz/sha512/daaed59d08fc49621479ed638dea0aac0cba123986e486571447e8e21e9a098776ce2e87fbd92ddea276782fc44621f23d40fa213296b28e1d4480553c7de4f7 +mmtk_julia-c9e046baf3a0d52fe75d6c8b28f6afd69b045d95.tar.gz/md5/73a8fbea71edce30a39a30f31969dd8e +mmtk_julia-c9e046baf3a0d52fe75d6c8b28f6afd69b045d95.tar.gz/sha512/374848b7696b565dea66daa208830581f92c1fcb0138e7a7ab88564402e94bc79c54b6ed370ec68473e31e2bd411bf82c97793796c31d39aafbbfffea9c05588 +mmtk_julia.v0.30.4+0.x86_64-linux-gnu.tar.gz/md5/8cdeb14fd69945f64308be49f6912f9c +mmtk_julia.v0.30.4+0.x86_64-linux-gnu.tar.gz/sha512/3692502f65dec8c0971b56b9bf8178641892b390d520cbcd69880d75b7500e6341534d87882246e68998f590f824ec54c18f4b8fb4aa09b8f313de065c48450e diff --git a/deps/mmtk_julia.version b/deps/mmtk_julia.version index cb1e8064f9825..684197bbe3e4e 100644 --- a/deps/mmtk_julia.version +++ b/deps/mmtk_julia.version @@ -1,6 +1,6 @@ MMTK_JULIA_BRANCH = master -MMTK_JULIA_SHA1 = f07d66aafc86af84ea988b35335acc9bbc770fa1 +MMTK_JULIA_SHA1 = c9e046baf3a0d52fe75d6c8b28f6afd69b045d95 MMTK_JULIA_GIT_URL := https://github.com/mmtk/mmtk-julia.git -MMTK_JULIA_TAR_URL = https://github.com/mmtk/mmtk-julia/archive/refs/tags/v0.30.3.tar.gz -MMTK_JULIA_JLL_VER := 0.30.3+1 +MMTK_JULIA_TAR_URL = https://github.com/mmtk/mmtk-julia/archive/refs/tags/v0.30.4.tar.gz +MMTK_JULIA_JLL_VER := 0.30.4+0 MMTK_JULIA_JLL_NAME := mmtk_julia diff --git a/src/gc-mmtk.c b/src/gc-mmtk.c index 5ec1e34cc1acd..2f261a2e8e2fd 100644 --- a/src/gc-mmtk.c +++ b/src/gc-mmtk.c @@ -64,11 +64,37 @@ void jl_gc_init(void) { arraylist_new(&to_finalize, 0); arraylist_new(&finalizer_list_marked, 0); - + gc_num.interval = default_collect_interval; gc_num.allocd = 0; gc_num.max_pause = 0; gc_num.max_memory = 0; + // Necessary if we want to use Julia heap resizing heuristics + uint64_t mem_reserve = 250*1024*1024; // LLVM + other libraries need some amount of memory + uint64_t min_heap_size_hint = mem_reserve + 1*1024*1024; + uint64_t hint = jl_options.heap_size_hint; + + // check if heap size specified on command line + if (jl_options.heap_size_hint == 0) { + char *cp = getenv(HEAP_SIZE_HINT); + if (cp) + hint = parse_heap_size_hint(cp, "JULIA_HEAP_SIZE_HINT=\"[]\""); + } +#ifdef _P64 + if (hint == 0) { + uint64_t constrained_mem = uv_get_constrained_memory(); + if (constrained_mem > 0 && constrained_mem < uv_get_total_memory()) + hint = constrained_mem; + } +#endif + if (hint) { + if (hint < min_heap_size_hint) + hint = min_heap_size_hint; + jl_gc_set_max_memory(hint - mem_reserve); + } + + // MMTK supports setting the heap size using the + // MMTK_MIN_HSIZE and MMTK_MAX_HSIZE environment variables long long min_heap_size; long long max_heap_size; char* min_size_def = getenv("MMTK_MIN_HSIZE"); @@ -77,7 +103,8 @@ void jl_gc_init(void) { char* max_size_def = getenv("MMTK_MAX_HSIZE"); char* max_size_gb = getenv("MMTK_MAX_HSIZE_G"); - // default min heap currently set as Julia's default_collect_interval + // If min and max values are not specified, set them to 0 here + // and use stock heuristics as defined in the binding if (min_size_def != NULL) { char *p; double min_size = strtod(min_size_def, &p); @@ -87,10 +114,9 @@ void jl_gc_init(void) { double min_size = strtod(min_size_gb, &p); min_heap_size = (long) 1024 * 1024 * 1024 * min_size; } else { - min_heap_size = default_collect_interval; + min_heap_size = 0; } - // default max heap currently set as 70% the free memory in the system if (max_size_def != NULL) { char *p; double max_size = strtod(max_size_def, &p); @@ -100,7 +126,7 @@ void jl_gc_init(void) { double max_size = strtod(max_size_gb, &p); max_heap_size = (long) 1024 * 1024 * 1024 * max_size; } else { - max_heap_size = uv_get_free_memory() * 70 / 100; + max_heap_size = 0; } // Assert that the number of stock GC threads is 0; MMTK uses the number of threads in jl_options.ngcthreads @@ -159,7 +185,17 @@ void jl_free_thread_gc_state(struct _jl_tls_states_t *ptls) { } JL_DLLEXPORT void jl_gc_set_max_memory(uint64_t max_mem) { - // MMTk currently does not allow setting the heap size at runtime +#ifdef _P32 + max_mem = max_mem < MAX32HEAP ? max_mem : MAX32HEAP; +#endif + max_total_memory = max_mem; +} + +JL_DLLEXPORT uint64_t jl_gc_get_max_memory(void) +{ + // FIXME: We should return the max heap size set in MMTk + // when not using Julia's heap resizing heuristics + return max_total_memory; } STATIC_INLINE void maybe_collect(jl_ptls_t ptls) @@ -415,12 +451,6 @@ JL_DLLEXPORT void jl_gc_get_total_bytes(int64_t *bytes) JL_NOTSAFEPOINT *bytes = (num.total_allocd + num.deferred_alloc + num.allocd); } -JL_DLLEXPORT uint64_t jl_gc_get_max_memory(void) -{ - // FIXME: should probably return MMTk's heap size - return max_total_memory; -} - // These are needed to collect MMTk statistics from a Julia program using ccall JL_DLLEXPORT void (jl_mmtk_harness_begin)(void) { diff --git a/stdlib/Distributed.version b/stdlib/Distributed.version index 4a7ab49defed2..13d49b7a093de 100644 --- a/stdlib/Distributed.version +++ b/stdlib/Distributed.version @@ -1,4 +1,4 @@ DISTRIBUTED_BRANCH = master -DISTRIBUTED_SHA1 = c6136853451677f1957bec20ecce13419cde3a12 +DISTRIBUTED_SHA1 = 8890288f01a3b4c2b64c87d98409bc9d865f506e DISTRIBUTED_GIT_URL := https://github.com/JuliaLang/Distributed.jl DISTRIBUTED_TAR_URL = https://api.github.com/repos/JuliaLang/Distributed.jl/tarball/$1 diff --git a/test/channels.jl b/test/channels.jl index 6e74a2079234c..d654bc63be586 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -40,16 +40,15 @@ end @test fetch(t) == "finished" end -@testset "timed wait on Condition" begin +@testset "wait_with_timeout on Condition" begin a = Threads.Condition() - @test_throws ArgumentError @lock a wait(a; timeout=0.0005) - @test @lock a wait(a; timeout=0.1)==:timed_out + @test @lock a Experimental.wait_with_timeout(a; timeout=0.1)==:timed_out lock(a) @spawn begin @lock a notify(a) end @test try - wait(a; timeout=2) + Experimental.wait_with_timeout(a; timeout=2) true finally unlock(a) diff --git a/test/threadpool_use.jl b/test/threadpool_use.jl index 7523991fdf6a7..e76d50c7a3fd1 100644 --- a/test/threadpool_use.jl +++ b/test/threadpool_use.jl @@ -9,6 +9,15 @@ using Base.Threads @test fetch(Threads.@spawn Threads.threadpool()) === :default @test fetch(Threads.@spawn :default Threads.threadpool()) === :default @test fetch(Threads.@spawn :interactive Threads.threadpool()) === :interactive +@test fetch(Threads.@spawn :samepool Threads.threadpool()) === Threads.threadpool() +@sync for tp in [:interactive, :default] + Threads.@spawn tp begin + @test fetch(Threads.@spawn :samepool Threads.threadpool()) === Threads.threadpool() + end +end +wait(Threads.@spawn :interactive begin + @test fetch(Threads.@spawn :samepool Threads.threadpool()) === Threads.threadpool() +end) tp = :default @test fetch(Threads.@spawn tp Threads.threadpool()) === :default tp = :interactive