Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement wait queue #1061

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions builtin-functions/kphp-light/functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ function get_hash_of_class (object $klass) ::: int;

function strlen ($str ::: string) ::: int;

function rand() ::: int;

// === Fork =======================================================================================

/** @kphp-extern-func-info interruptible cpp_template_call */
Expand All @@ -87,6 +89,15 @@ function sched_yield() ::: void;
/** @kphp-extern-func-info interruptible */
function sched_yield_sleep($timeout_ns ::: int) ::: void;

function wait_queue_create (array<future<any> | false> $request_ids = []) ::: future_queue<^1[*][*]>;

function wait_queue_push (future_queue<any> &$queue_id, future<any> | false $request_ids) ::: void;

function wait_queue_empty (future_queue<any> $queue_id) ::: bool;

/** @kphp-extern-func-info interruptible */
function wait_queue_next (future_queue<any> $queue_id, $timeout ::: float = -1.0) ::: future<^1[*]> | false;

// === Rpc ========================================================================================

/** @kphp-tl-class */
Expand Down
2 changes: 1 addition & 1 deletion runtime-core/memory-resource/resource_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class resource_allocator {
}

static constexpr size_t max_value_type_size() {
return 128U;
return 256U;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it changed?

}

friend inline bool operator==(const resource_allocator &lhs, const resource_allocator &rhs) noexcept {
Expand Down
14 changes: 14 additions & 0 deletions runtime-light/stdlib/fork/fork-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

task_t<void> f$sched_yield() noexcept {
co_await wait_for_reschedule_t{};
Expand All @@ -22,3 +23,16 @@ task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept {
}
co_await wait_for_timer_t{std::chrono::nanoseconds{static_cast<uint64_t>(duration_ns)}};
}

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue_id, double timeout) noexcept {
static_assert(CancellableAwaitable<wait_queue_t::awaiter_t>);
auto queue = WaitQueueContext::get().get_queue(queue_id);
if (!queue.has_value()) {
co_return Optional<int64_t>{};
}
const auto timeout_ns{timeout > 0 && timeout <= fork_api_impl_::MAX_TIMEOUT_S
? std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>{timeout})
: fork_api_impl_::DEFAULT_TIMEOUT_NS};
auto result_opt{co_await wait_with_timeout_t{queue.val()->pop(), timeout_ns}};
co_return result_opt.has_value() ? result_opt.value() : Optional<int64_t>{};
}
20 changes: 20 additions & 0 deletions runtime-light/stdlib/fork/fork-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork-context.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"

