Skip to content

Commit

Permalink
Remove sleeping state. Add mutex synchronization
Browse files Browse the repository at this point in the history
Signed-off-by: pavelkumbrasev <[email protected]>
  • Loading branch information
pavelkumbrasev committed Jan 26, 2022
1 parent 5731e54 commit a22919b
Showing 1 changed file with 41 additions and 37 deletions.
78 changes: 41 additions & 37 deletions src/tbb/private_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class private_worker: no_copy {
st_starting,
//! Associated thread is doing normal life sequence.
st_normal,
//! Associated thread is going to sleep on monitor.
st_sleeping,
//! Associated thread has ended normal life sequence and promises to never touch *this again.
st_quit
};
Expand Down Expand Up @@ -85,14 +83,14 @@ class private_worker: no_copy {
//! Actions executed by the associated thread
void run() noexcept;

//! Send to sleep thread on associated monitor
void send_to_sleep();

//! Wake up associated thread (or launch a thread if there is none)
void wake_or_launch();

//! Called by a thread (usually not the associated thread) to commence termination.
void start_shutdown();
state_t start_shutdown();

//! Called by a thread (usually not the associated thread) to finish termination.
void finish_shutdown(private_worker::state_t prev_state);

static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );

Expand Down Expand Up @@ -189,9 +187,22 @@ class private_server: public tbb_server, no_copy {
}

void request_close_connection( bool /*exiting*/ ) override {
for( std::size_t i=0; i<my_n_thread; ++i )
my_thread_array[i].start_shutdown();
private_worker::state_t* workers_state = tbb::cache_aligned_allocator<private_worker::state_t>().allocate(my_n_thread);

{
// Lock ability to insert in a sleep list
asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
for (std::size_t i = 0; i < my_n_thread; ++i) {
workers_state[i] = my_thread_array[i].start_shutdown();
}
}

for (std::size_t i = 0; i < my_n_thread; ++i) {
my_thread_array[i].finish_shutdown(workers_state[i]);
}
remove_server_ref();

tbb::cache_aligned_allocator<private_worker::state_t>().deallocate(workers_state, my_n_thread);
}

void yield() override { d0::yield(); }
Expand Down Expand Up @@ -237,7 +248,7 @@ void private_worker::release_handle(thread_handle handle, bool join) {
thread_monitor::detach_thread(handle);
}

void private_worker::start_shutdown() {
private_worker::state_t private_worker::start_shutdown() {
// The state can be transferred only in one direction: st_init -> st_starting -> st_normal.
// So we do not need more than three CAS attempts.
state_t expected_state = my_state.load(std::memory_order_relaxed);
Expand All @@ -251,45 +262,28 @@ void private_worker::start_shutdown() {
}
}

if (expected_state == st_normal || expected_state == st_sleeping || expected_state == st_starting) {
return expected_state;
}


void private_worker::finish_shutdown(private_worker::state_t prev_state) {
if (prev_state == st_normal || prev_state == st_starting) {
// May have invalidated invariant for sleeping, so wake up the thread.
// Note that the notify() here occurs without maintaining invariants for my_slack.
// It does not matter, because my_state==st_quit overrides checking of my_slack.
if (expected_state == st_sleeping) {
my_thread_monitor.notify();
}
my_thread_monitor.notify();
// Do not need release handle in st_init state,
// because in this case the thread wasn't started yet.
// For st_starting release is done at launch site.
if (expected_state == st_normal || expected_state == st_sleeping) {
if (prev_state == private_worker::st_normal) {
release_handle(my_handle, governor::does_client_join_workers(my_client));
}
} else if( expected_state==st_init ) {
} else if (prev_state == st_init) {
// Perform action that otherwise would be performed by associated thread when it quits.
my_server.remove_server_ref();
}
}

void private_worker::send_to_sleep() {

thread_monitor::cookie c;
my_thread_monitor.prepare_wait(c);

state_t expected = st_normal;
if (my_state.compare_exchange_strong(expected, st_sleeping) && my_server.try_insert_in_asleep_list(*this)) {
my_thread_monitor.commit_wait(c);
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) == st_quit || !my_next, "Thread monitor missed a spurious wakeup?");
my_server.propagate_chain_reaction();
} else {
my_thread_monitor.cancel_wait();
}

expected = my_state.load(std::memory_order_acquire);
if (expected == st_sleeping) {
my_state.compare_exchange_strong(expected, st_normal);
}
}

void private_worker::run() noexcept {
my_server.propagate_chain_reaction();

Expand All @@ -302,8 +296,18 @@ void private_worker::run() noexcept {
if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
my_client.process(j);
} else {
send_to_sleep();
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != st_sleeping, nullptr);
thread_monitor::cookie c;
// Prepare to wait
my_thread_monitor.prepare_wait(c);
// Check/set the invariant for sleeping
if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) {
my_thread_monitor.commit_wait(c);
__TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
my_server.propagate_chain_reaction();
} else {
// Invariant broken
my_thread_monitor.cancel_wait();
}
}
}
my_client.cleanup(j);
Expand Down

0 comments on commit a22919b

Please sign in to comment.