From 7a88f5401e15e1b64acae70077e40df1a5a9f6bf Mon Sep 17 00:00:00 2001 From: David Haim <59602013+David-Haim@users.noreply.github.com> Date: Sat, 15 Jul 2023 13:41:45 +0300 Subject: [PATCH] Version 0.1.7 (#141) Co-authored-by: friendlyanon <1736896+friendlyanon@users.noreply.github.com> Co-authored-by: friendlyanon Co-authored-by: NN <580536+NN---@users.noreply.github.com> Co-authored-by: NN Co-authored-by: Zakhar Karlin Co-authored-by: chausner <15180557+chausner@users.noreply.github.com> Co-authored-by: autoantwort <41973254+autoantwort@users.noreply.github.com> Co-authored-by: chausner Co-authored-by: R. Andrew Ohana <1442822+ohanar@users.noreply.github.com> Co-authored-by: Mathias Eggert Co-authored-by: Mathias Eggert Co-authored-by: Nick Co-authored-by: gotnone Co-authored-by: Stanley Pinchak --- .github/actions/fetch-clang/action.yml | 3 - .github/actions/fetch-gcc/action.yml | 46 +++ .github/workflows/ci.yml | 12 +- .github/workflows/test_matrix.json | 62 +++-- CMakeLists.txt | 5 +- README.md | 61 +++- cmake/coroutineOptions.cmake | 1 + include/concurrencpp/executors/constants.h | 3 + include/concurrencpp/executors/executor.h | 8 +- .../concurrencpp/executors/thread_executor.h | 5 +- .../executors/thread_pool_executor.h | 6 +- .../executors/worker_thread_executor.h | 8 +- include/concurrencpp/platform_defs.h | 4 +- include/concurrencpp/results/constants.h | 60 ++-- .../results/impl/consumer_context.h | 4 +- .../results/impl/producer_context.h | 29 -- .../concurrencpp/results/impl/result_state.h | 21 ++ .../results/impl/shared_result_state.h | 186 +++++-------- include/concurrencpp/results/promises.h | 6 +- include/concurrencpp/results/result.h | 4 +- include/concurrencpp/results/shared_result.h | 8 +- .../results/shared_result_awaitable.h | 1 + include/concurrencpp/results/when_result.h | 91 +++--- include/concurrencpp/runtime/constants.h | 2 +- include/concurrencpp/runtime/runtime.h | 14 +- include/concurrencpp/threads/cache_line.h | 11 +- include/concurrencpp/threads/constants.h | 20 +- include/concurrencpp/threads/thread.h | 20 +- include/concurrencpp/timers/constants.h | 18 +- include/concurrencpp/timers/timer_queue.h | 6 +- source/executors/thread_executor.cpp | 21 +- source/executors/thread_pool_executor.cpp | 49 +++- source/executors/worker_thread_executor.cpp | 32 ++- source/results/impl/consumer_context.cpp | 20 ++ source/results/impl/result_state.cpp | 22 ++ source/results/impl/shared_result_state.cpp | 91 ++---- source/results/promises.cpp | 2 - source/runtime/runtime.cpp | 15 +- source/threads/thread.cpp | 8 + source/timers/timer_queue.cpp | 23 +- test/CMakeLists.txt | 12 +- test/include/infra/assertions.h | 1 + test/include/utils/test_thread_callbacks.h | 44 +++ .../executor_tests/thread_executor_tests.cpp | 12 + .../thread_pool_executor_tests.cpp | 17 ++ .../worker_thread_executor_tests.cpp | 12 + .../tests/result_tests/result_await_tests.cpp | 223 --------------- .../result_resolve_await_tests.cpp | 263 ++++++++++++++++++ .../result_tests/result_resolve_tests.cpp | 193 ------------- test/source/tests/runtime_tests.cpp | 34 +++ .../tests/timer_tests/timer_queue_tests.cpp | 38 ++- 51 files changed, 1014 insertions(+), 843 deletions(-) create mode 100644 .github/actions/fetch-gcc/action.yml delete mode 100644 source/results/promises.cpp create mode 100644 test/include/utils/test_thread_callbacks.h delete mode 100644 test/source/tests/result_tests/result_await_tests.cpp create mode 100644 test/source/tests/result_tests/result_resolve_await_tests.cpp delete mode 100644 test/source/tests/result_tests/result_resolve_tests.cpp diff --git a/.github/actions/fetch-clang/action.yml b/.github/actions/fetch-clang/action.yml index 730e4b84..1ee41649 100644 --- a/.github/actions/fetch-clang/action.yml +++ b/.github/actions/fetch-clang/action.yml @@ -43,9 +43,6 @@ runs: if (${version} -eq 12) { $pkgs += "libunwind-${version}-dev" } - if (${version} -ge 14) { - $pkgs += "libclang-rt-${version}-dev" - } Invoke-NativeCommand sudo apt-get install -y $pkgs Add-Content "${env:GITHUB_OUTPUT}" "clang=$((Get-Command clang-${version}).Source)" Add-Content "${env:GITHUB_OUTPUT}" "clangxx=$((Get-Command clang++-${version}).Source)" diff --git a/.github/actions/fetch-gcc/action.yml b/.github/actions/fetch-gcc/action.yml new file mode 100644 index 00000000..066a5bb8 --- /dev/null +++ b/.github/actions/fetch-gcc/action.yml @@ -0,0 +1,46 @@ +name: Fetch GCC +description: Puts gcc's path into the output + +inputs: + version: + description: Version of GCC to fetch + required: true +outputs: + gcc: + description: Path of gcc executable + value: ${{ steps.script.outputs.gcc }} + gplusplus: + description: Path of g++ executable + value: ${{ steps.script.outputs.gplusplus }} + +runs: + using: composite + steps: + - id: script + shell: pwsh + run: | + $version = "${{ inputs.version }}" + function Invoke-NativeCommand { + $command = $args[0] + $arguments = $args[1..($args.Length)] + & $command @arguments + if ($LastExitCode -ne 0) { + Write-Error "Exit code $LastExitCode while running $command $arguments" + } + } + if ($IsMacOs) { + Invoke-NativeCommand brew install gcc@${version} + Add-Content "${env:GITHUB_OUTPUT}" "gcc=gcc-${version}" + Add-Content "${env:GITHUB_OUTPUT}" "gplusplus=g++-${version}" + } elseif ($IsLinux) { + # install Homebrew + $env:CI = "1" + Invoke-NativeCommand /bin/bash -c $(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh) + $env:CI = $null + # install gcc + Invoke-NativeCommand /home/linuxbrew/.linuxbrew/bin/brew install gcc@${version} + Add-Content "${env:GITHUB_OUTPUT}" "gcc=/home/linuxbrew/.linuxbrew/bin/gcc-${version}" + Add-Content "${env:GITHUB_OUTPUT}" "gplusplus=/home/linuxbrew/.linuxbrew/bin/g++-${version}" + } elseif ($IsWindows) { + Write-Error "GCC installation on Windows is not supported" + } diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe8c5bf0..3f058965 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,7 +71,7 @@ jobs: id: libstdcxx with: version: ${{ matrix.libstdcxx-version }} - if: ${{ matrix.stdlib == 'libstdc++' }} + if: ${{ matrix.stdlib == 'libstdc++' && matrix.libstdcxx-version }} - uses: ./.github/actions/fetch-clang id: clang @@ -80,11 +80,17 @@ jobs: base-directory: build/tools if: ${{ matrix.clang-version }} + - uses: ./.github/actions/fetch-gcc + id: gcc + with: + version: ${{ matrix.gcc-version }} + if: ${{ matrix.gcc-version }} + - name: Build Examples uses: ./.github/actions/cmake-build continue-on-error: ${{ matrix.os == 'macos-11' }} env: - CXX: ${{ steps.clang.outputs.clangxx }} + CXX: ${{ steps.clang.outputs.clangxx || steps.gcc.outputs.gplusplus }} with: cmake: ${{ steps.cmake.outputs.cmake }} ninja: ${{ steps.ninja.outputs.ninja }} @@ -100,7 +106,7 @@ jobs: uses: ./.github/actions/cmake-build continue-on-error: ${{ matrix.os == 'macos-11' }} env: - CXX: ${{ steps.clang.outputs.clangxx }} + CXX: ${{ steps.clang.outputs.clangxx || steps.gcc.outputs.gplusplus }} with: cmake: ${{ steps.cmake.outputs.cmake }} ninja: ${{ steps.ninja.outputs.ninja }} diff --git a/.github/workflows/test_matrix.json b/.github/workflows/test_matrix.json index 108ac187..5a6ece68 100644 --- a/.github/workflows/test_matrix.json +++ b/.github/workflows/test_matrix.json @@ -177,6 +177,30 @@ "shared": true, "name" : "ubuntu-22.04, clang-15, libstdcxx-12, shared=true, tsan=false" }, + { + "os": "ubuntu-22.04", + "gcc-version": 13, + "stdlib": "libstdc++", + "tsan": false, + "shared": false, + "name" : "ubuntu-22.04, gcc-13, libstdc++-13, shared=false, tsan=false" + }, + { + "os": "ubuntu-22.04", + "gcc-version": 13, + "stdlib": "libstdc++", + "tsan": false, + "shared": true, + "name" : "ubuntu-22.04, gcc-13, libstdc++-13, shared=true, tsan=false" + }, + { + "os": "ubuntu-22.04", + "gcc-version": 13, + "stdlib": "libstdc++", + "tsan": true, + "shared": false, + "name" : "ubuntu-22.04, gcc-13, libstdc++-13, shared=false, tsan=true" + }, { "os": "windows-2019", "msvc-version": 2019, @@ -209,22 +233,6 @@ "shared": true, "name" : "windows-2022, msvc-2022, msvc-stl, shared=true, tsan=false" }, - { - "os": "windows-2022", - "clang-version": 14, - "stdlib": "msvc-stl", - "tsan": false, - "shared": false, - "name" : "windows-2022, clang-14, msvc-stl, shared=false, tsan=false" - }, - { - "os": "windows-2022", - "clang-version": 14, - "stdlib": "msvc-stl", - "tsan": false, - "shared": true, - "name" : "windows-2022, clang-14, msvc-stl, shared=true, tsan=false" - }, { "os": "windows-2022", "clang-version": 15, @@ -246,20 +254,36 @@ "stdlib": "libc++", "tsan": false, "shared": false, - "name" : "macos-12, libc++, shared=false, tsan=false" + "name" : "macos-12, clang, libc++, shared=false, tsan=false" }, { "os": "macos-12", "stdlib": "libc++", "tsan": false, "shared": true, - "name" : "macos-12, libc++, shared=true, tsan=false" + "name" : "macos-12, clang, libc++, shared=true, tsan=false" }, { "os": "macos-12", "stdlib": "libc++", "tsan": true, "shared": false, - "name" : "macos-12, libc++, shared=false, tsan=true" + "name" : "macos-12, clang, libc++, shared=false, tsan=true" + }, + { + "os": "macos-12", + "gcc-version": 13, + "stdlib": "libstdc++", + "tsan": false, + "shared": false, + "name" : "macos-12, gcc-13, libstdc++-13, shared=false, tsan=false" + }, + { + "os": "macos-12", + "gcc-version": 13, + "stdlib": "libstdc++", + "tsan": false, + "shared": true, + "name" : "macos-12, gcc-13, libstdc++-13, shared=true, tsan=false" } ] \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index cb8837a9..d0eed988 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.16) project(concurrencpp - VERSION 0.1.6 + VERSION 0.1.7 LANGUAGES CXX) include(cmake/coroutineOptions.cmake) @@ -26,7 +26,6 @@ set(concurrencpp_sources source/results/impl/consumer_context.cpp source/results/impl/result_state.cpp source/results/impl/shared_result_state.cpp - source/results/promises.cpp source/runtime/runtime.cpp source/threads/async_lock.cpp source/threads/async_condition_variable.cpp @@ -71,6 +70,7 @@ set(concurrencpp_headers include/concurrencpp/results/generator.h include/concurrencpp/runtime/constants.h include/concurrencpp/runtime/runtime.h + include/concurrencpp/threads/constants.h include/concurrencpp/threads/async_lock.h include/concurrencpp/threads/async_condition_variable.h include/concurrencpp/threads/thread.h @@ -126,7 +126,6 @@ install( COMPONENT concurrencpp_Development INCLUDES DESTINATION "${concurrencpp_include_directory}" - COMPONENT concurrencpp_Development LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}" COMPONENT concurrencpp_Runtime NAMELINK_COMPONENT concurrencpp_Development diff --git a/README.md b/README.md index 002b9c76..12ef0f87 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ concurrencpp main advantages are: * [`async_condition_variable` example](#async_condition_variable-example) * [The runtime object](#the-runtime-object) * [`runtime` API](#runtime-api) + * [Thread creation and termination monitoring](#thread-creation-and-termination-monitoring) * [Creating user-defined executors](#creating-user-defined-executors) * [`task` objects](#task-objects) * [`task` API](#task-api) @@ -2133,6 +2134,58 @@ class runtime { static std::tuple version() noexcept; }; ``` +#### Thread creation and termination monitoring + +In some cases, applications are interested in monitoring thread creation and termination, for example, some memory allocators require new threads to be registered and unregistered upon their creation and termination. The concurrencpp runtime allows setting a thread creation callback and a thread termination callback. those callbacks will be called whenever one of the concurrencpp workers create a new thread and when that thread is terminating. Those callbacks are always called from inside the created/terminating thread, so `std::this_thread::get_id` will always return the relevant thread ID. The signature of those callbacks is `void callback (std::string_view thread_name)`. `thread_name` is a concurrencpp specific title that is given to the thread and can be observed in some debuggers that present the thread name. The thread name is not guaranteed to be unique and should be used for logging and debugging. + +In order to set a thread-creation callback and/or a thread termination callback, applications can set the `thread_started_callback` and/or `thread_terminated_callback` members of the `runtime_options` which is passed to the runtime constructor. Since those callbacks are copied to each concurrencpp worker that might create threads, those callbacks have to be copiable. + +#### Example: monitoring thread creation and termination + +```cpp +#include "concurrencpp/concurrencpp.h" + +#include + +int main() { + concurrencpp::runtime_options options; + options.thread_started_callback = [](std::string_view thread_name) { + std::cout << "A new thread is starting to run, name: " << thread_name << ", thread id: " << std::this_thread::get_id() + << std::endl; + }; + + options.thread_terminated_callback = [](std::string_view thread_name) { + std::cout << "A thread is terminating, name: " << thread_name << ", thread id: " << std::this_thread::get_id() << std::endl; + }; + + concurrencpp::runtime runtime(options); + const auto timer_queue = runtime.timer_queue(); + const auto thread_pool_executor = runtime.thread_pool_executor(); + + concurrencpp::timer timer = + timer_queue->make_timer(std::chrono::milliseconds(100), std::chrono::milliseconds(500), thread_pool_executor, [] { + std::cout << "A timer callable is executing" << std::endl; + }); + + std::this_thread::sleep_for(std::chrono::seconds(3)); + + return 0; +} +``` +Possible output: + +``` +A new thread is starting to run, name: concurrencpp::timer_queue worker, thread id: 7496 +A new thread is starting to run, name: concurrencpp::thread_pool_executor worker, thread id: 21620 +A timer callable is executing +A timer callable is executing +A timer callable is executing +A timer callable is executing +A timer callable is executing +A timer callable is executing +A thread is terminating, name: concurrencpp::timer_queue worker, thread id: 7496 +A thread is terminating, name: concurrencpp::thread_pool_executor worker, thread id: 21620 +``` #### Creating user-defined executors @@ -2344,8 +2397,8 @@ int main() { ### Supported platforms and tools * **Operating systems:** Linux, macOS, Windows (Windows 10 and above) -* **Compilers:** MSVC (Visual Studio 2019 version 16.8.2 and above), Clang 14+, Clang 11-13 with libc++ -* **Tools:** CMake (3.16 and above) +* **Compilers:** MSVC (Visual Studio 2019 version 16.8.2 and above), Clang 14+, Clang 11-13 with libc++, GCC 13+ +* **Tools:** CMake (3.16 and above) ### Building, installing and testing @@ -2377,7 +2430,7 @@ $ cmake --build build/lib ``` ##### Running the tests on *nix platforms -With clang, it is also possible to run the tests with TSAN (thread sanitizer) support. +With clang and gcc, it is also possible to run the tests with TSAN (thread sanitizer) support. ```cmake $ git clone https://github.com/David-Haim/concurrencpp.git @@ -2423,4 +2476,4 @@ $ cmake -S sandbox -B build/sandbox #for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox $ cmake --build build/sandbox $ ./build/sandbox #runs the sandbox -``` +``` \ No newline at end of file diff --git a/cmake/coroutineOptions.cmake b/cmake/coroutineOptions.cmake index 20f16b2a..edfe3a5b 100644 --- a/cmake/coroutineOptions.cmake +++ b/cmake/coroutineOptions.cmake @@ -6,6 +6,7 @@ function(target_coroutine_options TARGET) target_compile_options(${TARGET} PUBLIC /permissive-) elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang") set_target_properties(${TARGET} PROPERTIES CXX_EXTENSIONS NO) + elseif(CMAKE_CXX_COMPILER_ID MATCHES "GNU") else() message(FATAL_ERROR "Compiler not supported: ${CMAKE_CXX_COMPILER_ID}") endif() diff --git a/include/concurrencpp/executors/constants.h b/include/concurrencpp/executors/constants.h index c7178319..e8f20782 100644 --- a/include/concurrencpp/executors/constants.h +++ b/include/concurrencpp/executors/constants.h @@ -1,6 +1,7 @@ #ifndef CONCURRENCPP_EXECUTORS_CONSTS_H #define CONCURRENCPP_EXECUTORS_CONSTS_H +#include #include namespace concurrencpp::details::consts { @@ -19,6 +20,8 @@ namespace concurrencpp::details::consts { inline const char* k_manual_executor_name = "concurrencpp::manual_executor"; constexpr int k_manual_executor_max_concurrency_level = std::numeric_limits::max(); + inline const char* k_timer_queue_name = "concurrencpp::timer_queue"; + inline const char* k_executor_shutdown_err_msg = " - shutdown has been called on this executor."; } // namespace concurrencpp::details::consts diff --git a/include/concurrencpp/executors/executor.h b/include/concurrencpp/executors/executor.h index 673fee60..87475af0 100644 --- a/include/concurrencpp/executors/executor.h +++ b/include/concurrencpp/executors/executor.h @@ -11,7 +11,7 @@ namespace concurrencpp::details { [[noreturn]] CRCPP_API void throw_runtime_shutdown_exception(std::string_view executor_name); - std::string make_executor_worker_name(std::string_view executor_name); + CRCPP_API std::string make_executor_worker_name(std::string_view executor_name); } // namespace concurrencpp::details namespace concurrencpp { @@ -34,7 +34,11 @@ namespace concurrencpp { } void await_suspend(details::coroutine_handle coro_handle) noexcept { - accumulator.emplace_back(details::await_via_functor(coro_handle, &m_interrupted)); + try { + accumulator.emplace_back(details::await_via_functor(coro_handle, &m_interrupted)); + } catch (...) { + // do nothing. ~await_via_functor will resume the coroutine and throw an exception. + } } void await_resume() const { diff --git a/include/concurrencpp/executors/thread_executor.h b/include/concurrencpp/executors/thread_executor.h index 52a61a89..e5f10313 100644 --- a/include/concurrencpp/executors/thread_executor.h +++ b/include/concurrencpp/executors/thread_executor.h @@ -20,12 +20,15 @@ namespace concurrencpp { std::list m_last_retired; bool m_abort; std::atomic_bool m_atomic_abort; + const std::function m_thread_started_callback; + const std::function m_thread_terminated_callback; void enqueue_impl(std::unique_lock& lock, task& task); void retire_worker(std::list::iterator it); public: - thread_executor(); + thread_executor(const std::function& thread_started_callback = {}, + const std::function& thread_terminated_callback = {}); ~thread_executor() noexcept; void enqueue(task task) override; diff --git a/include/concurrencpp/executors/thread_pool_executor.h b/include/concurrencpp/executors/thread_pool_executor.h index baf8aaee..13f9ad81 100644 --- a/include/concurrencpp/executors/thread_pool_executor.h +++ b/include/concurrencpp/executors/thread_pool_executor.h @@ -57,7 +57,11 @@ namespace concurrencpp { details::thread_pool_worker& worker_at(size_t index) noexcept; public: - thread_pool_executor(std::string_view pool_name, size_t pool_size, std::chrono::milliseconds max_idle_time); + thread_pool_executor(std::string_view pool_name, + size_t pool_size, + std::chrono::milliseconds max_idle_time, + const std::function& thread_started_callback = {}, + const std::function& thread_terminated_callback = {}); ~thread_pool_executor() override; diff --git a/include/concurrencpp/executors/worker_thread_executor.h b/include/concurrencpp/executors/worker_thread_executor.h index 92a3290a..002a79d2 100644 --- a/include/concurrencpp/executors/worker_thread_executor.h +++ b/include/concurrencpp/executors/worker_thread_executor.h @@ -16,13 +16,16 @@ namespace concurrencpp { private: std::deque m_private_queue; std::atomic_bool m_private_atomic_abort; - details::thread m_thread; alignas(CRCPP_CACHE_LINE_ALIGNMENT) std::mutex m_lock; std::deque m_public_queue; std::binary_semaphore m_semaphore; + details::thread m_thread; std::atomic_bool m_atomic_abort; bool m_abort; + const std::function m_thread_started_callback; + const std::function m_thread_terminated_callback; + void make_os_worker_thread(); bool drain_queue_impl(); bool drain_queue(); void wait_for_task(std::unique_lock& lock); @@ -35,7 +38,8 @@ namespace concurrencpp { void enqueue_foreign(std::span task); public: - worker_thread_executor(); + worker_thread_executor(const std::function& thread_started_callback = {}, + const std::function& thread_terminated_callback = {}); void enqueue(concurrencpp::task task) override; void enqueue(std::span tasks) override; diff --git a/include/concurrencpp/platform_defs.h b/include/concurrencpp/platform_defs.h index 3469b1c6..69a150cd 100644 --- a/include/concurrencpp/platform_defs.h +++ b/include/concurrencpp/platform_defs.h @@ -1,7 +1,9 @@ #ifndef CONCURRENCPP_PLATFORM_DEFS_H #define CONCURRENCPP_PLATFORM_DEFS_H -#if defined(_WIN32) +#if defined(__MINGW32__) +# define CRCPP_MINGW_OS +#elif defined(_WIN32) # define CRCPP_WIN_OS #elif defined(unix) || defined(__unix__) || defined(__unix) # define CRCPP_UNIX_OS diff --git a/include/concurrencpp/results/constants.h b/include/concurrencpp/results/constants.h index 4a588d37..08078ff3 100644 --- a/include/concurrencpp/results/constants.h +++ b/include/concurrencpp/results/constants.h @@ -6,48 +6,48 @@ namespace concurrencpp::details::consts { * result_promise */ - inline const char* k_result_promise_set_result_error_msg = "result_promise::set_result() - empty result_promise."; + inline const char* k_result_promise_set_result_error_msg = "concurrencpp::result_promise::set_result() - empty result_promise."; - inline const char* k_result_promise_set_exception_error_msg = "result_promise::set_exception() - empty result_promise."; + inline const char* k_result_promise_set_exception_error_msg = "concurrencpp::result_promise::set_exception() - empty result_promise."; inline const char* k_result_promise_set_exception_null_exception_error_msg = - "result_promise::set_exception() - exception pointer is null."; + "concurrencpp::result_promise::set_exception() - exception pointer is null."; - inline const char* k_result_promise_set_from_function_error_msg = "result_promise::set_from_function() - empty result_promise."; + inline const char* k_result_promise_set_from_function_error_msg = "concurrencpp::result_promise::set_from_function() - empty result_promise."; - inline const char* k_result_promise_get_result_error_msg = "result_promise::get_result() - empty result_promise."; + inline const char* k_result_promise_get_result_error_msg = "concurrencpp::result_promise::get_result() - empty result_promise."; inline const char* k_result_promise_get_result_already_retrieved_error_msg = - "result_promise::get_result() - result was already retrieved."; + "concurrencpp::result_promise::get_result() - result was already retrieved."; /* * result */ - inline const char* k_result_status_error_msg = "result::status() - result is empty."; + inline const char* k_result_status_error_msg = "concurrencpp::result::status() - result is empty."; - inline const char* k_result_get_error_msg = "result::get() - result is empty."; + inline const char* k_result_get_error_msg = "concurrencpp::result::get() - result is empty."; - inline const char* k_result_wait_error_msg = "result::wait() - result is empty."; + inline const char* k_result_wait_error_msg = "concurrencpp::result::wait() - result is empty."; - inline const char* k_result_wait_for_error_msg = "result::wait_for() - result is empty."; + inline const char* k_result_wait_for_error_msg = "concurrencpp::result::wait_for() - result is empty."; - inline const char* k_result_wait_until_error_msg = "result::wait_until() - result is empty."; + inline const char* k_result_wait_until_error_msg = "concurrencpp::result::wait_until() - result is empty."; - inline const char* k_result_operator_co_await_error_msg = "result::operator co_await() - result is empty."; + inline const char* k_result_operator_co_await_error_msg = "concurrencpp::result::operator co_await() - result is empty."; - inline const char* k_result_resolve_error_msg = "result::resolve() - result is empty."; + inline const char* k_result_resolve_error_msg = "concurrencpp::result::resolve() - result is empty."; inline const char* k_executor_exception_error_msg = - "concurrencpp::result - an exception was thrown while trying to enqueue result continuation."; + "concurrencpp::concurrencpp::result - an exception was thrown while trying to enqueue result continuation."; - inline const char* k_broken_task_exception_error_msg = "concurrencpp::result - Associated task was interrupted abnormally"; + inline const char* k_broken_task_exception_error_msg = "concurrencpp::result - associated task was interrupted abnormally"; /* * when_xxx */ - inline const char* k_make_exceptional_result_exception_null_error_msg = "make_exception_result() - given exception_ptr is null."; + inline const char* k_make_exceptional_result_exception_null_error_msg = "concurrencpp::make_exception_result() - given exception_ptr is null."; inline const char* k_when_all_empty_result_error_msg = "concurrencpp::when_all() - one of the result objects is empty."; @@ -63,47 +63,47 @@ namespace concurrencpp::details::consts { * shared_result */ - inline const char* k_shared_result_status_error_msg = "shared_result::status() - result is empty."; + inline const char* k_shared_result_status_error_msg = "concurrencpp::shared_result::status() - result is empty."; - inline const char* k_shared_result_get_error_msg = "shared_result::get() - result is empty."; + inline const char* k_shared_result_get_error_msg = "concurrencpp::shared_result::get() - result is empty."; - inline const char* k_shared_result_wait_error_msg = "shared_result::wait() - result is empty."; + inline const char* k_shared_result_wait_error_msg = "concurrencpp::shared_result::wait() - result is empty."; - inline const char* k_shared_result_wait_for_error_msg = "shared_result::wait_for() - result is empty."; + inline const char* k_shared_result_wait_for_error_msg = "concurrencpp::shared_result::wait_for() - result is empty."; - inline const char* k_shared_result_wait_until_error_msg = "shared_result::wait_until() - result is empty."; + inline const char* k_shared_result_wait_until_error_msg = "concurrencpp::shared_result::wait_until() - result is empty."; - inline const char* k_shared_result_operator_co_await_error_msg = "shared_result::operator co_await() - result is empty."; + inline const char* k_shared_result_operator_co_await_error_msg = "concurrencpp::shared_result::operator co_await() - result is empty."; - inline const char* k_shared_result_resolve_error_msg = "shared_result::resolve() - result is empty."; + inline const char* k_shared_result_resolve_error_msg = "concurrencpp::shared_result::resolve() - result is empty."; /* * lazy_result */ - inline const char* k_empty_lazy_result_status_err_msg = "lazy_result::status - result is empty."; + inline const char* k_empty_lazy_result_status_err_msg = "concurrencpp::lazy_result::status - result is empty."; - inline const char* k_empty_lazy_result_operator_co_await_err_msg = "lazy_result::operator co_await - result is empty."; + inline const char* k_empty_lazy_result_operator_co_await_err_msg = "concurrencpp::lazy_result::operator co_await - result is empty."; - inline const char* k_empty_lazy_result_resolve_err_msg = "lazy_result::resolve - result is empty."; + inline const char* k_empty_lazy_result_resolve_err_msg = "concurrencpp::lazy_result::resolve - result is empty."; - inline const char* k_empty_lazy_result_run_err_msg = "lazy_result::run - result is empty."; + inline const char* k_empty_lazy_result_run_err_msg = "concurrencpp::lazy_result::run - result is empty."; /* * resume_on */ - inline const char* k_resume_on_null_exception_err_msg = "resume_on - given executor is null."; + inline const char* k_resume_on_null_exception_err_msg = "concurrencpp::resume_on - given executor is null."; /* * generator */ - inline const char* k_empty_generator_begin_err_msg = "generator::begin - generator is empty."; + inline const char* k_empty_generator_begin_err_msg = "concurrencpp::generator::begin - generator is empty."; /* * parallel-coroutine */ - inline const char* k_parallel_coroutine_null_exception_err_msg = "parallel-coroutine - given executor is null."; + inline const char* k_parallel_coroutine_null_exception_err_msg = "concurrencpp::parallel-coroutine - given executor is null."; } // namespace concurrencpp::details::consts diff --git a/include/concurrencpp/results/impl/consumer_context.h b/include/concurrencpp/results/impl/consumer_context.h index 807ec9e8..e15ccdd8 100644 --- a/include/concurrencpp/results/impl/consumer_context.h +++ b/include/concurrencpp/results/impl/consumer_context.h @@ -45,12 +45,13 @@ namespace concurrencpp::details { class CRCPP_API consumer_context { private: - enum class consumer_status { idle, await, wait_for, when_any }; + enum class consumer_status { idle, await, wait_for, when_any, shared }; union storage { coroutine_handle caller_handle; std::shared_ptr wait_for_ctx; std::shared_ptr when_any_ctx; + std::weak_ptr shared_ctx; storage() noexcept {} ~storage() noexcept {} @@ -71,6 +72,7 @@ namespace concurrencpp::details { void set_await_handle(coroutine_handle caller_handle) noexcept; void set_wait_for_context(const std::shared_ptr& wait_ctx) noexcept; void set_when_any_context(const std::shared_ptr& when_any_ctx) noexcept; + void set_shared_context(const std::shared_ptr& shared_ctx) noexcept; }; } // namespace concurrencpp::details diff --git a/include/concurrencpp/results/impl/producer_context.h b/include/concurrencpp/results/impl/producer_context.h index b5235c02..ccef9ef6 100644 --- a/include/concurrencpp/results/impl/producer_context.h +++ b/include/concurrencpp/results/impl/producer_context.h @@ -46,35 +46,6 @@ namespace concurrencpp::details { } } - producer_context& operator=(producer_context&& rhs) noexcept { - assert(m_status == result_status::idle); - m_status = std::exchange(rhs.m_status, result_status::idle); - - switch (m_status) { - case result_status::value: { - new (std::addressof(m_storage.object)) type(std::move(rhs.m_storage.object)); - rhs.m_storage.object.~type(); - break; - } - - case result_status::exception: { - new (std::addressof(m_storage.exception)) std::exception_ptr(rhs.m_storage.exception); - rhs.m_storage.exception.~exception_ptr(); - break; - } - - case result_status::idle: { - break; - } - - default: { - assert(false); - } - } - - return *this; - } - template void build_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward(arguments)...))) { assert(m_status == result_status::idle); diff --git a/include/concurrencpp/results/impl/result_state.h b/include/concurrencpp/results/impl/result_state.h index 0e04b360..3bab1ea4 100644 --- a/include/concurrencpp/results/impl/result_state.h +++ b/include/concurrencpp/results/impl/result_state.h @@ -3,6 +3,7 @@ #include "concurrencpp/results/impl/consumer_context.h" #include "concurrencpp/results/impl/producer_context.h" +#include "concurrencpp/platform_defs.h" #include #include @@ -27,6 +28,8 @@ namespace concurrencpp::details { bool await(coroutine_handle caller_handle) noexcept; pc_state when_any(const std::shared_ptr& when_any_state) noexcept; + void share(const std::shared_ptr& shared_result_state) noexcept; + void try_rewind_consumer() noexcept; }; @@ -43,7 +46,14 @@ namespace concurrencpp::details { return done_handle.destroy(); } +#ifdef CRCPP_GCC_COMPILER +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wfree-nonheap-object" +#endif delete state; +#ifdef CRCPP_GCC_COMPILER +# pragma GCC diagnostic pop +#endif } template @@ -89,6 +99,8 @@ namespace concurrencpp::details { const auto wait_ctx = std::make_shared(0); m_consumer.set_wait_for_context(wait_ctx); + + std::atomic_thread_fence(std::memory_order_release); auto expected_idle_state = pc_state::idle; const auto idle_0 = m_pc_state.compare_exchange_strong(expected_idle_state, @@ -102,6 +114,10 @@ namespace concurrencpp::details { } if (wait_ctx->try_acquire_for(duration + std::chrono::milliseconds(1))) { + // counting_semaphore isn't required to synchronize non atomic data, + // we'll synchronize it manually using m_pc_state::load(memory_order_acquire) + const auto status = m_pc_state.load(std::memory_order_acquire); + (void)status; assert_done(); return m_producer.status(); } @@ -146,6 +162,11 @@ namespace concurrencpp::details { return m_producer.get(); } + std::add_lvalue_reference_t get_ref() { + assert_done(); + return m_producer.get_ref(); + } + template void from_callable(callable_type&& callable) { using is_void = std::is_same; diff --git a/include/concurrencpp/results/impl/shared_result_state.h b/include/concurrencpp/results/impl/shared_result_state.h index 5c739b0a..6c96f4b2 100644 --- a/include/concurrencpp/results/impl/shared_result_state.h +++ b/include/concurrencpp/results/impl/shared_result_state.h @@ -1,167 +1,125 @@ #ifndef CONCURRENCPP_SHARED_RESULT_STATE_H #define CONCURRENCPP_SHARED_RESULT_STATE_H -#include "concurrencpp/coroutines/coroutine.h" -#include "concurrencpp/forward_declarations.h" -#include "concurrencpp/results/impl/producer_context.h" -#include "concurrencpp/results/impl/return_value_struct.h" +#include "concurrencpp/platform_defs.h" +#include "concurrencpp/results/impl/result_state.h" #include -#include -#include -#include +#include #include namespace concurrencpp::details { - struct shared_await_context { + struct shared_result_helper { + template + static consumer_result_state_ptr get_state(result& result) noexcept { + return std::move(result.m_state); + } + }; + + struct CRCPP_API shared_await_context { shared_await_context* next = nullptr; coroutine_handle caller_handle; }; -} // namespace concurrencpp::details -namespace concurrencpp::details { class CRCPP_API shared_result_state_base { protected: - std::atomic_bool m_ready {false}; - mutable std::mutex m_lock; - shared_await_context* m_awaiters = nullptr; - std::optional m_condition; + std::atomic m_status {result_status::idle}; + std::atomic m_awaiters {nullptr}; + std::counting_semaphore<> m_semaphore {0}; - void await_impl(std::unique_lock& lock, shared_await_context& awaiter) noexcept; - void wait_impl(std::unique_lock& lock); - bool wait_for_impl(std::unique_lock& lock, std::chrono::milliseconds ms); + static shared_await_context* result_ready_constant() noexcept; public: - void complete_producer(); - bool await(shared_await_context& awaiter); - void wait(); - }; + virtual ~shared_result_state_base() noexcept = default; - template - class shared_result_state final : public shared_result_state_base { - - private: - producer_context m_producer; + virtual void on_result_finished() noexcept = 0; - void assert_done() const noexcept { - assert(m_ready.load(std::memory_order_acquire)); - assert(m_producer.status() != result_status::idle); - } + result_status status() const noexcept; - public: - result_status status() const noexcept { - if (!m_ready.load(std::memory_order_acquire)) { - return result_status::idle; - } + void wait() noexcept; - return m_producer.status(); - } + bool await(shared_await_context& awaiter) noexcept; template result_status wait_for(std::chrono::duration duration) { - if (m_ready.load(std::memory_order_acquire)) { - return m_producer.status(); - } - - const auto ms = std::chrono::duration_cast(duration) + std::chrono::milliseconds(1); - - std::unique_lock lock(m_lock); - if (m_ready.load(std::memory_order_acquire)) { - return m_producer.status(); - } - - const auto ready = wait_for_impl(lock, ms); - if (ready) { - assert_done(); - return m_producer.status(); - } - - lock.unlock(); - return result_status::idle; + const auto time_point = std::chrono::system_clock::now() + duration; + return wait_until(time_point); } template result_status wait_until(const std::chrono::time_point& timeout_time) { - const auto now = clock::now(); - if (timeout_time <= now) { - return status(); + while ((status() == result_status::idle) && (clock::now() < timeout_time)) { + const auto res = m_semaphore.try_acquire_until(timeout_time); + (void)res; } - const auto diff = timeout_time - now; - return wait_for(diff); - } - - std::add_lvalue_reference_t get() { - return m_producer.get_ref(); - } - - template - void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward(arguments)...))) { - m_producer.build_result(std::forward(arguments)...); - } - - void unhandled_exception() noexcept { - m_producer.build_exception(std::current_exception()); - } - }; -} // namespace concurrencpp::details - -namespace concurrencpp::details { - struct shared_result_publisher : public suspend_always { - template - bool await_suspend(coroutine_handle handle) const noexcept { - // TODO : this can (very rarely) throw, but the standard mandates us to have a noexcept finalizer - handle.promise().complete_producer(); - return false; + return status(); } }; template - class shared_result_promise : public return_value_struct, type> { + class shared_result_state final : public shared_result_state_base { private: - const std::shared_ptr> m_state = std::make_shared>(); + consumer_result_state_ptr m_result_state; public: - template - void set_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward(arguments)...))) { - m_state->set_result(std::forward(arguments)...); + shared_result_state(consumer_result_state_ptr result_state) noexcept : m_result_state(std::move(result_state)) { + assert(static_cast(m_result_state)); } - void unhandled_exception() const noexcept { - m_state->unhandled_exception(); + ~shared_result_state() noexcept { + assert(static_cast(m_result_state)); + m_result_state->try_rewind_consumer(); + m_result_state.reset(); } - shared_result get_return_object() noexcept { - return shared_result {m_state}; + void share(const std::shared_ptr& self) noexcept { + assert(static_cast(m_result_state)); + m_result_state->share(self); } - suspend_never initial_suspend() const noexcept { - return {}; + std::add_lvalue_reference_t get() { + assert(static_cast(m_result_state)); + return m_result_state->get_ref(); } - shared_result_publisher final_suspend() const noexcept { - return {}; - } + void on_result_finished() noexcept override { + m_status.store(m_result_state->status(), std::memory_order_release); + m_status.notify_all(); - void complete_producer() const { - m_state->complete_producer(); - } - }; + /* theoretically buggish, practically there's no way + that we'll have more than max(ptrdiff_t) / 2 waiters. + on 64 bits, that's 2^62 waiters, on 32 bits thats 2^30 waiters. + memory will run out before enough tasks could be created to wait this synchronously + */ + m_semaphore.release(m_semaphore.max() / 2); - struct shared_result_tag {}; -} // namespace concurrencpp::details + auto k_result_ready = result_ready_constant(); + auto awaiters = m_awaiters.exchange(k_result_ready, std::memory_order_acq_rel); -namespace CRCPP_COROUTINE_NAMESPACE { - // No executor + No result - template - struct coroutine_traits<::concurrencpp::shared_result, - concurrencpp::details::shared_result_tag, - concurrencpp::result> { - using promise_type = concurrencpp::details::shared_result_promise; + shared_await_context* current = awaiters; + shared_await_context *prev = nullptr, *next = nullptr; + + while (current != nullptr) { + next = current->next; + current->next = prev; + prev = current; + current = next; + } + + awaiters = prev; + + while (awaiters != nullptr) { + assert(static_cast(awaiters->caller_handle)); + auto caller_handle = awaiters->caller_handle; + awaiters = awaiters->next; + caller_handle(); + } + } }; -} // namespace CRCPP_COROUTINE_NAMESPACE +} // namespace concurrencpp::details -#endif +#endif \ No newline at end of file diff --git a/include/concurrencpp/results/promises.h b/include/concurrencpp/results/promises.h index b894b7b0..b5e0700f 100644 --- a/include/concurrencpp/results/promises.h +++ b/include/concurrencpp/results/promises.h @@ -56,7 +56,11 @@ namespace concurrencpp::details { public: template void await_suspend(coroutine_handle handle) { - handle.promise().m_initial_executor.post(await_via_functor {handle, &m_interrupted}); + try { + handle.promise().m_initial_executor.post(await_via_functor {handle, &m_interrupted}); + } catch (...) { + // do nothing. ~await_via_functor will resume the coroutine and throw an exception. + } } void await_resume() const { diff --git a/include/concurrencpp/results/result.h b/include/concurrencpp/results/result.h index ea487597..72cc25fd 100644 --- a/include/concurrencpp/results/result.h +++ b/include/concurrencpp/results/result.h @@ -15,7 +15,7 @@ namespace concurrencpp { static constexpr auto valid_result_type_v = std::is_same_v || std::is_nothrow_move_constructible_v; - static_assert(valid_result_type_v, "concurrencpp::result - <> should be now-throw-move constructable or void."); + static_assert(valid_result_type_v, "concurrencpp::result - <> should be no-throw-move constructible or void."); friend class details::when_result_helper; friend struct details::shared_result_helper; @@ -100,7 +100,7 @@ namespace concurrencpp { static constexpr auto valid_result_type_v = std::is_same_v || std::is_nothrow_move_constructible_v; static_assert(valid_result_type_v, - "concurrencpp::result_promise - <> should be now-throw-move constructable or void."); + "concurrencpp::result_promise - <> should be no-throw-move constructible or void."); private: details::producer_result_state_ptr m_producer_state; diff --git a/include/concurrencpp/results/shared_result.h b/include/concurrencpp/results/shared_result.h index 0eed9da7..16940549 100644 --- a/include/concurrencpp/results/shared_result.h +++ b/include/concurrencpp/results/shared_result.h @@ -12,10 +12,6 @@ namespace concurrencpp { private: std::shared_ptr> m_state; - static shared_result make_shared_result(details::shared_result_tag, result result) { - co_return co_await result; - } - void throw_if_empty(const char* message) const { if (!static_cast(m_state)) { throw errors::empty_result(message); @@ -33,7 +29,9 @@ namespace concurrencpp { return; } - *this = make_shared_result({}, std::move(rhs)); + auto result_state = details::shared_result_helper::get_state(rhs); + m_state = std::make_shared>(std::move(result_state)); + m_state->share(std::static_pointer_cast(m_state)); } shared_result(const shared_result& rhs) noexcept = default; diff --git a/include/concurrencpp/results/shared_result_awaitable.h b/include/concurrencpp/results/shared_result_awaitable.h index b9d419ea..d1521c5e 100644 --- a/include/concurrencpp/results/shared_result_awaitable.h +++ b/include/concurrencpp/results/shared_result_awaitable.h @@ -6,6 +6,7 @@ namespace concurrencpp::details { template class shared_awaitable_base : public suspend_always { + protected: std::shared_ptr> m_state; diff --git a/include/concurrencpp/results/when_result.h b/include/concurrencpp/results/when_result.h index a2ff0515..ede16fab 100644 --- a/include/concurrencpp/results/when_result.h +++ b/include/concurrencpp/results/when_result.h @@ -13,20 +13,16 @@ namespace concurrencpp::details { class when_result_helper { private: - template - static void throw_if_empty_single(const char* error_message, const result& result) { - if (!static_cast(result)) { - throw errors::empty_result(error_message); - } - } - static void throw_if_empty_impl(const char* error_message) noexcept { (void)error_message; } template static void throw_if_empty_impl(const char* error_message, const result& result, result_types&&... results) { - throw_if_empty_single(error_message, result); + if (!static_cast(result)) { + throw errors::empty_result(error_message); + } + throw_if_empty_impl(error_message, std::forward(results)...); } @@ -50,6 +46,22 @@ namespace concurrencpp::details { return at_impl(seq, tuple, n); } + template + static result_state_base& at(std::vector>& vector, size_t n) noexcept { + assert(n < vector.size()); + return get_state_base(vector[n]); + } + + template + static size_t size(std::tuple& tuple) noexcept { + return std::tuple_size_v>; + } + + template + static size_t size(const std::vector& vector) noexcept { + return vector.size(); + } + template static void throw_if_empty_tuple(const char* error_message, result_types&&... results) { throw_if_empty_impl(error_message, std::forward(results)...); @@ -58,7 +70,9 @@ namespace concurrencpp::details { template static void throw_if_empty_range(const char* error_message, iterator_type begin, iterator_type end) { for (; begin != end; ++begin) { - throw_if_empty_single(error_message, *begin); + if (!static_cast((*begin))) { + throw errors::empty_result(error_message); + } } } @@ -88,26 +102,6 @@ namespace concurrencpp::details { std::shared_ptr m_promise; result_types& m_results; - template - static result_state_base& get_at(std::vector& vector, size_t i) noexcept { - return get_state_base(vector[i]); - } - - template - static size_t size(const std::vector& vector) noexcept { - return vector.size(); - } - - template - static result_state_base& get_at(std::tuple& tuple, size_t i) noexcept { - return at(tuple, i); - } - - template - static size_t size(std::tuple& tuple) noexcept { - return std::tuple_size_v>; - } - public: when_any_awaitable(result_types& results) noexcept : m_results(results) {} @@ -118,13 +112,13 @@ namespace concurrencpp::details { bool await_suspend(coroutine_handle coro_handle) { m_promise = std::make_shared(coro_handle); - const auto range_length = size(m_results); + const auto range_length = when_result_helper::size(m_results); for (size_t i = 0; i < range_length; i++) { if (m_promise->any_result_finished()) { return false; } - auto& state_ref = get_at(m_results, i); + auto& state_ref = when_result_helper::at(m_results, i); const auto status = state_ref.when_any(m_promise); if (status == result_state_base::pc_state::producer_done) { return m_promise->resume_inline(state_ref); @@ -138,9 +132,9 @@ namespace concurrencpp::details { const auto completed_result_state = m_promise->completed_result(); auto completed_result_index = std::numeric_limits::max(); - const auto range_length = size(m_results); + const auto range_length = when_result_helper::size(m_results); for (size_t i = 0; i < range_length; i++) { - auto& state_ref = get_at(m_results, i); + auto& state_ref = when_result_helper::at(m_results, i); state_ref.try_rewind_consumer(); if (completed_result_state == &state_ref) { completed_result_index = i; @@ -172,30 +166,15 @@ namespace concurrencpp { } // namespace concurrencpp namespace concurrencpp::details { - template - lazy_result> when_all_impl(std::shared_ptr resume_executor) { - co_return std::tuple<>(); - } - - template - lazy_result when_all_impl(std::shared_ptr resume_executor, tuple_type tuple) { - for (size_t i = 0; i < std::tuple_size_v; i++) { - auto& state_ref = when_result_helper::at(tuple, i); + template + lazy_result when_all_impl(std::shared_ptr resume_executor, collection_type collection) { + for (size_t i = 0; i < when_result_helper::size(collection); i++) { + auto& state_ref = when_result_helper::at(collection, i); co_await when_result_helper::when_all_awaitable {state_ref}; } co_await resume_on(resume_executor); - co_return std::move(tuple); - } - - template - lazy_result> when_all_impl(std::shared_ptr resume_executor, std::vector vector) { - for (auto& result : vector) { - result = co_await result.resolve(); - } - - co_await resume_on(resume_executor); - co_return std::move(vector); + co_return std::move(collection); } } // namespace concurrencpp::details @@ -206,7 +185,11 @@ namespace concurrencpp { throw std::invalid_argument(details::consts::k_when_all_null_resume_executor_error_msg); } - return details::when_all_impl(resume_executor); + auto make_lazy_result = []() -> lazy_result> { + co_return std::tuple<>{}; + }; + + return make_lazy_result(); } template diff --git a/include/concurrencpp/runtime/constants.h b/include/concurrencpp/runtime/constants.h index 6120a2a7..57d53e15 100644 --- a/include/concurrencpp/runtime/constants.h +++ b/include/concurrencpp/runtime/constants.h @@ -12,7 +12,7 @@ namespace concurrencpp::details::consts { constexpr static unsigned int k_concurrencpp_version_major = 0; constexpr static unsigned int k_concurrencpp_version_minor = 1; - constexpr static unsigned int k_concurrencpp_version_revision = 6; + constexpr static unsigned int k_concurrencpp_version_revision = 7; } // namespace concurrencpp::details::consts #endif diff --git a/include/concurrencpp/runtime/runtime.h b/include/concurrencpp/runtime/runtime.h index 1e820a26..bb51261e 100644 --- a/include/concurrencpp/runtime/runtime.h +++ b/include/concurrencpp/runtime/runtime.h @@ -9,6 +9,7 @@ #include #include #include +#include namespace concurrencpp::details { class CRCPP_API executor_collection { @@ -33,6 +34,9 @@ namespace concurrencpp { std::chrono::milliseconds max_timer_queue_waiting_time; + std::function thread_started_callback; + std::function thread_terminated_callback; + runtime_options() noexcept; runtime_options(const runtime_options&) noexcept = default; @@ -42,14 +46,14 @@ namespace concurrencpp { class CRCPP_API runtime { private: - std::shared_ptr m_inline_executor; - std::shared_ptr m_thread_pool_executor; - std::shared_ptr m_background_executor; - std::shared_ptr m_thread_executor; + std::shared_ptr m_inline_executor; + std::shared_ptr m_thread_pool_executor; + std::shared_ptr m_background_executor; + std::shared_ptr m_thread_executor; details::executor_collection m_registered_executors; - std::shared_ptr m_timer_queue; + std::shared_ptr m_timer_queue; public: runtime(); diff --git a/include/concurrencpp/threads/cache_line.h b/include/concurrencpp/threads/cache_line.h index f2c7fba3..795cfb79 100644 --- a/include/concurrencpp/threads/cache_line.h +++ b/include/concurrencpp/threads/cache_line.h @@ -6,9 +6,16 @@ #include #if !defined(CRCPP_MAC_OS) && defined(__cpp_lib_hardware_interference_size) -# define CRCPP_CACHE_LINE_ALIGNMENT std::hardware_destructive_interference_size +# if defined(CRCPP_GCC_COMPILER) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Winterference-size" +# endif +constexpr inline std::size_t CRCPP_CACHE_LINE_ALIGNMENT = std::hardware_destructive_interference_size; +# if defined(CRCPP_GCC_COMPILER) +# pragma GCC diagnostic pop +# endif #else -# define CRCPP_CACHE_LINE_ALIGNMENT 64 +constexpr inline std::size_t CRCPP_CACHE_LINE_ALIGNMENT = 64; #endif #endif diff --git a/include/concurrencpp/threads/constants.h b/include/concurrencpp/threads/constants.h index 2fb17acd..5166fc57 100644 --- a/include/concurrencpp/threads/constants.h +++ b/include/concurrencpp/threads/constants.h @@ -2,29 +2,29 @@ #define CONCURRENCPP_THREAD_CONSTS_H namespace concurrencpp::details::consts { - inline const char* k_async_lock_null_resume_executor_err_msg = "async_lock::lock() - given resume executor is null."; - inline const char* k_async_lock_unlock_invalid_lock_err_msg = "async_lock::unlock() - trying to unlock an unowned lock."; + inline const char* k_async_lock_null_resume_executor_err_msg = "concurrencpp::async_lock::lock() - given resume executor is null."; + inline const char* k_async_lock_unlock_invalid_lock_err_msg = "concurrencpp::async_lock::unlock() - trying to unlock an unowned lock."; - inline const char* k_scoped_async_lock_null_resume_executor_err_msg = "scoped_async_lock::lock() - given resume executor is null."; + inline const char* k_scoped_async_lock_null_resume_executor_err_msg = "concurrencpp::scoped_async_lock::lock() - given resume executor is null."; - inline const char* k_scoped_async_lock_lock_deadlock_err_msg = "scoped_async_lock::lock() - *this is already locked."; + inline const char* k_scoped_async_lock_lock_deadlock_err_msg = "concurrencpp::scoped_async_lock::lock() - *this is already locked."; inline const char* k_scoped_async_lock_lock_no_mutex_err_msg = - "scoped_async_lock::lock() - *this doesn't reference any async_lock."; + "concurrencpp::scoped_async_lock::lock() - *this doesn't reference any async_lock."; - inline const char* k_scoped_async_lock_try_lock_deadlock_err_msg = "scoped_async_lock::try_lock() - *this is already locked."; + inline const char* k_scoped_async_lock_try_lock_deadlock_err_msg = "concurrencpp::scoped_async_lock::try_lock() - *this is already locked."; inline const char* k_scoped_async_lock_try_lock_no_mutex_err_msg = - "scoped_async_lock::try_lock() - *this doesn't reference any async_lock."; + "concurrencpp::scoped_async_lock::try_lock() - *this doesn't reference any async_lock."; inline const char* k_scoped_async_lock_unlock_invalid_lock_err_msg = - "scoped_async_lock::unlock() - trying to unlock an unowned lock."; + "concurrencpp::scoped_async_lock::unlock() - trying to unlock an unowned lock."; inline const char* k_async_condition_variable_await_invalid_resume_executor_err_msg = - "async_condition_variable::await() - resume_executor is null."; + "concurrencpp::async_condition_variable::await() - resume_executor is null."; inline const char* k_async_condition_variable_await_lock_unlocked_err_msg = - "async_condition_variable::await() - lock is unlocked."; + "concurrencpp::async_condition_variable::await() - lock is unlocked."; } // namespace concurrencpp::details::consts diff --git a/include/concurrencpp/threads/thread.h b/include/concurrencpp/threads/thread.h index b7d82d27..82ca58b5 100644 --- a/include/concurrencpp/threads/thread.h +++ b/include/concurrencpp/threads/thread.h @@ -3,6 +3,7 @@ #include "concurrencpp/platform_defs.h" +#include #include #include @@ -19,10 +20,25 @@ namespace concurrencpp::details { thread(thread&&) noexcept = default; template - thread(std::string name, callable_type&& callable) { - m_thread = std::thread([name = std::move(name), callable = std::forward(callable)]() mutable { + thread(std::string name, + callable_type&& callable, + std::function thread_started_callback, + std::function thread_terminated_callback) { + m_thread = std::thread([name = std::move(name), + callable = std::forward(callable), + thread_started_callback = std::move(thread_started_callback), + thread_terminated_callback = std::move(thread_terminated_callback)]() mutable { set_name(name); + + if (static_cast(thread_started_callback)) { + thread_started_callback(name); + } + callable(); + + if (static_cast(thread_terminated_callback)) { + thread_terminated_callback(name); + } }); } diff --git a/include/concurrencpp/timers/constants.h b/include/concurrencpp/timers/constants.h index b5bc6ee1..31db61a0 100644 --- a/include/concurrencpp/timers/constants.h +++ b/include/concurrencpp/timers/constants.h @@ -2,17 +2,17 @@ #define CONCURRENCPP_TIMER_CONSTS_H namespace concurrencpp::details::consts { - inline const char* k_timer_empty_get_due_time_err_msg = "timer::get_due_time() - timer is empty."; - inline const char* k_timer_empty_get_frequency_err_msg = "timer::get_frequency() - timer is empty."; - inline const char* k_timer_empty_get_executor_err_msg = "timer::get_executor() - timer is empty."; - inline const char* k_timer_empty_get_timer_queue_err_msg = "timer::get_timer_queue() - timer is empty."; - inline const char* k_timer_empty_set_frequency_err_msg = "timer::set_frequency() - timer is empty."; + inline const char* k_timer_empty_get_due_time_err_msg = "concurrencpp::timer::get_due_time() - timer is empty."; + inline const char* k_timer_empty_get_frequency_err_msg = "concurrencpp::timer::get_frequency() - timer is empty."; + inline const char* k_timer_empty_get_executor_err_msg = "concurrencpp::timer::get_executor() - timer is empty."; + inline const char* k_timer_empty_get_timer_queue_err_msg = "concurrencpp::timer::get_timer_queue() - timer is empty."; + inline const char* k_timer_empty_set_frequency_err_msg = "concurrencpp::timer::set_frequency() - timer is empty."; - inline const char* k_timer_queue_make_timer_executor_null_err_msg = "timer_queue::make_timer() - executor is null."; + inline const char* k_timer_queue_make_timer_executor_null_err_msg = "concurrencpp::timer_queue::make_timer() - executor is null."; inline const char* k_timer_queue_make_oneshot_timer_executor_null_err_msg = - "timer_queue::make_one_shot_timer() - executor is null."; - inline const char* k_timer_queue_make_delay_object_executor_null_err_msg = "timer_queue::make_delay_object() - executor is null."; - inline const char* k_timer_queue_shutdown_err_msg = "timer_queue has been shut down."; + "concurrencpp::timer_queue::make_one_shot_timer() - executor is null."; + inline const char* k_timer_queue_make_delay_object_executor_null_err_msg = "concurrencpp::timer_queue::make_delay_object() - executor is null."; + inline const char* k_timer_queue_shutdown_err_msg = "concurrencpp::timer_queue has been shut down."; } // namespace concurrencpp::details::consts #endif diff --git a/include/concurrencpp/timers/timer_queue.h b/include/concurrencpp/timers/timer_queue.h index 038fd4d5..7d3cb30b 100644 --- a/include/concurrencpp/timers/timer_queue.h +++ b/include/concurrencpp/timers/timer_queue.h @@ -40,6 +40,8 @@ namespace concurrencpp { bool m_abort; bool m_idle; const std::chrono::milliseconds m_max_waiting_time; + const std::function m_thread_started_callback; + const std::function m_thread_terminated_callback; details::thread ensure_worker_thread(std::unique_lock& lock); @@ -79,7 +81,9 @@ namespace concurrencpp { void work_loop(); public: - timer_queue(std::chrono::milliseconds max_waiting_time); + timer_queue(std::chrono::milliseconds max_waiting_time, + const std::function& thread_started_callback = {}, + const std::function& thread_terminated_callback = {}); ~timer_queue() noexcept; void shutdown(); diff --git a/source/executors/thread_executor.cpp b/source/executors/thread_executor.cpp index 7064564f..36879d88 100644 --- a/source/executors/thread_executor.cpp +++ b/source/executors/thread_executor.cpp @@ -3,9 +3,11 @@ using concurrencpp::thread_executor; -thread_executor::thread_executor() : - derivable_executor(details::consts::k_thread_executor_name), m_abort(false), m_atomic_abort(false) { -} +thread_executor::thread_executor(const std::function& thread_started_callback, + const std::function& thread_terminated_callback) : + derivable_executor(details::consts::k_thread_executor_name), + m_abort(false), m_atomic_abort(false), m_thread_started_callback(thread_started_callback), + m_thread_terminated_callback(thread_terminated_callback) {} thread_executor::~thread_executor() noexcept { assert(m_workers.empty()); @@ -16,11 +18,14 @@ void thread_executor::enqueue_impl(std::unique_lock& lock, concurren assert(lock.owns_lock()); auto& new_thread = m_workers.emplace_front(); - new_thread = details::thread(details::make_executor_worker_name(name), - [this, self_it = m_workers.begin(), task = std::move(task)]() mutable { - task(); - retire_worker(self_it); - }); + new_thread = details::thread( + details::make_executor_worker_name(name), + [this, self_it = m_workers.begin(), task = std::move(task)]() mutable { + task(); + retire_worker(self_it); + }, + m_thread_started_callback, + m_thread_terminated_callback); } void thread_executor::enqueue(concurrencpp::task task) { diff --git a/source/executors/thread_pool_executor.cpp b/source/executors/thread_pool_executor.cpp index c640f104..a21bf2b9 100644 --- a/source/executors/thread_pool_executor.cpp +++ b/source/executors/thread_pool_executor.cpp @@ -45,6 +45,8 @@ namespace concurrencpp::details { bool m_abort; std::atomic_bool m_task_found_or_abort; thread m_thread; + const std::function m_thread_started_callback; + const std::function m_thread_terminated_callback; void balance_work(); @@ -57,7 +59,12 @@ namespace concurrencpp::details { void ensure_worker_active(bool first_enqueuer, std::unique_lock& lock); public: - thread_pool_worker(thread_pool_executor& parent_pool, size_t index, size_t pool_size, std::chrono::milliseconds max_idle_time); + thread_pool_worker(thread_pool_executor& parent_pool, + size_t index, + size_t pool_size, + std::chrono::milliseconds max_idle_time, + const std::function& thread_started_callback, + const std::function& thread_terminated_callback); thread_pool_worker(thread_pool_worker&& rhs) noexcept; ~thread_pool_worker() noexcept; @@ -165,11 +172,14 @@ void idle_worker_set::find_idle_workers(size_t caller_index, std::vector thread_pool_worker::thread_pool_worker(thread_pool_executor& parent_pool, size_t index, size_t pool_size, - std::chrono::milliseconds max_idle_time) : + std::chrono::milliseconds max_idle_time, + const std::function& thread_started_callback, + const std::function& thread_terminated_callback) : m_atomic_abort(false), m_parent_pool(parent_pool), m_index(index), m_pool_size(pool_size), m_max_idle_time(max_idle_time), m_worker_name(details::make_executor_worker_name(parent_pool.name)), m_semaphore(0), m_idle(true), m_abort(false), - m_task_found_or_abort(false) { + m_task_found_or_abort(false), m_thread_started_callback(thread_started_callback), + m_thread_terminated_callback(thread_terminated_callback) { m_idle_worker_list.reserve(pool_size); } @@ -357,10 +367,15 @@ void thread_pool_worker::work_loop() { s_tl_thread_pool_data.this_worker = this; s_tl_thread_pool_data.this_thread_index = m_index; - while (true) { - if (!drain_queue()) { - return; + try { + while (true) { + if (!drain_queue()) { + return; + } } + } catch (const errors::runtime_shutdown&) { + std::unique_lock lock(m_lock); + m_idle = true; } } @@ -378,9 +393,13 @@ void thread_pool_worker::ensure_worker_active(bool first_enqueuer, std::unique_l } auto stale_worker = std::move(m_thread); - m_thread = thread(m_worker_name, [this] { - work_loop(); - }); + m_thread = thread( + m_worker_name, + [this] { + work_loop(); + }, + m_thread_started_callback, + m_thread_terminated_callback); m_idle = false; lock.unlock(); @@ -496,13 +515,17 @@ bool thread_pool_worker::appears_empty() const noexcept { return m_private_queue.empty() && !m_task_found_or_abort.load(std::memory_order_relaxed); } -thread_pool_executor::thread_pool_executor(std::string_view pool_name, size_t pool_size, std::chrono::milliseconds max_idle_time) : - derivable_executor(pool_name), m_round_robin_cursor(0), m_idle_workers(pool_size), - m_abort(false) { +thread_pool_executor::thread_pool_executor(std::string_view pool_name, + size_t pool_size, + std::chrono::milliseconds max_idle_time, + const std::function& thread_started_callback, + const std::function& thread_terminated_callback) : + derivable_executor(pool_name), + m_round_robin_cursor(0), m_idle_workers(pool_size), m_abort(false) { m_workers.reserve(pool_size); for (size_t i = 0; i < pool_size; i++) { - m_workers.emplace_back(*this, i, pool_size, max_idle_time); + m_workers.emplace_back(*this, i, pool_size, max_idle_time, thread_started_callback, thread_terminated_callback); } for (size_t i = 0; i < pool_size; i++) { diff --git a/source/executors/worker_thread_executor.cpp b/source/executors/worker_thread_executor.cpp index fb83e180..5f6e8997 100644 --- a/source/executors/worker_thread_executor.cpp +++ b/source/executors/worker_thread_executor.cpp @@ -1,18 +1,27 @@ #include "concurrencpp/executors/worker_thread_executor.h" + #include "concurrencpp/executors/constants.h" namespace concurrencpp::details { static thread_local worker_thread_executor* s_tl_this_worker = nullptr; -} +} // namespace concurrencpp::details using concurrencpp::worker_thread_executor; -worker_thread_executor::worker_thread_executor() : +worker_thread_executor::worker_thread_executor(const std::function& thread_started_callback, + const std::function& thread_terminated_callback) : derivable_executor(details::consts::k_worker_thread_executor_name), - m_private_atomic_abort(false), m_semaphore(0), m_atomic_abort(false), m_abort(false) { - m_thread = details::thread(details::make_executor_worker_name(name), [this] { - work_loop(); - }); + m_private_atomic_abort(false), m_semaphore(0), m_atomic_abort(false), m_abort(false), + m_thread_started_callback(thread_started_callback), m_thread_terminated_callback(thread_terminated_callback) {} + +void concurrencpp::worker_thread_executor::make_os_worker_thread() { + m_thread = details::thread( + details::make_executor_worker_name(name), + [this] { + work_loop(); + }, + m_thread_started_callback, + m_thread_terminated_callback); } bool worker_thread_executor::drain_queue_impl() { @@ -100,6 +109,11 @@ void worker_thread_executor::enqueue_foreign(concurrencpp::task& task) { const auto is_empty = m_public_queue.empty(); m_public_queue.emplace_back(std::move(task)); + + if (!m_thread.joinable()) { + return make_os_worker_thread(); + } + lock.unlock(); if (is_empty) { @@ -115,6 +129,11 @@ void worker_thread_executor::enqueue_foreign(std::span tasks const auto is_empty = m_public_queue.empty(); m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end())); + + if (!m_thread.joinable()) { + return make_os_worker_thread(); + } + lock.unlock(); if (is_empty) { @@ -157,6 +176,7 @@ void worker_thread_executor::shutdown() { m_abort = true; } + m_private_atomic_abort.store(true, std::memory_order_relaxed); m_semaphore.release(); if (m_thread.joinable()) { diff --git a/source/results/impl/consumer_context.cpp b/source/results/impl/consumer_context.cpp index a27ea785..04604798 100644 --- a/source/results/impl/consumer_context.cpp +++ b/source/results/impl/consumer_context.cpp @@ -1,6 +1,7 @@ #include "concurrencpp/results/impl/consumer_context.h" #include "concurrencpp/executors/executor.h" +#include "concurrencpp/results/impl/shared_result_state.h" using concurrencpp::details::when_any_context; using concurrencpp::details::consumer_context; @@ -167,6 +168,10 @@ void consumer_context::destroy() noexcept { case consumer_status::when_any: { return details::destroy(m_storage.when_any_ctx); } + + case consumer_status::shared: { + return details::destroy(m_storage.shared_ctx); + } } assert(false); @@ -195,6 +200,12 @@ void consumer_context::set_when_any_context(const std::shared_ptr& shared_ctx) noexcept { + assert(m_status == consumer_status::idle); + m_status = consumer_status::shared; + details::build(m_storage.shared_ctx, shared_ctx); +} + void consumer_context::resume_consumer(result_state_base& self) const { switch (m_status) { case consumer_status::idle: { @@ -218,6 +229,15 @@ void consumer_context::resume_consumer(result_state_base& self) const { const auto when_any_ctx = m_storage.when_any_ctx; return when_any_ctx->try_resume(self); } + + case consumer_status::shared: { + const auto weak_shared_ctx = m_storage.shared_ctx; + const auto shared_ctx = weak_shared_ctx.lock(); + if (static_cast(shared_ctx)) { + shared_ctx->on_result_finished(); + } + return; + } } assert(false); diff --git a/source/results/impl/result_state.cpp b/source/results/impl/result_state.cpp index 320dddd0..8c578a9b 100644 --- a/source/results/impl/result_state.cpp +++ b/source/results/impl/result_state.cpp @@ -77,6 +77,28 @@ result_state_base::pc_state result_state_base::when_any(const std::shared_ptr& shared_result_state) noexcept { + const auto state = m_pc_state.load(std::memory_order_acquire); + if (state == pc_state::producer_done) { + return shared_result_state->on_result_finished(); + } + + m_consumer.set_shared_context(shared_result_state); + + auto expected_state = pc_state::idle; + const auto idle = m_pc_state.compare_exchange_strong(expected_state, + pc_state::consumer_set, + std::memory_order_acq_rel, + std::memory_order_acquire); + + if (idle) { + return; + } + + assert_done(); + shared_result_state->on_result_finished(); +} + void result_state_base::try_rewind_consumer() noexcept { const auto pc_state = m_pc_state.load(std::memory_order_acquire); if (pc_state != pc_state::consumer_set) { diff --git a/source/results/impl/shared_result_state.cpp b/source/results/impl/shared_result_state.cpp index 9aadfdef..2f94a1d8 100644 --- a/source/results/impl/shared_result_state.cpp +++ b/source/results/impl/shared_result_state.cpp @@ -2,90 +2,31 @@ using concurrencpp::details::shared_result_state_base; -void shared_result_state_base::await_impl(std::unique_lock& lock, shared_await_context& awaiter) noexcept { - assert(lock.owns_lock()); - - if (m_awaiters == nullptr) { - m_awaiters = &awaiter; - return; - } - - awaiter.next = m_awaiters; - m_awaiters = &awaiter; -} - -void shared_result_state_base::wait_impl(std::unique_lock& lock) { - assert(lock.owns_lock()); - - if (!m_condition.has_value()) { - m_condition.emplace(); - } - - m_condition.value().wait(lock, [this] { - return m_ready.load(std::memory_order_relaxed); - }); +concurrencpp::details::shared_await_context* shared_result_state_base::result_ready_constant() noexcept { + return reinterpret_cast(-1); } -bool shared_result_state_base::wait_for_impl(std::unique_lock& lock, std::chrono::milliseconds ms) { - assert(lock.owns_lock()); - - if (!m_condition.has_value()) { - m_condition.emplace(); - } - - return m_condition.value().wait_for(lock, ms, [this] { - return m_ready.load(std::memory_order_relaxed); - }); +concurrencpp::result_status concurrencpp::details::shared_result_state_base::status() const noexcept { + return m_status.load(std::memory_order_acquire); } -void shared_result_state_base::complete_producer() { - shared_await_context* awaiters; - - { - std::unique_lock lock(m_lock); - awaiters = std::exchange(m_awaiters, nullptr); - m_ready.store(true, std::memory_order_release); - - if (m_condition.has_value()) { - m_condition.value().notify_all(); - } - } - - while (awaiters != nullptr) { - const auto next = awaiters->next; - awaiters->caller_handle(); - awaiters = next; - } -} - -bool shared_result_state_base::await(shared_await_context& awaiter) { - if (m_ready.load(std::memory_order_acquire)) { - return false; - } - - { - std::unique_lock lock(m_lock); - if (m_ready.load(std::memory_order_acquire)) { +bool shared_result_state_base::await(shared_await_context& awaiter) noexcept { + while (true) { + auto awaiter_before = m_awaiters.load(std::memory_order_acquire); + if (awaiter_before == result_ready_constant()) { return false; } - await_impl(lock, awaiter); + awaiter.next = awaiter_before; + const auto swapped = m_awaiters.compare_exchange_weak(awaiter_before, &awaiter, std::memory_order_acq_rel); + if (swapped) { + return true; + } } - - return true; } -void shared_result_state_base::wait() { - if (m_ready.load(std::memory_order_acquire)) { - return; - } - - { - std::unique_lock lock(m_lock); - if (m_ready.load(std::memory_order_acquire)) { - return; - } - - wait_impl(lock); +void concurrencpp::details::shared_result_state_base::wait() noexcept { + if (status() == result_status::idle) { + m_status.wait(result_status::idle, std::memory_order_acquire); } } diff --git a/source/results/promises.cpp b/source/results/promises.cpp deleted file mode 100644 index 8b979fa1..00000000 --- a/source/results/promises.cpp +++ /dev/null @@ -1,2 +0,0 @@ -#include "concurrencpp/results/promises.h" -#include "concurrencpp/coroutines/coroutine.h" diff --git a/source/runtime/runtime.cpp b/source/runtime/runtime.cpp index ebda121b..7a2b3f9a 100644 --- a/source/runtime/runtime.cpp +++ b/source/runtime/runtime.cpp @@ -70,22 +70,29 @@ runtime_options::runtime_options() noexcept : runtime::runtime() : runtime(runtime_options()) {} runtime::runtime(const runtime_options& options) { - m_timer_queue = std::make_shared<::concurrencpp::timer_queue>(options.max_timer_queue_waiting_time); + m_timer_queue = std::make_shared<::concurrencpp::timer_queue>(options.max_timer_queue_waiting_time, + options.thread_started_callback, + options.thread_terminated_callback); m_inline_executor = std::make_shared<::concurrencpp::inline_executor>(); m_registered_executors.register_executor(m_inline_executor); m_thread_pool_executor = std::make_shared<::concurrencpp::thread_pool_executor>(details::consts::k_thread_pool_executor_name, options.max_cpu_threads, - options.max_thread_pool_executor_waiting_time); + options.max_thread_pool_executor_waiting_time, + options.thread_started_callback, + options.thread_terminated_callback); m_registered_executors.register_executor(m_thread_pool_executor); m_background_executor = std::make_shared<::concurrencpp::thread_pool_executor>(details::consts::k_background_executor_name, options.max_background_threads, - options.max_background_executor_waiting_time); + options.max_background_executor_waiting_time, + options.thread_started_callback, + options.thread_terminated_callback); m_registered_executors.register_executor(m_background_executor); - m_thread_executor = std::make_shared<::concurrencpp::thread_executor>(); + m_thread_executor = + std::make_shared<::concurrencpp::thread_executor>(options.thread_started_callback, options.thread_terminated_callback); m_registered_executors.register_executor(m_thread_executor); } diff --git a/source/threads/thread.cpp b/source/threads/thread.cpp index b300853b..e00a28a4 100644 --- a/source/threads/thread.cpp +++ b/source/threads/thread.cpp @@ -54,6 +54,14 @@ void thread::set_name(std::string_view name) noexcept { ::SetThreadDescription(::GetCurrentThread(), utf16_name.data()); } +#elif defined(CRCPP_MINGW_OS) + +# include + +void thread::set_name(std::string_view name) noexcept { + ::pthread_setname_np(::pthread_self(), name.data()); +} + #elif defined(CRCPP_UNIX_OS) # include diff --git a/source/timers/timer_queue.cpp b/source/timers/timer_queue.cpp index 3835be8f..0afe5a20 100644 --- a/source/timers/timer_queue.cpp +++ b/source/timers/timer_queue.cpp @@ -2,6 +2,8 @@ #include "concurrencpp/timers/timer_queue.h" #include "concurrencpp/coroutines/coroutine.h" +#include "concurrencpp/executors/constants.h" +#include "concurrencpp/executors/executor.h" #include #include @@ -143,8 +145,12 @@ namespace concurrencpp::details { } // namespace } // namespace concurrencpp::details -timer_queue::timer_queue(milliseconds max_waiting_time) : - m_atomic_abort(false), m_abort(false), m_idle(true), m_max_waiting_time(max_waiting_time) {} +timer_queue::timer_queue(milliseconds max_waiting_time, + const std::function& thread_started_callback, + const std::function& thread_terminated_callback) : + m_thread_started_callback(thread_started_callback), + m_thread_terminated_callback(thread_terminated_callback), m_atomic_abort(false), m_abort(false), m_idle(true), + m_max_waiting_time(max_waiting_time) {} timer_queue::~timer_queue() noexcept { shutdown(); @@ -253,9 +259,13 @@ concurrencpp::details::thread timer_queue::ensure_worker_thread(std::unique_lock auto old_worker = std::move(m_worker); - m_worker = details::thread("concurrencpp::timer_queue worker", [this] { - work_loop(); - }); + m_worker = details::thread( + details::make_executor_worker_name(details::consts::k_timer_queue_name), + [this] { + work_loop(); + }, + m_thread_started_callback, + m_thread_terminated_callback); m_idle = false; return old_worker; @@ -288,8 +298,7 @@ concurrencpp::lazy_result timer_queue::make_delay_object_impl(std::chrono: details::await_via_functor {coro_handle, &m_interrupted}); } catch (...) { - // if an exception is thrown, await_via_functor d.tor will set an interrupt and resume the coro - // no need to let the exception propagate. + // do nothing. ~await_via_functor will resume the coroutine and throw an exception. } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 49e84a69..29f79455 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -16,11 +16,11 @@ endif() # ---- Add root project ---- option(ENABLE_THREAD_SANITIZER "\ -Build concurrencpp with LLVM thread sanitizer. \ -Does not have an effect if the compiler is not Clang based." OFF) +Build concurrencpp with ThreadSanitizer. \ +Does not have an effect if the compiler is not Clang-based or GCC." OFF) string(TOLOWER ${CMAKE_CXX_COMPILER_ID} compiler_id) -if(ENABLE_THREAD_SANITIZER AND compiler_id MATCHES "clang") +if(ENABLE_THREAD_SANITIZER AND (compiler_id MATCHES "clang" OR compiler_id MATCHES "gnu")) # Instead of polluting the lists file, we inject a command definition # override that will apply the sanitizer flag set(CMAKE_PROJECT_concurrencpp_INCLUDE @@ -55,7 +55,8 @@ set(test_headers include/utils/test_generators.h include/utils/throwing_executor.h include/utils/test_ready_result.h - include/utils/test_ready_lazy_result.h) + include/utils/test_ready_lazy_result.h + include/utils/test_thread_callbacks.h) add_library(concurrencpp_test_infra STATIC ${test_headers} ${test_sources}) @@ -111,8 +112,7 @@ add_test(NAME thread_pool_executor_tests PATH source/tests/executor_tests/thread add_test(NAME worker_thread_executor_tests PATH source/tests/executor_tests/worker_thread_executor_tests.cpp) add_test(NAME result_tests PATH source/tests/result_tests/result_tests.cpp) -add_test(NAME result_resolving_tests PATH source/tests/result_tests/result_resolve_tests.cpp) -add_test(NAME result_awaiting_tests PATH source/tests/result_tests/result_await_tests.cpp) +add_test(NAME result_resolve_await_tests PATH source/tests/result_tests/result_resolve_await_tests.cpp) add_test(NAME lazy_result_tests PATH source/tests/result_tests/lazy_result_tests.cpp) diff --git a/test/include/infra/assertions.h b/test/include/infra/assertions.h index 89d0b491..34d674fa 100644 --- a/test/include/infra/assertions.h +++ b/test/include/infra/assertions.h @@ -1,6 +1,7 @@ #ifndef CONCURRENCPP_ASSERTIONS_H #define CONCURRENCPP_ASSERTIONS_H +#include #include #include diff --git a/test/include/utils/test_thread_callbacks.h b/test/include/utils/test_thread_callbacks.h new file mode 100644 index 00000000..3493f236 --- /dev/null +++ b/test/include/utils/test_thread_callbacks.h @@ -0,0 +1,44 @@ +#ifndef CONCURRENCPP_TEST_THREAD_CALLBACKS_H +#define CONCURRENCPP_TEST_THREAD_CALLBACKS_H + +#include "infra/assertions.h" +#include "utils/executor_shutdowner.h" +#include "concurrencpp/executors/executor.h" + +namespace concurrencpp::tests { + template + void test_thread_callbacks(executor_factory_type executor_factory, std::string_view expected_thread_name) { + std::atomic_size_t thread_started_callback_invocations_num = 0; + std::atomic_size_t thread_terminated_callback_invocations_num = 0; + + auto thread_started_callback = [&thread_started_callback_invocations_num, expected_thread_name](std::string_view thread_name) { + ++thread_started_callback_invocations_num; + assert_equal(thread_name, expected_thread_name); + }; + + auto thread_terminated_callback = [&thread_terminated_callback_invocations_num, + expected_thread_name](std::string_view thread_name) { + ++thread_terminated_callback_invocations_num; + assert_equal(thread_name, expected_thread_name); + }; + + std::shared_ptr executor = executor_factory(thread_started_callback, thread_terminated_callback); + executor_shutdowner es(executor); + + assert_equal(thread_started_callback_invocations_num, 0); + assert_equal(thread_terminated_callback_invocations_num, 0); + + executor + ->submit([&thread_started_callback_invocations_num, &thread_terminated_callback_invocations_num]() { + assert_equal(thread_started_callback_invocations_num, 1); + assert_equal(thread_terminated_callback_invocations_num, 0); + }) + .get(); + + executor->shutdown(); + assert_equal(thread_started_callback_invocations_num, 1); + assert_equal(thread_terminated_callback_invocations_num, 1); + } +} // namespace concurrencpp::tests + +#endif diff --git a/test/source/tests/executor_tests/thread_executor_tests.cpp b/test/source/tests/executor_tests/thread_executor_tests.cpp index d710f376..183b1529 100644 --- a/test/source/tests/executor_tests/thread_executor_tests.cpp +++ b/test/source/tests/executor_tests/thread_executor_tests.cpp @@ -6,6 +6,7 @@ #include "utils/test_generators.h" #include "utils/test_ready_result.h" #include "utils/executor_shutdowner.h" +#include "utils/test_thread_callbacks.h" namespace concurrencpp::tests { void test_thread_executor_name(); @@ -37,6 +38,8 @@ namespace concurrencpp::tests { void test_thread_executor_bulk_submit_inline(); void test_thread_executor_bulk_submit(); + void test_thread_executor_thread_callbacks(); + void assert_unique_execution_threads(const std::unordered_map& execution_map, const size_t expected_thread_count) { assert_equal(execution_map.size(), expected_thread_count); @@ -373,6 +376,14 @@ void concurrencpp::tests::test_thread_executor_bulk_submit() { test_thread_executor_bulk_post_inline(); } +void concurrencpp::tests::test_thread_executor_thread_callbacks() { + test_thread_callbacks( + [](auto thread_started_callback, auto thread_terminated_callback) { + return std::make_shared(thread_started_callback, thread_terminated_callback); + }, + concurrencpp::details::make_executor_worker_name(concurrencpp::details::consts::k_thread_executor_name)); +} + using namespace concurrencpp::tests; int main() { @@ -385,6 +396,7 @@ int main() { tester.add_step("submit", test_thread_executor_submit); tester.add_step("bulk_post", test_thread_executor_bulk_post); tester.add_step("bulk_submit", test_thread_executor_bulk_submit); + tester.add_step("thread_callbacks", test_thread_executor_thread_callbacks); tester.launch_test(); return 0; diff --git a/test/source/tests/executor_tests/thread_pool_executor_tests.cpp b/test/source/tests/executor_tests/thread_pool_executor_tests.cpp index aba41a48..464e2a42 100644 --- a/test/source/tests/executor_tests/thread_pool_executor_tests.cpp +++ b/test/source/tests/executor_tests/thread_pool_executor_tests.cpp @@ -6,6 +6,7 @@ #include "utils/test_generators.h" #include "utils/test_ready_result.h" #include "utils/executor_shutdowner.h" +#include "utils/test_thread_callbacks.h" namespace concurrencpp::tests { void test_thread_pool_executor_name(); @@ -37,6 +38,8 @@ namespace concurrencpp::tests { void test_thread_pool_executor_enqueue_algorithm(); void test_thread_pool_executor_dynamic_resizing(); + + void test_thread_pool_executor_thread_callbacks(); } // namespace concurrencpp::tests using concurrencpp::details::thread; @@ -547,6 +550,19 @@ void concurrencpp::tests::test_thread_pool_executor_dynamic_resizing() { } } +void concurrencpp::tests::test_thread_pool_executor_thread_callbacks() { + constexpr std::string_view thread_pool_name = "threadpool"; + test_thread_callbacks( + [thread_pool_name](auto thread_started_callback, auto thread_terminated_callback) { + return std::make_shared(thread_pool_name, + 1, + std::chrono::seconds(10), + thread_started_callback, + thread_terminated_callback); + }, + concurrencpp::details::make_executor_worker_name(thread_pool_name)); +} + using namespace concurrencpp::tests; int main() { @@ -560,6 +576,7 @@ int main() { tester.add_step("bulk_submit", test_thread_pool_executor_bulk_submit); tester.add_step("enqueuing algorithm", test_thread_pool_executor_enqueue_algorithm); tester.add_step("dynamic resizing", test_thread_pool_executor_dynamic_resizing); + tester.add_step("thread_callbacks", test_thread_pool_executor_thread_callbacks); tester.launch_test(); return 0; diff --git a/test/source/tests/executor_tests/worker_thread_executor_tests.cpp b/test/source/tests/executor_tests/worker_thread_executor_tests.cpp index af88da21..92d0819b 100644 --- a/test/source/tests/executor_tests/worker_thread_executor_tests.cpp +++ b/test/source/tests/executor_tests/worker_thread_executor_tests.cpp @@ -6,6 +6,7 @@ #include "utils/test_generators.h" #include "utils/test_ready_result.h" #include "utils/executor_shutdowner.h" +#include "utils/test_thread_callbacks.h" namespace concurrencpp::tests { void test_worker_thread_executor_name(); @@ -37,6 +38,8 @@ namespace concurrencpp::tests { void test_worker_thread_executor_bulk_submit_inline(); void test_worker_thread_executor_bulk_submit(); + void test_worker_thread_executor_thread_callbacks(); + void assert_unique_execution_thread(const std::unordered_map& execution_map) { assert_equal(execution_map.size(), 1); assert_not_equal(execution_map.begin()->first, concurrencpp::details::thread::get_current_virtual_id()); @@ -363,6 +366,14 @@ void concurrencpp::tests::test_worker_thread_executor_bulk_submit() { test_worker_thread_executor_bulk_submit_inline(); } +void concurrencpp::tests::test_worker_thread_executor_thread_callbacks() { + test_thread_callbacks( + [](auto thread_started_callback, auto thread_terminated_callback) { + return std::make_shared(thread_started_callback, thread_terminated_callback); + }, + concurrencpp::details::make_executor_worker_name(concurrencpp::details::consts::k_worker_thread_executor_name)); +} + using namespace concurrencpp::tests; int main() { @@ -375,6 +386,7 @@ int main() { tester.add_step("submit", test_worker_thread_executor_submit); tester.add_step("bulk_post", test_worker_thread_executor_bulk_post); tester.add_step("bulk_submit", test_worker_thread_executor_bulk_submit); + tester.add_step("thread_callbacks", test_worker_thread_executor_thread_callbacks); tester.launch_test(); return 0; diff --git a/test/source/tests/result_tests/result_await_tests.cpp b/test/source/tests/result_tests/result_await_tests.cpp deleted file mode 100644 index 20dafcae..00000000 --- a/test/source/tests/result_tests/result_await_tests.cpp +++ /dev/null @@ -1,223 +0,0 @@ -#include "concurrencpp/concurrencpp.h" - -#include "infra/tester.h" -#include "infra/assertions.h" -#include "utils/object_observer.h" -#include "utils/test_generators.h" -#include "utils/test_ready_result.h" -#include "utils/throwing_executor.h" -#include "utils/executor_shutdowner.h" - -namespace concurrencpp::tests { - template - void test_result_await_impl(); - void test_result_await(); -} // namespace concurrencpp::tests - -using concurrencpp::result; -using concurrencpp::details::thread; - -/* - * In this test suit, we need to check all the possible scenarios that "await" can have. - * Our tests are split into 2 branches: when the result is already ready at the moment of resolving and - * when it's not. - * - * If the result is already ready, the test matrix looks like this: - * status[value, exception] - * Overall 2 scenarios - * - * If the result is not ready, the test matrix looks like this: - * status[value, exception] - * Overall 2 scenarios - * - * These tests are almost identical to result::resolve(_via) tests. If we got here, - * that means that result::resolve(_via) works correctly. so we modify the resolving tests to use regular - * "await" and then continue as a regular resolving test. If any assert fails, it's result::await(_via) fault and not - * result:resolve(_via) - */ - -namespace concurrencpp::tests { - template - struct test_await_ready_result { - result operator()(); - }; - - template - struct test_await_ready_result { - - private: - uintptr_t m_thread_id_0 = 0; - - result proxy_task() { - auto result = result_gen::ready(); - - m_thread_id_0 = thread::get_current_virtual_id(); - - co_return co_await result; - } - - public: - result operator()() { - auto done_result = co_await proxy_task().resolve(); - - const auto thread_id_1 = thread::get_current_virtual_id(); - - assert_equal(m_thread_id_0, thread_id_1); - test_ready_result(std::move(done_result)); - } - }; - - template - struct test_await_ready_result { - - private: - uintptr_t m_thread_id_0 = 0; - - result proxy_task(const size_t id) { - auto result = make_exceptional_result(custom_exception(id)); - - m_thread_id_0 = thread::get_current_virtual_id(); - - co_return co_await result; - } - - public: - result operator()() { - const auto id = 1234567; - auto done_result = co_await proxy_task(id).resolve(); - - const auto thread_id_1 = thread::get_current_virtual_id(); - ; - assert_equal(m_thread_id_0, thread_id_1); - test_ready_result_custom_exception(std::move(done_result), id); - } - }; - - template - struct test_await_not_ready_result { - result operator()(std::shared_ptr executor); - }; - - template - struct test_await_not_ready_result { - - private: - uintptr_t m_setting_thread_id = 0; - uintptr_t m_resuming_thread_id = 0; - - result proxy_task(std::shared_ptr manual_executor) { - auto result = manual_executor->submit([]() -> decltype(auto) { - return value_gen::default_value(); - }); - - co_return co_await result; - } - - result inner_task(std::shared_ptr manual_executor) { - auto done_result = co_await proxy_task(manual_executor).resolve(); - - m_resuming_thread_id = thread::get_current_virtual_id(); - - test_ready_result(std::move(done_result)); - } - - public: - result operator()(std::shared_ptr manual_executor, std::shared_ptr thread_executor) { - assert_true(manual_executor->empty()); - - auto result = inner_task(manual_executor); - - co_await thread_executor->submit([this, manual_executor] { - m_setting_thread_id = concurrencpp::details::thread::get_current_virtual_id(); - assert_true(manual_executor->loop_once()); - }); - - co_await result; - - assert_equal(m_setting_thread_id, m_resuming_thread_id); - } - }; - - template - struct test_await_not_ready_result { - - private: - uintptr_t m_setting_thread_id = 0; - uintptr_t m_resuming_thread_id = 0; - - result proxy_task(std::shared_ptr manual_executor, const size_t id) { - auto result = manual_executor->submit([id]() -> decltype(auto) { - throw custom_exception(id); - return value_gen::default_value(); - }); - - co_return co_await result; - } - - result inner_task(std::shared_ptr manual_executor) { - const auto id = 1234567; - auto done_result = co_await proxy_task(manual_executor, id).resolve(); - - m_resuming_thread_id = concurrencpp::details::thread::get_current_virtual_id(); - - test_ready_result_custom_exception(std::move(done_result), id); - } - - public: - result operator()(std::shared_ptr manual_executor, std::shared_ptr thread_executor) { - assert_true(manual_executor->empty()); - - auto result = inner_task(manual_executor); - - co_await thread_executor->submit([this, manual_executor] { - m_setting_thread_id = concurrencpp::details::thread::get_current_virtual_id(); - assert_true(manual_executor->loop_once()); - }); - - co_await result; - - assert_equal(m_setting_thread_id, m_resuming_thread_id); - } - }; - -} // namespace concurrencpp::tests - -template -void concurrencpp::tests::test_result_await_impl() { - // empty result throws - { - assert_throws_with_error_message( - [] { - result().operator co_await(); - }, - concurrencpp::details::consts::k_result_operator_co_await_error_msg); - } - - auto thread_executor = std::make_shared(); - auto manual_executor = std::make_shared(); - executor_shutdowner es0(thread_executor), es1(manual_executor); - - test_await_ready_result()().get(); - test_await_ready_result()().get(); - test_await_not_ready_result()(manual_executor, thread_executor).get(); - test_await_not_ready_result()(manual_executor, thread_executor).get(); -} - -void concurrencpp::tests::test_result_await() { - test_result_await_impl(); - test_result_await_impl(); - test_result_await_impl(); - test_result_await_impl(); - test_result_await_impl(); -} - -using namespace concurrencpp::tests; - -int main() { - tester tester("result::await"); - - tester.add_step("await", test_result_await); - - tester.launch_test(); - return 0; -} \ No newline at end of file diff --git a/test/source/tests/result_tests/result_resolve_await_tests.cpp b/test/source/tests/result_tests/result_resolve_await_tests.cpp new file mode 100644 index 00000000..7669e83c --- /dev/null +++ b/test/source/tests/result_tests/result_resolve_await_tests.cpp @@ -0,0 +1,263 @@ +#include "concurrencpp/concurrencpp.h" + +#include "infra/tester.h" +#include "infra/assertions.h" +#include "utils/object_observer.h" +#include "utils/test_generators.h" +#include "utils/test_ready_result.h" +#include "utils/executor_shutdowner.h" + +namespace concurrencpp::tests { + template + result test_result_resolve_impl_result_ready_value(); + + template + result test_result_resolve_impl_result_ready_exception(); + + template + result test_result_resolve_impl_result_not_ready_value(std::shared_ptr thread_executor); + + template + result test_result_resolve_impl_result_not_ready_exception(std::shared_ptr thread_executor); + + template + void test_result_resolve_impl(); + void test_result_resolve(); + + template + result test_result_await_impl_result_ready_value(); + + template + result test_result_await_impl_result_ready_exception(); + + template + result test_result_await_impl_result_not_ready_value(std::shared_ptr thread_executor); + + template + result test_result_await_impl_result_not_ready_exception(std::shared_ptr thread_executor); + + template + void test_result_await_impl(); + void test_result_await(); + + template + result wrap_co_await(result result) { + co_return co_await result; + } +} // namespace concurrencpp::tests + +using concurrencpp::result; +using concurrencpp::details::thread; + +/* + Test result statuses [ready. not ready] vs. result outcome [value, exception] +*/ + +template +result concurrencpp::tests::test_result_resolve_impl_result_ready_value() { + auto result = result_gen::ready(); + + const auto thread_id_0 = thread::get_current_virtual_id(); + + auto done_result = co_await result.resolve(); + + const auto thread_id_1 = thread::get_current_virtual_id(); + + assert_false(static_cast(result)); + assert_equal(thread_id_0, thread_id_1); + test_ready_result(std::move(done_result)); +} + +template +result concurrencpp::tests::test_result_resolve_impl_result_ready_exception() { + const auto id = 1234567; + auto result = make_exceptional_result(custom_exception(id)); + + const auto thread_id_0 = thread::get_current_virtual_id(); + + auto done_result = co_await result.resolve(); + + const auto thread_id_1 = thread::get_current_virtual_id(); + + assert_false(static_cast(result)); + assert_equal(thread_id_0, thread_id_1); + test_ready_result_custom_exception(std::move(done_result), id); +} + +template +result concurrencpp::tests::test_result_resolve_impl_result_not_ready_value(std::shared_ptr thread_executor) { + std::atomic_uintptr_t setting_thread_id = 0; + + auto result = thread_executor->submit([&setting_thread_id]() mutable -> type { + setting_thread_id = thread::get_current_virtual_id(); + + std::this_thread::sleep_for(std::chrono::milliseconds(350)); + + return value_gen::default_value(); + }); + + auto done_result = co_await result.resolve(); + test_ready_result(std::move(done_result)); + assert_equal(thread::get_current_virtual_id(), setting_thread_id.load()); +} + +template +result concurrencpp::tests::test_result_resolve_impl_result_not_ready_exception( + std::shared_ptr thread_executor) { + std::atomic_uintptr_t setting_thread_id = 0; + + auto result = thread_executor->submit([&setting_thread_id]() mutable -> type { + setting_thread_id = thread::get_current_virtual_id(); + + std::this_thread::sleep_for(std::chrono::milliseconds(350)); + + return value_gen::throw_ex(); + }); + + auto done_result = co_await result.resolve(); + + assert_equal(done_result.status(), result_status::exception); + assert_throws([&done_result] { + done_result.get(); + }); + + assert_equal(thread::get_current_virtual_id(), setting_thread_id.load()); +} + +template +void concurrencpp::tests::test_result_resolve_impl() { + // empty result throws + { + assert_throws_with_error_message( + [] { + result().resolve(); + }, + concurrencpp::details::consts::k_result_resolve_error_msg); + } + + auto thread_executor = std::make_shared(); + executor_shutdowner es(thread_executor); + + test_result_resolve_impl_result_ready_value().get(); + test_result_resolve_impl_result_ready_exception().get(); + test_result_resolve_impl_result_not_ready_value(thread_executor).get(); + test_result_resolve_impl_result_not_ready_exception(thread_executor).get(); +} + +void concurrencpp::tests::test_result_resolve() { + test_result_resolve_impl(); + test_result_resolve_impl(); + test_result_resolve_impl(); + test_result_resolve_impl(); + test_result_resolve_impl(); +} + +template +result concurrencpp::tests::test_result_await_impl_result_ready_value() { + auto result = result_gen::ready(); + + const auto thread_id_0 = thread::get_current_virtual_id(); + + auto done_result = co_await wrap_co_await(std::move(result)).resolve(); + + const auto thread_id_1 = thread::get_current_virtual_id(); + + assert_false(static_cast(result)); + assert_equal(thread_id_0, thread_id_1); + test_ready_result(std::move(done_result)); +} + +template +result concurrencpp::tests::test_result_await_impl_result_ready_exception() { + const auto id = 1234567; + auto result = make_exceptional_result(custom_exception(id)); + + const auto thread_id_0 = thread::get_current_virtual_id(); + + auto done_result = co_await wrap_co_await(std::move(result)).resolve(); + + const auto thread_id_1 = thread::get_current_virtual_id(); + + assert_false(static_cast(result)); + assert_equal(thread_id_0, thread_id_1); + test_ready_result_custom_exception(std::move(done_result), id); +} + +template +result concurrencpp::tests::test_result_await_impl_result_not_ready_value(std::shared_ptr thread_executor) { + std::atomic_uintptr_t setting_thread_id = 0; + + auto result = thread_executor->submit([&setting_thread_id]() mutable -> type { + setting_thread_id = thread::get_current_virtual_id(); + + std::this_thread::sleep_for(std::chrono::milliseconds(350)); + + return value_gen::default_value(); + }); + + auto done_result = co_await wrap_co_await(std::move(result)).resolve(); + test_ready_result(std::move(done_result)); + assert_equal(thread::get_current_virtual_id(), setting_thread_id.load()); +} + +template +result concurrencpp::tests::test_result_await_impl_result_not_ready_exception(std::shared_ptr thread_executor) { + std::atomic_uintptr_t setting_thread_id = 0; + + auto result = thread_executor->submit([&setting_thread_id]() mutable -> type { + setting_thread_id = thread::get_current_virtual_id(); + + std::this_thread::sleep_for(std::chrono::milliseconds(350)); + + return value_gen::throw_ex(); + }); + + auto done_result = co_await wrap_co_await(std::move(result)).resolve(); + + assert_equal(done_result.status(), result_status::exception); + assert_throws([&done_result] { + done_result.get(); + }); + + assert_equal(thread::get_current_virtual_id(), setting_thread_id.load()); +} + +template +void concurrencpp::tests::test_result_await_impl() { + // empty result throws + { + assert_throws_with_error_message( + [] { + result().operator co_await(); + }, + concurrencpp::details::consts::k_result_operator_co_await_error_msg); + } + + auto thread_executor = std::make_shared(); + executor_shutdowner es(thread_executor); + + test_result_await_impl_result_ready_value().get(); + test_result_await_impl_result_ready_exception().get(); + test_result_await_impl_result_not_ready_value(thread_executor).get(); + test_result_await_impl_result_not_ready_exception(thread_executor).get(); +} + +void concurrencpp::tests::test_result_await() { + test_result_await_impl(); + test_result_await_impl(); + test_result_await_impl(); + test_result_await_impl(); + test_result_await_impl(); +} + +using namespace concurrencpp::tests; + +int main() { + tester tester("result::resolve + result::await"); + + tester.add_step("resolve", test_result_resolve); + tester.add_step("co_await", test_result_await); + + tester.launch_test(); + return 0; +} \ No newline at end of file diff --git a/test/source/tests/result_tests/result_resolve_tests.cpp b/test/source/tests/result_tests/result_resolve_tests.cpp deleted file mode 100644 index 71c5ca31..00000000 --- a/test/source/tests/result_tests/result_resolve_tests.cpp +++ /dev/null @@ -1,193 +0,0 @@ -#include "concurrencpp/concurrencpp.h" - -#include "infra/tester.h" -#include "infra/assertions.h" -#include "utils/object_observer.h" -#include "utils/test_generators.h" -#include "utils/test_ready_result.h" -#include "utils/throwing_executor.h" -#include "utils/executor_shutdowner.h" - -namespace concurrencpp::tests { - template - void test_result_resolve_impl(); - void test_result_resolve(); -} // namespace concurrencpp::tests - -using concurrencpp::result; -using concurrencpp::details::thread; - -/* - * In this test suit, we need to check all the possible scenarios that result::resolve can have. - * Our tests are split into 2 branches: when the result is already ready at the moment of resolving and - * when it's not. - * - * If the result is already ready, the test matrix looks like this: - * status[value, exception] - * Overall 2 scenarios - * - * If the result is not ready, the test matrix looks like this: - * status[value, exception] - * Overall 2 scenarios - */ - -namespace concurrencpp::tests { - template - struct test_await_ready_result { - result operator()(); - }; - - template - struct test_await_ready_result { - result operator()() { - auto result = result_gen::ready(); - - const auto thread_id_0 = thread::get_current_virtual_id(); - - auto done_result = co_await result.resolve(); - - const auto thread_id_1 = thread::get_current_virtual_id(); - - assert_false(static_cast(result)); - assert_equal(thread_id_0, thread_id_1); - test_ready_result(std::move(done_result)); - } - }; - - template - struct test_await_ready_result { - result operator()() { - const auto id = 1234567; - auto result = make_exceptional_result(custom_exception(id)); - - const auto thread_id_0 = concurrencpp::details::thread::get_current_virtual_id(); - - auto done_result = co_await result.resolve(); - - const auto thread_id_1 = concurrencpp::details::thread::get_current_virtual_id(); - - assert_false(static_cast(result)); - assert_equal(thread_id_0, thread_id_1); - test_ready_result_custom_exception(std::move(done_result), id); - } - }; - - template - struct test_await_not_ready_result { - result operator()(std::shared_ptr executor); - }; - - template - struct test_await_not_ready_result { - - private: - uintptr_t m_setting_thread_id = 0; - uintptr_t m_resuming_thread_id = 0; - - result inner_task(std::shared_ptr manual_executor) { - auto result = manual_executor->submit([]() -> decltype(auto) { - return value_gen::default_value(); - }); - - auto done_result = co_await result.resolve(); - - m_resuming_thread_id = thread::get_current_virtual_id(); - - test_ready_result(std::move(done_result)); - } - - public: - result operator()(std::shared_ptr manual_executor, std::shared_ptr thread_executor) { - assert_true(manual_executor->empty()); - - auto result = inner_task(manual_executor); - - co_await thread_executor->submit([this, manual_executor] { - m_setting_thread_id = thread::get_current_virtual_id(); - assert_true(manual_executor->loop_once()); - }); - - co_await result; - - assert_equal(m_setting_thread_id, m_resuming_thread_id); - } - }; - - template - struct test_await_not_ready_result { - - private: - uintptr_t m_setting_thread_id = 0; - uintptr_t m_resuming_thread_id = 0; - - result inner_task(std::shared_ptr manual_executor) { - const auto id = 1234567; - auto result = manual_executor->submit([id]() -> decltype(auto) { - throw custom_exception(id); - return value_gen::default_value(); - }); - - auto done_result = co_await result.resolve(); - - m_resuming_thread_id = thread::get_current_virtual_id(); - - test_ready_result_custom_exception(std::move(done_result), id); - } - - public: - result operator()(std::shared_ptr manual_executor, std::shared_ptr thread_executor) { - assert_true(manual_executor->empty()); - - auto result = inner_task(manual_executor); - - co_await thread_executor->submit([this, manual_executor] { - m_setting_thread_id = concurrencpp::details::thread::get_current_virtual_id(); - assert_true(manual_executor->loop_once()); - }); - - co_await result; - - assert_equal(m_setting_thread_id, m_resuming_thread_id); - } - }; - -} // namespace concurrencpp::tests - -template -void concurrencpp::tests::test_result_resolve_impl() { - // empty result throws - { - assert_throws_with_error_message( - [] { - result().resolve(); - }, - concurrencpp::details::consts::k_result_resolve_error_msg); - } - - auto thread_executor = std::make_shared(); - auto manual_executor = std::make_shared(); - executor_shutdowner es0(thread_executor), es1(manual_executor); - - test_await_ready_result()().get(); - test_await_ready_result()().get(); - test_await_not_ready_result()(manual_executor, thread_executor).get(); - test_await_not_ready_result()(manual_executor, thread_executor).get(); -} - -void concurrencpp::tests::test_result_resolve() { - test_result_resolve_impl(); - test_result_resolve_impl(); - test_result_resolve_impl(); - test_result_resolve_impl(); - test_result_resolve_impl(); -} -using namespace concurrencpp::tests; - -int main() { - tester tester("result::resolve"); - - tester.add_step("resolve", test_result_resolve); - - tester.launch_test(); - return 0; -} \ No newline at end of file diff --git a/test/source/tests/runtime_tests.cpp b/test/source/tests/runtime_tests.cpp index be1c8dc1..745e79ad 100644 --- a/test/source/tests/runtime_tests.cpp +++ b/test/source/tests/runtime_tests.cpp @@ -40,6 +40,19 @@ void concurrencpp::tests::test_runtime_constructor() { opts.max_background_threads = 7; opts.max_background_executor_waiting_time = std::chrono::milliseconds(54321); + std::atomic_size_t thread_started_callback_invocations_num = 0; + std::atomic_size_t thread_terminated_callback_invocations_num = 0; + + opts.thread_started_callback = [&thread_started_callback_invocations_num](std::string_view thread_name) { + assert_false(thread_name.empty()); + ++thread_started_callback_invocations_num; + }; + + opts.thread_terminated_callback = [&thread_terminated_callback_invocations_num](std::string_view thread_name) { + assert_false(thread_name.empty()); + ++thread_terminated_callback_invocations_num; + }; + concurrencpp::runtime runtime(opts); auto dummy_ex = runtime.make_executor("dummy_executor", 1, 4.4f); assert_true(static_cast(runtime.inline_executor())); @@ -58,6 +71,27 @@ void concurrencpp::tests::test_runtime_constructor() { assert_equal(runtime.thread_pool_executor()->max_worker_idle_time(), opts.max_thread_pool_executor_waiting_time); assert_equal(runtime.background_executor()->max_concurrency_level(), opts.max_background_threads); assert_equal(runtime.background_executor()->max_worker_idle_time(), opts.max_background_executor_waiting_time); + + auto test_runtime_executor = [&thread_started_callback_invocations_num, + &thread_terminated_callback_invocations_num](std::shared_ptr executor) { + thread_started_callback_invocations_num = 0; + thread_terminated_callback_invocations_num = 0; + + executor + ->submit([&thread_started_callback_invocations_num, &thread_terminated_callback_invocations_num]() { + assert_equal(thread_started_callback_invocations_num, 1); + assert_equal(thread_terminated_callback_invocations_num, 0); + }) + .get(); + + executor->shutdown(); + assert_equal(thread_started_callback_invocations_num, 1); + assert_equal(thread_terminated_callback_invocations_num, 1); + }; + + test_runtime_executor(runtime.thread_executor()); + test_runtime_executor(runtime.thread_pool_executor()); + test_runtime_executor(runtime.background_executor()); } void concurrencpp::tests::test_runtime_destructor() { diff --git a/test/source/tests/timer_tests/timer_queue_tests.cpp b/test/source/tests/timer_tests/timer_queue_tests.cpp index 75bfd79b..9a4768a5 100644 --- a/test/source/tests/timer_tests/timer_queue_tests.cpp +++ b/test/source/tests/timer_tests/timer_queue_tests.cpp @@ -15,6 +15,7 @@ namespace concurrencpp::tests { void test_timer_queue_make_delay_object(); void test_timer_queue_max_worker_idle_time(); void test_timer_queue_thread_injection(); + void test_timer_queue_thread_callbacks(); } // namespace concurrencpp::tests void concurrencpp::tests::test_timer_queue_make_timer() { @@ -107,6 +108,40 @@ void concurrencpp::tests::test_timer_queue_thread_injection() { } } +void concurrencpp::tests::test_timer_queue_thread_callbacks() { + std::atomic_size_t thread_started_callback_invocations_num = 0; + std::atomic_size_t thread_terminated_callback_invocations_num = 0; + + auto thread_started_callback = [&thread_started_callback_invocations_num](std::string_view thread_name) { + ++thread_started_callback_invocations_num; + assert_equal(thread_name, concurrencpp::details::make_executor_worker_name(concurrencpp::details::consts::k_timer_queue_name)); + }; + + auto thread_terminated_callback = [&thread_terminated_callback_invocations_num](std::string_view thread_name) { + ++thread_terminated_callback_invocations_num; + assert_equal(thread_name, concurrencpp::details::make_executor_worker_name(concurrencpp::details::consts::k_timer_queue_name)); + }; + + auto timer_queue = std::make_shared(50ms, thread_started_callback, thread_terminated_callback); + assert_equal(thread_started_callback_invocations_num, 0); + assert_equal(thread_terminated_callback_invocations_num, 0); + + auto inline_executor = std::make_shared(); + executor_shutdowner es(inline_executor); + + auto timer = + timer_queue->make_one_shot_timer(50ms, + inline_executor, + [&thread_started_callback_invocations_num, &thread_terminated_callback_invocations_num]() { + assert_equal(thread_started_callback_invocations_num, 1); + assert_equal(thread_terminated_callback_invocations_num, 0); + }); + + timer_queue->shutdown(); + assert_equal(thread_started_callback_invocations_num, 1); + assert_equal(thread_terminated_callback_invocations_num, 1); +} + using namespace concurrencpp::tests; int main() { @@ -116,7 +151,8 @@ int main() { test.add_step("make_oneshot_timer", test_timer_queue_make_timer); test.add_step("make_delay_object", test_timer_queue_make_delay_object); test.add_step("max_worker_idle_time", test_timer_queue_max_worker_idle_time); - test.add_step("thread injection", test_timer_queue_thread_injection); + test.add_step("thread_injection", test_timer_queue_thread_injection); + test.add_step("thread_callbacks", test_timer_queue_thread_callbacks); test.launch_test(); return 0;