diff --git a/python/rml/ipc_server.cpp b/python/rml/ipc_server.cpp index 69006a56e7..3c81589afe 100644 --- a/python/rml/ipc_server.cpp +++ b/python/rml/ipc_server.cpp @@ -562,26 +562,21 @@ void ipc_worker::run() { // complications in handle management on Windows. ::rml::job& j = *my_client.create_one_job(); - // TODO: reconsider if memory_order_acquire is required here and other places state_t state = my_state.load(std::memory_order_acquire); while( state!=st_quit && state!=st_stop ) { if( my_server.my_slack>=0 ) { my_client.process(j); } else { - // Prepare to wait - my_thread_monitor.prepare_wait(); // Check/set the invariant for sleeping state = my_state.load(std::memory_order_seq_cst); if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) { if( my_server.my_n_thread > 1 ) my_server.release_active_thread(); - my_thread_monitor.commit_wait(); + my_thread_monitor.wait(); my_server.propagate_chain_reaction(); - } else { - // Invariant broken - my_thread_monitor.cancel_wait(); } } - state = my_state.load(std::memory_order_acquire); + // memory_order_seq_cst to be strictly ordered after thread_monitor::wait + state = my_state.load(std::memory_order_seq_cst); } my_client.cleanup(j); @@ -663,7 +658,8 @@ void ipc_waker::run() { // which would create race with the launching thread and // complications in handle management on Windows. - while( my_state.load(std::memory_order_acquire)!=st_quit ) { + // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration + while( my_state.load(std::memory_order_seq_cst)!=st_quit ) { bool have_to_sleep = false; if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { if( my_server.wait_active_thread() ) { @@ -678,14 +674,9 @@ void ipc_waker::run() { have_to_sleep = true; } if( have_to_sleep ) { - // Prepare to wait - my_thread_monitor.prepare_wait(); // Check/set the invariant for sleeping - if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) { - my_thread_monitor.commit_wait(); - } else { - // Invariant broken - my_thread_monitor.cancel_wait(); + if( my_server.my_slack.load(std::memory_order_acquire)<0 ) { + my_thread_monitor.wait(); } } } diff --git a/src/tbb/private_server.cpp b/src/tbb/private_server.cpp index 4b200a4787..16f019cd55 100644 --- a/src/tbb/private_server.cpp +++ b/src/tbb/private_server.cpp @@ -234,6 +234,8 @@ void private_worker::release_handle(thread_handle handle, bool join) { } void private_worker::start_shutdown() { + __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != st_quit, "The thread has alredy been requested to quit"); + // memory_order_acquire to acquire my_handle state_t prev_state = my_state.exchange(st_quit, std::memory_order_acquire); @@ -262,23 +264,14 @@ void private_worker::run() noexcept { // complications in handle management on Windows. ::rml::job& j = *my_client.create_one_job(); - while( my_state.load(std::memory_order_relaxed)!=st_quit ) { + // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration + while( my_state.load(std::memory_order_seq_cst)!=st_quit ) { if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) { my_client.process(j); - } else { - // Prepare to wait - my_thread_monitor.prepare_wait(); - // Check/set the invariant for sleeping - // We need memory_order_seq_cst to enforce ordering with prepare_wait - // (note that a store in prepare_wait should be with memory_order_seq_cst as well) - if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) { - my_thread_monitor.commit_wait(); - __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" ); - my_server.propagate_chain_reaction(); - } else { - // Invariant broken - my_thread_monitor.cancel_wait(); - } + } else if( my_server.try_insert_in_asleep_list(*this) ) { + my_thread_monitor.wait(); + __TBB_ASSERT(my_state.load(std::memory_order_relaxed) == st_quit || !my_next, "Thread monitor missed a spurious wakeup?" ); + my_server.propagate_chain_reaction(); } } my_client.cleanup(j); @@ -386,17 +379,16 @@ void private_server::wake_some( int additional_slack ) { } done: - if (allotted_slack) - { + if (allotted_slack) { asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); - - while( my_asleep_list_root.load(std::memory_order_relaxed) && wmy_next, std::memory_order_relaxed); - *w++ = old; + *w++ = root; + root = root->my_next; } + my_asleep_list_root.store(root, std::memory_order_relaxed); if(allotted_slack) { // Contribute our unused slack to my_slack. my_slack += allotted_slack; diff --git a/src/tbb/rml_thread_monitor.h b/src/tbb/rml_thread_monitor.h index 545ecc9699..00bed6cf50 100644 --- a/src/tbb/rml_thread_monitor.h +++ b/src/tbb/rml_thread_monitor.h @@ -83,20 +83,12 @@ class thread_monitor { } ~thread_monitor() {} - //! If a thread is waiting or started a two-phase wait, notify it. + //! Notify waiting thread /** Can be called by any thread. */ void notify(); - //! Begin two-phase wait. - /** Should only be called by thread that owns the monitor. - The caller must either complete the wait or cancel it. */ - void prepare_wait(); - - //! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait. - void commit_wait(); - - //! Cancel a two-phase wait. - void cancel_wait(); + //! Wait for notification + void wait(); #if __TBB_USE_WINAPI typedef HANDLE handle_type; @@ -123,8 +115,8 @@ class thread_monitor { //! Detach thread static void detach_thread(handle_type handle); private: - std::atomic in_wait{false}; - bool skipped_wakeup{false}; + // The protection from double notification of the binary semaphore + std::atomic my_notified{ false }; binary_semaphore my_sema; #if __TBB_USE_POSIX static void check( int error_code, const char* routine ); @@ -215,37 +207,17 @@ void thread_monitor::detach_thread(handle_type handle) { #endif /* __TBB_USE_POSIX */ inline void thread_monitor::notify() { - bool do_signal = in_wait.exchange( false, std::memory_order_release); - if( do_signal ) { + // Check that the semaphore is not notified twice + if (!my_notified.exchange(true, std::memory_order_release)) { my_sema.V(); } } -inline void thread_monitor::prepare_wait() { - if( skipped_wakeup ) { - // Lazily consume a signal that was skipped due to cancel_wait - skipped_wakeup = false; - my_sema.P(); // does not really wait on the semaphore - } - // std::memory_order_seq_cst is required because prepare_wait - // should be ordered before further operations (that are suppose to - // use memory_order_seq_cst where required) - in_wait.store( true, std::memory_order_seq_cst ); -} - -inline void thread_monitor::commit_wait() { - bool do_it = in_wait.load(std::memory_order_acquire); - if( do_it ) { - my_sema.P(); - } else { - skipped_wakeup = true; - } -} - -inline void thread_monitor::cancel_wait() { - // if not in_wait, then some thread has sent us a signal; - // it will be consumed by the next prepare_wait call - skipped_wakeup = ! in_wait.exchange( false, std::memory_order_relaxed); +inline void thread_monitor::wait() { + my_sema.P(); + // memory_order_seq_cst is required here to be ordered with + // futher loads checking shutdown state + my_notified.store(false, std::memory_order_seq_cst); } } // namespace internal