Skip to content

Commit

Permalink
Merge pull request #825 from CesiumGS/wait-in-main-thread
Browse files Browse the repository at this point in the history
Add AsyncSystem::waitInMainThread
  • Loading branch information
csciguy8 authored Apr 8, 2024
2 parents 623a0cd + afb6670 commit fff19f3
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added `Uri::getPath` and `Uri::setPath`.
- Added `TileTransform::setTransform`.
- Added `waitInMainThread` method to `Future` and `SharedFuture`.

##### Fixes :wrench:

Expand Down
21 changes: 21 additions & 0 deletions CesiumAsync/include/CesiumAsync/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,32 @@ template <typename T> class Future final {
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T wait() { return this->_task.get(); }

/**
* @brief Waits for this future to resolve or reject in the main thread while
* also processing main-thread tasks.
*
* This method must be called from the main thread.
*
* The function does not return until {@link Future::isReady} returns true.
* In the meantime, main-thread tasks are processed as necessary. This method
* does not spin wait; it suspends the calling thread by waiting on a
* condition variable when there is no work to do.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T waitInMainThread() {
return this->_pSchedulers->mainThread.dispatchUntilTaskCompletes(
std::move(this->_task));
}

/**
* @brief Determines if this future is already resolved or rejected.
*
Expand Down
71 changes: 70 additions & 1 deletion CesiumAsync/include/CesiumAsync/Impl/QueuedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,88 @@
#include "ImmediateScheduler.h"
#include "cesium-async++.h"

#include <atomic>

namespace CesiumAsync {
namespace CesiumImpl {

class QueuedScheduler {
public:
QueuedScheduler();
~QueuedScheduler();

void schedule(async::task_run_handle t);
void dispatchQueuedContinuations();
bool dispatchZeroOrOneContinuation();

template <typename T> T dispatchUntilTaskCompletes(async::task<T>&& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes.
//
// We use the `isDone` flag as the loop termination condition to
// avoid a race condition that can lead to a deadlock. If we used
// `unblockTask.ready()` as the termination condition instead, then it's
// possible for events to happen as follows:
//
// 1. The original `task` completes in a worker thread and the `unblockTask`
// continuation is invoked immediately in the same thread.
// 2. The unblockTask continuation calls `unblock`, which terminates the
// `wait` on the condition variable in the main thread.
// 3. The main thread resumes and the while loop in this function spins back
// around and evaluates `unblockTask.ready()`. This returns false because
// the unblockTask continuation has not actually finished running in the
// worker thread yet. The main thread starts waiting on the condition
// variable again.
// 4. The `unblockTask` continuation finally finishes, making
// `unblockTask.ready()` return true, but it's too late. The main thread is
// already waiting on the condition variable.
//
// By setting the atomic `isDone` flag before calling `unblock`, we ensure
// that the loop termination condition is satisfied before the main thread
// is awoken, avoiding the potential deadlock.

std::atomic<bool> isDone = false;
async::task<T> unblockTask =
task.then(async::inline_scheduler(), [this, &isDone](auto&& value) {
isDone = true;
this->unblock();
return std::move(value);
});

while (!isDone) {
this->dispatchInternal(true);
}

return std::move(unblockTask).get();
}

template <typename T>
T dispatchUntilTaskCompletes(const async::shared_task<T>& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes. This case is simpler than the one above because a SharedFuture
// supports multiple continuations. We can use readiness of the _original_
// task to terminate the loop while unblocking in a separate continuation
// guaranteed to run only after that termination condition is satisfied.
async::task<void> unblockTask =
task.then(async::inline_scheduler(), [this](const auto&) {
this->unblock();
});

while (!task.ready()) {
this->dispatchInternal(true);
}

return task.get();
}

ImmediateScheduler<QueuedScheduler> immediate{this};

private:
async::fifo_scheduler _scheduler;
bool dispatchInternal(bool blockIfNoTasks);
void unblock();

struct Impl;
std::unique_ptr<Impl> _pImpl;
};

} // namespace CesiumImpl
Expand Down
21 changes: 21 additions & 0 deletions CesiumAsync/include/CesiumAsync/SharedFuture.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ template <typename T> class SharedFuture final {
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
Expand All @@ -236,6 +238,25 @@ template <typename T> class SharedFuture final {
this->_task.get();
}

/**
* @brief Waits for this future to resolve or reject in the main thread while
* also processing main-thread tasks.
*
* This method must be called from the main thread.
*
* The function does not return until {@link Future::isReady} returns true.
* In the meantime, main-thread tasks are processed as necessary. This method
* does not spin wait; it suspends the calling thread by waiting on a
* condition variable when there is no work to do.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T waitInMainThread() {
return this->_pSchedulers->mainThread.dispatchUntilTaskCompletes(
std::move(this->_task));
}

/**
* @brief Determines if this future is already resolved or rejected.
*
Expand Down
2 changes: 0 additions & 2 deletions CesiumAsync/src/AsyncSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include "CesiumAsync/ITaskProcessor.h"

#include <future>

namespace CesiumAsync {
AsyncSystem::AsyncSystem(
const std::shared_ptr<ITaskProcessor>& pTaskProcessor) noexcept
Expand Down
61 changes: 55 additions & 6 deletions CesiumAsync/src/QueuedScheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,66 @@
#include "CesiumAsync/Impl/QueuedScheduler.h"

using namespace CesiumAsync::CesiumImpl;
#include <condition_variable>
#include <mutex>

// Hackily use Async++'s internal fifo_queue. We could copy it instead - it's
// not much code - but why create the duplication? However, we are assuming that
// Async++'s source (not just headers) are available while building
// cesium-native. If that's a problem in some context, we'll need to do that
// duplication of fifo_queue after all.
#include <async++/../../src/fifo_queue.h>

namespace CesiumAsync::CesiumImpl {

struct QueuedScheduler::Impl {
async::detail::fifo_queue queue;
std::mutex mutex;
std::condition_variable conditionVariable;
};

QueuedScheduler::QueuedScheduler() : _pImpl(std::make_unique<Impl>()) {}
QueuedScheduler::~QueuedScheduler() = default;

void QueuedScheduler::schedule(async::task_run_handle t) {
this->_scheduler.schedule(std::move(t));
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
this->_pImpl->queue.push(std::move(t));

// Notify listeners that there is new work.
this->_pImpl->conditionVariable.notify_all();
}

void QueuedScheduler::dispatchQueuedContinuations() {
auto scope = this->immediate.scope();
this->_scheduler.run_all_tasks();
while (this->dispatchZeroOrOneContinuation()) {
}
}

bool QueuedScheduler::dispatchZeroOrOneContinuation() {
auto scope = this->immediate.scope();
return this->_scheduler.try_run_one_task();
return this->dispatchInternal(false);
}

bool QueuedScheduler::dispatchInternal(bool blockIfNoTasks) {
async::task_run_handle t;

{
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
t = this->_pImpl->queue.pop();
if (blockIfNoTasks && !t) {
this->_pImpl->conditionVariable.wait(guard);
}
}

if (t) {
auto scope = this->immediate.scope();
t.run();
return true;
} else {
return false;
}
}

void QueuedScheduler::unblock() {
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
this->_pImpl->conditionVariable.notify_all();
}

} // namespace CesiumAsync::CesiumImpl
92 changes: 92 additions & 0 deletions CesiumAsync/test/TestAsyncSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,96 @@ TEST_CASE("AsyncSystem") {

CHECK(checksCompleted);
}

SECTION("waitInMainThread") {
SECTION("Future returning a value") {
bool called = false;
Future<int> future =
asyncSystem.createResolvedFuture().thenInMainThread([&called]() {
called = true;
return 4;
});
int value = std::move(future).waitInMainThread();
CHECK(called);
CHECK(value == 4);
}

SECTION("Future returning void") {
bool called = false;
Future<void> future = asyncSystem.createResolvedFuture().thenInMainThread(
[&called]() { called = true; });
std::move(future).waitInMainThread();
CHECK(called);
}

SECTION("SharedFuture returning a value") {
bool called = false;
SharedFuture<int> future = asyncSystem.createResolvedFuture()
.thenInMainThread([&called]() {
called = true;
return 4;
})
.share();
int value = future.waitInMainThread();
CHECK(called);
CHECK(value == 4);
}

SECTION("SharedFuture returning void") {
bool called = false;
SharedFuture<void> future =
asyncSystem.createResolvedFuture()
.thenInMainThread([&called]() { called = true; })
.share();
future.waitInMainThread();
CHECK(called);
}

SECTION("Future resolving while main thread is waiting") {
bool called1 = false;
bool called2 = false;
Future<void> future =
asyncSystem.createResolvedFuture()
.thenInWorkerThread([&called1]() {
using namespace std::chrono_literals;
// should be long enough for the main thread to start waiting on
// the conditional, without slowing the test down too much.
std::this_thread::sleep_for(20ms);
called1 = true;
})
.thenInMainThread([&called2]() { called2 = true; });
future.waitInMainThread();
CHECK(called1);
CHECK(called2);
}

SECTION("Future resolving from a worker while main thread is waiting") {
bool called1 = false;
bool called2 = false;
bool called3 = false;
Future<void> future =
asyncSystem.createResolvedFuture()
.thenInWorkerThread([&called1]() {
using namespace std::chrono_literals;
// should be long enough for the main thread to start waiting on
// the conditional, without slowing the test down too much.
std::this_thread::sleep_for(20ms);
called1 = true;
})
.thenInMainThread([&called2]() { called2 = true; })
.thenInWorkerThread([&called3]() {
using namespace std::chrono_literals;
// Sufficient time for the main thread to drop back into waiting
// on the conditional again after it was awakened by the
// scheduling of the main thread continuation above. It should
// awaken again when this continuation completes.
std::this_thread::sleep_for(20ms);
called3 = true;
});
future.waitInMainThread();
CHECK(called1);
CHECK(called2);
CHECK(called3);
}
}
}

0 comments on commit fff19f3

Please sign in to comment.