From 345fffa2f109e82f364ec621afc33868296422c3 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 8 Aug 2024 17:00:08 +0300 Subject: [PATCH 01/10] add wait queue --- builtin-functions/kphp-light/functions.txt | 11 +++ .../memory-resource/resource_allocator.h | 2 +- runtime-light/coroutine/awaitable.h | 39 ++++++++ runtime-light/stdlib/fork/fork-api.cpp | 33 +++++++ runtime-light/stdlib/fork/fork-api.h | 9 +- .../stdlib/fork/wait-queue-context.cpp | 29 ++++++ .../stdlib/fork/wait-queue-context.h | 37 +++++++ runtime-light/stdlib/fork/wait-queue.cpp | 18 ++++ runtime-light/stdlib/fork/wait-queue.h | 96 +++++++++++++++++++ runtime-light/stdlib/interface.cpp | 3 +- runtime-light/stdlib/stdlib.cmake | 2 + tests/phpt/fork/001_basic.php | 8 +- 12 files changed, 280 insertions(+), 7 deletions(-) create mode 100644 runtime-light/stdlib/fork/wait-queue-context.cpp create mode 100644 runtime-light/stdlib/fork/wait-queue-context.h create mode 100644 runtime-light/stdlib/fork/wait-queue.cpp create mode 100644 runtime-light/stdlib/fork/wait-queue.h diff --git a/builtin-functions/kphp-light/functions.txt b/builtin-functions/kphp-light/functions.txt index e2e776dce8..300247bd52 100644 --- a/builtin-functions/kphp-light/functions.txt +++ b/builtin-functions/kphp-light/functions.txt @@ -76,6 +76,8 @@ function get_hash_of_class (object $klass) ::: int; function strlen ($str ::: string) ::: int; +function rand() ::: int; + // === Fork ======================================================================================= /** @kphp-extern-func-info interruptible cpp_template_call */ @@ -87,6 +89,15 @@ function sched_yield() ::: void; /** @kphp-extern-func-info interruptible */ function sched_yield_sleep($timeout_ns ::: int) ::: void; +function wait_queue_create (array | false> $request_ids = []) ::: future_queue<^1[*][*]>; + +function wait_queue_push (future_queue &$queue_id, future | false $request_ids) ::: void; + +function wait_queue_empty (future_queue $queue_id) ::: bool; + +/** @kphp-extern-func-info interruptible */ +function wait_queue_next (future_queue $queue_id, $timeout ::: float = -1.0) ::: future<^1[*]> | false; + // === Rpc ======================================================================================== /** @kphp-tl-class */ diff --git a/runtime-core/memory-resource/resource_allocator.h b/runtime-core/memory-resource/resource_allocator.h index eca8f3b1a1..ce4b341960 100644 --- a/runtime-core/memory-resource/resource_allocator.h +++ b/runtime-core/memory-resource/resource_allocator.h @@ -46,7 +46,7 @@ class resource_allocator { } static constexpr size_t max_value_type_size() { - return 128U; + return 256U; } friend inline bool operator==(const resource_allocator &lhs, const resource_allocator &rhs) noexcept { diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index c27ab80eb3..84c75f5936 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -21,6 +21,7 @@ #include "runtime-light/stdlib/fork/fork-context.h" #include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/utils/context.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" template concept Awaitable = requires(T awaitable, std::coroutine_handle<> coro) { @@ -434,5 +435,43 @@ class wait_with_timeout_t { } }; +// ================================================================================================ + +class wait_queue_next_t { + int64_t queue_id; + wait_for_timer_t timer_awaiter; + +public: + explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept + : queue_id(queue_id_) + , timer_awaiter(timeout_) {} + + bool await_ready() const noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { + return queue.val()->empty() || queue.val()->has_ready_fork(); + } + return true; + } + + void await_suspend(std::coroutine_handle<> coro) noexcept { + auto queue = WaitQueueContext::get().get_queue(queue_id); + queue.val()->push_coro_handle(coro); + timer_awaiter.await_suspend(coro); + } + + Optional await_resume() noexcept { + auto queue = WaitQueueContext::get().get_queue(queue_id); + queue.val()->pop_coro_handle(); + auto ready_fork = queue.val()->pop_fork(); + if (ready_fork.has_value()) { + timer_awaiter.cancel(); + // todo set here info about prev fork_id + return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second)); + } else { + return {}; + } + } +}; + template wait_with_timeout_t(T &&, std::chrono::nanoseconds) -> wait_with_timeout_t; diff --git a/runtime-light/stdlib/fork/fork-api.cpp b/runtime-light/stdlib/fork/fork-api.cpp index b8e68746ce..8bb49e744a 100644 --- a/runtime-light/stdlib/fork/fork-api.cpp +++ b/runtime-light/stdlib/fork/fork-api.cpp @@ -10,6 +10,7 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/awaitable.h" #include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" task_t f$sched_yield() noexcept { co_await wait_for_reschedule_t{}; @@ -22,3 +23,35 @@ task_t f$sched_yield_sleep(int64_t duration_ns) noexcept { } co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast(duration_ns)}}; } + +int64_t f$wait_queue_create(array> fork_ids) noexcept { + return WaitQueueContext::get().create_queue(fork_ids); +} + +void f$wait_queue_push(int64_t queue_id, Optional fork_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) { + auto task = ForkComponentContext::get().pop_fork(fork_id.val()); + if (task.has_value()) { + php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id); + queue.val()->push_fork(fork_id.val(), std::move(task.val())); + } + } +} + +bool f$wait_queue_empty(int64_t queue_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { + return queue.val()->empty(); + } + return false; +} + +task_t> f$wait_queue_next(int64_t queue_id, double timeout) noexcept { + if (WaitQueueContext::get().get_queue(queue_id).has_value()) { + if (timeout < 0.0) { + timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT; + } + const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; + co_return co_await wait_queue_next_t{queue_id, timeout_ns}; + } + co_return Optional{}; +} diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index 99b3fa2167..a5a58c2171 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -11,7 +11,6 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/awaitable.h" #include "runtime-light/coroutine/task.h" -#include "runtime-light/stdlib/fork/fork-context.h" namespace fork_api_impl_ { @@ -45,3 +44,11 @@ requires(is_optional::value) task_t f$wait(Optional fork_id_opt, task_t f$sched_yield() noexcept; task_t f$sched_yield_sleep(int64_t duration_ns) noexcept; + +int64_t f$wait_queue_create(array> fork_ids) noexcept; + +void f$wait_queue_push(int64_t queue, Optional fork_id) noexcept; + +bool f$wait_queue_empty(int64_t queue_id) noexcept; + +task_t> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept; diff --git a/runtime-light/stdlib/fork/wait-queue-context.cpp b/runtime-light/stdlib/fork/wait-queue-context.cpp new file mode 100644 index 0000000000..864764b8ba --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue-context.cpp @@ -0,0 +1,29 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/fork/wait-queue-context.h" + +#include "runtime-light/component/component.h" +#include "runtime-light/stdlib/fork/fork-context.h" + +WaitQueueContext &WaitQueueContext::get() noexcept { + return ForkComponentContext::get().wait_queue_context; +} + +int64_t WaitQueueContext::create_queue(const array> &fork_ids) noexcept { + auto &memory_resource{get_component_context()->runtime_allocator.memory_resource}; + unordered_map> forks(unordered_map>::allocator_type{memory_resource}); + std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) { + Optional fork_id = it.get_value(); + if (fork_id.has_value()) { + if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) { + forks[fork_id.val()] = std::move(task.val()); + } + } + }); + int64_t queue_id{++next_wait_queue_id}; + wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks))); + php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size); + return queue_id; +} diff --git a/runtime-light/stdlib/fork/wait-queue-context.h b/runtime-light/stdlib/fork/wait-queue-context.h new file mode 100644 index 0000000000..89c98dd708 --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue-context.h @@ -0,0 +1,37 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include + +#include "runtime-core/core-types/decl/optional.h" +#include "runtime-core/memory-resource/resource_allocator.h" +#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-light/stdlib/fork/wait-queue.h" +#include "runtime-light/utils/concepts.h" + +class WaitQueueContext { + template + using unordered_map = memory_resource::stl::unordered_map; + + unordered_map wait_queues; + static constexpr auto WAIT_QUEUE_INIT_ID = 0; + int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID}; + +public: + explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept + : wait_queues(unordered_map::allocator_type{memory_resource}) {} + + static WaitQueueContext &get() noexcept; + + int64_t create_queue(const array> &fork_ids) noexcept; + + Optional get_queue(int64_t queue_id) noexcept { + if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) { + return &it->second; + } + return {}; + } +}; diff --git a/runtime-light/stdlib/fork/wait-queue.cpp b/runtime-light/stdlib/fork/wait-queue.cpp new file mode 100644 index 0000000000..0fb8b6e850 --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue.cpp @@ -0,0 +1,18 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#include "runtime-light/stdlib/fork/wait-queue.h" + +#include "runtime-light/component/component.h" + +void WaitQueue::resume_awaited_handles_if_empty() { + if (forks.empty()) { + while (!awaited_handles.empty()) { + auto handle = awaited_handles.front(); + awaited_handles.pop_front(); + CoroutineScheduler::get().suspend({handle, WaitEvent::Rechedule{}}); + } + } +} + diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h new file mode 100644 index 0000000000..8116a5f633 --- /dev/null +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -0,0 +1,96 @@ +// Compiler for PHP (aka KPHP) +// Copyright (c) 2024 LLC «V Kontakte» +// Distributed under the GPL v3 License, see LICENSE.notice.txt + +#pragma once + +#include +#include +#include + +#include "runtime-core/memory-resource/resource_allocator.h" +#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" +#include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/utils/concepts.h" + +class WaitQueue { + template + using unordered_map = memory_resource::stl::unordered_map; + + template + using deque = memory_resource::stl::deque; + + unordered_map> forks; + unordered_map::awaiter_t> fork_awaiters; + deque> awaited_handles; + + void resume_awaited_handles_if_empty(); + +public: + WaitQueue(const WaitQueue &) = delete; + WaitQueue &operator=(const WaitQueue &) = delete; + WaitQueue &operator=(WaitQueue &&) = delete; + + WaitQueue(WaitQueue &&other) noexcept + : forks(std::move(other.forks)) + , fork_awaiters(std::move(other.fork_awaiters)) + , awaited_handles(std::move(other.awaited_handles)) {} + + explicit WaitQueue(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_map> &&forks_) noexcept + : forks(std::move(forks_)) + , fork_awaiters(unordered_map::awaiter_t>::allocator_type{memory_resource}) + , awaited_handles(deque>::allocator_type{memory_resource}) { + for (auto &fork : forks) { + fork_awaiters.emplace(fork.first, std::addressof(fork.second)); + } + } + + void push_fork(int64_t fork_id, task_t &&fork) noexcept { + auto [fork_it, fork_success] = forks.emplace(fork_id, std::move(fork)); + auto [awaiter_id, awaiter_success] = fork_awaiters.emplace(fork_id, std::addressof(fork_it->second)); + if (!awaited_handles.empty()) { + awaiter_id->second.await_suspend(awaited_handles.front()); + } + } + + Optional>> pop_fork() noexcept { + auto it = std::find_if(forks.begin(), forks.end(), [](std::pair> &fork) { return fork.second.done(); }); + if (it != forks.end()) { + auto fork = std::move(*it); + forks.erase(fork.first); + fork_awaiters.erase(fork.first); + resume_awaited_handles_if_empty(); + return fork; + } else { + return {}; + } + } + + void push_coro_handle(std::coroutine_handle<> coro) noexcept { + if (awaited_handles.empty()) { + std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [coro](auto &awaiter) { awaiter.second.await_suspend(coro); }); + } + awaited_handles.push_back(coro); + } + + void pop_coro_handle() noexcept { + if (!awaited_handles.empty()) { + awaited_handles.pop_front(); + } + std::coroutine_handle<> next_awaiter = awaited_handles.empty() ? std::noop_coroutine() : awaited_handles.front(); + std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [&](auto &awaiter) { awaiter.second.await_suspend(next_awaiter); }); + } + + bool has_ready_fork() const noexcept { + return std::any_of(forks.begin(), forks.end(), [](const auto &fork) { return fork.second.done(); }); + } + + size_t size() const noexcept { + return forks.size(); + } + + bool empty() const noexcept { + return forks.empty(); + } +}; diff --git a/runtime-light/stdlib/interface.cpp b/runtime-light/stdlib/interface.cpp index f99c7ad954..c1e07d1c38 100644 --- a/runtime-light/stdlib/interface.cpp +++ b/runtime-light/stdlib/interface.cpp @@ -14,6 +14,7 @@ int64_t f$rand() { std::random_device rd; - int64_t dice_roll = rd(); + std::uniform_int_distribution<> dis(0, rd.max()); + int64_t dice_roll = dis(rd); return dice_roll; } diff --git a/runtime-light/stdlib/stdlib.cmake b/runtime-light/stdlib/stdlib.cmake index d7956932a3..8a19981539 100644 --- a/runtime-light/stdlib/stdlib.cmake +++ b/runtime-light/stdlib/stdlib.cmake @@ -9,6 +9,8 @@ prepend( superglobals.cpp fork/fork-api.cpp fork/fork-context.cpp + fork/wait-queue-context.cpp + fork/wait-queue.cpp rpc/rpc-api.cpp rpc/rpc-context.cpp rpc/rpc-extra-headers.cpp diff --git a/tests/phpt/fork/001_basic.php b/tests/phpt/fork/001_basic.php index 65e245dc70..546c67bae7 100644 --- a/tests/phpt/fork/001_basic.php +++ b/tests/phpt/fork/001_basic.php @@ -1,4 +1,4 @@ -@ok k2_skip +@ok Date: Wed, 14 Aug 2024 16:48:38 +0300 Subject: [PATCH 02/10] version 2 --- runtime-light/coroutine/awaitable.h | 39 ------ runtime-light/stdlib/fork/fork-api.cpp | 37 ++---- runtime-light/stdlib/fork/fork-api.h | 17 ++- runtime-light/stdlib/fork/fork-context.h | 40 +++++- .../stdlib/fork/wait-queue-context.cpp | 11 +- .../stdlib/fork/wait-queue-context.h | 13 +- runtime-light/stdlib/fork/wait-queue.cpp | 60 +++++++-- runtime-light/stdlib/fork/wait-queue.h | 116 ++++++++---------- 8 files changed, 173 insertions(+), 160 deletions(-) diff --git a/runtime-light/coroutine/awaitable.h b/runtime-light/coroutine/awaitable.h index 84c75f5936..c27ab80eb3 100644 --- a/runtime-light/coroutine/awaitable.h +++ b/runtime-light/coroutine/awaitable.h @@ -21,7 +21,6 @@ #include "runtime-light/stdlib/fork/fork-context.h" #include "runtime-light/stdlib/fork/fork.h" #include "runtime-light/utils/context.h" -#include "runtime-light/stdlib/fork/wait-queue-context.h" template concept Awaitable = requires(T awaitable, std::coroutine_handle<> coro) { @@ -435,43 +434,5 @@ class wait_with_timeout_t { } }; -// ================================================================================================ - -class wait_queue_next_t { - int64_t queue_id; - wait_for_timer_t timer_awaiter; - -public: - explicit wait_queue_next_t(int64_t queue_id_, std::chrono::nanoseconds timeout_) noexcept - : queue_id(queue_id_) - , timer_awaiter(timeout_) {} - - bool await_ready() const noexcept { - if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { - return queue.val()->empty() || queue.val()->has_ready_fork(); - } - return true; - } - - void await_suspend(std::coroutine_handle<> coro) noexcept { - auto queue = WaitQueueContext::get().get_queue(queue_id); - queue.val()->push_coro_handle(coro); - timer_awaiter.await_suspend(coro); - } - - Optional await_resume() noexcept { - auto queue = WaitQueueContext::get().get_queue(queue_id); - queue.val()->pop_coro_handle(); - auto ready_fork = queue.val()->pop_fork(); - if (ready_fork.has_value()) { - timer_awaiter.cancel(); - // todo set here info about prev fork_id - return ForkComponentContext::get().push_fork(std::move(ready_fork.val().second)); - } else { - return {}; - } - } -}; - template wait_with_timeout_t(T &&, std::chrono::nanoseconds) -> wait_with_timeout_t; diff --git a/runtime-light/stdlib/fork/fork-api.cpp b/runtime-light/stdlib/fork/fork-api.cpp index 8bb49e744a..55160350c8 100644 --- a/runtime-light/stdlib/fork/fork-api.cpp +++ b/runtime-light/stdlib/fork/fork-api.cpp @@ -24,34 +24,15 @@ task_t f$sched_yield_sleep(int64_t duration_ns) noexcept { co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast(duration_ns)}}; } -int64_t f$wait_queue_create(array> fork_ids) noexcept { - return WaitQueueContext::get().create_queue(fork_ids); -} - -void f$wait_queue_push(int64_t queue_id, Optional fork_id) noexcept { - if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) { - auto task = ForkComponentContext::get().pop_fork(fork_id.val()); - if (task.has_value()) { - php_debug("push fork %ld, to queue %ld", fork_id.val(), queue_id); - queue.val()->push_fork(fork_id.val(), std::move(task.val())); - } - } -} - -bool f$wait_queue_empty(int64_t queue_id) noexcept { - if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { - return queue.val()->empty(); - } - return false; -} - task_t> f$wait_queue_next(int64_t queue_id, double timeout) noexcept { - if (WaitQueueContext::get().get_queue(queue_id).has_value()) { - if (timeout < 0.0) { - timeout = fork_api_impl_::WAIT_FORK_MAX_TIMEOUT; - } - const auto timeout_ns{std::chrono::duration_cast(std::chrono::duration{timeout})}; - co_return co_await wait_queue_next_t{queue_id, timeout_ns}; + static_assert(CancellableAwaitable); + auto queue = WaitQueueContext::get().get_queue(queue_id); + if (!queue.has_value()) { + co_return Optional{}; } - co_return Optional{}; + const auto timeout_ns{timeout > 0 && timeout <= fork_api_impl_::MAX_TIMEOUT_S + ? std::chrono::duration_cast(std::chrono::duration{timeout}) + : fork_api_impl_::DEFAULT_TIMEOUT_NS}; + auto result_opt{co_await wait_with_timeout_t{queue.val()->pop(), timeout_ns}}; + co_return result_opt.has_value() ? result_opt.value() : Optional{}; } diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index a5a58c2171..2c3cc87dea 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -45,10 +45,21 @@ task_t f$sched_yield() noexcept; task_t f$sched_yield_sleep(int64_t duration_ns) noexcept; -int64_t f$wait_queue_create(array> fork_ids) noexcept; +inline int64_t f$wait_queue_create(array> fork_ids) noexcept { + return WaitQueueContext::get().create_queue(fork_ids); +} -void f$wait_queue_push(int64_t queue, Optional fork_id) noexcept; +inline void f$wait_queue_push(int64_t queue_id, Optional fork_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) { + queue.val()->push(fork_id.val()); + } +} -bool f$wait_queue_empty(int64_t queue_id) noexcept; +inline bool f$wait_queue_empty(int64_t queue_id) noexcept { + if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { + return queue.val()->empty(); + } + return false; +} task_t> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept; diff --git a/runtime-light/stdlib/fork/fork-context.h b/runtime-light/stdlib/fork/fork-context.h index 57c67ec1af..6249e2a1af 100644 --- a/runtime-light/stdlib/fork/fork-context.h +++ b/runtime-light/stdlib/fork/fork-context.h @@ -11,21 +11,24 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/task.h" #include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" #include "runtime-light/utils/concepts.h" constexpr int64_t INVALID_FORK_ID = -1; class ForkComponentContext { + enum class ForkStatus { available, reserved }; + template using unordered_map = memory_resource::stl::unordered_map; static constexpr auto FORK_ID_INIT = 0; - unordered_map> forks; + unordered_map, ForkStatus>> forks; int64_t next_fork_id{FORK_ID_INIT + 1}; int64_t push_fork(task_t &&task) noexcept { - return forks.emplace(next_fork_id, std::move(task)), next_fork_id++; + return forks.emplace(next_fork_id, std::make_pair(std::move(task), ForkStatus::available)), next_fork_id++; } task_t pop_fork(int64_t fork_id) noexcept { @@ -34,21 +37,48 @@ class ForkComponentContext { php_critical_error("can't find fork %" PRId64, fork_id); } auto fork{std::move(it_fork->second)}; + php_assert(fork.second == ForkStatus::available); forks.erase(it_fork); - return fork; + return std::move(fork.first); + } + + void reserve_fork(int64_t fork_id) noexcept { + if (const auto it = forks.find(fork_id); it != forks.end()) { + it->second.second = ForkStatus::reserved; + } + } + + void unreserve_fork(int64_t fork_id) noexcept { + if (const auto it = forks.find(fork_id); it != forks.end()) { + it->second.second = ForkStatus::available; + } } friend class start_fork_t; template friend class wait_fork_t; + friend class wait_queue_t; public: + WaitQueueContext wait_queue_context; + explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : forks(unordered_map>::allocator_type{memory_resource}) {} + : forks(unordered_map, ForkStatus>>::allocator_type{memory_resource}) + , wait_queue_context(memory_resource) {} static ForkComponentContext &get() noexcept; bool contains(int64_t fork_id) const noexcept { - return forks.contains(fork_id); + if (const auto it = forks.find(fork_id); it != forks.cend()) { + return it->second.second == ForkStatus::available; + } + return false; + } + + bool is_ready(int64_t fork_id) const noexcept { + if (const auto it = forks.find(fork_id); it != forks.cend()) { + return it->second.first.done(); + } + return false; } }; diff --git a/runtime-light/stdlib/fork/wait-queue-context.cpp b/runtime-light/stdlib/fork/wait-queue-context.cpp index 864764b8ba..bf805e07c1 100644 --- a/runtime-light/stdlib/fork/wait-queue-context.cpp +++ b/runtime-light/stdlib/fork/wait-queue-context.cpp @@ -5,6 +5,7 @@ #include "runtime-light/stdlib/fork/wait-queue-context.h" #include "runtime-light/component/component.h" +#include "runtime-light/coroutine/awaitable.h" #include "runtime-light/stdlib/fork/fork-context.h" WaitQueueContext &WaitQueueContext::get() noexcept { @@ -13,17 +14,15 @@ WaitQueueContext &WaitQueueContext::get() noexcept { int64_t WaitQueueContext::create_queue(const array> &fork_ids) noexcept { auto &memory_resource{get_component_context()->runtime_allocator.memory_resource}; - unordered_map> forks(unordered_map>::allocator_type{memory_resource}); - std::for_each(fork_ids.begin(), fork_ids.end(), [&forks](const auto &it) { + unordered_set forks_ids(unordered_set::allocator_type{memory_resource}); + std::for_each(fork_ids.begin(), fork_ids.end(), [&forks_ids](const auto &it) { Optional fork_id = it.get_value(); if (fork_id.has_value()) { - if (auto task = ForkComponentContext::get().pop_fork(fork_id.val()); task.has_value()) { - forks[fork_id.val()] = std::move(task.val()); - } + forks_ids.insert(fork_id.val()); } }); int64_t queue_id{++next_wait_queue_id}; - wait_queues.emplace(queue_id, WaitQueue(memory_resource, std::move(forks))); + wait_queues.emplace(queue_id, wait_queue_t(memory_resource, std::move(forks_ids))); php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size); return queue_id; } diff --git a/runtime-light/stdlib/fork/wait-queue-context.h b/runtime-light/stdlib/fork/wait-queue-context.h index 89c98dd708..f8485c6437 100644 --- a/runtime-light/stdlib/fork/wait-queue-context.h +++ b/runtime-light/stdlib/fork/wait-queue-context.h @@ -9,28 +9,31 @@ #include "runtime-core/core-types/decl/optional.h" #include "runtime-core/memory-resource/resource_allocator.h" #include "runtime-core/memory-resource/unsynchronized_pool_resource.h" -#include "runtime-light/stdlib/fork/wait-queue.h" #include "runtime-light/utils/concepts.h" +#include "runtime-light/stdlib/fork/wait-queue.h" class WaitQueueContext { template using unordered_map = memory_resource::stl::unordered_map; - unordered_map wait_queues; + template + using unordered_set = memory_resource::stl::unordered_set; + + unordered_map wait_queues; static constexpr auto WAIT_QUEUE_INIT_ID = 0; int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID}; public: explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept - : wait_queues(unordered_map::allocator_type{memory_resource}) {} + : wait_queues(unordered_map::allocator_type{memory_resource}) {} static WaitQueueContext &get() noexcept; int64_t create_queue(const array> &fork_ids) noexcept; - Optional get_queue(int64_t queue_id) noexcept { + Optional get_queue(int64_t queue_id) noexcept { if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) { - return &it->second; + return std::addressof(it->second); } return {}; } diff --git a/runtime-light/stdlib/fork/wait-queue.cpp b/runtime-light/stdlib/fork/wait-queue.cpp index 0fb8b6e850..d53aa87fe7 100644 --- a/runtime-light/stdlib/fork/wait-queue.cpp +++ b/runtime-light/stdlib/fork/wait-queue.cpp @@ -4,15 +4,57 @@ #include "runtime-light/stdlib/fork/wait-queue.h" -#include "runtime-light/component/component.h" - -void WaitQueue::resume_awaited_handles_if_empty() { - if (forks.empty()) { - while (!awaited_handles.empty()) { - auto handle = awaited_handles.front(); - awaited_handles.pop_front(); - CoroutineScheduler::get().suspend({handle, WaitEvent::Rechedule{}}); - } +#include "runtime-light/stdlib/fork/fork-context.h" + +wait_queue_t::wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept + : forks_ids(std::move(forks_ids_)) + , awaiters(deque::allocator_type{memory_resource}) { + std::for_each(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { ForkComponentContext::get().reserve_fork(fork_id); }); +} + +void wait_queue_t::push(int64_t fork_id) noexcept { + forks_ids.insert(fork_id); + ForkComponentContext::get().reserve_fork(fork_id); + if (!awaiters.empty()) { + auto &task = ForkComponentContext::get().forks[fork_id].first; + task_t::awaiter_t awaiter{std::addressof(task)}; + awaiter.await_suspend(awaiters.front().awaited_handle); } } +void wait_queue_t::insert_awaiter(awaiter_t awaiter) noexcept { + if (awaiters.empty()) { + std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) { + task_t::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)}; + task_awaiter.await_suspend(awaiter.awaited_handle); + }); + } + awaiters.push_back(awaiter); +} + +void wait_queue_t::erase_awaiter(awaiter_t awaiter) noexcept { + auto it = std::find(awaiters.begin(), awaiters.end(), awaiter); + if (it != awaiters.end()) { + awaiters.erase(it); + } + std::coroutine_handle<> next_awaiter = awaiters.empty() ? std::noop_coroutine() : awaiters.front().awaited_handle; + std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) { + task_t::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)}; + task_awaiter.await_suspend(next_awaiter); + }); +} + +int64_t wait_queue_t::pop_ready_fork() noexcept { + auto it = std::find_if(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); }); + if (it == forks_ids.end()) { + php_critical_error("there is no fork to pop from queue"); + } + int64_t ready_fork = *it; + forks_ids.erase(it); + ForkComponentContext::get().unreserve_fork(ready_fork); + return ready_fork; +} + +bool wait_queue_t::has_ready_fork() const noexcept { + return std::any_of(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); }); +} diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h index 8116a5f633..3c6e71b445 100644 --- a/runtime-light/stdlib/fork/wait-queue.h +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -4,93 +4,79 @@ #pragma once -#include #include -#include -#include "runtime-core/memory-resource/resource_allocator.h" -#include "runtime-core/memory-resource/unsynchronized_pool_resource.h" -#include "runtime-light/coroutine/task.h" -#include "runtime-light/stdlib/fork/fork.h" +#include "runtime-light/allocator/allocator.h" #include "runtime-light/utils/concepts.h" -class WaitQueue { - template - using unordered_map = memory_resource::stl::unordered_map; +class wait_queue_t { + template + using unordered_set = memory_resource::stl::unordered_set; template using deque = memory_resource::stl::deque; - unordered_map> forks; - unordered_map::awaiter_t> fork_awaiters; - deque> awaited_handles; - - void resume_awaited_handles_if_empty(); - public: - WaitQueue(const WaitQueue &) = delete; - WaitQueue &operator=(const WaitQueue &) = delete; - WaitQueue &operator=(WaitQueue &&) = delete; - - WaitQueue(WaitQueue &&other) noexcept - : forks(std::move(other.forks)) - , fork_awaiters(std::move(other.fork_awaiters)) - , awaited_handles(std::move(other.awaited_handles)) {} - - explicit WaitQueue(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_map> &&forks_) noexcept - : forks(std::move(forks_)) - , fork_awaiters(unordered_map::awaiter_t>::allocator_type{memory_resource}) - , awaited_handles(deque>::allocator_type{memory_resource}) { - for (auto &fork : forks) { - fork_awaiters.emplace(fork.first, std::addressof(fork.second)); + struct awaiter_t { + wait_queue_t *wait_queue; + std::coroutine_handle<> awaited_handle; + explicit awaiter_t(wait_queue_t *wait_queue_) noexcept + : wait_queue(wait_queue_) + , awaited_handle(std::noop_coroutine()) {} + + bool await_ready() const noexcept { + return wait_queue->has_ready_fork(); } - } - void push_fork(int64_t fork_id, task_t &&fork) noexcept { - auto [fork_it, fork_success] = forks.emplace(fork_id, std::move(fork)); - auto [awaiter_id, awaiter_success] = fork_awaiters.emplace(fork_id, std::addressof(fork_it->second)); - if (!awaited_handles.empty()) { - awaiter_id->second.await_suspend(awaited_handles.front()); + void await_suspend(std::coroutine_handle<> coro) noexcept { + awaited_handle = coro; + wait_queue->insert_awaiter(*this); } - } - Optional>> pop_fork() noexcept { - auto it = std::find_if(forks.begin(), forks.end(), [](std::pair> &fork) { return fork.second.done(); }); - if (it != forks.end()) { - auto fork = std::move(*it); - forks.erase(fork.first); - fork_awaiters.erase(fork.first); - resume_awaited_handles_if_empty(); - return fork; - } else { - return {}; + int64_t await_resume() noexcept { + wait_queue->erase_awaiter(*this); + return wait_queue->pop_ready_fork(); } - } - void push_coro_handle(std::coroutine_handle<> coro) noexcept { - if (awaited_handles.empty()) { - std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [coro](auto &awaiter) { awaiter.second.await_suspend(coro); }); + bool resumable() noexcept { + return wait_queue->has_ready_fork(); } - awaited_handles.push_back(coro); - } - void pop_coro_handle() noexcept { - if (!awaited_handles.empty()) { - awaited_handles.pop_front(); + void cancel() noexcept { + wait_queue->erase_awaiter(*this); } - std::coroutine_handle<> next_awaiter = awaited_handles.empty() ? std::noop_coroutine() : awaited_handles.front(); - std::for_each(fork_awaiters.begin(), fork_awaiters.end(), [&](auto &awaiter) { awaiter.second.await_suspend(next_awaiter); }); - } - bool has_ready_fork() const noexcept { - return std::any_of(forks.begin(), forks.end(), [](const auto &fork) { return fork.second.done(); }); - } + bool operator==(const awaiter_t & other) const { + return awaited_handle == other.awaited_handle; + } + }; + + wait_queue_t(const wait_queue_t &) = delete; + wait_queue_t &operator=(const wait_queue_t &) = delete; + wait_queue_t &operator=(wait_queue_t &&) = delete; + + wait_queue_t(wait_queue_t &&other) noexcept + : forks_ids(std::move(other.forks_ids)) + , awaiters(std::move(other.awaiters)) {} + + explicit wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept; - size_t size() const noexcept { - return forks.size(); + void push(int64_t fork_id) noexcept; + + awaiter_t pop() noexcept { + return awaiter_t{this}; } bool empty() const noexcept { - return forks.empty(); + return forks_ids.empty(); } + +private: + unordered_set forks_ids; + deque awaiters; + + void insert_awaiter(awaiter_t awaiter) noexcept; + void erase_awaiter(awaiter_t awaiter) noexcept; + int64_t pop_ready_fork() noexcept; + bool has_ready_fork() const noexcept; }; From 945e92dfdccc22ec903fce2ecece41e9c010bb36 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 11:18:04 +0300 Subject: [PATCH 03/10] add include --- runtime-light/stdlib/fork/fork-api.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index 2c3cc87dea..32c2f0c5a1 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -11,6 +11,8 @@ #include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/coroutine/awaitable.h" #include "runtime-light/coroutine/task.h" +#include "runtime-light/stdlib/fork/fork-context.h" +#include "runtime-light/stdlib/fork/wait-queue-context.h" namespace fork_api_impl_ { From 806e581242406f2a0a0cdde2178d08fa07a0df6d Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 11:19:02 +0300 Subject: [PATCH 04/10] small fix --- runtime-light/stdlib/fork/fork-api.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime-light/stdlib/fork/fork-api.h b/runtime-light/stdlib/fork/fork-api.h index 32c2f0c5a1..f22e949997 100644 --- a/runtime-light/stdlib/fork/fork-api.h +++ b/runtime-light/stdlib/fork/fork-api.h @@ -61,7 +61,7 @@ inline bool f$wait_queue_empty(int64_t queue_id) noexcept { if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) { return queue.val()->empty(); } - return false; + return true; } task_t> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept; From 080eac3ae80dba238aa269a668162f0cb8a0c846 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 12:21:07 +0300 Subject: [PATCH 05/10] add const ref --- runtime-light/stdlib/fork/wait-queue.cpp | 4 ++-- runtime-light/stdlib/fork/wait-queue.h | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/runtime-light/stdlib/fork/wait-queue.cpp b/runtime-light/stdlib/fork/wait-queue.cpp index d53aa87fe7..21d4c600d1 100644 --- a/runtime-light/stdlib/fork/wait-queue.cpp +++ b/runtime-light/stdlib/fork/wait-queue.cpp @@ -22,7 +22,7 @@ void wait_queue_t::push(int64_t fork_id) noexcept { } } -void wait_queue_t::insert_awaiter(awaiter_t awaiter) noexcept { +void wait_queue_t::insert_awaiter(const awaiter_t &awaiter) noexcept { if (awaiters.empty()) { std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) { task_t::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)}; @@ -32,7 +32,7 @@ void wait_queue_t::insert_awaiter(awaiter_t awaiter) noexcept { awaiters.push_back(awaiter); } -void wait_queue_t::erase_awaiter(awaiter_t awaiter) noexcept { +void wait_queue_t::erase_awaiter(const awaiter_t &awaiter) noexcept { auto it = std::find(awaiters.begin(), awaiters.end(), awaiter); if (it != awaiters.end()) { awaiters.erase(it); diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h index 3c6e71b445..78d556e371 100644 --- a/runtime-light/stdlib/fork/wait-queue.h +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -46,7 +46,7 @@ class wait_queue_t { wait_queue->erase_awaiter(*this); } - bool operator==(const awaiter_t & other) const { + bool operator==(const awaiter_t &other) const { return awaited_handle == other.awaited_handle; } }; @@ -75,8 +75,8 @@ class wait_queue_t { unordered_set forks_ids; deque awaiters; - void insert_awaiter(awaiter_t awaiter) noexcept; - void erase_awaiter(awaiter_t awaiter) noexcept; + void insert_awaiter(const awaiter_t &awaiter) noexcept; + void erase_awaiter(const awaiter_t &awaiter) noexcept; int64_t pop_ready_fork() noexcept; bool has_ready_fork() const noexcept; }; From 4232a271ada72f79c7d837e70ff4b7e3cb57658e Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 12:33:44 +0300 Subject: [PATCH 06/10] add timeout in tests --- tests/phpt/fork/001_basic.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/phpt/fork/001_basic.php b/tests/phpt/fork/001_basic.php index 546c67bae7..99b6e9e19b 100644 --- a/tests/phpt/fork/001_basic.php +++ b/tests/phpt/fork/001_basic.php @@ -46,7 +46,7 @@ function hash3($n) { } $q = wait_queue_create($ids); while (true) { - $t = wait_queue_next ($q); + $t = wait_queue_next ($q, 0.1); if (!$t) { break; } @@ -100,7 +100,7 @@ function hash4($n) { # $id3 = fork(hash4(100)); $qid = hash4(10); while (true) { - $t = wait_queue_next ($qid); + $t = wait_queue_next ($qid, 0.1); if (!$t) { break; } From 784a97eba7a3eba57d83b35cf4718271edab1cce Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 12:35:43 +0300 Subject: [PATCH 07/10] delete move constructor for wait_queue --- runtime-light/stdlib/fork/wait-queue-context.cpp | 2 +- runtime-light/stdlib/fork/wait-queue.h | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/runtime-light/stdlib/fork/wait-queue-context.cpp b/runtime-light/stdlib/fork/wait-queue-context.cpp index bf805e07c1..6c25d9aa0d 100644 --- a/runtime-light/stdlib/fork/wait-queue-context.cpp +++ b/runtime-light/stdlib/fork/wait-queue-context.cpp @@ -22,7 +22,7 @@ int64_t WaitQueueContext::create_queue(const array> &fork_ids) } }); int64_t queue_id{++next_wait_queue_id}; - wait_queues.emplace(queue_id, wait_queue_t(memory_resource, std::move(forks_ids))); + wait_queues.emplace(std::piecewise_construct, std::forward_as_tuple(queue_id), std::forward_as_tuple(memory_resource, std::move(forks_ids))); php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size); return queue_id; } diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h index 78d556e371..2940dbd690 100644 --- a/runtime-light/stdlib/fork/wait-queue.h +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -52,13 +52,10 @@ class wait_queue_t { }; wait_queue_t(const wait_queue_t &) = delete; + wait_queue_t(wait_queue_t &&other) = delete; wait_queue_t &operator=(const wait_queue_t &) = delete; wait_queue_t &operator=(wait_queue_t &&) = delete; - wait_queue_t(wait_queue_t &&other) noexcept - : forks_ids(std::move(other.forks_ids)) - , awaiters(std::move(other.awaiters)) {} - explicit wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept; void push(int64_t fork_id) noexcept; From b8db5f65048526e03e73b26c0cebe66290b2025f Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 12:58:39 +0300 Subject: [PATCH 08/10] add #include --- runtime-light/stdlib/fork/wait-queue.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runtime-light/stdlib/fork/wait-queue.cpp b/runtime-light/stdlib/fork/wait-queue.cpp index 21d4c600d1..2a24eb712f 100644 --- a/runtime-light/stdlib/fork/wait-queue.cpp +++ b/runtime-light/stdlib/fork/wait-queue.cpp @@ -2,8 +2,11 @@ // Copyright (c) 2024 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt +#include + #include "runtime-light/stdlib/fork/wait-queue.h" +#include "runtime-core/utils/kphp-assert-core.h" #include "runtime-light/stdlib/fork/fork-context.h" wait_queue_t::wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept From bd4a3ad7982b2565fffb8709413aa2743b6ebf52 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 12:59:12 +0300 Subject: [PATCH 09/10] add #include --- runtime-light/stdlib/fork/wait-queue.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime-light/stdlib/fork/wait-queue.cpp b/runtime-light/stdlib/fork/wait-queue.cpp index 2a24eb712f..c610b3a010 100644 --- a/runtime-light/stdlib/fork/wait-queue.cpp +++ b/runtime-light/stdlib/fork/wait-queue.cpp @@ -3,6 +3,7 @@ // Distributed under the GPL v3 License, see LICENSE.notice.txt #include +#include #include "runtime-light/stdlib/fork/wait-queue.h" From 7419da908c0d7a1ced17e178d4bf3a639025b771 Mon Sep 17 00:00:00 2001 From: Vadim Sadokhov Date: Thu, 15 Aug 2024 13:02:24 +0300 Subject: [PATCH 10/10] small improvements --- runtime-light/stdlib/fork/wait-queue.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/runtime-light/stdlib/fork/wait-queue.h b/runtime-light/stdlib/fork/wait-queue.h index 2940dbd690..58f3e3d3cc 100644 --- a/runtime-light/stdlib/fork/wait-queue.h +++ b/runtime-light/stdlib/fork/wait-queue.h @@ -17,6 +17,13 @@ class wait_queue_t { using deque = memory_resource::stl::deque; public: + wait_queue_t(const wait_queue_t &) = delete; + wait_queue_t(wait_queue_t &&other) = delete; + wait_queue_t &operator=(const wait_queue_t &) = delete; + wait_queue_t &operator=(wait_queue_t &&) = delete; + + explicit wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept; + struct awaiter_t { wait_queue_t *wait_queue; std::coroutine_handle<> awaited_handle; @@ -51,13 +58,6 @@ class wait_queue_t { } }; - wait_queue_t(const wait_queue_t &) = delete; - wait_queue_t(wait_queue_t &&other) = delete; - wait_queue_t &operator=(const wait_queue_t &) = delete; - wait_queue_t &operator=(wait_queue_t &&) = delete; - - explicit wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set &&forks_ids_) noexcept; - void push(int64_t fork_id) noexcept; awaiter_t pop() noexcept {