Skip to content

Commit

Permalink
Version 0.0.9 (#35)
Browse files Browse the repository at this point in the history
* addition of task objects
* result refactor
* executor refactor + optimizations
* tests were completly re-written
* move to MSVC 18.6.2 and clang 11
* move to standard coroutines on MSVC
* awaitables are uncopiable and unmovable
* when timer is cancelled/destructed, spawned tasks that are not yet executed are cancelled.

Note: CI/CD fail as Clang 11 is still not supported on Github Actions. Tests were run locally on Window, Linux and Mac.
Note: This version breaks ABI if applications implemented their own executors.
  • Loading branch information
David-Haim authored Jan 1, 2021
1 parent 547c557 commit 0ef82b4
Show file tree
Hide file tree
Showing 80 changed files with 6,139 additions and 3,258 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

# Others
.vs
CMakeSettings.json

# Specific directories
build/
Expand Down
13 changes: 9 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.16)

project(concurrencpp
VERSION 0.0.8
VERSION 0.0.9
LANGUAGES CXX)

include(cmake/coroutineOptions.cmake)
Expand All @@ -17,13 +17,15 @@ endif()
# ---- Declare library ----

set(concurrencpp_sources
source/task.cpp
source/executors/executor.cpp
source/executors/manual_executor.cpp
source/executors/thread_executor.cpp
source/executors/thread_pool_executor.cpp
source/executors/worker_thread_executor.cpp
source/results/impl/consumer_context.cpp
source/results/impl/result_state.cpp
source/results/promises.cpp
source/results/result_core.cpp
source/runtime/runtime.cpp
source/threads/thread.cpp
source/timers/timer.cpp
Expand All @@ -32,8 +34,10 @@ set(concurrencpp_sources
set(concurrencpp_headers
include/concurrencpp/concurrencpp.h
include/concurrencpp/errors.h
include/concurrencpp/task.h
include/concurrencpp/forward_declerations.h
include/concurrencpp/platform_defs.h
include/concurrencpp/coroutines/coroutine.h
include/concurrencpp/executors/constants.h
include/concurrencpp/executors/derivable_executor.h
include/concurrencpp/executors/executor.h
Expand All @@ -43,13 +47,14 @@ set(concurrencpp_headers
include/concurrencpp/executors/thread_executor.h
include/concurrencpp/executors/thread_pool_executor.h
include/concurrencpp/executors/worker_thread_executor.h
include/concurrencpp/results/impl/consumer_context.h
include/concurrencpp/results/impl/producer_context.h
include/concurrencpp/results/impl/result_state.h
include/concurrencpp/results/constants.h
include/concurrencpp/results/executor_exception.h
include/concurrencpp/results/make_result.h
include/concurrencpp/results/promises.h
include/concurrencpp/results/result.h
include/concurrencpp/results/result_awaitable.h
include/concurrencpp/results/result_core.h
include/concurrencpp/results/result_fwd_declerations.h
include/concurrencpp/results/when_result.h
include/concurrencpp/runtime/constants.h
Expand Down
318 changes: 201 additions & 117 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmake/coroutineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
function(target_coroutine_options TARGET)
if(MSVC)
target_compile_options(${TARGET} PUBLIC /await /permissive-)
target_compile_options(${TARGET} PUBLIC /std:c++latest /permissive-)
return()
endif()

Expand Down
30 changes: 30 additions & 0 deletions include/concurrencpp/coroutines/coroutine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef CONCURRENCPP_COROUTINE_H
#define CONCURRENCPP_COROUTINE_H

#include "../platform_defs.h"

#ifdef CRCPP_MSVC_COMPILER

# include <coroutine>

namespace concurrencpp::details {
template<class promise_type>
using coroutine_handle = std::coroutine_handle<promise_type>;
using suspend_never = std::suspend_never;
using suspend_always = std::suspend_always;
} // namespace concurrencpp::details

#elif defined(CRCPP_CLANG_COMPILER)

# include <experimental/coroutine>

namespace concurrencpp::details {
template<class promise_type>
using coroutine_handle = std::experimental::coroutine_handle<promise_type>;
using suspend_never = std::experimental::suspend_never;
using suspend_always = std::experimental::suspend_always;
} // namespace concurrencpp::details

#endif

#endif
2 changes: 2 additions & 0 deletions include/concurrencpp/executors/constants.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef CONCURRENCPP_EXECUTORS_CONSTS_H
#define CONCURRENCPP_EXECUTORS_CONSTS_H

#include <numeric>

namespace concurrencpp::details::consts {
inline const char* k_inline_executor_name = "concurrencpp::inline_executor";
constexpr int k_inline_executor_max_concurrency_level = 0;
Expand Down
1 change: 1 addition & 0 deletions include/concurrencpp/executors/derivable_executor.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CONCURRENCPP_DERIVABLE_EXECUTOR_H
#define CONCURRENCPP_DERIVABLE_EXECUTOR_H

#include "concurrencpp/utils/bind.h"
#include "concurrencpp/executors/executor.h"

namespace concurrencpp {
Expand Down
46 changes: 19 additions & 27 deletions include/concurrencpp/executors/executor.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#ifndef CONCURRENCPP_EXECUTOR_H
#define CONCURRENCPP_EXECUTOR_H

#include "concurrencpp/task.h"
#include "concurrencpp/results/result.h"

#include <span>
#include <vector>
#include <string>
#include <string_view>

Expand All @@ -16,27 +18,13 @@ namespace concurrencpp {
class executor {

private:
template<class executor_type, class callable_type, class... argument_types>
static null_result post_bridge(executor_tag, executor_type*, callable_type callable, argument_types... arguments) {
callable(arguments...);
co_return;
}

template<class callable_type>
static null_result bulk_post_bridge(details::executor_bulk_tag, std::vector<std::experimental::coroutine_handle<>>* accumulator, callable_type callable) {
callable();
co_return;
}

template<class return_type, class executor_type, class callable_type, class... argument_types>
static result<return_type> submit_bridge(executor_tag, executor_type*, callable_type callable, argument_types... arguments) {
co_return callable(arguments...);
}

template<class callable_type, class return_type = typename std::invoke_result_t<callable_type>>
static result<return_type> bulk_submit_bridge(details::executor_bulk_tag,
std::vector<std::experimental::coroutine_handle<>>* accumulator,
callable_type callable) {
static result<return_type> bulk_submit_bridge(details::executor_bulk_tag, std::vector<concurrencpp::task>* accumulator, callable_type callable) {
co_return callable();
}

Expand All @@ -46,7 +34,8 @@ namespace concurrencpp {
static_assert(std::is_invocable_v<callable_type, argument_types...>,
"concurrencpp::executor::post - <<callable_type>> is not invokable with <<argument_types...>>");

post_bridge({}, executor_ptr, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
assert(executor_ptr != nullptr);
executor_ptr->enqueue(details::bind_with_try_catch(std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...));
}

template<class executor_type, class callable_type, class... argument_types>
Expand All @@ -55,37 +44,40 @@ namespace concurrencpp {
"concurrencpp::executor::submit - <<callable_type>> is not invokable with <<argument_types...>>");

using return_type = typename std::invoke_result_t<callable_type, argument_types...>;

return submit_bridge<return_type>({}, executor_ptr, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}

template<class executor_type, class callable_type>
static void do_bulk_post(executor_type* executor_ptr, std::span<callable_type> callable_list) {
std::vector<std::experimental::coroutine_handle<>> accumulator;
accumulator.reserve(callable_list.size());
assert(executor_ptr != nullptr);
assert(!callable_list.empty());

std::vector<task> tasks;
tasks.reserve(callable_list.size());

for (auto& callable : callable_list) {
bulk_post_bridge<callable_type>({}, &accumulator, std::move(callable));
tasks.emplace_back(details::bind_with_try_catch(std::move(callable)));
}

assert(!accumulator.empty());
executor_ptr->enqueue(accumulator);
std::span<task> span = tasks;
executor_ptr->enqueue(span);
}

template<class executor_type, class callable_type, class return_type = std::invoke_result_t<callable_type>>
static std::vector<concurrencpp::result<return_type>> do_bulk_submit(executor_type* executor_ptr, std::span<callable_type> callable_list) {
std::vector<std::experimental::coroutine_handle<>> accumulator;
std::vector<task> accumulator;
accumulator.reserve(callable_list.size());

std::vector<concurrencpp::result<return_type>> results;
std::vector<result<return_type>> results;
results.reserve(callable_list.size());

for (auto& callable : callable_list) {
results.emplace_back(bulk_submit_bridge<callable_type>({}, &accumulator, std::move(callable)));
}

assert(!accumulator.empty());
executor_ptr->enqueue(accumulator);
std::span<task> span = accumulator;
executor_ptr->enqueue(span);
return results;
}

Expand All @@ -96,8 +88,8 @@ namespace concurrencpp {

const std::string name;

virtual void enqueue(std::experimental::coroutine_handle<> task) = 0;
virtual void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) = 0;
virtual void enqueue(concurrencpp::task task) = 0;
virtual void enqueue(std::span<concurrencpp::task> tasks) = 0;

virtual int max_concurrency_level() const noexcept = 0;

Expand Down
5 changes: 2 additions & 3 deletions include/concurrencpp/executors/inline_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ namespace concurrencpp {
public:
inline_executor() noexcept : executor(details::consts::k_inline_executor_name), m_abort(false) {}

void enqueue(std::experimental::coroutine_handle<> task) override {
void enqueue(concurrencpp::task task) override {
throw_if_aborted();
task();
}

void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) override {
void enqueue(std::span<concurrencpp::task> tasks) override {
throw_if_aborted();

for (auto& task : tasks) {
task();
}
Expand Down
59 changes: 51 additions & 8 deletions include/concurrencpp/executors/manual_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,42 @@
#define CONCURRENCPP_MANUAL_EXECUTOR_H

#include "concurrencpp/executors/derivable_executor.h"
#include "concurrencpp/executors/constants.h"

#include <deque>
#include <chrono>

namespace concurrencpp {
class alignas(64) manual_executor final : public derivable_executor<manual_executor> {

private:
mutable std::mutex m_lock;
std::deque<std::experimental::coroutine_handle<>> m_tasks;
std::deque<task> m_tasks;
std::condition_variable m_condition;
bool m_abort;
std::atomic_bool m_atomic_abort;

void destroy_tasks(std::unique_lock<std::mutex>& lock) noexcept;
template<class clock_type, class duration_type>
static std::chrono::system_clock::time_point to_system_time_point(std::chrono::time_point<clock_type, duration_type> time_point) {
const auto src_now = clock_type::now();
const auto dst_now = std::chrono::system_clock::now();
return dst_now + std::chrono::duration_cast<std::chrono::milliseconds>(time_point - src_now);
}

static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms) {
return std::chrono::system_clock::now() + ms;
}

size_t loop_impl(size_t max_count);
size_t loop_until_impl(size_t max_count, std::chrono::time_point<std::chrono::system_clock> deadline);

void wait_for_tasks_impl(size_t count);
size_t wait_for_tasks_impl(size_t count, std::chrono::time_point<std::chrono::system_clock> deadline);

public:
manual_executor();

void enqueue(std::experimental::coroutine_handle<> task) override;
void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) override;
void enqueue(task task) override;
void enqueue(std::span<task> tasks) override;

int max_concurrency_level() const noexcept override;

Expand All @@ -32,15 +47,43 @@ namespace concurrencpp {
size_t size() const noexcept;
bool empty() const noexcept;

size_t clear();

bool loop_once();
bool loop_once(std::chrono::milliseconds max_waiting_time);

bool loop_once_for(std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
bool loop_once_until(std::chrono::time_point<clock_type, duration_type> timeout_time) {
return loop_until_impl(1, to_system_time_point(timeout_time));
}

size_t loop(size_t max_count);

size_t clear() noexcept;
size_t loop_for(size_t max_count, std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
size_t loop_until(size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time) {
return loop_until_impl(max_count, to_system_time_point(timeout_time));
}

void wait_for_task();
bool wait_for_task(std::chrono::milliseconds max_waiting_time);

bool wait_for_task_for(std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
bool wait_for_task_until(std::chrono::time_point<clock_type, duration_type> timeout_time) {
return wait_for_tasks_impl(1, to_system_time_point(timeout_time)) == 1;
}

void wait_for_tasks(size_t count);

size_t wait_for_tasks_for(size_t count, std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
size_t wait_for_tasks_until(size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time) {
return wait_for_tasks_impl(count, to_system_time_point(timeout_time));
}
};
} // namespace concurrencpp

Expand Down
Loading

0 comments on commit 0ef82b4

Please sign in to comment.