diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b8774a9c..a7eccc31 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,8 +42,8 @@ jobs: cxx: clang++ tsan: NO - - name: Windows (Visual Studio Enterprise 2019) - os: windows-latest + - name: Windows (Visual Studio Enterprise 2022) + os: windows-2022 cc: cl cxx: cl tsan: NO diff --git a/CMakeLists.txt b/CMakeLists.txt index 29cf5f30..22dc9c68 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.16) project(concurrencpp - VERSION 0.1.4 + VERSION 0.1.5 LANGUAGES CXX) include(cmake/coroutineOptions.cmake) @@ -29,6 +29,7 @@ set(concurrencpp_sources source/results/impl/shared_result_state.cpp source/results/promises.cpp source/runtime/runtime.cpp + source/threads/async_lock.cpp source/threads/binary_semaphore.cpp source/threads/thread.cpp source/timers/timer.cpp @@ -71,6 +72,7 @@ set(concurrencpp_headers include/concurrencpp/results/generator.h include/concurrencpp/runtime/constants.h include/concurrencpp/runtime/runtime.h + include/concurrencpp/threads/async_lock.h include/concurrencpp/threads/binary_semaphore.h include/concurrencpp/threads/thread.h include/concurrencpp/threads/cache_line.h @@ -91,6 +93,9 @@ target_compile_features(concurrencpp PUBLIC cxx_std_20) target_coroutine_options(concurrencpp) +find_package(Threads REQUIRED) +target_link_libraries(concurrencpp PUBLIC Threads::Threads) + find_library(LIBRT NAMES rt DOC "Path to the Real Time shared library") target_link_libraries(concurrencpp PUBLIC "$<$:${LIBRT}>") diff --git a/README.md b/README.md index 91d166ec..15cfd482 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,3 @@ - - # concurrencpp, the C++ concurrency library ![Latest Release](https://img.shields.io/github/v/release/David-Haim/concurrencpp.svg) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) @@ -57,6 +55,9 @@ concurrencpp main advantages are: * [Delay object example](#delay-object-example) * [Generators](#generators) * [`generator` API](#generator-api) +* [Asynchronous locks](#asynchronous-locks) + * [`async_lock` API](#async_lock-api) + * [`scoped_async_lock` API](#scoped_async_lock-api) * [The runtime object](#the-runtime-object) * [`runtime` API](#runtime-api) * [Creating user-defined executors](#creating-user-defined-executors) @@ -766,14 +767,16 @@ concurrencpp also provides parallel coroutines, which start to run inside a give Every parallel coroutine must meet the following preconditions: 1. Returns any of `result` / `null_result` . -1. Gets `executor_tag` as its first argument . -1. Gets any of `type*` / `type&` / `std::shared_ptr`, where `type` is a concrete class of `executor` as its second argument. -1. Contains any of `co_await` or `co_return` in its body. +2. Gets `executor_tag` as its first argument . +3. Gets any of `type*` / `type&` / `std::shared_ptr`, where `type` is a concrete class of `executor` as its second argument. +4. Contains any of `co_await` or `co_return` in its body. +5. Is not a member function or a lambda function If all the above applies, the function is a parallel coroutine: concurrencpp will start the coroutine suspended and immediately reschedule it to run in the provided executor. `concurrencpp::executor_tag` is a dummy placeholder to tell the concurrencpp runtime that this function is not a regular function, it needs to start running inside the given executor. -Applications can then consume the result of the parallel coroutine by using the returned result object. +If the executor passed to the parallel coroutine is null, the coroutine will not start to run and an `std::invalid_argument` exception will be thrown synchronously. +If all preconditions are met, Applications can consume the result of the parallel coroutine by using the returned result object. #### *Parallel Fibonacci example:* ```cpp @@ -1650,6 +1653,210 @@ class generator_iterator { }; ``` +### Asynchronous locks +Regular synchronous locks cannot be used safely inside coroutines for a number of reasons: + + - Synchronous locks, such as `std::mutex`, are expected to be locked and unlocked in the same thread of execution. Unlocking a synchronous lock in a thread which had not locked it is undefined behavior. Since tasks can be suspended and resumed in any thread of execution, synchronous locks will break when used inside tasks. + - Synchronous locks were created to work with *threads* and not with *coroutines*. If a synchronous lock is already locked by one thread, then when another thread tries to lock it, the entire thread of execution will be blocked and will be unblocked when the lock is released. This mechanism works well for traditional multi-threading paradigms but not for coroutines: with coroutines, we want *tasks* to be *suspended and resumed* without blocking or interfering with the execution of underlying threads and executors. + + `concurrencpp::async_lock` solves those issues by providing a similar API to `std::mutex`, with the main difference that calls to `concurrencpp::async_lock` will return a lazy-result that can be `co_awaited` safely inside tasks. If one task tries to lock an async-lock and fails, the task will be suspended, and will be resumed when the lock is unlocked and acquired by the suspended task. This allows executors to process a huge amount of tasks waiting to acquire a lock without expensive context-switching and expensive kernel calls. + +Similar to how `std::mutex` works, only one task can acquire `async_lock` at any given time, and a *read barrier* is place at the moment of acquiring. Releasing an async lock places a *write barrier* and allows the next task to acquire it, creating a chain of one-modifier at a time who sees the changes other modifiers had done and posts its modifications for the next modifiers to see. + +Like `std::mutex`, `concurrencpp::async_lock` ***is not recursive***. Extra attention must be given when acquiring such lock - A lock must not be acquired again in a task that has been spawned by another task which had already acquired the lock. In such case, an unavoidable dead-lock will occur. Unlike other objects in concurrencpp, `async_lock` is neither copiable nor movable. + +Like standard locks, `concurrencpp::async_lock` is meant to be used with scoped wrappers which leverage C++ RAII idiom to ensure locks are always unlocked upon function return or thrown exception. `async_lock::lock` returns a lazy-result of a scoped wrapper that calls `async_lock::unlock` on destruction. Raw uses of `async_lock::unlock` are discouraged. `concurrencpp::scoped_async_lock` acts as the scoped wrapper and provides an API which is almost identical to `std::unique_lock`. `concurrencpp::scoped_async_lock` is movable, but not copiable. + +`async_lock::lock` and `scoped_async_lock::lock` require a resume-executor as their parameter. Upon calling those methods, if the lock is available for locking, then it is locked and the current task is resumed immediately. If not, then the current task is suspended, and will be resumed inside the given resume-executor when the lock is finally acquired by it. + +`concurrencpp::scoped_async_lock` wraps an `async_lock` and ensure it's properly unlocked. like `std::unique_lock`, there are cases it does not wrap any lock, and in this case it's considered to be empty. An empty `scoped_async_lock` can happen when it's defaultly constructed, moved, or `scoped_async_lock::release` method is called. An empty scoped-async-lock will not unlock any lock on destruction. + +Even if the scoped-async-lock is not empty, it does not mean that it owns the underlying async-lock and it will unlock it on destruction. Non-empty and non-owning scoped-async locks can happen if `scoped_async_lock::unlock` was called or the scoped-async-lock was constructed using `scoped_async_lock(async_lock&, std::defer_lock_t)` constructor. + +#### `async_lock` *example:* + +```cpp +#include "concurrencpp/concurrencpp.h" + +#include +#include + +std::vector numbers; +concurrencpp::async_lock lock; + +concurrencpp::result add_numbers(concurrencpp::executor_tag, + std::shared_ptr executor, + size_t begin, + size_t end) { + for (auto i = begin; i < end; i++) { + concurrencpp::scoped_async_lock raii_wrapper = co_await lock.lock(executor); + numbers.push_back(i); + } +} + +int main() { + concurrencpp::runtime runtime; + constexpr size_t range = 10'000'000; + constexpr size_t sections = 4; + concurrencpp::result results[sections]; + + for (size_t i = 0; i < 4; i++) { + const auto range_start = i * range / sections; + const auto range_end = (i + 1) * range / sections; + + results[i] = add_numbers({}, runtime.thread_pool_executor(), range_start, range_end); + } + + for (auto& result : results) { + result.get(); + } + + std::cout << "vector size is " << numbers.size() << std::endl; + + // make sure the vector state has not been corrupted by unprotected concurrent accesses + std::sort(numbers.begin(), numbers.end()); + for (size_t i = 0; i < range; i++) { + if (numbers[i] != i) { + std::cerr << "vector state is corrupted." << std::endl; + return -1; + } + } + + std::cout << "succeeded pushing range [0 - 10,000,000] concurrently to the vector!" << std::endl; + return 0; +} +``` +#### `async_lock` API +```cpp +class async_lock { + /* + Constructs an async lock object. + */ + async_lock() noexcept; + + /* + Destructs an async lock object. + *this is not automatically unlocked at the moment of destruction. + */ + ~async_lock() noexcept; + + /* + Asynchronously acquires the async lock. + If *this has already been locked by another non-parent task, the current task will be suspended + and will be resumed when *this is acquired, inside resume_executor. + If *this has not been locked by another task, then *this will be acquired and the current task will be resumed + immediately in the calling thread of execution. + If *this has already been locked by a parent task, then unavoidable dead-lock will occur. + Throws std::invalid_argument if resume_executor is null. + Throws std::system error if one of the underlying synhchronization primitives throws. + */ + lazy_result lock(std::shared_ptr resume_executor); + + /* + Tries to acquire *this in the calling thread of execution. + Returns true if *this is acquired, false otherwise. + In any case, the current task is resumed immediately in the calling thread of execution. + Throws std::system error if one of the underlying synhchronization primitives throws. + */ + lazy_result try_lock(); + + /* + Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it. + Throws std::system error if *this is not locked at the moment of calling this method. + Throws std::system error if one of the underlying synhchronization primitives throws. + */ + void unlock(); +}; +``` +#### `scoped_async_lock` API + +```cpp +class scoped_async_lock { + /* + Constructs an async lock wrapper that does not wrap any async lock. + */ + scoped_async_lock() noexcept = default; + + /* + If *this wraps async_lock, this method releases the wrapped lock. + */ + ~scoped_async_lock() noexcept; + + /* + Moves rhs to *this. + After this call, *rhs does not wrap any async lock. + */ + scoped_async_lock(scoped_async_lock&& rhs) noexcept; + + /* + Wrapps unlocked lock. + lock must not be in acquired mode when calling this method. + */ + scoped_async_lock(async_lock& lock, std::defer_lock_t) noexcept; + + /* + Wrapps locked lock. + lock must be already acquired when calling this method. + */ + scoped_async_lock(async_lock& lock, std::adopt_lock_t) noexcept; + + /* + Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter. + Throws std::invalid_argument if resume_executor is nulll. + Throws std::system_error if *this does not wrap any lock. + Throws std::system_error if wrapped lock is already locked. + Throws any exception async_lock::lock throws. + */ + lazy_result lock(std::shared_ptr resume_executor); + + /* + Calls async_lock::try_lock on the wrapped lock. + Throws std::system_error if *this does not wrap any lock. + Throws std::system_error if wrapped lock is already locked. + Throws any exception async_lock::try_lock throws. + */ + lazy_result try_lock(); + + /* + Calls async_lock::unlock on the wrapped lock. + If *this does not wrap any lock, this method does nothing. + Throws std::system_error if *this wraps a lock and it is not locked. + */ + void unlock(); + + /* + Checks whether *this wraps a locked mutex or not. + Returns true if wrapped locked is in acquired state, false otherwise. + */ + bool owns_lock() const noexcept; + + /* + Equivalent to owns_lock. + */ + explicit operator bool() const noexcept; + + /* + Swaps the contents of *this and rhs. + */ + void swap(scoped_async_lock& rhs) noexcept; + + /* + Empties *this and returns a pointer to the previously wrapped lock. + After a call to this method, *this doesn't wrap any lock. + The previously wrapped lock is not released, + it must be released by either unlocking it manually through the returned pointer or by + capturing the pointer with another scoped_async_lock which will take ownerwhip over it. + */ + async_lock* release() noexcept; + + /* + Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock. + */ + async_lock* mutex() const noexcept; +}; + +``` + ### The runtime object The concurrencpp runtime object is the agent used to acquire, store and create new executors. @@ -1988,14 +2195,17 @@ $ cd build/test $ ctest . -V ``` -##### Via vcpkg on Windows and *nix platforms +##### Via package managers on Windows and *nix platforms -Alternatively to building and installing the library manually, developers may get stable releases of concurrencpp as [vcpkg](https://vcpkg.io/) packages: +Alternatively to building and installing the library manually, developers may get stable releases of concurrencpp via the [vcpkg](https://vcpkg.io/) and [Conan](https://conan.io/) package managers: +vcpkg: ```shell $ vcpkg install concurrencpp ``` +Conan: [concurrencpp on ConanCenter](https://conan.io/center/concurrencpp) + ##### Experimenting with the built-in sandbox concurrencpp comes with a built-in sandbox program which developers can modify and experiment, without having to install or link the compiled library to a different code-base. In order to play with the sandbox, developers can modify `sandbox/main.cpp` and compile the application using the following commands: @@ -2012,4 +2222,4 @@ $ cmake -S sandbox -B build/sandbox #for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox $ cmake --build build/sandbox $ ./build/sandbox #runs the sandbox -``` +``` \ No newline at end of file diff --git a/cmake/concurrencppConfig.cmake b/cmake/concurrencppConfig.cmake index dfa304f7..742d18ea 100644 --- a/cmake/concurrencppConfig.cmake +++ b/cmake/concurrencppConfig.cmake @@ -1 +1,4 @@ +include(CMakeFindDependencyMacro) +find_dependency(Threads) + include("${CMAKE_CURRENT_LIST_DIR}/concurrencppTargets.cmake") diff --git a/cmake/coroutineOptions.cmake b/cmake/coroutineOptions.cmake index 14a60c36..e6aef1cc 100644 --- a/cmake/coroutineOptions.cmake +++ b/cmake/coroutineOptions.cmake @@ -4,13 +4,7 @@ function(target_coroutine_options TARGET) if(MSVC) target_compile_options(${TARGET} PUBLIC /std:c++latest /permissive-) - return() - endif() - - find_package(Threads REQUIRED) - target_link_libraries(${TARGET} PRIVATE Threads::Threads) - - if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang") target_compile_options(${TARGET} PUBLIC -stdlib=libc++ -fcoroutines-ts) target_link_options(${TARGET} PUBLIC -stdlib=libc++) set_target_properties(${TARGET} PROPERTIES CXX_EXTENSIONS NO) diff --git a/cmake/setCiVars.cmake b/cmake/setCiVars.cmake index 6699772d..5c5d91b6 100644 --- a/cmake/setCiVars.cmake +++ b/cmake/setCiVars.cmake @@ -1,6 +1,6 @@ if (os MATCHES "^windows") execute_process( - COMMAND "C:/Program Files (x86)/Microsoft Visual Studio/2019/Enterprise/VC/Auxiliary/Build/vcvars64.bat" && set + COMMAND "C:/Program Files/Microsoft Visual Studio/2022/Enterprise/VC/Auxiliary/Build/vcvars64.bat" && set OUTPUT_FILE environment_script_output.txt ) file(STRINGS environment_script_output.txt output_lines) diff --git a/include/concurrencpp/concurrencpp.h b/include/concurrencpp/concurrencpp.h index 318490f1..02dd2fb6 100644 --- a/include/concurrencpp/concurrencpp.h +++ b/include/concurrencpp/concurrencpp.h @@ -17,5 +17,6 @@ #include "concurrencpp/results/resume_on.h" #include "concurrencpp/results/generator.h" #include "concurrencpp/executors/executor_all.h" +#include "concurrencpp/threads/async_lock.h" #endif diff --git a/include/concurrencpp/coroutines/coroutine.h b/include/concurrencpp/coroutines/coroutine.h index 91c2bf3b..b4befd83 100644 --- a/include/concurrencpp/coroutines/coroutine.h +++ b/include/concurrencpp/coroutines/coroutine.h @@ -3,28 +3,23 @@ #include "../platform_defs.h" -#ifdef CRCPP_MSVC_COMPILER +#if !__has_include() && __has_include() -# include +# include +# define CRCPP_COROUTINE_NAMESPACE std::experimental -namespace concurrencpp::details { - template - using coroutine_handle = std::coroutine_handle; - using suspend_never = std::suspend_never; - using suspend_always = std::suspend_always; -} // namespace concurrencpp::details +#else -#elif defined(CRCPP_CLANG_COMPILER) +# include +# define CRCPP_COROUTINE_NAMESPACE std -# include +#endif namespace concurrencpp::details { template - using coroutine_handle = std::experimental::coroutine_handle; - using suspend_never = std::experimental::suspend_never; - using suspend_always = std::experimental::suspend_always; + using coroutine_handle = CRCPP_COROUTINE_NAMESPACE::coroutine_handle; + using suspend_never = CRCPP_COROUTINE_NAMESPACE::suspend_never; + using suspend_always = CRCPP_COROUTINE_NAMESPACE::suspend_always; } // namespace concurrencpp::details -#endif - #endif \ No newline at end of file diff --git a/include/concurrencpp/forward_declarations.h b/include/concurrencpp/forward_declarations.h index 039d2ec8..cf9f0335 100644 --- a/include/concurrencpp/forward_declarations.h +++ b/include/concurrencpp/forward_declarations.h @@ -30,6 +30,8 @@ namespace concurrencpp { template class generator; + + class async_lock; } // namespace concurrencpp #endif // FORWARD_DECLARATIONS_H diff --git a/include/concurrencpp/results/constants.h b/include/concurrencpp/results/constants.h index e4b4e145..4a588d37 100644 --- a/include/concurrencpp/results/constants.h +++ b/include/concurrencpp/results/constants.h @@ -99,6 +99,12 @@ namespace concurrencpp::details::consts { * generator */ inline const char* k_empty_generator_begin_err_msg = "generator::begin - generator is empty."; + + /* + * parallel-coroutine + */ + inline const char* k_parallel_coroutine_null_exception_err_msg = "parallel-coroutine - given executor is null."; + } // namespace concurrencpp::details::consts #endif diff --git a/include/concurrencpp/results/impl/consumer_context.h b/include/concurrencpp/results/impl/consumer_context.h index 0a6f861f..b82b6cf2 100644 --- a/include/concurrencpp/results/impl/consumer_context.h +++ b/include/concurrencpp/results/impl/consumer_context.h @@ -8,28 +8,14 @@ #include namespace concurrencpp::details { - class await_context { - - private: - coroutine_handle m_caller_handle; - std::exception_ptr m_interrupt_exception; - - public: - void resume() noexcept; - - void set_coro_handle(coroutine_handle coro_handle) noexcept; - void set_interrupt(const std::exception_ptr& interrupt) noexcept; - - void throw_if_interrupted() const; - }; - class await_via_functor { private: - await_context* m_ctx; + coroutine_handle m_caller_handle; + bool* m_interrupted; public: - await_via_functor(await_context* ctx) noexcept; + await_via_functor(coroutine_handle caller_handle, bool* interrupted) noexcept; await_via_functor(await_via_functor&& rhs) noexcept; ~await_via_functor() noexcept; @@ -51,27 +37,33 @@ namespace concurrencpp::details { }; class when_any_context { + private: - std::atomic_bool m_fulfilled = false; - result_state_base* m_completed_result = nullptr; + std::atomic m_status; coroutine_handle m_coro_handle; + static const result_state_base* k_processing; + static const result_state_base* k_done_processing; + public: when_any_context(coroutine_handle coro_handle) noexcept; - bool fulfilled() const noexcept; - result_state_base* completed_result() const noexcept; - void try_resume(result_state_base* completed_result) noexcept; + bool any_result_finished() const noexcept; + bool finish_processing() noexcept; + const result_state_base* completed_result() const noexcept; + + void try_resume(result_state_base& completed_result) noexcept; + bool resume_inline(result_state_base& completed_result) noexcept; }; class consumer_context { private: - enum class consumer_status { idle, await, wait, when_any }; + enum class consumer_status { idle, await, wait_for, when_any }; union storage { coroutine_handle caller_handle; - std::shared_ptr wait_ctx; + std::shared_ptr wait_for_ctx; std::shared_ptr when_any_ctx; template @@ -92,14 +84,16 @@ namespace concurrencpp::details { consumer_status m_status = consumer_status::idle; storage m_storage; + void destroy() noexcept; + public: ~consumer_context() noexcept; void clear() noexcept; - void resume_consumer(result_state_base* self) const; + void resume_consumer(result_state_base& self) const; void set_await_handle(coroutine_handle caller_handle) noexcept; - void set_wait_context(const std::shared_ptr& wait_ctx) noexcept; + void set_wait_for_context(const std::shared_ptr& wait_ctx) noexcept; void set_when_any_context(const std::shared_ptr& when_any_ctx) noexcept; }; } // namespace concurrencpp::details diff --git a/include/concurrencpp/results/impl/result_state.h b/include/concurrencpp/results/impl/result_state.h index 8aa9043e..3b9f5a3b 100644 --- a/include/concurrencpp/results/impl/result_state.h +++ b/include/concurrencpp/results/impl/result_state.h @@ -13,7 +13,7 @@ namespace concurrencpp::details { class result_state_base { public: - enum class pc_state { idle, consumer_set, consumer_done, producer_done }; + enum class pc_state { idle, consumer_set, consumer_waiting, consumer_done, producer_done }; protected: std::atomic m_pc_state {pc_state::idle}; @@ -36,7 +36,8 @@ namespace concurrencpp::details { private: producer_context m_producer; - static void delete_self(coroutine_handle done_handle, const result_state* state) noexcept { + static void delete_self(result_state* state) noexcept { + auto done_handle = state->m_done_handle; if (static_cast(done_handle)) { assert(done_handle.done()); return done_handle.destroy(); @@ -46,13 +47,13 @@ namespace concurrencpp::details { } template - void from_callable(std::true_type /* is_void_type*/, callable_type&& callable) { + void from_callable(std::true_type /*is_void_type*/, callable_type&& callable) { callable(); set_result(); } template - void from_callable(std::false_type /* is_void_type */, callable_type&& callable) { + void from_callable(std::false_type /*is_void_type*/, callable_type&& callable) { set_result(callable()); } @@ -87,11 +88,13 @@ namespace concurrencpp::details { } const auto wait_ctx = std::make_shared(); - m_consumer.set_wait_context(wait_ctx); + m_consumer.set_wait_for_context(wait_ctx); auto expected_idle_state = pc_state::idle; - const auto idle_0 = - m_pc_state.compare_exchange_strong(expected_idle_state, pc_state::consumer_set, std::memory_order_acq_rel); + const auto idle_0 = m_pc_state.compare_exchange_strong(expected_idle_state, + pc_state::consumer_set, + std::memory_order_acq_rel, + std::memory_order_acquire); if (!idle_0) { assert_done(); @@ -114,7 +117,10 @@ namespace concurrencpp::details { producer will not try to access the consumer if the flag doesn't say so. */ auto expected_consumer_state = pc_state::consumer_set; - const auto idle_1 = m_pc_state.compare_exchange_strong(expected_consumer_state, pc_state::idle, std::memory_order_acq_rel); + const auto idle_1 = m_pc_state.compare_exchange_strong(expected_consumer_state, + pc_state::idle, + std::memory_order_acq_rel, + std::memory_order_acquire); if (!idle_1) { assert_done(); @@ -141,10 +147,6 @@ namespace concurrencpp::details { return m_producer.get(); } - void initialize_producer_from(producer_context& producer_ctx) noexcept { - producer_ctx = std::move(m_producer); - } - template void from_callable(callable_type&& callable) { using is_void = std::is_same; @@ -156,7 +158,7 @@ namespace concurrencpp::details { } } - void complete_producer(result_state_base* self /*for when_any*/, coroutine_handle done_handle = {}) { + void complete_producer(coroutine_handle done_handle = {}) { m_done_handle = done_handle; const auto state_before = this->m_pc_state.exchange(pc_state::producer_done, std::memory_order_acq_rel); @@ -164,16 +166,19 @@ namespace concurrencpp::details { switch (state_before) { case pc_state::consumer_set: { - m_consumer.resume_consumer(self); - return; + return m_consumer.resume_consumer(*this); } case pc_state::idle: { return; } + case pc_state::consumer_waiting: { + return m_pc_state.notify_one(); + } + case pc_state::consumer_done: { - return delete_self(done_handle, this); + return delete_self(this); } default: { @@ -187,18 +192,23 @@ namespace concurrencpp::details { void complete_consumer() noexcept { const auto pc_state = this->m_pc_state.load(std::memory_order_acquire); if (pc_state == pc_state::producer_done) { - return delete_self(m_done_handle, this); + return delete_self(this); } const auto pc_state1 = this->m_pc_state.exchange(pc_state::consumer_done, std::memory_order_acq_rel); assert(pc_state1 != pc_state::consumer_set); if (pc_state1 == pc_state::producer_done) { - return delete_self(m_done_handle, this); + return delete_self(this); } assert(pc_state1 == pc_state::idle); } + + void complete_joined_consumer() noexcept { + assert_done(); + delete_self(this); + } }; template @@ -209,17 +219,28 @@ namespace concurrencpp::details { } }; + template + struct joined_consumer_result_state_deleter { + void operator()(result_state* state_ptr) const noexcept { + assert(state_ptr != nullptr); + state_ptr->complete_joined_consumer(); + } + }; + template struct producer_result_state_deleter { void operator()(result_state* state_ptr) const { assert(state_ptr != nullptr); - state_ptr->complete_producer(state_ptr); + state_ptr->complete_producer(); } }; template using consumer_result_state_ptr = std::unique_ptr, consumer_result_state_deleter>; + template + using joined_consumer_result_state_ptr = std::unique_ptr, joined_consumer_result_state_deleter>; + template using producer_result_state_ptr = std::unique_ptr, producer_result_state_deleter>; } // namespace concurrencpp::details diff --git a/include/concurrencpp/results/impl/shared_result_state.h b/include/concurrencpp/results/impl/shared_result_state.h index 6e87661a..0a134d87 100644 --- a/include/concurrencpp/results/impl/shared_result_state.h +++ b/include/concurrencpp/results/impl/shared_result_state.h @@ -154,7 +154,7 @@ namespace concurrencpp::details { struct shared_result_tag {}; } // namespace concurrencpp::details -namespace std::experimental { +namespace CRCPP_COROUTINE_NAMESPACE { // No executor + No result template struct coroutine_traits<::concurrencpp::shared_result, @@ -162,6 +162,6 @@ namespace std::experimental { concurrencpp::result> { using promise_type = concurrencpp::details::shared_result_promise; }; -} // namespace std::experimental +} // namespace CRCPP_COROUTINE_NAMESPACE #endif diff --git a/include/concurrencpp/results/promises.h b/include/concurrencpp/results/promises.h index 5e27197c..d21850e9 100644 --- a/include/concurrencpp/results/promises.h +++ b/include/concurrencpp/results/promises.h @@ -9,64 +9,43 @@ #include +#include "constants.h" +#include "concurrencpp/errors.h" + namespace concurrencpp::details { struct coroutine_per_thread_data { - executor* executor = nullptr; std::vector* accumulator = nullptr; static thread_local coroutine_per_thread_data s_tl_per_thread_data; }; - template - class initial_scheduling_awaiter : public suspend_always { - - private: - await_context m_await_context; - - public: - void await_suspend(coroutine_handle handle) { - m_await_context.set_coro_handle(handle); - - auto& per_thread_data = coroutine_per_thread_data::s_tl_per_thread_data; - auto executor_base_ptr = std::exchange(per_thread_data.executor, nullptr); - - assert(executor_base_ptr != nullptr); - assert(dynamic_cast(executor_base_ptr) != nullptr); - - auto& executor = *static_cast(executor_base_ptr); - executor.template post(&m_await_context); - } - - void await_resume() const { - m_await_context.throw_if_interrupted(); - } - }; - - template<> - struct initial_scheduling_awaiter : public suspend_never {}; - class initial_accumulating_awaiter : public suspend_always { - private: - await_context m_await_context; + bool m_interrupted = false; public: void await_suspend(coroutine_handle handle) noexcept; - void await_resume(); + void await_resume() const; }; template - struct initialy_rescheduled_promise { + class initialy_rescheduled_promise { + + protected: + static thread_local executor_type* s_tl_initial_executor; static_assert( std::is_base_of_v, "concurrencpp::initialy_rescheduled_promise<> - <> isn't driven from concurrencpp::executor."); + public: template initialy_rescheduled_promise(executor_tag, executor_type* executor_ptr, argument_types&&...) { - assert(executor_ptr != nullptr); - assert(coroutine_per_thread_data::s_tl_per_thread_data.executor == nullptr); - coroutine_per_thread_data::s_tl_per_thread_data.executor = executor_ptr; + if (executor_ptr == nullptr) { + throw std::invalid_argument(consts::k_parallel_coroutine_null_exception_err_msg); + } + + s_tl_initial_executor = executor_ptr; } template @@ -77,11 +56,36 @@ namespace concurrencpp::details { initialy_rescheduled_promise(executor_tag, executor_type& executor, argument_types&&... args) : initialy_rescheduled_promise(executor_tag {}, std::addressof(executor), std::forward(args)...) {} - initial_scheduling_awaiter initial_suspend() const noexcept { + template + initialy_rescheduled_promise(class_type&&, executor_tag, std::shared_ptr executor, argument_types&&... args) : + initialy_rescheduled_promise(executor_tag {}, executor.get(), std::forward(args)...) {} + + class initial_scheduling_awaiter : public suspend_always { + + private: + bool m_interrupted = false; + + public: + void await_suspend(coroutine_handle handle) { + auto executor = std::exchange(s_tl_initial_executor, nullptr); + executor->post(await_via_functor {handle, &m_interrupted}); + } + + void await_resume() const { + if (m_interrupted) { + throw errors::broken_task(consts::k_broken_task_exception_error_msg); + } + } + }; + + initial_scheduling_awaiter initial_suspend() const noexcept { return {}; } }; + template + thread_local executor_type* initialy_rescheduled_promise::s_tl_initial_executor = nullptr; + struct initialy_resumed_promise { suspend_never initial_suspend() const noexcept { return {}; @@ -141,7 +145,7 @@ namespace concurrencpp::details { } void complete_producer(coroutine_handle done_handle) noexcept { - this->m_result_state.complete_producer(&m_result_state, done_handle); + this->m_result_state.complete_producer(done_handle); } result_publisher final_suspend() const noexcept { @@ -179,7 +183,7 @@ namespace concurrencpp::details { }; } // namespace concurrencpp::details -namespace std::experimental { +namespace CRCPP_COROUTINE_NAMESPACE { // No executor + No result template struct coroutine_traits<::concurrencpp::null_result, arguments...> { @@ -248,6 +252,6 @@ namespace std::experimental { using promise_type = concurrencpp::details::lazy_promise; }; -} // namespace std::experimental +} // namespace CRCPP_COROUTINE_NAMESPACE #endif diff --git a/include/concurrencpp/results/result.h b/include/concurrencpp/results/result.h index 75d384a0..e9320299 100644 --- a/include/concurrencpp/results/result.h +++ b/include/concurrencpp/results/result.h @@ -75,8 +75,9 @@ namespace concurrencpp { type get() { throw_if_empty(details::consts::k_result_get_error_msg); - auto state = std::move(m_state); - state->wait(); + m_state->wait(); + + details::joined_consumer_result_state_ptr state(m_state.release()); return state->get(); } diff --git a/include/concurrencpp/results/result_awaitable.h b/include/concurrencpp/results/result_awaitable.h index f955028c..0d5b21a0 100644 --- a/include/concurrencpp/results/result_awaitable.h +++ b/include/concurrencpp/results/result_awaitable.h @@ -31,7 +31,7 @@ namespace concurrencpp { } type await_resume() { - auto state = std::move(this->m_state); + details::joined_consumer_result_state_ptr state(this->m_state.release()); return state->get(); } }; diff --git a/include/concurrencpp/results/resume_on.h b/include/concurrencpp/results/resume_on.h index 2a517278..6bae6c2f 100644 --- a/include/concurrencpp/results/resume_on.h +++ b/include/concurrencpp/results/resume_on.h @@ -11,8 +11,8 @@ namespace concurrencpp::details { class resume_on_awaitable : public suspend_always { private: - await_context m_await_ctx; executor_type& m_executor; + bool m_interrupted = false; public: resume_on_awaitable(executor_type& executor) noexcept : m_executor(executor) {} @@ -24,17 +24,17 @@ namespace concurrencpp::details { resume_on_awaitable& operator=(resume_on_awaitable&&) = delete; void await_suspend(coroutine_handle handle) { - m_await_ctx.set_coro_handle(handle); - try { - m_executor.template post(&m_await_ctx); + m_executor.post(await_via_functor {handle, &m_interrupted}); } catch (...) { // the exception caused the enqeueud task to be broken and resumed with an interrupt, no need to do anything here. } } void await_resume() const { - m_await_ctx.throw_if_interrupted(); + if (m_interrupted) { + throw errors::broken_task(consts::k_broken_task_exception_error_msg); + } } }; } // namespace concurrencpp::details diff --git a/include/concurrencpp/results/when_result.h b/include/concurrencpp/results/when_result.h index a2344069..a2ff0515 100644 --- a/include/concurrencpp/results/when_result.h +++ b/include/concurrencpp/results/when_result.h @@ -5,8 +5,8 @@ #include "concurrencpp/results/resume_on.h" #include "concurrencpp/results/lazy_result.h" -#include #include +#include #include namespace concurrencpp::details { @@ -31,19 +31,21 @@ namespace concurrencpp::details { } template - static result_state_base* get_state_base(result& result) noexcept { - return result.m_state.get(); + static result_state_base& get_state_base(result& result) noexcept { + assert(static_cast(result.m_state)); + return *result.m_state; } template - static result_state_base* at_impl(std::index_sequence, tuple_type& tuple, size_t n) noexcept { - result_state_base* bases[] = {get_state_base(std::get(tuple))...}; - return bases[n]; + static result_state_base& at_impl(std::index_sequence, tuple_type& tuple, size_t n) noexcept { + result_state_base* bases[] = {(&get_state_base(std::get(tuple)))...}; + assert(bases[n] != nullptr); + return *bases[n]; } public: template - static result_state_base* at(tuple_type& tuple, size_t n) noexcept { + static result_state_base& at(tuple_type& tuple, size_t n) noexcept { auto seq = std::make_index_sequence::value>(); return at_impl(seq, tuple, n); } @@ -87,7 +89,7 @@ namespace concurrencpp::details { result_types& m_results; template - static result_state_base* get_at(std::vector& vector, size_t i) noexcept { + static result_state_base& get_at(std::vector& vector, size_t i) noexcept { return get_state_base(vector[i]); } @@ -97,7 +99,7 @@ namespace concurrencpp::details { } template - static result_state_base* get_at(std::tuple& tuple, size_t i) noexcept { + static result_state_base& get_at(std::tuple& tuple, size_t i) noexcept { return at(tuple, i); } @@ -113,22 +115,23 @@ namespace concurrencpp::details { return false; } - void await_suspend(coroutine_handle coro_handle) { + bool await_suspend(coroutine_handle coro_handle) { m_promise = std::make_shared(coro_handle); const auto range_length = size(m_results); for (size_t i = 0; i < range_length; i++) { - if (m_promise->fulfilled()) { - return; + if (m_promise->any_result_finished()) { + return false; } - auto state_ptr = get_at(m_results, i); - const auto status = state_ptr->when_any(m_promise); + auto& state_ref = get_at(m_results, i); + const auto status = state_ref.when_any(m_promise); if (status == result_state_base::pc_state::producer_done) { - m_promise->try_resume(state_ptr); - return; + return m_promise->resume_inline(state_ref); } } + + return m_promise->finish_processing(); } size_t await_resume() noexcept { @@ -137,9 +140,9 @@ namespace concurrencpp::details { const auto range_length = size(m_results); for (size_t i = 0; i < range_length; i++) { - auto state_ptr = get_at(m_results, i); - state_ptr->try_rewind_consumer(); - if (completed_result_state == state_ptr) { + auto& state_ref = get_at(m_results, i); + state_ref.try_rewind_consumer(); + if (completed_result_state == &state_ref) { completed_result_index = i; } } @@ -177,8 +180,8 @@ namespace concurrencpp::details { template lazy_result when_all_impl(std::shared_ptr resume_executor, tuple_type tuple) { for (size_t i = 0; i < std::tuple_size_v; i++) { - auto state_ptr = when_result_helper::at(tuple, i); - co_await when_result_helper::when_all_awaitable {*state_ptr}; + auto& state_ref = when_result_helper::at(tuple, i); + co_await when_result_helper::when_all_awaitable {state_ref}; } co_await resume_on(resume_executor); diff --git a/include/concurrencpp/runtime/constants.h b/include/concurrencpp/runtime/constants.h index bcb5977e..b87269cf 100644 --- a/include/concurrencpp/runtime/constants.h +++ b/include/concurrencpp/runtime/constants.h @@ -12,7 +12,7 @@ namespace concurrencpp::details::consts { constexpr static unsigned int k_concurrencpp_version_major = 0; constexpr static unsigned int k_concurrencpp_version_minor = 1; - constexpr static unsigned int k_concurrencpp_version_revision = 4; + constexpr static unsigned int k_concurrencpp_version_revision = 5; } // namespace concurrencpp::details::consts #endif diff --git a/include/concurrencpp/threads/async_lock.h b/include/concurrencpp/threads/async_lock.h new file mode 100644 index 00000000..cb77d870 --- /dev/null +++ b/include/concurrencpp/threads/async_lock.h @@ -0,0 +1,71 @@ +#ifndef CONCURRENCPP_ASYNC_LOCK_H +#define CONCURRENCPP_ASYNC_LOCK_H + +#include "concurrencpp/platform_defs.h" +#include "concurrencpp/results/lazy_result.h" +#include "concurrencpp/forward_declarations.h" + +namespace concurrencpp::details { + class async_lock_awaiter; +} // namespace concurrencpp::details + +namespace concurrencpp { + class scoped_async_lock; + + class async_lock { + + friend class scoped_async_lock; + friend class details::async_lock_awaiter; + + private: + std::mutex m_awaiter_lock; + details::async_lock_awaiter* m_head = nullptr; + details::async_lock_awaiter* m_tail = nullptr; + bool m_locked = false; + +#ifdef CRCPP_DEBUG_MODE + std::atomic_intptr_t m_thread_count_in_critical_section {0}; +#endif + + void enqueue_awaiter(std::unique_lock& lock, details::async_lock_awaiter& awaiter_node) noexcept; + details::async_lock_awaiter* try_dequeue_awaiter(std::unique_lock& lock) noexcept; + + lazy_result lock_impl(std::shared_ptr resume_executor, bool with_raii_guard); + + public: + ~async_lock() noexcept; + + lazy_result lock(std::shared_ptr resume_executor); + lazy_result try_lock(); + void unlock(); + }; + + class scoped_async_lock { + + private: + async_lock* m_lock = nullptr; + bool m_owns = false; + + public: + scoped_async_lock() noexcept = default; + scoped_async_lock(scoped_async_lock&& rhs) noexcept; + + scoped_async_lock(async_lock& lock, std::defer_lock_t) noexcept; + scoped_async_lock(async_lock& lock, std::adopt_lock_t) noexcept; + + ~scoped_async_lock() noexcept; + + lazy_result lock(std::shared_ptr resume_executor); + lazy_result try_lock(); + void unlock(); + + bool owns_lock() const noexcept; + explicit operator bool() const noexcept; + + void swap(scoped_async_lock& rhs) noexcept; + async_lock* release() noexcept; + async_lock* mutex() const noexcept; + }; +} // namespace concurrencpp + +#endif \ No newline at end of file diff --git a/include/concurrencpp/threads/constants.h b/include/concurrencpp/threads/constants.h new file mode 100644 index 00000000..92063072 --- /dev/null +++ b/include/concurrencpp/threads/constants.h @@ -0,0 +1,25 @@ +#ifndef CONCURRENCPP_THREAD_CONSTS_H +#define CONCURRENCPP_THREAD_CONSTS_H + +namespace concurrencpp::details::consts { + inline const char* k_async_lock_null_resume_executor_err_msg = "async_lock::lock() - given resume executor is null."; + inline const char* k_async_lock_unlock_invalid_lock_err_msg = "async_lock::unlock() - trying to unlock an unowned lock."; + + inline const char* k_scoped_async_lock_null_resume_executor_err_msg = "scoped_async_lock::lock() - given resume executor is null."; + + inline const char* k_scoped_async_lock_lock_deadlock_err_msg = "scoped_async_lock::lock() - *this is already locked."; + + inline const char* k_scoped_async_lock_lock_no_mutex_err_msg = + "scoped_async_lock::lock() - *this doesn't reference any async_lock."; + + inline const char* k_scoped_async_lock_try_lock_deadlock_err_msg = "scoped_async_lock::try_lock() - *this is already locked."; + + inline const char* k_scoped_async_lock_try_lock_no_mutex_err_msg = + "scoped_async_lock::try_lock() - *this doesn't reference any async_lock."; + + inline const char* k_scoped_async_lock_unlock_invalid_lock_err_msg = + "scoped_async_lock::unlock() - trying to unlock an unowned lock."; + +} // namespace concurrencpp::details::consts + +#endif \ No newline at end of file diff --git a/source/executors/thread_pool_executor.cpp b/source/executors/thread_pool_executor.cpp index 57cc2178..390b7e3b 100644 --- a/source/executors/thread_pool_executor.cpp +++ b/source/executors/thread_pool_executor.cpp @@ -33,7 +33,7 @@ void idle_worker_set::set_idle(size_t idle_thread) noexcept { return; } - m_approx_size.fetch_add(1, std::memory_order_release); + m_approx_size.fetch_add(1, std::memory_order_relaxed); } void idle_worker_set::set_active(size_t idle_thread) noexcept { @@ -42,7 +42,7 @@ void idle_worker_set::set_active(size_t idle_thread) noexcept { return; } - m_approx_size.fetch_sub(1, std::memory_order_release); + m_approx_size.fetch_sub(1, std::memory_order_relaxed); } bool idle_worker_set::try_acquire_flag(size_t index) noexcept { @@ -406,7 +406,7 @@ void thread_pool_worker::enqueue_local(std::span tasks) { } void thread_pool_worker::shutdown() { - assert(m_atomic_abort.load(std::memory_order_relaxed) == false); + assert(!m_atomic_abort.load(std::memory_order_relaxed)); m_atomic_abort.store(true, std::memory_order_relaxed); { @@ -414,7 +414,7 @@ void thread_pool_worker::shutdown() { m_abort = true; } - m_task_found_or_abort.store(true, std::memory_order_release); // make sure the store is finished before notifying the worker. + m_task_found_or_abort.store(true, std::memory_order_relaxed); // make sure the store is finished before notifying the worker. m_semaphore.release(); diff --git a/source/results/impl/consumer_context.cpp b/source/results/impl/consumer_context.cpp index a1c4dd87..cfbf7116 100644 --- a/source/results/impl/consumer_context.cpp +++ b/source/results/impl/consumer_context.cpp @@ -2,64 +2,39 @@ #include "concurrencpp/executors/executor.h" -using concurrencpp::details::await_context; using concurrencpp::details::wait_context; using concurrencpp::details::when_any_context; using concurrencpp::details::consumer_context; using concurrencpp::details::await_via_functor; - -/* - * await_context - */ - -void await_context::resume() noexcept { - assert(static_cast(m_caller_handle)); - assert(!m_caller_handle.done()); - m_caller_handle(); -} - -void await_context::set_coro_handle(coroutine_handle coro_handle) noexcept { - assert(!static_cast(m_caller_handle)); - assert(static_cast(coro_handle)); - assert(!coro_handle.done()); - m_caller_handle = coro_handle; -} - -void await_context::set_interrupt(const std::exception_ptr& interrupt) noexcept { - assert(m_interrupt_exception == nullptr); - assert(static_cast(interrupt)); - m_interrupt_exception = interrupt; -} - -void await_context::throw_if_interrupted() const { - if (m_interrupt_exception != nullptr) { - std::rethrow_exception(m_interrupt_exception); - } -} +using concurrencpp::details::result_state_base; /* * await_via_functor */ -await_via_functor::await_via_functor(await_context* ctx) noexcept : m_ctx(ctx) {} - -await_via_functor::await_via_functor(await_via_functor&& rhs) noexcept : m_ctx(rhs.m_ctx) { - rhs.m_ctx = nullptr; +await_via_functor::await_via_functor(coroutine_handle caller_handle, bool* interrupted) noexcept : + m_caller_handle(caller_handle), m_interrupted(interrupted) { + assert(static_cast(caller_handle)); + assert(!caller_handle.done()); + assert(interrupted != nullptr); } +await_via_functor::await_via_functor(await_via_functor&& rhs) noexcept : + m_caller_handle(std::exchange(rhs.m_caller_handle, {})), m_interrupted(std::exchange(rhs.m_interrupted, nullptr)) {} + await_via_functor ::~await_via_functor() noexcept { - if (m_ctx == nullptr) { + if (m_interrupted == nullptr) { return; } - m_ctx->set_interrupt(std::make_exception_ptr(errors::broken_task(consts::k_broken_task_exception_error_msg))); - m_ctx->resume(); + *m_interrupted = true; + m_caller_handle(); } void await_via_functor::operator()() noexcept { - assert(m_ctx != nullptr); - const auto await_context = std::exchange(m_ctx, nullptr); - await_context->resume(); + assert(m_interrupted != nullptr); + m_interrupted = nullptr; + m_caller_handle(); } /* @@ -93,30 +68,92 @@ void wait_context::notify() { * when_any_context */ -when_any_context::when_any_context(coroutine_handle coro_handle) noexcept : m_coro_handle(coro_handle) {} +/* + * k_processing -> k_done_processing -> (completed) result_state_base* + * | ^ + * | | + * ---------------------------------------------- + */ + +const result_state_base* when_any_context::k_processing = reinterpret_cast(-1); +const result_state_base* when_any_context::k_done_processing = nullptr; -void when_any_context::try_resume(result_state_base* completed_result) noexcept { - assert(completed_result != nullptr); +when_any_context::when_any_context(coroutine_handle coro_handle) noexcept : m_status(k_processing), m_coro_handle(coro_handle) { + assert(static_cast(coro_handle)); + assert(!coro_handle.done()); +} - const auto already_resumed = m_fulfilled.exchange(true, std::memory_order_acq_rel); - if (already_resumed) { - return; - } +bool when_any_context::any_result_finished() const noexcept { + const auto status = m_status.load(std::memory_order_acquire); + assert(status != k_done_processing); + return status != k_processing; +} - assert(m_completed_result == nullptr); - m_completed_result = completed_result; +bool when_any_context::finish_processing() noexcept { + assert(m_status.load(std::memory_order_relaxed) != k_done_processing); - assert(static_cast(m_coro_handle)); - m_coro_handle(); + // tries to turn k_processing -> k_done_processing. + auto expected_state = k_processing; + const auto res = m_status.compare_exchange_strong(expected_state, k_done_processing, std::memory_order_acq_rel); + return res; // if k_processing -> k_done_processing, then no result finished before the CAS, suspend. } -bool when_any_context::fulfilled() const noexcept { - return m_fulfilled.load(std::memory_order_acquire); +void when_any_context::try_resume(result_state_base& completed_result) noexcept { + /* + * tries to turn m_status into the completed_result ptr + * if m_status == k_processing, we just leave the pointer and bail out, the processor thread will pick + * the pointer up and resume from there + * if m_status == k_done_processing AND we were able to CAS it into the completed result + * then we were the first ones to complete, processing is done for all input-results + * and we resume the caller + */ + + while (true) { + auto status = m_status.load(std::memory_order_acquire); + if (status != k_processing && status != k_done_processing) { + return; // another task finished before us, bail out + } + + if (status == k_done_processing) { + const auto swapped = m_status.compare_exchange_strong(status, &completed_result, std::memory_order_acq_rel); + + if (!swapped) { + return; // another task finished before us, bail out + } + + // k_done_processing -> result_state_base ptr, we are the first to finish and CAS the status + m_coro_handle(); + return; + } + + assert(status == k_processing); + const auto res = m_status.compare_exchange_strong(status, &completed_result, std::memory_order_acq_rel); + + if (res) { // k_processing -> completed result_state_base* + return; + } + + // either another result raced us, either m_status is now k_done_processing, retry and act accordingly + } } -concurrencpp::details::result_state_base* when_any_context::completed_result() const noexcept { - assert(m_completed_result != nullptr); - return m_completed_result; +bool when_any_context::resume_inline(result_state_base& completed_result) noexcept { + auto status = m_status.load(std::memory_order_acquire); + assert(status != k_done_processing); + + if (status != k_processing) { + return false; + } + + // either we succeed turning k_processing to &completed_result, then we can resume inline, either we failed + // meaning another thread had turned k_processing -> &completed_result, either way, testing if the cas succeeded + // is redundant as we need to resume inline. + m_status.compare_exchange_strong(status, &completed_result, std::memory_order_acq_rel); + return false; +} + +const result_state_base* when_any_context::completed_result() const noexcept { + return m_status.load(std::memory_order_acquire); } /* @@ -124,46 +161,46 @@ concurrencpp::details::result_state_base* when_any_context::completed_result() c */ consumer_context::~consumer_context() noexcept { - clear(); + destroy(); } -void consumer_context::clear() noexcept { - const auto status = std::exchange(m_status, consumer_status::idle); - - switch (status) { +void consumer_context::destroy() noexcept { + switch (m_status) { case consumer_status::idle: { return; } case consumer_status::await: { - storage::destroy(m_storage.caller_handle); - return; + return storage::destroy(m_storage.caller_handle); } - case consumer_status::wait: { - storage::destroy(m_storage.wait_ctx); - return; + case consumer_status::wait_for: { + return storage::destroy(m_storage.wait_for_ctx); } case consumer_status::when_any: { - storage::destroy(m_storage.when_any_ctx); - return; + return storage::destroy(m_storage.when_any_ctx); } } assert(false); } +void consumer_context::clear() noexcept { + destroy(); + m_status = consumer_status::idle; +} + void consumer_context::set_await_handle(coroutine_handle caller_handle) noexcept { assert(m_status == consumer_status::idle); m_status = consumer_status::await; storage::build(m_storage.caller_handle, caller_handle); } -void consumer_context::set_wait_context(const std::shared_ptr& wait_ctx) noexcept { +void consumer_context::set_wait_for_context(const std::shared_ptr& wait_ctx) noexcept { assert(m_status == consumer_status::idle); - m_status = consumer_status::wait; - storage::build(m_storage.wait_ctx, wait_ctx); + m_status = consumer_status::wait_for; + storage::build(m_storage.wait_for_ctx, wait_ctx); } void consumer_context::set_when_any_context(const std::shared_ptr& when_any_ctx) noexcept { @@ -172,7 +209,7 @@ void consumer_context::set_when_any_context(const std::shared_ptr(wait_ctx)); return wait_ctx->notify(); } diff --git a/source/results/impl/result_state.cpp b/source/results/impl/result_state.cpp index cf3ccccc..320dddd0 100644 --- a/source/results/impl/result_state.cpp +++ b/source/results/impl/result_state.cpp @@ -13,18 +13,25 @@ void result_state_base::wait() { return; } - const auto wait_ctx = std::make_shared(); - m_consumer.set_wait_context(wait_ctx); - auto expected_state = pc_state::idle; - const auto idle = m_pc_state.compare_exchange_strong(expected_state, pc_state::consumer_set, std::memory_order_acq_rel); + const auto idle = m_pc_state.compare_exchange_strong(expected_state, + pc_state::consumer_waiting, + std::memory_order_acq_rel, + std::memory_order_acquire); if (!idle) { assert_done(); return; } - wait_ctx->wait(); + while (true) { + if (m_pc_state.load(std::memory_order_acquire) == pc_state::producer_done) { + break; + } + + m_pc_state.wait(pc_state::consumer_waiting, std::memory_order_acquire); + } + assert_done(); } @@ -37,7 +44,10 @@ bool result_state_base::await(coroutine_handle caller_handle) noexcept { m_consumer.set_await_handle(caller_handle); auto expected_state = pc_state::idle; - const auto idle = m_pc_state.compare_exchange_strong(expected_state, pc_state::consumer_set, std::memory_order_acq_rel); + const auto idle = m_pc_state.compare_exchange_strong(expected_state, + pc_state::consumer_set, + std::memory_order_acq_rel, + std::memory_order_acquire); if (!idle) { assert_done(); @@ -55,7 +65,10 @@ result_state_base::pc_state result_state_base::when_any(const std::shared_ptr handle) noexcept { - m_await_context.set_coro_handle(handle); - auto& per_thread_data = coroutine_per_thread_data::s_tl_per_thread_data; auto accumulator = std::exchange(per_thread_data.accumulator, nullptr); assert(accumulator != nullptr); assert(accumulator->capacity() > accumulator->size()); // so it's always noexcept - accumulator->emplace_back(await_via_functor {&m_await_context}); + accumulator->emplace_back(await_via_functor {handle, &m_interrupted}); } -void concurrencpp::details::initial_accumulating_awaiter::await_resume() { - m_await_context.throw_if_interrupted(); +void concurrencpp::details::initial_accumulating_awaiter::await_resume() const { + if (m_interrupted) { + throw errors::broken_task(consts::k_broken_task_exception_error_msg); + } } diff --git a/source/runtime/runtime.cpp b/source/runtime/runtime.cpp index f18ab198..8bf47d18 100644 --- a/source/runtime/runtime.cpp +++ b/source/runtime/runtime.cpp @@ -10,6 +10,8 @@ #include "concurrencpp/timers/timer_queue.h" +#include + namespace concurrencpp::details { size_t default_max_cpu_workers() noexcept { return static_cast(thread::hardware_concurrency() * consts::k_cpu_threadpool_worker_count_factor); diff --git a/source/threads/async_lock.cpp b/source/threads/async_lock.cpp new file mode 100644 index 00000000..8253edbe --- /dev/null +++ b/source/threads/async_lock.cpp @@ -0,0 +1,269 @@ +#include "concurrencpp/results/resume_on.h" +#include "concurrencpp/threads/constants.h" +#include "concurrencpp/threads/async_lock.h" +#include "concurrencpp/executors/executor.h" + +namespace concurrencpp::details { + class async_lock_awaiter { + + friend class concurrencpp::async_lock; + + private: + async_lock& m_parent; + std::unique_lock m_lock; + async_lock_awaiter* m_next = nullptr; + coroutine_handle m_resume_handle; + + public: + async_lock_awaiter(async_lock& parent, std::unique_lock& lock) noexcept : + m_parent(parent), m_lock(std::move(lock)) {} + + static bool await_ready() noexcept { + return false; + } + + void await_suspend(coroutine_handle handle) { + assert(static_cast(handle)); + assert(!handle.done()); + assert(!static_cast(m_resume_handle)); + assert(m_lock.owns_lock()); + + m_resume_handle = handle; + m_parent.enqueue_awaiter(m_lock, *this); + + auto lock = std::move(m_lock); // will unlock underlying lock + } + + static void await_resume() noexcept {} + + void retry() noexcept { + m_resume_handle.resume(); + } + }; +} // namespace concurrencpp::details + +using concurrencpp::async_lock; +using concurrencpp::scoped_async_lock; +using concurrencpp::details::async_lock_awaiter; + +async_lock::~async_lock() noexcept { +#ifdef CRCPP_DEBUG_MODE + std::unique_lock lock(m_awaiter_lock); + assert(!m_locked && "async_lock is dstroyed while it's locked."); +#endif +} + +void async_lock::enqueue_awaiter(std::unique_lock& lock, async_lock_awaiter& awaiter_node) noexcept { + assert(lock.owns_lock()); + + if (m_head == nullptr) { + assert(m_tail == nullptr); + m_head = m_tail = &awaiter_node; + return; + } + + m_tail->m_next = &awaiter_node; + m_tail = &awaiter_node; +} + +async_lock_awaiter* async_lock::try_dequeue_awaiter(std::unique_lock& lock) noexcept { + assert(lock.owns_lock()); + + const auto node = m_head; + if (node == nullptr) { + return nullptr; + } + + m_head = m_head->m_next; + if (m_head == nullptr) { + m_tail = nullptr; + } + + return node; +} + +concurrencpp::lazy_result async_lock::lock_impl(std::shared_ptr resume_executor, bool with_raii_guard) { + auto resume_synchronously = true; // indicates if the locking coroutine managed to lock the lock on first attempt + + while (true) { + std::unique_lock lock(m_awaiter_lock); + if (!m_locked) { + m_locked = true; + lock.unlock(); + break; + } + + co_await async_lock_awaiter(*this, lock); + + resume_synchronously = + false; // if we haven't managed to lock the lock on first attempt, we need to resume using resume_executor + } + + if (!resume_synchronously) { + try { + co_await resume_on(resume_executor); + } catch (...) { + std::unique_lock lock(m_awaiter_lock); + assert(m_locked); + m_locked = false; + const auto awaiter = try_dequeue_awaiter(lock); + lock.unlock(); + + if (awaiter != nullptr) { + awaiter->retry(); + } + + throw; + } + } + +#ifdef CRCPP_DEBUG_MODE + const auto current_count = m_thread_count_in_critical_section.fetch_add(1, std::memory_order_relaxed); + assert(current_count == 0); +#endif + + if (with_raii_guard) { + co_return scoped_async_lock(*this, std::adopt_lock); + } + + co_return scoped_async_lock(*this, std::defer_lock); +} + +concurrencpp::lazy_result async_lock::lock(std::shared_ptr resume_executor) { + if (!static_cast(resume_executor)) { + throw std::invalid_argument(details::consts::k_async_lock_null_resume_executor_err_msg); + } + + return lock_impl(std::move(resume_executor), true); +} + +concurrencpp::lazy_result async_lock::try_lock() { + auto res = false; + + std::unique_lock lock(m_awaiter_lock); + if (!m_locked) { + m_locked = true; + lock.unlock(); + res = true; + } else { + lock.unlock(); + } + +#ifdef CRCPP_DEBUG_MODE + if (res) { + const auto current_count = m_thread_count_in_critical_section.fetch_add(1, std::memory_order_relaxed); + assert(current_count == 0); + } +#endif + + co_return res; +} + +void async_lock::unlock() { + std::unique_lock lock(m_awaiter_lock); + if (!m_locked) { // trying to unlocked non-owned mutex + lock.unlock(); + throw std::system_error(static_cast(std::errc::operation_not_permitted), + std::system_category(), + details::consts::k_async_lock_unlock_invalid_lock_err_msg); + } + + m_locked = false; + +#ifdef CRCPP_DEBUG_MODE + const auto current_count = m_thread_count_in_critical_section.fetch_sub(1, std::memory_order_relaxed); + assert(current_count == 1); +#endif + + const auto awaiter = try_dequeue_awaiter(lock); + lock.unlock(); + + if (awaiter != nullptr) { + awaiter->retry(); + } +} + +/* + * scoped_async_lock + */ + +scoped_async_lock::scoped_async_lock(scoped_async_lock&& rhs) noexcept : + m_lock(std::exchange(rhs.m_lock, nullptr)), m_owns(std::exchange(rhs.m_owns, false)) {} + +scoped_async_lock::scoped_async_lock(async_lock& lock, std::defer_lock_t) noexcept : m_lock(&lock), m_owns(false) {} + +scoped_async_lock::scoped_async_lock(async_lock& lock, std::adopt_lock_t) noexcept : m_lock(&lock), m_owns(true) {} + +scoped_async_lock::~scoped_async_lock() noexcept { + if (m_owns && m_lock != nullptr) { + m_lock->unlock(); + } +} + +concurrencpp::lazy_result scoped_async_lock::lock(std::shared_ptr resume_executor) { + if (!static_cast(resume_executor)) { + throw std::invalid_argument(details::consts::k_scoped_async_lock_null_resume_executor_err_msg); + } + + if (m_lock == nullptr) { + throw std::system_error(static_cast(std::errc::operation_not_permitted), + std::system_category(), + details::consts::k_scoped_async_lock_lock_no_mutex_err_msg); + } else if (m_owns) { + throw std::system_error(static_cast(std::errc::resource_deadlock_would_occur), + std::system_category(), + details::consts::k_scoped_async_lock_lock_deadlock_err_msg); + } else { + co_await m_lock->lock_impl(std::move(resume_executor), false); + m_owns = true; + } +} + +concurrencpp::lazy_result scoped_async_lock::try_lock() { + if (m_lock == nullptr) { + throw std::system_error(static_cast(std::errc::operation_not_permitted), + std::system_category(), + concurrencpp::details::consts::k_scoped_async_lock_try_lock_no_mutex_err_msg); + } else if (m_owns) { + throw std::system_error(static_cast(std::errc::resource_deadlock_would_occur), + std::system_category(), + concurrencpp::details::consts::k_scoped_async_lock_try_lock_deadlock_err_msg); + } else { + m_owns = co_await m_lock->try_lock(); + } + + co_return m_owns; +} + +void scoped_async_lock::unlock() { + if (!m_owns) { + throw std::system_error(static_cast(std::errc::operation_not_permitted), + std::system_category(), + concurrencpp::details::consts::k_scoped_async_lock_unlock_invalid_lock_err_msg); + } else if (m_lock != nullptr) { + m_lock->unlock(); + m_owns = false; + } +} + +bool scoped_async_lock::owns_lock() const noexcept { + return m_owns; +} + +scoped_async_lock::operator bool() const noexcept { + return owns_lock(); +} + +void scoped_async_lock::swap(scoped_async_lock& rhs) noexcept { + std::swap(m_lock, rhs.m_lock); + std::swap(m_owns, rhs.m_owns); +} + +async_lock* scoped_async_lock::release() noexcept { + m_owns = false; + return std::exchange(m_lock, nullptr); +} + +async_lock* scoped_async_lock::mutex() const noexcept { + return m_lock; +} diff --git a/source/timers/timer_queue.cpp b/source/timers/timer_queue.cpp index e68b7f20..649db20e 100644 --- a/source/timers/timer_queue.cpp +++ b/source/timers/timer_queue.cpp @@ -220,7 +220,7 @@ void timer_queue::work_loop() { } bool timer_queue::shutdown_requested() const noexcept { - return m_atomic_abort.load(std::memory_order_acquire); + return m_atomic_abort.load(std::memory_order_relaxed); } void timer_queue::shutdown() { @@ -268,7 +268,7 @@ concurrencpp::lazy_result timer_queue::make_delay_object_impl(std::chrono: const size_t m_due_time_ms; timer_queue& m_parent_queue; std::shared_ptr m_executor; - details::await_context m_await_context; + bool m_interrupted = false; public: delay_object_awaitable(size_t due_time_ms, @@ -278,14 +278,12 @@ concurrencpp::lazy_result timer_queue::make_delay_object_impl(std::chrono: m_parent_queue(parent_queue), m_executor(std::move(executor)) {} void await_suspend(details::coroutine_handle coro_handle) noexcept { - m_await_context.set_coro_handle(coro_handle); - try { m_parent_queue.make_timer_impl(m_due_time_ms, 0, std::move(m_executor), true, - details::await_via_functor {&m_await_context}); + details::await_via_functor {coro_handle, &m_interrupted}); } catch (...) { // if an exception is thrown, await_via_functor d.tor will set an interrupt and resume the coro @@ -294,7 +292,9 @@ concurrencpp::lazy_result timer_queue::make_delay_object_impl(std::chrono: } void await_resume() const { - m_await_context.throw_if_interrupted(); + if (m_interrupted) { + throw errors::broken_task(details::consts::k_broken_task_exception_error_msg); + } } }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1fe9f032..6d03670d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -48,6 +48,16 @@ set(test_headers include/utils/test_ready_result.h include/utils/test_ready_lazy_result.h) +add_library(concurrencpp_test_infra ${test_headers} ${test_sources}) + +target_compile_features(concurrencpp_test_infra PRIVATE cxx_std_20) +target_include_directories(concurrencpp_test_infra PRIVATE "${PROJECT_SOURCE_DIR}/../include") +target_include_directories(concurrencpp_test_infra PRIVATE "${PROJECT_SOURCE_DIR}/include") + +target_link_libraries(concurrencpp_test_infra PRIVATE concurrencpp::concurrencpp) + +target_coroutine_options(concurrencpp_test_infra) + # add_test(NAME PATH [PROPERTIES ...]) # # Add test with the name with the source file at @@ -64,9 +74,10 @@ function(add_test) set(target "${TEST_NAME}") set(test_name "${TEST_NAME}") - add_executable(${target} ${TEST_PATH} ${test_headers} ${test_sources}) + add_executable(${target} ${TEST_PATH}) target_link_libraries(${target} PRIVATE concurrencpp::concurrencpp) + target_link_libraries(${target} PRIVATE concurrencpp_test_infra) target_compile_features(${target} PRIVATE cxx_std_20) @@ -114,6 +125,9 @@ add_test(NAME generator_tests PATH source/tests/result_tests/generator_tests.cpp add_test(NAME coroutine_promise_tests PATH source/tests/coroutine_tests/coroutine_promise_tests.cpp) add_test(NAME coroutine_tests PATH source/tests/coroutine_tests/coroutine_tests.cpp) +add_test(NAME async_lock_tests PATH source/tests/async_lock_tests.cpp) +add_test(NAME scoped_async_lock_tests PATH source/tests/scoped_async_lock_tests.cpp) + add_test(NAME timer_queue_tests PATH source/tests/timer_tests/timer_queue_tests.cpp) add_test(NAME timer_tests PATH source/tests/timer_tests/timer_tests.cpp) @@ -129,4 +143,5 @@ add_test(NAME tsan_when_any_tests PATH source/thread_sanitizer/when_any.cpp) add_test(NAME tsan_fibonacci PATH source/thread_sanitizer/fibonacci.cpp) add_test(NAME tsan_lazy_fibonacci PATH source/thread_sanitizer/lazy_fibonacci.cpp) add_test(NAME tsan_quick_sort PATH source/thread_sanitizer/quick_sort.cpp) -add_test(NAME tsan_matrix_multiplication PATH source/thread_sanitizer/matrix_multiplication.cpp) \ No newline at end of file +add_test(NAME tsan_matrix_multiplication PATH source/thread_sanitizer/matrix_multiplication.cpp) +add_test(NAME tsan_async_lock PATH source/thread_sanitizer/async_lock.cpp) diff --git a/test/include/infra/assertions.h b/test/include/infra/assertions.h index 5fb52feb..89d0b491 100644 --- a/test/include/infra/assertions.h +++ b/test/include/infra/assertions.h @@ -120,6 +120,20 @@ namespace concurrencpp::tests { assert_false(true); } + template + void assert_throws_contains_error_message(task_type&& task, std::string_view error_msg) { + try { + task(); + } catch (const exception_type& e) { + const auto pos = std::string(e.what()).find(error_msg); + assert_not_equal(pos, std::string::npos); + return; + } catch (...) { + } + + assert_false(true); + } + } // namespace concurrencpp::tests #endif // CONCURRENCPP_ASSERTIONS_H diff --git a/test/source/tests/async_lock_tests.cpp b/test/source/tests/async_lock_tests.cpp new file mode 100644 index 00000000..20f06eaa --- /dev/null +++ b/test/source/tests/async_lock_tests.cpp @@ -0,0 +1,249 @@ +#include "concurrencpp/concurrencpp.h" + +#include "infra/tester.h" +#include "infra/assertions.h" +#include "utils/executor_shutdowner.h" + +#include "concurrencpp/threads/constants.h" + +namespace concurrencpp::tests { + void test_async_lock_lock_null_resume_executor(); + void test_async_lock_lock_resumption(); + void test_async_lock_lock(); + + void test_async_lock_try_lock(); + + void test_async_lock_unlock_resumption_fails(); + void test_async_lock_unlock(); + + void test_async_lock_mini_load_test1(); + void test_async_lock_mini_load_test2(); + void test_async_lock_lock_unlock(); + + result incremenet(executor_tag, std::shared_ptr ex, async_lock& lock, size_t& counter, size_t cycles) { + for (size_t i = 0; i < cycles; i++) { + auto lk = co_await lock.lock(ex); + ++counter; + } + } + + result insert(executor_tag, + std::shared_ptr ex, + async_lock& lock, + std::vector& vec, + size_t range_begin, + size_t range_end) { + for (size_t i = range_begin; i < range_end; i++) { + auto lk = co_await lock.lock(ex); + vec.emplace_back(i); + } + } + + result> get_thread_ids(async_lock& lock, std::shared_ptr executor) { + const auto before_id = concurrencpp::details::thread::get_current_virtual_id(); + auto lock_guard = co_await lock.lock(executor); + const auto after_id = concurrencpp::details::thread::get_current_virtual_id(); + co_return std::make_pair(before_id, after_id); + } +} // namespace concurrencpp::tests + +void concurrencpp::tests::test_async_lock_lock_null_resume_executor() { + assert_throws_with_error_message( + [] { + async_lock lock; + lock.lock(std::shared_ptr {}); + }, + concurrencpp::details::consts::k_async_lock_null_resume_executor_err_msg); +} + +void concurrencpp::tests::test_async_lock_lock_resumption() { + async_lock lock; + const auto worker_thread = std::make_shared(); + const auto thread_executor = std::make_shared(); + executor_shutdowner es0(worker_thread), es1(thread_executor); + + // if async_mutex is available for acquiring the coroutine resumes inline + { + const auto [before, after] = get_thread_ids(lock, worker_thread).get(); + assert_equal(before, after); + } + + // if async_mutex nor is available for acquiring, the coroutine resumes inside resume_executor + { + auto guard = lock.lock(worker_thread).run().get(); + auto ids = thread_executor->submit(get_thread_ids, std::ref(lock), worker_thread).get(); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + guard.unlock(); + const auto [before, after] = ids.get(); + assert_not_equal(before, after); + } +} + +void concurrencpp::tests::test_async_lock_lock() { + test_async_lock_lock_null_resume_executor(); + test_async_lock_lock_resumption(); +} + +void concurrencpp::tests::test_async_lock_try_lock() { + async_lock lock; + assert_true(lock.try_lock().run().get()); + assert_false(lock.try_lock().run().get()); + + lock.unlock(); + assert_true(lock.try_lock().run().get()); + + lock.unlock(); +} + +namespace concurrencpp::tests { + result lock_coro(async_lock& lock, std::shared_ptr ex) { + auto g = co_await lock.lock(ex); + } +} // namespace concurrencpp::tests + +void concurrencpp::tests::test_async_lock_unlock_resumption_fails() { + /* let's say that one coroutine tried to lock a lock and failed because the lock is already locked. + * that coroutine was queued for resumption for when async_lock::unlock is called. + * when unlock is called, the coroutine manages to lock the lock but executor::shutdown is called + * on the resume-executor. + * In this case, another coroutine must be resumed instead, otherwise the chain of locking/unlocking will be lost. + */ + + runtime runtime; + std::shared_ptr executors[5]; + std::shared_ptr working_executor = runtime.make_worker_thread_executor(); + + result results[5]; + result result; + + for (auto& executor : executors) { + executor = runtime.make_worker_thread_executor(); + } + + async_lock lock; + + auto g = lock.lock(runtime.thread_pool_executor()).run().get(); + + for (size_t i = 0; i < std::size(executors); i++) { + results[i] = lock_coro(lock, executors[i]); + } + + result = lock_coro(lock, working_executor); + + for (auto& executor : executors) { + executor->shutdown(); + } + + g.unlock(); + + for (auto& err_result : results) { + assert_throws([&err_result] { + err_result.get(); + }); + } + + result.get(); // make sure nothing is thrown +} + +void concurrencpp::tests::test_async_lock_unlock() { + // unlocking an un-owned lock throws + assert_throws_contains_error_message( + [] { + async_lock lock; + lock.unlock(); + }, + concurrencpp::details::consts::k_async_lock_unlock_invalid_lock_err_msg); + + test_async_lock_unlock_resumption_fails(); +} + +void concurrencpp::tests::test_async_lock_mini_load_test1() { + async_lock mtx; + size_t counter = 0; + + const size_t worker_count = concurrencpp::details::thread::hardware_concurrency(); + constexpr size_t cycles = 100'000; + + std::vector> workers(worker_count); + for (auto& worker : workers) { + worker = std::make_shared(); + } + + std::vector> results(worker_count); + + for (size_t i = 0; i < worker_count; i++) { + results[i] = incremenet({}, workers[i], mtx, counter, cycles); + } + + for (size_t i = 0; i < worker_count; i++) { + results[i].get(); + } + + { + auto lock = mtx.lock(workers[0]).run().get(); + assert_equal(counter, cycles * worker_count); + } + + for (auto& worker : workers) { + worker->shutdown(); + } +} + +void concurrencpp::tests::test_async_lock_mini_load_test2() { + async_lock mtx; + std::vector vector; + + const size_t worker_count = concurrencpp::details::thread::hardware_concurrency(); + constexpr size_t cycles = 100'000; + + std::vector> workers(worker_count); + for (auto& worker : workers) { + worker = std::make_shared(); + } + + std::vector> results(worker_count); + + for (size_t i = 0; i < worker_count; i++) { + results[i] = insert({}, workers[i], mtx, vector, i * cycles, (i + 1) * cycles); + } + + for (size_t i = 0; i < worker_count; i++) { + results[i].get(); + } + + { + auto lock = mtx.lock(workers[0]).run().get(); + assert_equal(vector.size(), cycles * worker_count); + + std::sort(vector.begin(), vector.end()); + + for (size_t i = 0; i < worker_count * cycles; i++) { + assert_equal(vector[i], i); + } + } + + for (auto& worker : workers) { + worker->shutdown(); + } +} + +void concurrencpp::tests::test_async_lock_lock_unlock() { + test_async_lock_mini_load_test1(); + test_async_lock_mini_load_test2(); +} + +using namespace concurrencpp::tests; + +int main() { + tester tester("async_lock test"); + + tester.add_step("lock", test_async_lock_lock); + tester.add_step("try_lock", test_async_lock_try_lock); + tester.add_step("unlock", test_async_lock_unlock); + tester.add_step("lock + unlock", test_async_lock_lock_unlock); + + tester.launch_test(); + return 0; +} \ No newline at end of file diff --git a/test/source/tests/coroutine_tests/coroutine_promise_tests.cpp b/test/source/tests/coroutine_tests/coroutine_promise_tests.cpp index 4bec5038..d254cbfa 100644 --- a/test/source/tests/coroutine_tests/coroutine_promise_tests.cpp +++ b/test/source/tests/coroutine_tests/coroutine_promise_tests.cpp @@ -266,7 +266,20 @@ void concurrencpp::tests::test_initialy_rescheduled_null_result_promise_exceptio shutdown_workers(workers); } +namespace concurrencpp::tests { + null_result null_executor_null_result_coro(executor_tag, std::shared_ptr ex) { + co_return; + } +} // namespace concurrencpp::tests + void concurrencpp::tests::test_initialy_rescheduled_null_result_promise() { + // null resume executor + assert_throws_with_error_message( + [] { + null_executor_null_result_coro({}, {}); + }, + concurrencpp::details::consts::k_parallel_coroutine_null_exception_err_msg); + test_initialy_rescheduled_null_result_promise_value(); test_initialy_rescheduled_null_result_promise_exception(); } @@ -361,7 +374,20 @@ void concurrencpp::tests::test_initialy_rescheduled_result_promise_exception() { shutdown_workers(workers); } +namespace concurrencpp::tests { + result null_executor_result_coro(executor_tag, std::shared_ptr ex) { + co_return; + } +} // namespace concurrencpp::tests + void concurrencpp::tests::test_initialy_rescheduled_result_promise() { + // null resume executor + assert_throws_with_error_message( + [] { + null_executor_result_coro({}, {}); + }, + concurrencpp::details::consts::k_parallel_coroutine_null_exception_err_msg); + test_initialy_rescheduled_result_promise_value(); test_initialy_rescheduled_result_promise_exception(); } diff --git a/test/source/tests/executor_tests/inline_executor_tests.cpp b/test/source/tests/executor_tests/inline_executor_tests.cpp index c1c1ed9a..f6fc34e6 100644 --- a/test/source/tests/executor_tests/inline_executor_tests.cpp +++ b/test/source/tests/executor_tests/inline_executor_tests.cpp @@ -262,7 +262,7 @@ void concurrencpp::tests::test_inline_executor_bulk_submit_exception() { executor_shutdowner shutdown(executor); constexpr intptr_t id = 12345; - auto thrower = [id] { + auto thrower = [] { throw custom_exception(id); }; diff --git a/test/source/tests/executor_tests/manual_executor_tests.cpp b/test/source/tests/executor_tests/manual_executor_tests.cpp index 96ad2f3a..0c37adc6 100644 --- a/test/source/tests/executor_tests/manual_executor_tests.cpp +++ b/test/source/tests/executor_tests/manual_executor_tests.cpp @@ -451,7 +451,7 @@ void concurrencpp::tests::test_manual_executor_bulk_submit_exception() { executor_shutdowner shutdown(executor); constexpr intptr_t id = 12345; - auto thrower = [id] { + auto thrower = [] { throw custom_exception(id); }; diff --git a/test/source/tests/executor_tests/thread_executor_tests.cpp b/test/source/tests/executor_tests/thread_executor_tests.cpp index 97a57d50..d710f376 100644 --- a/test/source/tests/executor_tests/thread_executor_tests.cpp +++ b/test/source/tests/executor_tests/thread_executor_tests.cpp @@ -299,7 +299,7 @@ void concurrencpp::tests::test_thread_executor_bulk_submit_exception() { executor_shutdowner shutdown(executor); constexpr intptr_t id = 12345; - auto thrower = [id] { + auto thrower = [] { throw custom_exception(id); }; diff --git a/test/source/tests/executor_tests/thread_pool_executor_tests.cpp b/test/source/tests/executor_tests/thread_pool_executor_tests.cpp index 10b46428..859edc77 100644 --- a/test/source/tests/executor_tests/thread_pool_executor_tests.cpp +++ b/test/source/tests/executor_tests/thread_pool_executor_tests.cpp @@ -306,7 +306,7 @@ void concurrencpp::tests::test_thread_pool_executor_bulk_submit_exception() { executor_shutdowner shutdown(executor); constexpr intptr_t id = 12345; - auto thrower = [id] { + auto thrower = [] { throw custom_exception(id); }; diff --git a/test/source/tests/executor_tests/worker_thread_executor_tests.cpp b/test/source/tests/executor_tests/worker_thread_executor_tests.cpp index d41265c3..af88da21 100644 --- a/test/source/tests/executor_tests/worker_thread_executor_tests.cpp +++ b/test/source/tests/executor_tests/worker_thread_executor_tests.cpp @@ -292,7 +292,7 @@ void concurrencpp::tests::test_worker_thread_executor_bulk_submit_exception() { executor_shutdowner shutdown(executor); constexpr intptr_t id = 12345; - auto thrower = [id] { + auto thrower = [] { throw custom_exception(id); }; diff --git a/test/source/tests/scoped_async_lock_tests.cpp b/test/source/tests/scoped_async_lock_tests.cpp new file mode 100644 index 00000000..1a2c2564 --- /dev/null +++ b/test/source/tests/scoped_async_lock_tests.cpp @@ -0,0 +1,313 @@ +#include "concurrencpp/concurrencpp.h" + +#include "infra/tester.h" +#include "infra/assertions.h" +#include "utils/executor_shutdowner.h" + +#include "concurrencpp/threads/constants.h" + +namespace concurrencpp::tests { + result test_scoped_async_lock_constructor_impl(std::shared_ptr executor); + void test_scoped_async_lock_constructor(); + + result test_scoped_async_lock_destructor_impl(std::shared_ptr executor); + void test_scoped_async_lock_destructor(); + + void test_scoped_async_lock_lock(); + void test_scoped_async_lock_try_lock(); + void test_scoped_async_lock_unlock(); + + void test_scoped_async_lock_swap(); + void test_scoped_async_lock_release(); + +} // namespace concurrencpp::tests + +concurrencpp::result concurrencpp::tests::test_scoped_async_lock_constructor_impl(std::shared_ptr executor) { + { // default constructor + scoped_async_lock sal; + assert_false(sal.owns_lock()); + assert_false(static_cast(sal)); + assert_equal(sal.mutex(), nullptr); + } + + { // move constructor + async_lock lock; + scoped_async_lock sal = co_await lock.lock(executor); + assert_true(sal.owns_lock()); + assert_true(static_cast(sal)); + assert_equal(sal.mutex(), &lock); + + auto sal1 = std::move(sal); + assert_false(sal.owns_lock()); + assert_false(static_cast(sal)); + assert_equal(sal.mutex(), nullptr); + + assert_true(sal1.owns_lock()); + assert_true(static_cast(sal1)); + assert_equal(sal1.mutex(), &lock); + + // moving an unowned mutex + scoped_async_lock empty; + auto sal2 = std::move(empty); + assert_false(sal2.owns_lock()); + assert_false(static_cast(sal2)); + assert_equal(sal2.mutex(), nullptr); + } + + { + async_lock lock; + scoped_async_lock sal(lock, std::defer_lock); + assert_false(sal.owns_lock()); + assert_false(static_cast(sal)); + assert_equal(sal.mutex(), &lock); + } + + { + async_lock lock; + const auto locked = co_await lock.try_lock(); + assert_true(locked); + scoped_async_lock sal(lock, std::adopt_lock); + assert_true(sal.owns_lock()); + assert_true(static_cast(sal)); + assert_equal(sal.mutex(), &lock); + } +} + +void concurrencpp::tests::test_scoped_async_lock_constructor() { + auto worker = std::make_shared(); + executor_shutdowner es(worker); + test_scoped_async_lock_constructor_impl(worker).get(); +} + +concurrencpp::result concurrencpp::tests::test_scoped_async_lock_destructor_impl(std::shared_ptr executor) { + async_lock lock; + + { // non-owning scoped_async_lock + scoped_async_lock sal(lock, std::defer_lock); + } + + { // empty scoped_async_lock + auto sal = co_await lock.lock(executor); + auto mtx = sal.release(); + mtx->unlock(); + } +} + +void concurrencpp::tests::test_scoped_async_lock_destructor() { + const auto worker = std::make_shared(); + executor_shutdowner es(worker); + test_scoped_async_lock_destructor_impl(worker); +} + +void concurrencpp::tests::test_scoped_async_lock_lock() { + auto executor = std::make_shared(); + executor_shutdowner es(executor); + + { // null resume_executor + assert_throws_contains_error_message( + [] { + async_lock lock; + scoped_async_lock sal(lock, std::defer_lock); + sal.lock(std::shared_ptr()).run().get(); + }, + concurrencpp::details::consts::k_scoped_async_lock_null_resume_executor_err_msg); + } + + { // empty scoped_async_lock + assert_throws_contains_error_message( + [executor] { + scoped_async_lock sal; + sal.lock(executor).run().get(); + }, + concurrencpp::details::consts::k_scoped_async_lock_lock_no_mutex_err_msg); + } + + { // lock would cause a deadlock + assert_throws_contains_error_message( + [executor] { + async_lock lock; + auto sal = lock.lock(executor).run().get(); + sal.lock(executor).run().get(); + }, + concurrencpp::details::consts::k_scoped_async_lock_lock_deadlock_err_msg); + } + + // valid scenario + async_lock lock; + scoped_async_lock sal(lock, std::defer_lock); + assert_false(sal.owns_lock()); + + sal.lock(executor).run().get(); + assert_true(sal.owns_lock()); +} + +void concurrencpp::tests::test_scoped_async_lock_try_lock() { + { // empty scoped_async_lock + assert_throws_contains_error_message( + [] { + scoped_async_lock sal; + sal.try_lock().run().get(); + }, + concurrencpp::details::consts::k_scoped_async_lock_try_lock_no_mutex_err_msg); + } + + { // lock would cause a deadlock + assert_throws_contains_error_message( + [] { + async_lock lock; + const auto locked = lock.try_lock().run().get(); + assert_true(locked); + + scoped_async_lock sal(lock, std::adopt_lock); + sal.try_lock().run().get(); + }, + concurrencpp::details::consts::k_scoped_async_lock_try_lock_deadlock_err_msg); + } + + // valid scenario + async_lock lock; + scoped_async_lock sal(lock, std::defer_lock); + assert_false(sal.owns_lock()); + + const auto locked = sal.try_lock().run().get(); + assert_true(locked); + + assert_true(sal.owns_lock()); +} + +void concurrencpp::tests::test_scoped_async_lock_unlock() { + async_lock lock; + scoped_async_lock sal(lock, std::defer_lock); + + assert_throws_contains_error_message( + [&sal] { + sal.unlock(); + }, + concurrencpp::details::consts::k_scoped_async_lock_unlock_invalid_lock_err_msg); + + const auto locked = sal.try_lock().run().get(); + assert_true(locked); + + sal.unlock(); + assert_false(sal.owns_lock()); + assert_false(static_cast(sal)); + assert_equal(sal.mutex(), &lock); +} + +void concurrencpp::tests::test_scoped_async_lock_swap() { + { // empty + empty + scoped_async_lock sal0, sal1; + sal0.swap(sal1); + + assert_false(sal0.owns_lock()); + assert_false(static_cast(sal0)); + assert_equal(sal0.mutex(), nullptr); + + assert_false(sal1.owns_lock()); + assert_false(static_cast(sal1)); + assert_equal(sal1.mutex(), nullptr); + } + + { // empty + non-empty + async_lock lock1; + const auto locked = lock1.try_lock().run().get(); + assert_true(locked); + + scoped_async_lock sal0, sal1(lock1, std::adopt_lock); + sal0.swap(sal1); + + assert_true(sal0.owns_lock()); + assert_true(static_cast(sal0)); + assert_equal(sal0.mutex(), &lock1); + + assert_false(sal1.owns_lock()); + assert_false(static_cast(sal1)); + assert_equal(sal1.mutex(), nullptr); + } + + { // non-empty + empty + async_lock lock0; + const auto locked = lock0.try_lock().run().get(); + assert_true(locked); + + scoped_async_lock sal0(lock0, std::adopt_lock), sal1; + sal0.swap(sal1); + + assert_false(sal0.owns_lock()); + assert_false(static_cast(sal0)); + assert_equal(sal0.mutex(), nullptr); + + assert_true(sal1.owns_lock()); + assert_true(static_cast(sal1)); + assert_equal(sal1.mutex(), &lock0); + } + + { // non-empty + non-empty + async_lock lock0, lock1; + const auto locked0 = lock0.try_lock().run().get(); + assert_true(locked0); + + const auto locked1 = lock1.try_lock().run().get(); + assert_true(locked1); + + scoped_async_lock sal0(lock0, std::adopt_lock), sal1(lock1, std::adopt_lock); + sal0.swap(sal1); + + assert_true(sal0.owns_lock()); + assert_true(static_cast(sal0)); + assert_equal(sal0.mutex(), &lock1); + + assert_true(sal1.owns_lock()); + assert_true(static_cast(sal1)); + assert_equal(sal1.mutex(), &lock0); + } + + { // swap with self + async_lock lock; + const auto locked = lock.try_lock().run().get(); + assert_true(locked); + + scoped_async_lock sal(lock, std::adopt_lock); + sal.swap(sal); + + assert_true(sal.owns_lock()); + assert_true(static_cast(sal)); + assert_equal(sal.mutex(), &lock); + } +} + +void concurrencpp::tests::test_scoped_async_lock_release() { + // empty: + scoped_async_lock sal; + assert_equal(sal.release(), nullptr); + + async_lock lock; + const auto locked = lock.try_lock().run().get(); + assert_true(locked); + scoped_async_lock sal0(lock, std::adopt_lock); + + const auto lock_ptr = sal0.release(); + assert_equal(lock_ptr, &lock); + lock_ptr->unlock(); + + assert_false(sal0.owns_lock()); + assert_false(static_cast(sal0)); + assert_equal(sal0.mutex(), nullptr); +} + +using namespace concurrencpp::tests; + +int main() { + tester tester("scoped_async_lock test"); + + tester.add_step("constructor", test_scoped_async_lock_constructor); + tester.add_step("destructor", test_scoped_async_lock_destructor); + tester.add_step("lock", test_scoped_async_lock_lock); + tester.add_step("try_lock", test_scoped_async_lock_try_lock); + tester.add_step("unlock", test_scoped_async_lock_unlock); + tester.add_step("swap", test_scoped_async_lock_swap); + tester.add_step("release", test_scoped_async_lock_release); + + tester.launch_test(); + return 0; +} \ No newline at end of file diff --git a/test/source/tests/task_tests.cpp b/test/source/tests/task_tests.cpp index 1359eecf..fde179c6 100644 --- a/test/source/tests/task_tests.cpp +++ b/test/source/tests/task_tests.cpp @@ -143,13 +143,13 @@ namespace concurrencpp::tests::functions { } // namespace concurrencpp::tests::functions -namespace std::experimental { +namespace CRCPP_COROUTINE_NAMESPACE { template struct coroutine_traits, functions::dummy_test_tag, arguments...> { using promise_type = functions::test_promise; }; -} // namespace std::experimental +} // namespace CRCPP_COROUTINE_NAMESPACE namespace concurrencpp::tests::functions { coroutine_handle coro_function(dummy_test_tag) { diff --git a/test/source/thread_sanitizer/async_lock.cpp b/test/source/thread_sanitizer/async_lock.cpp new file mode 100644 index 00000000..ee41e9d1 --- /dev/null +++ b/test/source/thread_sanitizer/async_lock.cpp @@ -0,0 +1,129 @@ +#include "concurrencpp/concurrencpp.h" + +#include + +void async_increment(concurrencpp::runtime& runtime); +void async_insert(concurrencpp::runtime& runtime); + +int main() { + std::cout << "Starting async_lock test" << std::endl; + + concurrencpp::runtime runtime; + + std::cout << "================================" << std::endl; + std::cout << "async increment" << std::endl; + async_increment(runtime); + + std::cout << "================================" << std::endl; + std::cout << "async insert" << std::endl; + async_insert(runtime); + + std::cout << "================================" << std::endl; +} + +using namespace concurrencpp; + +result incremenet(executor_tag, + std::shared_ptr ex, + async_lock& lock, + size_t& counter, + size_t cycles, + std::chrono::time_point tp) { + + std::this_thread::sleep_until(tp); + + for (size_t i = 0; i < cycles; i++) { + auto lk = co_await lock.lock(ex); + ++counter; + } +} + +result insert(executor_tag, + std::shared_ptr ex, + async_lock& lock, + std::vector& vec, + size_t range_begin, + size_t range_end, + std::chrono::time_point tp) { + + std::this_thread::sleep_until(tp); + + for (size_t i = range_begin; i < range_end; i++) { + auto lk = co_await lock.lock(ex); + vec.emplace_back(i); + } +} + +void async_increment(runtime& runtime) { + async_lock mtx; + size_t counter = 0; + + const size_t worker_count = concurrencpp::details::thread::hardware_concurrency(); + constexpr size_t cycles = 500'000; + + std::vector> workers(worker_count); + for (auto& worker : workers) { + worker = runtime.make_worker_thread_executor(); + } + + std::vector> results(worker_count); + + const auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(4); + + for (size_t i = 0; i < worker_count; i++) { + results[i] = incremenet({}, workers[i], mtx, counter, cycles, deadline); + } + + for (size_t i = 0; i < worker_count; i++) { + results[i].get(); + } + + { + auto lock = mtx.lock(workers[0]).run().get(); + if (counter != cycles * worker_count) { + std::cout << "async_lock test failed, counter != cycles * worker_count, " << counter << " " << cycles * worker_count + << std::endl; + } + } +} + +void async_insert(runtime& runtime) { + async_lock mtx; + std::vector vector; + + const size_t worker_count = concurrencpp::details::thread::hardware_concurrency(); + constexpr size_t cycles = 500'000; + + std::vector> workers(worker_count); + for (auto& worker : workers) { + worker = runtime.make_worker_thread_executor(); + } + + std::vector> results(worker_count); + + const auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(4); + + for (size_t i = 0; i < worker_count; i++) { + results[i] = insert({}, workers[i], mtx, vector, i * cycles, (i + 1) * cycles, deadline); + } + + for (size_t i = 0; i < worker_count; i++) { + results[i].get(); + } + + { + auto lock = mtx.lock(workers[0]).run().get(); + if (vector.size() != cycles * worker_count) { + std::cerr << "async_lock test failed, vector.size() != cycles * worker_count, " << vector.size() + << " != " << cycles * worker_count << std::endl; + } + + std::sort(vector.begin(), vector.end()); + + for (size_t i = 0; i < worker_count * cycles; i++) { + if (vector[i] != i) { + std::cerr << "async_lock test failed, vector[i] != i, " << vector[i] << " != " << i << std::endl; + } + } + } +}