Skip to content

Commit

Permalink
Fix race condition.
Browse files Browse the repository at this point in the history
  • Loading branch information
kring committed Apr 5, 2024
1 parent cd232f6 commit c09e2f0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
38 changes: 33 additions & 5 deletions CesiumAsync/include/CesiumAsync/Impl/QueuedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "ImmediateScheduler.h"
#include "cesium-async++.h"

#include <atomic>

namespace CesiumAsync {
namespace CesiumImpl {

Expand All @@ -18,13 +20,38 @@ class QueuedScheduler {
template <typename T> T dispatchUntilTaskCompletes(async::task<T>&& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes.
//
// We use the `isDone` flag as the loop termination condition to
// avoid a race condition that can lead to a deadlock. If we used
// `unblockTask.ready()` as the termination condition instead, then it's
// possible for events to happen as follows:
//
// 1. The original `task` completes in a worker thread and the `unblockTask`
// continuation is invoked immediately in the same thread.
// 2. The unblockTask continuation calls `unblock`, which terminates the
// `wait` on the condition variable in the main thread.
// 3. The main thread resumes and the while loop in this function spins back
// around and evaluates `unblockTask.ready()`. This returns false because
// the unblockTask continuation has not actually finished running in the
// worker thread yet. The main thread starts waiting on the condition
// variable again.
// 4. The `unblockTask` continuation finally finishes, making
// `unblockTask.ready()` return true, but it's too late. The main thread is
// already waiting on the condition variable.
//
// By setting the atomic `isDone` flag before calling `unblock`, we ensure
// that the loop termination condition is satisfied before the main thread
// is awoken, avoiding the potential deadlock.

std::atomic<bool> isDone = false;
async::task<T> unblockTask =
task.then(async::inline_scheduler(), [this](auto&& value) {
task.then(async::inline_scheduler(), [this, &isDone](auto&& value) {
isDone = true;
this->unblock();
return std::move(value);
});

while (!unblockTask.ready()) {
while (!isDone) {
this->dispatchInternal(true);
}

Expand All @@ -34,9 +61,10 @@ class QueuedScheduler {
template <typename T>
T dispatchUntilTaskCompletes(const async::shared_task<T>& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes. Unlike the non-shared future case above, we don't need to pass
// the future value through because shared_task supports multiple
// continuations.
// completes. This case is simpler than the one above because a SharedFuture
// supports multiple continuations. We can use readiness of the _original_
// task to terminate the loop while unblocking in a separate continuation
// guaranteed to run only after that termination condition is satisfied.
async::task<void> unblockTask =
task.then(async::inline_scheduler(), [this](const auto&) {
this->unblock();
Expand Down
4 changes: 2 additions & 2 deletions CesiumAsync/src/QueuedScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ QueuedScheduler::QueuedScheduler() : _pImpl(std::make_unique<Impl>()) {}
QueuedScheduler::~QueuedScheduler() = default;

void QueuedScheduler::schedule(async::task_run_handle t) {
std::lock_guard<std::mutex> guard(this->_pImpl->mutex);
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
this->_pImpl->queue.push(std::move(t));

// Notify listeners that there is new work.
Expand Down Expand Up @@ -59,7 +59,7 @@ bool QueuedScheduler::dispatchInternal(bool blockIfNoTasks) {
}

void QueuedScheduler::unblock() {
std::lock_guard<std::mutex> guard(this->_pImpl->mutex);
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
this->_pImpl->conditionVariable.notify_all();
}

Expand Down

0 comments on commit c09e2f0

Please sign in to comment.