Skip to content

Commit

Permalink
Remove transaction logic from thread_monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Alexei Katranov <[email protected]>
  • Loading branch information
alexey-katranov committed Jan 31, 2022
1 parent 78ff937 commit c0b2ea2
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 78 deletions.
23 changes: 7 additions & 16 deletions python/rml/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,26 +562,21 @@ void ipc_worker::run() {
// complications in handle management on Windows.

::rml::job& j = *my_client.create_one_job();
// TODO: reconsider if memory_order_acquire is required here and other places
state_t state = my_state.load(std::memory_order_acquire);
while( state!=st_quit && state!=st_stop ) {
if( my_server.my_slack>=0 ) {
my_client.process(j);
} else {
// Prepare to wait
my_thread_monitor.prepare_wait();
// Check/set the invariant for sleeping
state = my_state.load(std::memory_order_seq_cst);
if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
my_thread_monitor.commit_wait();
my_thread_monitor.wait();
my_server.propagate_chain_reaction();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
}
}
state = my_state.load(std::memory_order_acquire);
// memory_order_seq_cst to be strictly ordered after thread_monitor::wait
state = my_state.load(std::memory_order_seq_cst);
}
my_client.cleanup(j);

Expand Down Expand Up @@ -663,7 +658,8 @@ void ipc_waker::run() {
// which would create race with the launching thread and
// complications in handle management on Windows.

while( my_state.load(std::memory_order_acquire)!=st_quit ) {
// memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
bool have_to_sleep = false;
if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
if( my_server.wait_active_thread() ) {
Expand All @@ -678,14 +674,9 @@ void ipc_waker::run() {
have_to_sleep = true;
}
if( have_to_sleep ) {
// Prepare to wait
my_thread_monitor.prepare_wait();
// Check/set the invariant for sleeping
if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) {
my_thread_monitor.commit_wait();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
if( my_server.my_slack.load(std::memory_order_acquire)<0 ) {
my_thread_monitor.wait();
}
}
}
Expand Down
36 changes: 14 additions & 22 deletions src/tbb/private_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ void private_worker::release_handle(thread_handle handle, bool join) {
}

void private_worker::start_shutdown() {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != st_quit, "The thread has alredy been requested to quit");

// memory_order_acquire to acquire my_handle
state_t prev_state = my_state.exchange(st_quit, std::memory_order_acquire);

Expand Down Expand Up @@ -262,23 +264,14 @@ void private_worker::run() noexcept {
// complications in handle management on Windows.

::rml::job& j = *my_client.create_one_job();
while( my_state.load(std::memory_order_relaxed)!=st_quit ) {
// memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
my_client.process(j);
} else {
// Prepare to wait
my_thread_monitor.prepare_wait();
// Check/set the invariant for sleeping
// We need memory_order_seq_cst to enforce ordering with prepare_wait
// (note that a store in prepare_wait should be with memory_order_seq_cst as well)
if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) {
my_thread_monitor.commit_wait();
__TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
my_server.propagate_chain_reaction();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
}
} else if( my_server.try_insert_in_asleep_list(*this) ) {
my_thread_monitor.wait();
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) == st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
my_server.propagate_chain_reaction();
}
}
my_client.cleanup(j);
Expand Down Expand Up @@ -386,17 +379,16 @@ void private_server::wake_some( int additional_slack ) {
}
done:

if (allotted_slack)
{
if (allotted_slack) {
asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);

while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 && allotted_slack) {
auto root = my_asleep_list_root.load(std::memory_order_relaxed);
while( root && w<wakee+2 && allotted_slack) {
--allotted_slack;
// Pop sleeping worker to combine with claimed unit of slack
auto old = my_asleep_list_root.load(std::memory_order_relaxed);
my_asleep_list_root.store(old->my_next, std::memory_order_relaxed);
*w++ = old;
*w++ = root;
root = root->my_next;
}
my_asleep_list_root.store(root, std::memory_order_relaxed);
if(allotted_slack) {
// Contribute our unused slack to my_slack.
my_slack += allotted_slack;
Expand Down
52 changes: 12 additions & 40 deletions src/tbb/rml_thread_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,12 @@ class thread_monitor {
}
~thread_monitor() {}

//! If a thread is waiting or started a two-phase wait, notify it.
//! Notify waiting thread
/** Can be called by any thread. */
void notify();

//! Begin two-phase wait.
/** Should only be called by thread that owns the monitor.
The caller must either complete the wait or cancel it. */
void prepare_wait();

//! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait.
void commit_wait();

//! Cancel a two-phase wait.
void cancel_wait();
//! Wait for notification
void wait();

#if __TBB_USE_WINAPI
typedef HANDLE handle_type;
Expand All @@ -123,8 +115,8 @@ class thread_monitor {
//! Detach thread
static void detach_thread(handle_type handle);
private:
std::atomic<bool> in_wait{false};
bool skipped_wakeup{false};
// The protection from double notification of the binary semaphore
std::atomic<bool> my_notified{ false };
binary_semaphore my_sema;
#if __TBB_USE_POSIX
static void check( int error_code, const char* routine );
Expand Down Expand Up @@ -215,37 +207,17 @@ void thread_monitor::detach_thread(handle_type handle) {
#endif /* __TBB_USE_POSIX */

inline void thread_monitor::notify() {
bool do_signal = in_wait.exchange( false, std::memory_order_release);
if( do_signal ) {
// Check that the semaphore is not notified twice
if (!my_notified.exchange(true, std::memory_order_release)) {
my_sema.V();
}
}

inline void thread_monitor::prepare_wait() {
if( skipped_wakeup ) {
// Lazily consume a signal that was skipped due to cancel_wait
skipped_wakeup = false;
my_sema.P(); // does not really wait on the semaphore
}
// std::memory_order_seq_cst is required because prepare_wait
// should be ordered before further operations (that are suppose to
// use memory_order_seq_cst where required)
in_wait.store( true, std::memory_order_seq_cst );
}

inline void thread_monitor::commit_wait() {
bool do_it = in_wait.load(std::memory_order_acquire);
if( do_it ) {
my_sema.P();
} else {
skipped_wakeup = true;
}
}

inline void thread_monitor::cancel_wait() {
// if not in_wait, then some thread has sent us a signal;
// it will be consumed by the next prepare_wait call
skipped_wakeup = ! in_wait.exchange( false, std::memory_order_relaxed);
inline void thread_monitor::wait() {
my_sema.P();
// memory_order_seq_cst is required here to be ordered with
// futher loads checking shutdown state
my_notified.store(false, std::memory_order_seq_cst);
}

} // namespace internal
Expand Down

0 comments on commit c0b2ea2

Please sign in to comment.