namespace fork_api_impl_ {

Expand Down Expand Up @@ -45,3 +46,22 @@ requires(is_optional<T>::value) task_t<T> f$wait(Optional<int64_t> fork_id_opt,
task_t<void> f$sched_yield() noexcept;

task_t<void> f$sched_yield_sleep(int64_t duration_ns) noexcept;

inline int64_t f$wait_queue_create(array<Optional<int64_t>> fork_ids) noexcept {
return WaitQueueContext::get().create_queue(fork_ids);
}

inline void f$wait_queue_push(int64_t queue_id, Optional<int64_t> fork_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value() && fork_id.has_value()) {
queue.val()->push(fork_id.val());
}
}

inline bool f$wait_queue_empty(int64_t queue_id) noexcept {
if (auto queue = WaitQueueContext::get().get_queue(queue_id); queue.has_value()) {
return queue.val()->empty();
}
return true;
}

task_t<Optional<int64_t>> f$wait_queue_next(int64_t queue, double timeout = -1.0) noexcept;
40 changes: 35 additions & 5 deletions runtime-light/stdlib/fork/fork-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@
#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/stdlib/fork/fork.h"
#include "runtime-light/stdlib/fork/wait-queue-context.h"
#include "runtime-light/utils/concepts.h"

constexpr int64_t INVALID_FORK_ID = -1;

class ForkComponentContext {
enum class ForkStatus { available, reserved };

template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

static constexpr auto FORK_ID_INIT = 0;

unordered_map<int64_t, task_t<fork_result>> forks;
unordered_map<int64_t, std::pair<task_t<fork_result>, ForkStatus>> forks;
int64_t next_fork_id{FORK_ID_INIT + 1};

int64_t push_fork(task_t<fork_result> &&task) noexcept {
return forks.emplace(next_fork_id, std::move(task)), next_fork_id++;
return forks.emplace(next_fork_id, std::make_pair(std::move(task), ForkStatus::available)), next_fork_id++;
}

task_t<fork_result> pop_fork(int64_t fork_id) noexcept {
Expand All @@ -34,21 +37,48 @@ class ForkComponentContext {
php_critical_error("can't find fork %" PRId64, fork_id);
}
auto fork{std::move(it_fork->second)};
php_assert(fork.second == ForkStatus::available);
forks.erase(it_fork);
return fork;
return std::move(fork.first);
}

void reserve_fork(int64_t fork_id) noexcept {
if (const auto it = forks.find(fork_id); it != forks.end()) {
it->second.second = ForkStatus::reserved;
}
}

void unreserve_fork(int64_t fork_id) noexcept {
if (const auto it = forks.find(fork_id); it != forks.end()) {
it->second.second = ForkStatus::available;
}
}

friend class start_fork_t;
template<typename>
friend class wait_fork_t;
friend class wait_queue_t;

public:
WaitQueueContext wait_queue_context;

explicit ForkComponentContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: forks(unordered_map<int64_t, task_t<fork_result>>::allocator_type{memory_resource}) {}
: forks(unordered_map<int64_t, std::pair<task_t<fork_result>, ForkStatus>>::allocator_type{memory_resource})
, wait_queue_context(memory_resource) {}

static ForkComponentContext &get() noexcept;

bool contains(int64_t fork_id) const noexcept {
return forks.contains(fork_id);
if (const auto it = forks.find(fork_id); it != forks.cend()) {
return it->second.second == ForkStatus::available;
}
return false;
}

bool is_ready(int64_t fork_id) const noexcept {
if (const auto it = forks.find(fork_id); it != forks.cend()) {
return it->second.first.done();
}
return false;
}
};
28 changes: 28 additions & 0 deletions runtime-light/stdlib/fork/wait-queue-context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime-light/stdlib/fork/wait-queue-context.h"

#include "runtime-light/component/component.h"
#include "runtime-light/coroutine/awaitable.h"
#include "runtime-light/stdlib/fork/fork-context.h"

WaitQueueContext &WaitQueueContext::get() noexcept {
return ForkComponentContext::get().wait_queue_context;
}

int64_t WaitQueueContext::create_queue(const array<Optional<int64_t>> &fork_ids) noexcept {
auto &memory_resource{get_component_context()->runtime_allocator.memory_resource};
unordered_set<int64_t> forks_ids(unordered_set<int64_t>::allocator_type{memory_resource});
std::for_each(fork_ids.begin(), fork_ids.end(), [&forks_ids](const auto &it) {
Optional<int64_t> fork_id = it.get_value();
if (fork_id.has_value()) {
forks_ids.insert(fork_id.val());
}
});
int64_t queue_id{++next_wait_queue_id};
wait_queues.emplace(std::piecewise_construct, std::forward_as_tuple(queue_id), std::forward_as_tuple(memory_resource, std::move(forks_ids)));
php_debug("WaitQueueContext: create queue %ld with %ld forks", queue_id, fork_ids.size().size);
return queue_id;
}
40 changes: 40 additions & 0 deletions runtime-light/stdlib/fork/wait-queue-context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <cstdint>

#include "runtime-core/core-types/decl/optional.h"
#include "runtime-core/memory-resource/resource_allocator.h"
#include "runtime-core/memory-resource/unsynchronized_pool_resource.h"
#include "runtime-light/utils/concepts.h"
#include "runtime-light/stdlib/fork/wait-queue.h"

class WaitQueueContext {
template<hashable Key, typename Value>
using unordered_map = memory_resource::stl::unordered_map<Key, Value, memory_resource::unsynchronized_pool_resource>;

template<hashable T>
using unordered_set = memory_resource::stl::unordered_set<T, memory_resource::unsynchronized_pool_resource>;

unordered_map<int64_t, wait_queue_t> wait_queues;
static constexpr auto WAIT_QUEUE_INIT_ID = 0;
int64_t next_wait_queue_id{WAIT_QUEUE_INIT_ID};

public:
explicit WaitQueueContext(memory_resource::unsynchronized_pool_resource &memory_resource) noexcept
: wait_queues(unordered_map<int64_t, wait_queue_t>::allocator_type{memory_resource}) {}

static WaitQueueContext &get() noexcept;

int64_t create_queue(const array<Optional<int64_t>> &fork_ids) noexcept;

Optional<wait_queue_t *> get_queue(int64_t queue_id) noexcept {
if (auto it = wait_queues.find(queue_id); it != wait_queues.end()) {
return std::addressof(it->second);
}
return {};
}
};
64 changes: 64 additions & 0 deletions runtime-light/stdlib/fork/wait-queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include <algorithm>
#include <coroutine>

#include "runtime-light/stdlib/fork/wait-queue.h"

#include "runtime-core/utils/kphp-assert-core.h"
#include "runtime-light/stdlib/fork/fork-context.h"

wait_queue_t::wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set<int64_t> &&forks_ids_) noexcept
: forks_ids(std::move(forks_ids_))
, awaiters(deque<awaiter_t>::allocator_type{memory_resource}) {
std::for_each(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { ForkComponentContext::get().reserve_fork(fork_id); });
}

