Skip to content

Commit

Permalink
Merge branch 'main' into to-globe-rectangle
Browse files Browse the repository at this point in the history
  • Loading branch information
j9liu authored Apr 9, 2024
2 parents b90d028 + fff19f3 commit 377c95f
Show file tree
Hide file tree
Showing 18 changed files with 586 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Added `Uri::getPath` and `Uri::setPath`.
- Added `TileTransform::setTransform`.
- Added `BoundingRegionBuilder::toGlobeRectangle`.
- Added `waitInMainThread` method to `Future` and `SharedFuture`.

##### Fixes :wrench:

Expand Down
21 changes: 21 additions & 0 deletions CesiumAsync/include/CesiumAsync/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,32 @@ template <typename T> class Future final {
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T wait() { return this->_task.get(); }

/**
* @brief Waits for this future to resolve or reject in the main thread while
* also processing main-thread tasks.
*
* This method must be called from the main thread.
*
* The function does not return until {@link Future::isReady} returns true.
* In the meantime, main-thread tasks are processed as necessary. This method
* does not spin wait; it suspends the calling thread by waiting on a
* condition variable when there is no work to do.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T waitInMainThread() {
return this->_pSchedulers->mainThread.dispatchUntilTaskCompletes(
std::move(this->_task));
}

/**
* @brief Determines if this future is already resolved or rejected.
*
Expand Down
71 changes: 70 additions & 1 deletion CesiumAsync/include/CesiumAsync/Impl/QueuedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,88 @@
#include "ImmediateScheduler.h"
#include "cesium-async++.h"

#include <atomic>

namespace CesiumAsync {
namespace CesiumImpl {

class QueuedScheduler {
public:
QueuedScheduler();
~QueuedScheduler();

void schedule(async::task_run_handle t);
void dispatchQueuedContinuations();
bool dispatchZeroOrOneContinuation();

template <typename T> T dispatchUntilTaskCompletes(async::task<T>&& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes.
//
// We use the `isDone` flag as the loop termination condition to
// avoid a race condition that can lead to a deadlock. If we used
// `unblockTask.ready()` as the termination condition instead, then it's
// possible for events to happen as follows:
//
// 1. The original `task` completes in a worker thread and the `unblockTask`
// continuation is invoked immediately in the same thread.
// 2. The unblockTask continuation calls `unblock`, which terminates the
// `wait` on the condition variable in the main thread.
// 3. The main thread resumes and the while loop in this function spins back
// around and evaluates `unblockTask.ready()`. This returns false because
// the unblockTask continuation has not actually finished running in the
// worker thread yet. The main thread starts waiting on the condition
// variable again.
// 4. The `unblockTask` continuation finally finishes, making
// `unblockTask.ready()` return true, but it's too late. The main thread is
// already waiting on the condition variable.
//
// By setting the atomic `isDone` flag before calling `unblock`, we ensure
// that the loop termination condition is satisfied before the main thread
// is awoken, avoiding the potential deadlock.

std::atomic<bool> isDone = false;
async::task<T> unblockTask =
task.then(async::inline_scheduler(), [this, &isDone](auto&& value) {
isDone = true;
this->unblock();
return std::move(value);
});

while (!isDone) {
this->dispatchInternal(true);
}

return std::move(unblockTask).get();
}

template <typename T>
T dispatchUntilTaskCompletes(const async::shared_task<T>& task) {
// Set up a continuation to unblock the blocking dispatch when this task
// completes. This case is simpler than the one above because a SharedFuture
// supports multiple continuations. We can use readiness of the _original_
// task to terminate the loop while unblocking in a separate continuation
// guaranteed to run only after that termination condition is satisfied.
async::task<void> unblockTask =
task.then(async::inline_scheduler(), [this](const auto&) {
this->unblock();
});

while (!task.ready()) {
this->dispatchInternal(true);
}

return task.get();
}

ImmediateScheduler<QueuedScheduler> immediate{this};

private:
async::fifo_scheduler _scheduler;
bool dispatchInternal(bool blockIfNoTasks);
void unblock();

struct Impl;
std::unique_ptr<Impl> _pImpl;
};

} // namespace CesiumImpl
Expand Down
21 changes: 21 additions & 0 deletions CesiumAsync/include/CesiumAsync/SharedFuture.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ template <typename T> class SharedFuture final {
* deadlock because the main thread tasks will never complete while this
* method is blocking the main thread.
*
* To wait in the main thread, use {@link waitInMainThread} instead.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
Expand All @@ -236,6 +238,25 @@ template <typename T> class SharedFuture final {
this->_task.get();
}

/**
* @brief Waits for this future to resolve or reject in the main thread while
* also processing main-thread tasks.
*
* This method must be called from the main thread.
*
* The function does not return until {@link Future::isReady} returns true.
* In the meantime, main-thread tasks are processed as necessary. This method
* does not spin wait; it suspends the calling thread by waiting on a
* condition variable when there is no work to do.
*
* @return The value if the future resolves successfully.
* @throws An exception if the future rejected.
*/
T waitInMainThread() {
return this->_pSchedulers->mainThread.dispatchUntilTaskCompletes(
std::move(this->_task));
}

/**
* @brief Determines if this future is already resolved or rejected.
*
Expand Down
2 changes: 0 additions & 2 deletions CesiumAsync/src/AsyncSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include "CesiumAsync/ITaskProcessor.h"

#include <future>

namespace CesiumAsync {
AsyncSystem::AsyncSystem(
const std::shared_ptr<ITaskProcessor>& pTaskProcessor) noexcept
Expand Down
61 changes: 55 additions & 6 deletions CesiumAsync/src/QueuedScheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,66 @@
#include "CesiumAsync/Impl/QueuedScheduler.h"

using namespace CesiumAsync::CesiumImpl;
#include <condition_variable>
#include <mutex>

// Hackily use Async++'s internal fifo_queue. We could copy it instead - it's
// not much code - but why create the duplication? However, we are assuming that
// Async++'s source (not just headers) are available while building
// cesium-native. If that's a problem in some context, we'll need to do that
// duplication of fifo_queue after all.
#include <async++/../../src/fifo_queue.h>

namespace CesiumAsync::CesiumImpl {

struct QueuedScheduler::Impl {
async::detail::fifo_queue queue;
std::mutex mutex;
std::condition_variable conditionVariable;
};

QueuedScheduler::QueuedScheduler() : _pImpl(std::make_unique<Impl>()) {}
QueuedScheduler::~QueuedScheduler() = default;

void QueuedScheduler::schedule(async::task_run_handle t) {
this->_scheduler.schedule(std::move(t));
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
this->_pImpl->queue.push(std::move(t));

// Notify listeners that there is new work.
this->_pImpl->conditionVariable.notify_all();
}

void QueuedScheduler::dispatchQueuedContinuations() {
auto scope = this->immediate.scope();
this->_scheduler.run_all_tasks();
while (this->dispatchZeroOrOneContinuation()) {
}
}

bool QueuedScheduler::dispatchZeroOrOneContinuation() {
auto scope = this->immediate.scope();
return this->_scheduler.try_run_one_task();
return this->dispatchInternal(false);
}

bool QueuedScheduler::dispatchInternal(bool blockIfNoTasks) {
async::task_run_handle t;

{
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
t = this->_pImpl->queue.pop();
if (blockIfNoTasks && !t) {
this->_pImpl->conditionVariable.wait(guard);
}
}

if (t) {
auto scope = this->immediate.scope();
t.run();
return true;
} else {
return false;
}
}

void QueuedScheduler::unblock() {
std::unique_lock<std::mutex> guard(this->_pImpl->mutex);
this->_pImpl->conditionVariable.notify_all();
}

} // namespace CesiumAsync::CesiumImpl
92 changes: 92 additions & 0 deletions CesiumAsync/test/TestAsyncSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,96 @@ TEST_CASE("AsyncSystem") {

CHECK(checksCompleted);
}

SECTION("waitInMainThread") {
SECTION("Future returning a value") {
bool called = false;
Future<int> future =
asyncSystem.createResolvedFuture().thenInMainThread([&called]() {
called = true;
return 4;
});
int value = std::move(future).waitInMainThread();
CHECK(called);
CHECK(value == 4);
}

SECTION("Future returning void") {
bool called = false;
Future<void> future = asyncSystem.createResolvedFuture().thenInMainThread(
[&called]() { called = true; });
std::move(future).waitInMainThread();
CHECK(called);
}

SECTION("SharedFuture returning a value") {
bool called = false;
SharedFuture<int> future = asyncSystem.createResolvedFuture()
.thenInMainThread([&called]() {
called = true;
return 4;
})
.share();
int value = future.waitInMainThread();
CHECK(called);
CHECK(value == 4);
}

SECTION("SharedFuture returning void") {
bool called = false;
SharedFuture<void> future =
asyncSystem.createResolvedFuture()
.thenInMainThread([&called]() { called = true; })
.share();
future.waitInMainThread();
CHECK(called);
}

SECTION("Future resolving while main thread is waiting") {
bool called1 = false;
bool called2 = false;
Future<void> future =
asyncSystem.createResolvedFuture()
.thenInWorkerThread([&called1]() {
using namespace std::chrono_literals;
// should be long enough for the main thread to start waiting on
// the conditional, without slowing the test down too much.
std::this_thread::sleep_for(20ms);
called1 = true;
})
.thenInMainThread([&called2]() { called2 = true; });
future.waitInMainThread();
CHECK(called1);
CHECK(called2);
}

SECTION("Future resolving from a worker while main thread is waiting") {
bool called1 = false;
bool called2 = false;
bool called3 = false;
Future<void> future =
asyncSystem.createResolvedFuture()
.thenInWorkerThread([&called1]() {
using namespace std::chrono_literals;
// should be long enough for the main thread to start waiting on
// the conditional, without slowing the test down too much.
std::this_thread::sleep_for(20ms);
called1 = true;
})
.thenInMainThread([&called2]() { called2 = true; })
.thenInWorkerThread([&called3]() {
using namespace std::chrono_literals;
// Sufficient time for the main thread to drop back into waiting
// on the conditional again after it was awakened by the
// scheduling of the main thread continuation above. It should
// awaken again when this continuation completes.
std::this_thread::sleep_for(20ms);
called3 = true;
});
future.waitInMainThread();
CHECK(called1);
CHECK(called2);
CHECK(called3);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// This file was generated by generate-classes.
// DO NOT EDIT THIS FILE!
#pragma once

#include "CesiumGltf/Library.h"

#include <CesiumUtility/ExtensibleObject.h>

#include <cstdint>

namespace CesiumGltf {
/**
* @brief glTF extension for indicating that some edges of a primitive's
* triangles should be outlined.
*/
struct CESIUMGLTF_API ExtensionCesiumPrimitiveOutline final
: public CesiumUtility::ExtensibleObject {
static inline constexpr const char* TypeName =
"ExtensionCesiumPrimitiveOutline";
static inline constexpr const char* ExtensionName =
"CESIUM_primitive_outline";

/**
* @brief The index of the accessor providing the list of highlighted lines at
* the edge of this primitive's triangles.
*/
int32_t indices = -1;
};
} // namespace CesiumGltf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace CesiumGltf {
/**
* @brief KHR_draco_mesh_compression extension
* @brief KHR_draco_mesh_compression glTF Mesh Primitive Extension
*/
struct CESIUMGLTF_API ExtensionKhrDracoMeshCompression final
: public CesiumUtility::ExtensibleObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace CesiumGltf {
/**
* @brief KHR_materials_variants mesh primitive extension
* @brief KHR_materials_variants glTF Mesh Primitive Extension
*/
struct CESIUMGLTF_API ExtensionMeshPrimitiveKhrMaterialsVariants final
: public CesiumUtility::ExtensibleObject {
Expand Down
Loading

0 comments on commit 377c95f

Please sign in to comment.