diff --git a/include/oneapi/tbb/detail/_config.h b/include/oneapi/tbb/detail/_config.h index e676b1558b..bd7c50e7b4 100644 --- a/include/oneapi/tbb/detail/_config.h +++ b/include/oneapi/tbb/detail/_config.h @@ -534,4 +534,8 @@ #define __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1 #endif +#if TBB_PREVIEW_PARALLEL_PHASE || __TBB_BUILD +#define __TBB_PREVIEW_PARALLEL_PHASE 1 +#endif + #endif // __TBB_detail__config_H diff --git a/include/oneapi/tbb/task_arena.h b/include/oneapi/tbb/task_arena.h index 5ce41d99c9..3eb007dcc8 100644 --- a/include/oneapi/tbb/task_arena.h +++ b/include/oneapi/tbb/task_arena.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -95,6 +95,11 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, s TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*); TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*); TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t); + +#if __TBB_PREVIEW_PARALLEL_PHASE +TBB_EXPORT void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base*, std::uintptr_t); +TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t); +#endif } // namespace r1 namespace d2 { @@ -122,6 +127,14 @@ class task_arena_base { normal = 2 * priority_stride, high = 3 * priority_stride }; + +#if __TBB_PREVIEW_PARALLEL_PHASE + enum class leave_policy : int { + automatic = 0, + fast = 1 + }; +#endif + #if __TBB_ARENA_BINDING using constraints = tbb::detail::d1::constraints; #endif /*__TBB_ARENA_BINDING*/ @@ -162,13 +175,37 @@ class task_arena_base { return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic; } +#if __TBB_PREVIEW_PARALLEL_PHASE + leave_policy get_leave_policy() const { + bool fast_policy_set = (my_version_and_traits & fast_leave_policy_flag) == fast_leave_policy_flag; + return fast_policy_set ? leave_policy::fast : leave_policy::automatic; + } + + int leave_policy_to_traits(leave_policy lp) const { + return lp == leave_policy::fast ? fast_leave_policy_flag : 0; + } + + void set_leave_policy(leave_policy lp) { + my_version_and_traits |= leave_policy_to_traits(lp); + } +#endif + enum { - default_flags = 0 - , core_type_support_flag = 1 + default_flags = 0, + core_type_support_flag = 1, + fast_leave_policy_flag = 1 << 1 }; - task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority) - : my_version_and_traits(default_flags | core_type_support_flag) + task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy lp +#endif + ) + : my_version_and_traits(default_flags | core_type_support_flag +#if __TBB_PREVIEW_PARALLEL_PHASE + | leave_policy_to_traits(lp) +#endif + ) , my_initialization_state(do_once_state::uninitialized) , my_arena(nullptr) , my_max_concurrency(max_concurrency) @@ -180,8 +217,16 @@ class task_arena_base { {} #if __TBB_ARENA_BINDING - task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority) - : my_version_and_traits(default_flags | core_type_support_flag) + task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy lp +#endif + ) + : my_version_and_traits(default_flags | core_type_support_flag +#if __TBB_PREVIEW_PARALLEL_PHASE + | leave_policy_to_traits(lp) +#endif + ) , my_initialization_state(do_once_state::uninitialized) , my_arena(nullptr) , my_max_concurrency(constraints_.max_concurrency) @@ -259,31 +304,58 @@ class task_arena : public task_arena_base { * Value of 1 is default and reflects behavior of implicit arenas. **/ task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) - : task_arena_base(max_concurrency_, reserved_for_masters, a_priority) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy lp = leave_policy::automatic +#endif + ) + : task_arena_base(max_concurrency_, reserved_for_masters, a_priority +#if __TBB_PREVIEW_PARALLEL_PHASE + , lp +#endif + ) {} #if __TBB_ARENA_BINDING //! Creates task arena pinned to certain NUMA node task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) - : task_arena_base(constraints_, reserved_for_masters, a_priority) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy lp = leave_policy::automatic +#endif + ) + : task_arena_base(constraints_, reserved_for_masters, a_priority +#if __TBB_PREVIEW_PARALLEL_PHASE + , lp +#endif + ) {} //! Copies settings from another task_arena - task_arena(const task_arena &s) // copy settings but not the reference or instance + task_arena(const task_arena &a) // copy settings but not the reference or instance : task_arena_base( constraints{} - .set_numa_id(s.my_numa_id) - .set_max_concurrency(s.my_max_concurrency) - .set_core_type(s.my_core_type) - .set_max_threads_per_core(s.my_max_threads_per_core) - , s.my_num_reserved_slots, s.my_priority) + .set_numa_id(a.my_numa_id) + .set_max_concurrency(a.my_max_concurrency) + .set_core_type(a.my_core_type) + .set_max_threads_per_core(a.my_max_threads_per_core) + , a.my_num_reserved_slots, a.my_priority +#if __TBB_PREVIEW_PARALLEL_PHASE + , a.get_leave_policy() +#endif + ) + {} #else //! Copies settings from another task_arena task_arena(const task_arena& a) // copy settings but not the reference or instance - : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority) + : task_arena_base(a.my_max_concurrency, + a.my_num_reserved_slots, + a.my_priority, +#if __TBB_PREVIEW_PARALLEL_PHASE + a.get_leave_policy() +#endif + ) {} #endif /*__TBB_ARENA_BINDING*/ @@ -292,7 +364,11 @@ class task_arena : public task_arena_base { //! Creates an instance of task_arena attached to the current arena of the thread explicit task_arena( attach ) - : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails + : task_arena_base(automatic, 1, priority::normal +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy::automatic +#endif + ) // use default settings if attach fails { if (r1::attach(*this)) { mark_initialized(); @@ -311,13 +387,20 @@ class task_arena : public task_arena_base { //! Overrides concurrency level and forces initialization of internal representation void initialize(int max_concurrency_, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy lp = leave_policy::automatic +#endif + ) { __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); if( !is_active() ) { my_max_concurrency = max_concurrency_; my_num_reserved_slots = reserved_for_masters; my_priority = a_priority; +#if __TBB_PREVIEW_PARALLEL_PHASE + set_leave_policy(lp); +#endif r1::initialize(*this); mark_initialized(); } @@ -325,7 +408,11 @@ class task_arena : public task_arena_base { #if __TBB_ARENA_BINDING void initialize(constraints constraints_, unsigned reserved_for_masters = 1, - priority a_priority = priority::normal) + priority a_priority = priority::normal +#if __TBB_PREVIEW_PARALLEL_PHASE + , leave_policy lp = leave_policy::automatic +#endif + ) { __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); if( !is_active() ) { @@ -335,6 +422,9 @@ class task_arena : public task_arena_base { my_max_threads_per_core = constraints_.max_threads_per_core; my_num_reserved_slots = reserved_for_masters; my_priority = a_priority; +#if __TBB_PREVIEW_PARALLEL_PHASE + set_leave_policy(lp); +#endif r1::initialize(*this); mark_initialized(); } @@ -404,6 +494,32 @@ class task_arena : public task_arena_base { return execute_impl(f); } +#if __TBB_PREVIEW_PARALLEL_PHASE + void start_parallel_phase() { + initialize(); + r1::register_parallel_phase(this, /*reserved*/0); + } + void end_parallel_phase(bool with_fast_leave = false) { + __TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr); + // It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1 + r1::unregister_parallel_phase(this, static_cast(with_fast_leave)); + } + + class scoped_parallel_phase { + task_arena& arena; + bool one_time_fast_leave; + public: + scoped_parallel_phase(task_arena& ta, bool with_fast_leave = false) + : arena(ta), one_time_fast_leave(with_fast_leave) + { + arena.start_parallel_phase(); + } + ~scoped_parallel_phase() { + arena.end_parallel_phase(one_time_fast_leave); + } + }; +#endif + #if __TBB_EXTRA_DEBUG //! Returns my_num_reserved_slots int debug_reserved_slots() const { @@ -472,6 +588,17 @@ inline void enqueue(F&& f) { enqueue_impl(std::forward(f), nullptr); } +#if __TBB_PREVIEW_PARALLEL_PHASE +inline void start_parallel_phase() { + r1::register_parallel_phase(nullptr, /*reserved*/0); +} + +inline void end_parallel_phase(bool with_fast_leave) { + // It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1 + r1::unregister_parallel_phase(nullptr, static_cast(with_fast_leave)); +} +#endif + using r1::submit; } // namespace d1 @@ -491,6 +618,11 @@ using detail::d1::max_concurrency; using detail::d1::isolate; using detail::d1::enqueue; + +#if __TBB_PREVIEW_PARALLEL_PHASE +using detail::d1::start_parallel_phase; +using detail::d1::end_parallel_phase; +#endif } // namespace this_task_arena } // inline namespace v1 diff --git a/src/tbb/arena.cpp b/src/tbb/arena.cpp index 6ca062d02f..0791869bf5 100644 --- a/src/tbb/arena.cpp +++ b/src/tbb/arena.cpp @@ -241,7 +241,12 @@ void arena::process(thread_data& tls) { __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); } -arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) { +arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_PHASE + , tbb::task_arena::leave_policy lp +#endif +) +{ __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); @@ -276,10 +281,18 @@ arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserv my_critical_task_stream.initialize(my_num_slots); #endif my_mandatory_requests = 0; + +#if __TBB_PREVIEW_PARALLEL_PHASE + my_thread_leave.set_initial_state(lp); +#endif } arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, - unsigned priority_level) + unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_PHASE + , tbb::task_arena::leave_policy lp +#endif +) { __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); @@ -290,7 +303,11 @@ arena& arena::allocate_arena(threading_control* control, unsigned num_slots, uns std::memset( storage, 0, n ); return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) ) - arena(control, num_slots, num_reserved_slots, priority_level); + arena(control, num_slots, num_reserved_slots, priority_level +#if __TBB_PREVIEW_PARALLEL_PHASE + , lp +#endif + ); } void arena::free_arena () { @@ -340,6 +357,9 @@ bool arena::has_enqueued_tasks() { } void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) { +#if __TBB_PREVIEW_PARALLEL_PHASE + my_thread_leave.restore_default_policy_if_needed(); +#endif my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta); if (wakeup_threads) { @@ -443,11 +463,21 @@ void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& advertise_new_work(); } -arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) { +arena &arena::create(threading_control *control, unsigned num_slots, + unsigned num_reserved_slots, unsigned arena_priority_level, + d1::constraints constraints +#if __TBB_PREVIEW_PARALLEL_PHASE + , tbb::task_arena::leave_policy lp +#endif +) { __TBB_ASSERT(num_slots > 0, NULL); __TBB_ASSERT(num_reserved_slots <= num_slots, NULL); // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). - arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level); + arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level +#if __TBB_PREVIEW_PARALLEL_PHASE + , lp +#endif + ); a.my_tc_client = control->create_client(a); // We should not publish arena until all fields are initialized control->publish_client(a.my_tc_client, constraints); @@ -500,6 +530,8 @@ struct task_arena_impl { static int max_concurrency(const d1::task_arena_base*); static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); static d1::slot_id execution_slot(const d1::task_arena_base&); + static void register_parallel_phase(d1::task_arena_base*, std::uintptr_t); + static void unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t); }; void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { @@ -534,6 +566,14 @@ d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base& arena) return task_arena_impl::execution_slot(arena); } +void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) { + task_arena_impl::register_parallel_phase(ta, flags); +} + +void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) { + task_arena_impl::unregister_parallel_phase(ta, flags); +} + void task_arena_impl::initialize(d1::task_arena_base& ta) { // Enforce global market initialization to properly initialize soft limit (void)governor::get_thread_data(); @@ -568,7 +608,12 @@ void task_arena_impl::initialize(d1::task_arena_base& ta) { __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); unsigned priority_level = arena_priority_level(ta.my_priority); threading_control* thr_control = threading_control::register_public_reference(); - arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints); + arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, + priority_level, arena_constraints +#if __TBB_PREVIEW_PARALLEL_PHASE + , ta.get_leave_policy() +#endif + ); ta.my_arena.store(&a, std::memory_order_release); #if __TBB_CPUBIND_PRESENT @@ -875,6 +920,21 @@ int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { return int(governor::default_num_threads()); } +#if __TBB_PREVIEW_PARALLEL_PHASE +void task_arena_impl::register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) { + arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena; + __TBB_ASSERT(a, nullptr); + a->my_thread_leave.register_parallel_phase(); + a->advertise_new_work(); +} + +void task_arena_impl::unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) { + arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena; + __TBB_ASSERT(a, nullptr); + a->my_thread_leave.unregister_parallel_phase(/*with_fast_leave=*/static_cast(flags)); +} +#endif + void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? thread_data* tls = governor::get_thread_data(); diff --git a/src/tbb/arena.h b/src/tbb/arena.h index 1e95f117b2..72cfc7feda 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -179,6 +179,81 @@ class atomic_flag { } }; +#if __TBB_PREVIEW_PARALLEL_PHASE +class thread_leave_manager { + static const std::uint64_t FAST_LEAVE = 1; + static const std::uint64_t ONE_TIME_FAST_LEAVE = 1 << 1; + static const std::uint64_t DELAYED_LEAVE = 1 << 2; + static const std::uint64_t PARALLEL_PHASE = 1 << 3; + + std::atomic my_state{0}; +public: + void set_initial_state(tbb::task_arena::leave_policy lp) { + if (lp == tbb::task_arena::leave_policy::automatic) { + std::uint64_t platform_policy = governor::hybrid_cpu() ? FAST_LEAVE : DELAYED_LEAVE; + my_state.store(platform_policy, std::memory_order_relaxed); + } else { + __TBB_ASSERT(lp == tbb::task_arena::leave_policy::fast, + "Was the new value introduced for leave policy?"); + my_state.store(FAST_LEAVE, std::memory_order_relaxed); + } + } + + void restore_default_policy_if_needed() { + std::uint64_t curr = ONE_TIME_FAST_LEAVE; + if (my_state.load(std::memory_order_relaxed) == curr) { + // Potentially can override decision of the parallel block from future epoch + // but it is not a problem because it does not violate the correctness + my_state.compare_exchange_strong(curr, DELAYED_LEAVE); + } + } + + // Indicate start of parallel phase in the state machine + void register_parallel_phase() { + std::uint64_t prev = my_state.load(std::memory_order_relaxed); + __TBB_ASSERT(prev != 0, "The initial state was not set"); + + std::uint64_t desired{}; + do { + // Need to add a reference for this start of a parallel phase, preserving the leave + // policy. Except for the case when one time fast leave was requested at the end of a + // previous phase. + desired = prev; + if (prev == ONE_TIME_FAST_LEAVE) { + // State was previously transitioned to "One-time Fast leave", thus with the start + // of new parallel phase, it should be transitioned to "Delayed leave" + desired = DELAYED_LEAVE; + } + __TBB_ASSERT(desired + PARALLEL_PHASE > desired, "Overflow detected"); + desired += PARALLEL_PHASE; // Take into account this start of a parallel phase + } while (!my_state.compare_exchange_strong(prev, desired)); + } + + // Indicate the end of parallel phase in the state machine + void unregister_parallel_phase(bool enable_fast_leave) { + std::uint64_t prev = my_state.load(std::memory_order_relaxed); + __TBB_ASSERT(prev != 0, "The initial state was not set"); + + std::uint64_t desired{}; + do { + __TBB_ASSERT(prev - PARALLEL_PHASE < prev, + "A call to unregister without its register complement"); + desired = prev - PARALLEL_PHASE; // Mark the end of this phase in reference counter + if (enable_fast_leave && /*it was the last parallel phase*/desired == DELAYED_LEAVE) { + desired = ONE_TIME_FAST_LEAVE; + } + } while (!my_state.compare_exchange_strong(prev, desired)); + } + + bool is_retention_allowed() { + std::uint64_t curr = my_state.load(std::memory_order_relaxed); + __TBB_ASSERT(curr != 0, "The initial state was not set"); + + return curr != FAST_LEAVE && curr != ONE_TIME_FAST_LEAVE; + } +}; +#endif /* __TBB_PREVIEW_PARALLEL_PHASE */ + //! The structure of an arena, except the array of slots. /** Separated in order to simplify padding. Intrusive list node base class is used by market to form a list of arenas. **/ @@ -245,6 +320,11 @@ struct arena_base : padded { //! Waiting object for external threads that cannot join the arena. concurrent_monitor my_exit_monitors; +#if __TBB_PREVIEW_PARALLEL_PHASE + //! Manages state of thread_leave state machine + thread_leave_manager my_thread_leave; +#endif + //! Coroutines (task_dispathers) cache buffer arena_co_cache my_co_cache; @@ -281,13 +361,27 @@ class arena: public padded }; //! Constructor - arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level); + arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_PHASE + , tbb::task_arena::leave_policy lp +#endif + ); //! Allocate an instance of arena. static arena& allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, - unsigned priority_level); + unsigned priority_level +#if __TBB_PREVIEW_PARALLEL_PHASE + , tbb::task_arena::leave_policy lp +#endif + ); - static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints = d1::constraints{}); + static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, + unsigned arena_priority_level, + d1::constraints constraints = d1::constraints{} +#if __TBB_PREVIEW_PARALLEL_PHASE + , tbb::task_arena::leave_policy lp = tbb::task_arena::leave_policy::automatic +#endif + ); static int unsigned num_arena_slots ( unsigned num_slots, unsigned num_reserved_slots ) { return num_reserved_slots == 0 ? num_slots : max(2u, num_slots); diff --git a/src/tbb/def/lin32-tbb.def b/src/tbb/def/lin32-tbb.def index 737e8ec2af..b71e08e5bb 100644 --- a/src/tbb/def/lin32-tbb.def +++ b/src/tbb/def/lin32-tbb.def @@ -107,6 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE; _ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE; _ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE; _ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE; +_ZN3tbb6detail2r123register_parallel_phaseEPNS0_2d115task_arena_baseEj; +_ZN3tbb6detail2r125unregister_parallel_phaseEPNS0_2d115task_arena_baseEj; /* System topology parsing and threads pinning (governor.cpp) */ _ZN3tbb6detail2r115numa_node_countEv; diff --git a/src/tbb/def/lin64-tbb.def b/src/tbb/def/lin64-tbb.def index 41aca2e932..71be72b678 100644 --- a/src/tbb/def/lin64-tbb.def +++ b/src/tbb/def/lin64-tbb.def @@ -107,6 +107,8 @@ _ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE; _ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE; _ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE; _ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE; +_ZN3tbb6detail2r123register_parallel_phaseEPNS0_2d115task_arena_baseEm; +_ZN3tbb6detail2r125unregister_parallel_phaseEPNS0_2d115task_arena_baseEm; /* System topology parsing and threads pinning (governor.cpp) */ _ZN3tbb6detail2r115numa_node_countEv; diff --git a/src/tbb/def/mac64-tbb.def b/src/tbb/def/mac64-tbb.def index 38bc48d30e..1895dbdbbb 100644 --- a/src/tbb/def/mac64-tbb.def +++ b/src/tbb/def/mac64-tbb.def @@ -109,6 +109,8 @@ __ZN3tbb6detail2r17enqueueERNS0_2d14taskEPNS2_15task_arena_baseE __ZN3tbb6detail2r17enqueueERNS0_2d14taskERNS2_18task_group_contextEPNS2_15task_arena_baseE __ZN3tbb6detail2r14waitERNS0_2d115task_arena_baseE __ZN3tbb6detail2r114execution_slotERKNS0_2d115task_arena_baseE +__ZN3tbb6detail2r123register_parallel_phaseEPNS0_2d115task_arena_baseEm +__ZN3tbb6detail2r125unregister_parallel_phaseEPNS0_2d115task_arena_baseEm # System topology parsing and threads pinning (governor.cpp) __ZN3tbb6detail2r115numa_node_countEv diff --git a/src/tbb/def/win32-tbb.def b/src/tbb/def/win32-tbb.def index 94b5441701..4813495fa5 100644 --- a/src/tbb/def/win32-tbb.def +++ b/src/tbb/def/win32-tbb.def @@ -101,6 +101,8 @@ EXPORTS ?wait@r1@detail@tbb@@YAXAAVtask_arena_base@d1@23@@Z ?enqueue@r1@detail@tbb@@YAXAAVtask@d1@23@AAVtask_group_context@523@PAVtask_arena_base@523@@Z ?execution_slot@r1@detail@tbb@@YAGABVtask_arena_base@d1@23@@Z +?register_parallel_phase@r1@detail@tbb@@YAXPAVtask_arena_base@d1@23@I@Z +?unregister_parallel_phase@r1@detail@tbb@@YAXPAVtask_arena_base@d1@23@I@Z ; System topology parsing and threads pinning (governor.cpp) ?numa_node_count@r1@detail@tbb@@YAIXZ diff --git a/src/tbb/def/win64-tbb.def b/src/tbb/def/win64-tbb.def index 96bafc0163..5f417b49da 100644 --- a/src/tbb/def/win64-tbb.def +++ b/src/tbb/def/win64-tbb.def @@ -101,6 +101,8 @@ EXPORTS ?enqueue@r1@detail@tbb@@YAXAEAVtask@d1@23@PEAVtask_arena_base@523@@Z ?enqueue@r1@detail@tbb@@YAXAEAVtask@d1@23@AEAVtask_group_context@523@PEAVtask_arena_base@523@@Z ?execution_slot@r1@detail@tbb@@YAGAEBVtask_arena_base@d1@23@@Z +?register_parallel_phase@r1@detail@tbb@@YAXPEAVtask_arena_base@d1@23@_K@Z +?unregister_parallel_phase@r1@detail@tbb@@YAXPEAVtask_arena_base@d1@23@_K@Z ; System topology parsing and threads pinning (governor.cpp) ?numa_node_count@r1@detail@tbb@@YAIXZ diff --git a/src/tbb/waiters.h b/src/tbb/waiters.h index 8ed431f857..e3248bb77f 100644 --- a/src/tbb/waiters.h +++ b/src/tbb/waiters.h @@ -54,13 +54,14 @@ class outermost_worker_waiter : public waiter_base { public: using waiter_base::waiter_base; - bool continue_execution(arena_slot& slot, d1::task*& t) const { + bool continue_execution(arena_slot& slot, d1::task*& t) { __TBB_ASSERT(t == nullptr, nullptr); if (is_worker_should_leave(slot)) { - if (!governor::hybrid_cpu()) { + if (is_delayed_leave_enabled()) { static constexpr std::chrono::microseconds worker_wait_leave_duration(1000); - static_assert(worker_wait_leave_duration > std::chrono::steady_clock::duration(1), "Clock resolution is not enough for measured interval."); + static_assert(worker_wait_leave_duration > std::chrono::steady_clock::duration(1), + "Clock resolution is not enough for measured interval."); for (auto t1 = std::chrono::steady_clock::now(), t2 = t1; std::chrono::duration_cast(t2 - t1) < worker_wait_leave_duration; @@ -70,7 +71,9 @@ class outermost_worker_waiter : public waiter_base { return true; } - if (my_arena.my_threading_control->is_any_other_client_active()) { + if (!my_arena.my_thread_leave.is_retention_allowed() || + my_arena.my_threading_control->is_any_other_client_active()) + { break; } d0::yield(); @@ -100,6 +103,14 @@ class outermost_worker_waiter : public waiter_base { private: using base_type = waiter_base; + bool is_delayed_leave_enabled() { +#if __TBB_PREVIEW_PARALLEL_PHASE + return my_arena.my_thread_leave.is_retention_allowed(); +#else + return !governor::hybrid_cpu(); +#endif + } + bool is_worker_should_leave(arena_slot& slot) const { bool is_top_priority_arena = my_arena.is_top_priority(); bool is_task_pool_empty = slot.task_pool.load(std::memory_order_relaxed) == EmptyTaskPool; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0ab4d7e8c8..d308dc2367 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -416,6 +416,7 @@ if (TARGET TBB::tbb) tbb_add_test(SUBDIR tbb NAME test_task_group DEPENDENCIES TBB::tbb) tbb_add_test(SUBDIR tbb NAME test_concurrent_hash_map DEPENDENCIES TBB::tbb) tbb_add_test(SUBDIR tbb NAME test_task_arena DEPENDENCIES TBB::tbb) + tbb_add_test(SUBDIR tbb NAME test_parallel_phase DEPENDENCIES TBB::tbb) tbb_add_test(SUBDIR tbb NAME test_enumerable_thread_specific DEPENDENCIES TBB::tbb) tbb_add_test(SUBDIR tbb NAME test_concurrent_queue DEPENDENCIES TBB::tbb) tbb_add_test(SUBDIR tbb NAME test_resumable_tasks DEPENDENCIES TBB::tbb) diff --git a/test/common/config.h b/test/common/config.h index c7ff8ba63a..7ae67a6224 100644 --- a/test/common/config.h +++ b/test/common/config.h @@ -39,6 +39,9 @@ #ifndef TBB_PREVIEW_ISOLATED_TASK_GROUP #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #endif +#ifndef TBB_PREVIEW_PARALLEL_PHASE +#define TBB_PREVIEW_PARALLEL_PHASE 1 +#endif #endif #include "oneapi/tbb/detail/_config.h" diff --git a/test/tbb/test_parallel_phase.cpp b/test/tbb/test_parallel_phase.cpp new file mode 100644 index 0000000000..47350ce101 --- /dev/null +++ b/test/tbb/test_parallel_phase.cpp @@ -0,0 +1,288 @@ +#define TBB_PREVIEW_PARALLEL_PHASE 1 + +#include "common/test.h" +#include "common/utils.h" +#include "common/utils_concurrency_limit.h" +#include "common/spin_barrier.h" + +#include "tbb/task_arena.h" +#include "tbb/parallel_for.h" + +struct dummy_func { + void operator()() const { + } +}; + +template +std::size_t measure_median_start_time(tbb::task_arena* ta, const F1& start = F1{}, const F2& end = F2{}) { + std::size_t num_threads = ta ? ta->max_concurrency() : tbb::this_task_arena::max_concurrency(); + std::size_t num_runs = 1000; + std::vector longest_start_times; + longest_start_times.reserve(num_runs); + + std::vector start_times(num_threads); + utils::SpinBarrier barrier(num_threads); + auto measure_start_time = [&] (std::size_t) { + start_times[tbb::this_task_arena::current_thread_index()] = std::chrono::steady_clock::now(); + barrier.wait(); + }; + + auto get_longest_start = [&] (std::chrono::steady_clock::time_point start_time) { + std::size_t longest_time = 0; + for (auto& time : start_times) { + auto diff = std::chrono::duration_cast(time - start_time); + longest_time = std::max(longest_time, std::size_t(diff.count())); + } + return longest_time; + }; + + auto work = [&] { + auto start_time = std::chrono::steady_clock::now(); + start(); + tbb::parallel_for(std::size_t(0), num_threads, measure_start_time, tbb::static_partitioner{}); + end(); + longest_start_times.push_back(get_longest_start(start_time)); + }; + + for (std::size_t i = 0; i < num_runs; ++i) { + if (ta) { + ta->execute(work); + } else { + work(); + } + utils::doDummyWork(i*250); + } + return utils::median(longest_start_times.begin(), longest_start_times.end()); +} + +template +class start_time_collection_base { + friend Impl; +public: + start_time_collection_base(tbb::task_arena& ta, std::size_t ntrials) : + arena(&ta), num_trials(ntrials), start_times(ntrials) {} + + explicit start_time_collection_base(std::size_t ntrials) : + arena(nullptr), num_trials(ntrials), start_times(ntrials) {} + + std::vector measure() { + for (std::size_t i = 0; i < num_trials; ++i) { + std::size_t median_start_time = static_cast(this)->measure_impl(); + start_times[i] = median_start_time; + } + return start_times; + } +protected: + tbb::task_arena* arena; + std::size_t num_trials; + std::vector start_times; +}; + +class start_time_collection : public start_time_collection_base { + using base = start_time_collection_base; + using base::base; + friend base; + + std::size_t measure_impl() { + return measure_median_start_time(arena); + }; +}; + +class start_time_collection_phase_wrapped + : public start_time_collection_base +{ + using base = start_time_collection_base; + using base::base; + friend base; + + std::size_t measure_impl() { + arena->start_parallel_phase(); + auto median_start_time = measure_median_start_time(arena); + arena->end_parallel_phase(/*with_fast_leave*/true); + return median_start_time; + }; +}; + +class start_time_collection_scoped_phase_wrapped + : public start_time_collection_base +{ + using base = start_time_collection_base; + using base::base; + friend base; + + std::size_t measure_impl() { + tbb::task_arena::scoped_parallel_phase phase{*arena}; + auto median_start_time = measure_median_start_time(arena); + return median_start_time; + }; +}; + +class start_time_collection_sequenced_phases + : public start_time_collection_base +{ + using base = start_time_collection_base; + friend base; + + bool with_fast_leave; + + std::size_t measure_impl() { + std::size_t median_start_time; + if (arena) { + median_start_time = measure_median_start_time(arena, + [this] { arena->start_parallel_phase(); }, + [this] { arena->end_parallel_phase(with_fast_leave); }); + } else { + median_start_time = measure_median_start_time(arena, + [] { tbb::this_task_arena::start_parallel_phase(); }, + [this] { tbb::this_task_arena::end_parallel_phase(with_fast_leave); }); + } + return median_start_time; + }; + +public: + start_time_collection_sequenced_phases(tbb::task_arena& ta, std::size_t ntrials, bool fast_leave = false) : + base(ta, ntrials), with_fast_leave(fast_leave) + {} + + explicit start_time_collection_sequenced_phases(std::size_t ntrials, bool fast_leave = false) : + base(ntrials), with_fast_leave(fast_leave) + {} +}; + +class start_time_collection_sequenced_scoped_phases + : public start_time_collection_base +{ + using base = start_time_collection_base; + friend base; + + bool with_fast_leave; + + std::size_t measure_impl() { + tbb::task_arena::scoped_parallel_phase* phase = nullptr; + auto median_start_time = measure_median_start_time(arena, + [this, &phase] { + phase = new tbb::task_arena::scoped_parallel_phase{*arena, with_fast_leave}; + }, + [&phase] { + delete phase; + }); + return median_start_time; + }; + +public: + start_time_collection_sequenced_scoped_phases(tbb::task_arena& ta, std::size_t ntrials, bool fast_leave = false) : + base(ta, ntrials), with_fast_leave(fast_leave) + {} + + explicit start_time_collection_sequenced_scoped_phases(std::size_t ntrials, bool fast_leave = false) : + base(ntrials), with_fast_leave(fast_leave) + {} +}; + +//! \brief \ref interface \ref requirement +TEST_CASE("Check that workers leave faster with leave_policy::fast") { + // Test measures workers start time, so no there is no point to + // measure it with workerless arena + if (utils::get_platform_max_threads() < 2) { + return; + } + tbb::task_arena ta_automatic_leave { + tbb::task_arena::automatic, 1, + tbb::task_arena::priority::normal, + tbb::task_arena::leave_policy::automatic + }; + tbb::task_arena ta_fast_leave { + tbb::task_arena::automatic, 1, + tbb::task_arena::priority::normal, + tbb::task_arena::leave_policy::fast + }; + start_time_collection st_collector1{ta_automatic_leave, /*num_trials=*/10}; + start_time_collection st_collector2{ta_fast_leave, /*num_trials=*/10}; + + auto times_automatic = st_collector1.measure(); + auto times_fast = st_collector2.measure(); + + auto median_automatic = utils::median(times_automatic.begin(), times_automatic.end()); + auto median_fast = utils::median(times_fast.begin(), times_fast.end()); + + WARN_MESSAGE(median_automatic < median_fast, + "Expected workers to start new work slower with fast leave policy"); +} + +//! \brief \ref interface \ref requirement +TEST_CASE("Parallel Phase retains workers in task_arena") { + if (utils::get_platform_max_threads() < 2) { + return; + } + tbb::task_arena ta_fast1 { + tbb::task_arena::automatic, 1, + tbb::task_arena::priority::normal, + tbb::task_arena::leave_policy::fast + }; + tbb::task_arena ta_fast2 { + tbb::task_arena::automatic, 1, + tbb::task_arena::priority::normal, + tbb::task_arena::leave_policy::fast + }; + start_time_collection_phase_wrapped st_collector1{ta_fast1, /*num_trials=*/10}; + start_time_collection_scoped_phase_wrapped st_collector_scoped{ta_fast1, /*num_trials=*/10}; + start_time_collection st_collector2{ta_fast2, /*num_trials=*/10}; + + auto times1 = st_collector1.measure(); + auto times2 = st_collector2.measure(); + auto times_scoped = st_collector_scoped.measure(); + + auto median1 = utils::median(times1.begin(), times1.end()); + auto median2 = utils::median(times2.begin(), times2.end()); + auto median_scoped = utils::median(times_scoped.begin(), times_scoped.end()); + + WARN_MESSAGE(median1 < median2, + "Expected workers start new work faster when using parallel_phase"); + + WARN_MESSAGE(median_scoped < median2, + "Expected workers start new work faster when using scoped parallel_phase"); +} + +//! \brief \ref interface \ref requirement +TEST_CASE("Test one-time fast leave") { + if (utils::get_platform_max_threads() < 2) { + return; + } + tbb::task_arena ta1{}; + tbb::task_arena ta2{}; + start_time_collection_sequenced_phases st_collector1{ta1, /*num_trials=*/10}; + start_time_collection_sequenced_phases st_collector2{ta2, /*num_trials=*/10, /*fast_leave*/true}; + start_time_collection_sequenced_scoped_phases st_collector_scoped{ta2, /*num_trials=*/10, /*fast_leave*/true}; + + auto times1 = st_collector1.measure(); + auto times2 = st_collector2.measure(); + auto times_scoped = st_collector_scoped.measure(); + + auto median1 = utils::median(times1.begin(), times1.end()); + auto median2 = utils::median(times2.begin(), times2.end()); + auto median_scoped = utils::median(times_scoped.begin(), times_scoped.end()); + + WARN_MESSAGE(median1 < median2, + "Expected one-time fast leave setting to slow workers to start new work"); + + WARN_MESSAGE(median1 < median_scoped, + "Expected one-time fast leave setting to slow workers to start new work"); +} + +//! \brief \ref interface \ref requirement +TEST_CASE("Test parallel phase with this_task_arena") { + if (utils::get_platform_max_threads() < 2) { + return; + } + start_time_collection_sequenced_phases st_collector1{/*num_trials=*/10}; + start_time_collection_sequenced_phases st_collector2{/*num_trials=*/10, /*fast_leave*/true}; + + auto times1 = st_collector1.measure(); + auto times2 = st_collector2.measure(); + + auto median1 = utils::median(times1.begin(), times1.end()); + auto median2 = utils::median(times2.begin(), times2.end()); + + WARN_MESSAGE(median1 < median2, + "Expected one-time fast leave setting to slow workers to start new work"); +}