void wait_queue_t::push(int64_t fork_id) noexcept {
forks_ids.insert(fork_id);
ForkComponentContext::get().reserve_fork(fork_id);
if (!awaiters.empty()) {
auto &task = ForkComponentContext::get().forks[fork_id].first;
task_t<fork_result>::awaiter_t awaiter{std::addressof(task)};
awaiter.await_suspend(awaiters.front().awaited_handle);
}
}

void wait_queue_t::insert_awaiter(const awaiter_t &awaiter) noexcept {
if (awaiters.empty()) {
std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) {
task_t<fork_result>::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)};
task_awaiter.await_suspend(awaiter.awaited_handle);
});
}
awaiters.push_back(awaiter);
}

void wait_queue_t::erase_awaiter(const awaiter_t &awaiter) noexcept {
auto it = std::find(awaiters.begin(), awaiters.end(), awaiter);
if (it != awaiters.end()) {
awaiters.erase(it);
}
std::coroutine_handle<> next_awaiter = awaiters.empty() ? std::noop_coroutine() : awaiters.front().awaited_handle;
std::for_each(forks_ids.begin(), forks_ids.end(), [&](int64_t fork_id) {
task_t<fork_result>::awaiter_t task_awaiter{std::addressof(ForkComponentContext::get().forks[fork_id].first)};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this part. I think should be method in ForkComponentContext for suspend coroutine handle on task_t

task_awaiter.await_suspend(next_awaiter);
});
}

int64_t wait_queue_t::pop_ready_fork() noexcept {
auto it = std::find_if(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); });
if (it == forks_ids.end()) {
php_critical_error("there is no fork to pop from queue");
}
int64_t ready_fork = *it;
forks_ids.erase(it);
ForkComponentContext::get().unreserve_fork(ready_fork);
return ready_fork;
}

bool wait_queue_t::has_ready_fork() const noexcept {
return std::any_of(forks_ids.begin(), forks_ids.end(), [](int64_t fork_id) { return ForkComponentContext::get().is_ready(fork_id); });
}
79 changes: 79 additions & 0 deletions runtime-light/stdlib/fork/wait-queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <coroutine>

#include "runtime-light/allocator/allocator.h"
#include "runtime-light/utils/concepts.h"

class wait_queue_t {
template<hashable T>
using unordered_set = memory_resource::stl::unordered_set<T, memory_resource::unsynchronized_pool_resource>;

template<typename T>
using deque = memory_resource::stl::deque<T, memory_resource::unsynchronized_pool_resource>;

public:
wait_queue_t(const wait_queue_t &) = delete;
wait_queue_t(wait_queue_t &&other) = delete;
wait_queue_t &operator=(const wait_queue_t &) = delete;
wait_queue_t &operator=(wait_queue_t &&) = delete;

explicit wait_queue_t(memory_resource::unsynchronized_pool_resource &memory_resource, unordered_set<int64_t> &&forks_ids_) noexcept;

struct awaiter_t {
wait_queue_t *wait_queue;
std::coroutine_handle<> awaited_handle;
explicit awaiter_t(wait_queue_t *wait_queue_) noexcept
: wait_queue(wait_queue_)
, awaited_handle(std::noop_coroutine()) {}

bool await_ready() const noexcept {
return wait_queue->has_ready_fork();
}

void await_suspend(std::coroutine_handle<> coro) noexcept {
awaited_handle = coro;
wait_queue->insert_awaiter(*this);
}

int64_t await_resume() noexcept {
wait_queue->erase_awaiter(*this);
return wait_queue->pop_ready_fork();
}

bool resumable() noexcept {
return wait_queue->has_ready_fork();
}

void cancel() noexcept {
wait_queue->erase_awaiter(*this);
}

bool operator==(const awaiter_t &other) const {
return awaited_handle == other.awaited_handle;
}
};

void push(int64_t fork_id) noexcept;

awaiter_t pop() noexcept {
return awaiter_t{this};
}

bool empty() const noexcept {
return forks_ids.empty();
}

private:
unordered_set<int64_t> forks_ids;
deque<awaiter_t> awaiters;

void insert_awaiter(const awaiter_t &awaiter) noexcept;
void erase_awaiter(const awaiter_t &awaiter) noexcept;
int64_t pop_ready_fork() noexcept;
bool has_ready_fork() const noexcept;
};
3 changes: 2 additions & 1 deletion runtime-light/stdlib/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

int64_t f$rand() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it defined in that file?

std::random_device rd;
int64_t dice_roll = rd();
std::uniform_int_distribution<> dis(0, rd.max());
int64_t dice_roll = dis(rd);
return dice_roll;
}
Loading
Loading