Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify private worker synchronization mechanism #754

Merged
merged 6 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions python/rml/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,26 +562,21 @@ void ipc_worker::run() {
// complications in handle management on Windows.

::rml::job& j = *my_client.create_one_job();
// TODO: reconsider if memory_order_acquire is required here and other places
state_t state = my_state.load(std::memory_order_acquire);
while( state!=st_quit && state!=st_stop ) {
if( my_server.my_slack>=0 ) {
my_client.process(j);
} else {
// Prepare to wait
my_thread_monitor.prepare_wait();
// Check/set the invariant for sleeping
state = my_state.load(std::memory_order_seq_cst);
if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
my_thread_monitor.commit_wait();
my_thread_monitor.wait();
my_server.propagate_chain_reaction();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
}
}
state = my_state.load(std::memory_order_acquire);
// memory_order_seq_cst to be strictly ordered after thread_monitor::wait
state = my_state.load(std::memory_order_seq_cst);
}
my_client.cleanup(j);

Expand Down Expand Up @@ -663,7 +658,8 @@ void ipc_waker::run() {
// which would create race with the launching thread and
// complications in handle management on Windows.

while( my_state.load(std::memory_order_acquire)!=st_quit ) {
// memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
bool have_to_sleep = false;
if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
if( my_server.wait_active_thread() ) {
Expand All @@ -678,14 +674,9 @@ void ipc_waker::run() {
have_to_sleep = true;
}
if( have_to_sleep ) {
// Prepare to wait
my_thread_monitor.prepare_wait();
// Check/set the invariant for sleeping
if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) {
my_thread_monitor.commit_wait();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
if( my_server.my_slack.load(std::memory_order_acquire)<0 ) {
my_thread_monitor.wait();
}
}
}
Expand Down
36 changes: 14 additions & 22 deletions src/tbb/private_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ void private_worker::release_handle(thread_handle handle, bool join) {
}

void private_worker::start_shutdown() {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != st_quit, "The thread has alredy been requested to quit");

// memory_order_acquire to acquire my_handle
state_t prev_state = my_state.exchange(st_quit, std::memory_order_acquire);

Expand Down Expand Up @@ -262,23 +264,14 @@ void private_worker::run() noexcept {
// complications in handle management on Windows.

::rml::job& j = *my_client.create_one_job();
while( my_state.load(std::memory_order_relaxed)!=st_quit ) {
// memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
my_client.process(j);
} else {
// Prepare to wait
my_thread_monitor.prepare_wait();
// Check/set the invariant for sleeping
// We need memory_order_seq_cst to enforce ordering with prepare_wait
// (note that a store in prepare_wait should be with memory_order_seq_cst as well)
if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) {
my_thread_monitor.commit_wait();
__TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
my_server.propagate_chain_reaction();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
}
} else if( my_server.try_insert_in_asleep_list(*this) ) {
my_thread_monitor.wait();
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) == st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
my_server.propagate_chain_reaction();
}
}
my_client.cleanup(j);
Expand Down Expand Up @@ -386,17 +379,16 @@ void private_server::wake_some( int additional_slack ) {
}
done:

if (allotted_slack)
{
if (allotted_slack) {
asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);

while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 && allotted_slack) {
auto root = my_asleep_list_root.load(std::memory_order_relaxed);
while( root && w<wakee+2 && allotted_slack) {
--allotted_slack;
// Pop sleeping worker to combine with claimed unit of slack
auto old = my_asleep_list_root.load(std::memory_order_relaxed);
my_asleep_list_root.store(old->my_next, std::memory_order_relaxed);
*w++ = old;
*w++ = root;
root = root->my_next;
}
my_asleep_list_root.store(root, std::memory_order_relaxed);
if(allotted_slack) {
// Contribute our unused slack to my_slack.
my_slack += allotted_slack;
Expand Down
52 changes: 12 additions & 40 deletions src/tbb/rml_thread_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,12 @@ class thread_monitor {
}
~thread_monitor() {}

//! If a thread is waiting or started a two-phase wait, notify it.
//! Notify waiting thread
/** Can be called by any thread. */
void notify();

//! Begin two-phase wait.
/** Should only be called by thread that owns the monitor.
The caller must either complete the wait or cancel it. */
void prepare_wait();

//! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait.
void commit_wait();

//! Cancel a two-phase wait.
void cancel_wait();
//! Wait for notification
void wait();

#if __TBB_USE_WINAPI
typedef HANDLE handle_type;
Expand All @@ -123,8 +115,8 @@ class thread_monitor {
//! Detach thread
static void detach_thread(handle_type handle);
private:
std::atomic<bool> in_wait{false};
bool skipped_wakeup{false};
// The protection from double notification of the binary semaphore
std::atomic<bool> my_notified{ false };
binary_semaphore my_sema;
#if __TBB_USE_POSIX
static void check( int error_code, const char* routine );
Expand Down Expand Up @@ -215,37 +207,17 @@ void thread_monitor::detach_thread(handle_type handle) {
#endif /* __TBB_USE_POSIX */

inline void thread_monitor::notify() {
bool do_signal = in_wait.exchange( false, std::memory_order_release);
if( do_signal ) {
// Check that the semaphore is not notified twice
if (!my_notified.exchange(true, std::memory_order_release)) {
my_sema.V();
}
}

inline void thread_monitor::prepare_wait() {
if( skipped_wakeup ) {
// Lazily consume a signal that was skipped due to cancel_wait
skipped_wakeup = false;
my_sema.P(); // does not really wait on the semaphore
}
// std::memory_order_seq_cst is required because prepare_wait
// should be ordered before further operations (that are suppose to
// use memory_order_seq_cst where required)
in_wait.store( true, std::memory_order_seq_cst );
}

inline void thread_monitor::commit_wait() {
bool do_it = in_wait.load(std::memory_order_acquire);
if( do_it ) {
my_sema.P();
} else {
skipped_wakeup = true;
}
}

inline void thread_monitor::cancel_wait() {
// if not in_wait, then some thread has sent us a signal;
// it will be consumed by the next prepare_wait call
skipped_wakeup = ! in_wait.exchange( false, std::memory_order_relaxed);
inline void thread_monitor::wait() {
my_sema.P();
// memory_order_seq_cst is required here to be ordered with
// futher loads checking shutdown state
aleksei-fedotov marked this conversation as resolved.
Show resolved Hide resolved
my_notified.store(false, std::memory_order_seq_cst);
}

} // namespace internal
Expand Down