Skip to content

Commit

Permalink
Version 0.1.0 (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Haim authored Feb 24, 2021
1 parent 0ef82b4 commit f037e8c
Show file tree
Hide file tree
Showing 56 changed files with 6,583 additions and 1,910 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.16)

project(concurrencpp
VERSION 0.0.9
VERSION 0.1.0
LANGUAGES CXX)

include(cmake/coroutineOptions.cmake)
Expand All @@ -25,6 +25,7 @@ set(concurrencpp_sources
source/executors/worker_thread_executor.cpp
source/results/impl/consumer_context.cpp
source/results/impl/result_state.cpp
source/results/impl/shared_result_state.cpp
source/results/promises.cpp
source/runtime/runtime.cpp
source/threads/thread.cpp
Expand All @@ -50,11 +51,14 @@ set(concurrencpp_headers
include/concurrencpp/results/impl/consumer_context.h
include/concurrencpp/results/impl/producer_context.h
include/concurrencpp/results/impl/result_state.h
include/concurrencpp/results/impl/shared_result_state.h
include/concurrencpp/results/constants.h
include/concurrencpp/results/make_result.h
include/concurrencpp/results/promises.h
include/concurrencpp/results/result.h
include/concurrencpp/results/shared_result.h
include/concurrencpp/results/result_awaitable.h
include/concurrencpp/results/shared_result_awaitable.h
include/concurrencpp/results/result_fwd_declerations.h
include/concurrencpp/results/when_result.h
include/concurrencpp/runtime/constants.h
Expand Down
1,850 changes: 1,102 additions & 748 deletions README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion cmake/concurrencppInjectTSAN.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ macro(add_library TARGET)
_add_library(${ARGV})

if("${TARGET}" STREQUAL "concurrencpp")
target_compile_options(concurrencpp PRIVATE -fsanitize=thread)
target_compile_options(concurrencpp PUBLIC -fsanitize=thread)
target_link_options(concurrencpp PUBLIC -fsanitize=thread)
endif()
endmacro()
2 changes: 2 additions & 0 deletions include/concurrencpp/concurrencpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "concurrencpp/results/result.h"
#include "concurrencpp/results/make_result.h"
#include "concurrencpp/results/when_result.h"
#include "concurrencpp/results/shared_result.h"
#include "concurrencpp/results/shared_result_awaitable.h"
#include "concurrencpp/executors/executor_all.h"

#endif
1 change: 1 addition & 0 deletions include/concurrencpp/executors/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "concurrencpp/task.h"
#include "concurrencpp/results/result.h"
#include "concurrencpp/results/promises.h"

#include <span>
#include <vector>
Expand Down
6 changes: 2 additions & 4 deletions include/concurrencpp/executors/inline_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ namespace concurrencpp {
std::atomic_bool m_abort;

void throw_if_aborted() const {
if (!m_abort.load(std::memory_order_relaxed)) {
return;
if (m_abort.load(std::memory_order_relaxed)) {
details::throw_executor_shutdown_exception(name);
}

details::throw_executor_shutdown_exception(name);
}

public:
Expand Down
4 changes: 0 additions & 4 deletions include/concurrencpp/executors/manual_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ namespace concurrencpp {
size_t clear();

bool loop_once();

bool loop_once_for(std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
Expand All @@ -59,7 +58,6 @@ namespace concurrencpp {
}

size_t loop(size_t max_count);

size_t loop_for(size_t max_count, std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
Expand All @@ -68,7 +66,6 @@ namespace concurrencpp {
}

void wait_for_task();

bool wait_for_task_for(std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
Expand All @@ -77,7 +74,6 @@ namespace concurrencpp {
}

void wait_for_tasks(size_t count);

size_t wait_for_tasks_for(size_t count, std::chrono::milliseconds max_waiting_time);

template<class clock_type, class duration_type>
Expand Down
2 changes: 1 addition & 1 deletion include/concurrencpp/executors/thread_executor.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#ifndef CONCURRENCPP_THREAD_EXECUTOR_H
#define CONCURRENCPP_THREAD_EXECUTOR_H

#include "concurrencpp/executors/derivable_executor.h"
#include "concurrencpp/threads/thread.h"
#include "concurrencpp/executors/derivable_executor.h"

#include <list>
#include <span>
Expand Down
2 changes: 1 addition & 1 deletion include/concurrencpp/platform_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#elif defined(unix) || defined(__unix__) || defined(__unix)
# define CRCPP_UNIX_OS
#elif defined(__APPLE__) || defined(__MACH__)
# define CRCPP_MACH_OS
# define CRCPP_MAC_OS
#elif defined(__FreeBSD__)
# define CRCPP_FREE_BSD_OS
#elif defined(__ANDROID__)
Expand Down
22 changes: 22 additions & 0 deletions include/concurrencpp/results/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ namespace concurrencpp::details::consts {

inline const char* k_broken_task_exception_error_msg = "concurrencpp::result - Associated task was interrupted abnormally";

inline const char* k_shared_result_status_error_msg = "shared_result::status() - result is empty.";

inline const char* k_shared_result_get_error_msg = "shared_result::get() - result is empty.";

inline const char* k_shared_result_wait_error_msg = "shared_result::wait() - result is empty.";

inline const char* k_shared_result_wait_for_error_msg = "shared_result::wait_for() - result is empty.";

inline const char* k_shared_result_wait_until_error_msg = "shared_result::wait_until() - result is empty.";

inline const char* k_shared_result_operator_co_await_error_msg = "shared_result::operator co_await() - result is empty.";

inline const char* k_shared_result_await_via_error_msg = "shared_result::await_via() - result is empty.";

inline const char* k_shared_result_await_via_executor_null_error_msg = "shared_result::await_via() - given executor is null.";

inline const char* k_shared_result_resolve_error_msg = "shared_result::resolve() - result is empty.";

inline const char* k_shared_result_resolve_via_error_msg = "shared_result::resolve_via() - result is empty.";

inline const char* k_shared_result_resolve_via_executor_null_error_msg = "shared_result::resolve_via() - given executor is null.";

} // namespace concurrencpp::details::consts

#endif
73 changes: 45 additions & 28 deletions include/concurrencpp/results/impl/consumer_context.h
Original file line number Diff line number Diff line change
@@ -1,46 +1,64 @@
#ifndef CONCURRENCPP_CONSUMER_CONTEXT_H
#define CONCURRENCPP_CONSUMER_CONTEXT_H

#include "concurrencpp/task.h"
#include "concurrencpp/forward_declerations.h"
#include "concurrencpp/coroutines/coroutine.h"
#include "concurrencpp/results/result_fwd_declerations.h"

#include <mutex>
#include <condition_variable>

namespace concurrencpp::details {
class await_context {
class await_via_functor;

private:
details::coroutine_handle<void> m_handle;
std::exception_ptr m_interrupt_exception;
class await_via_context {

public:
void set_coro_handle(details::coroutine_handle<void> coro_handle) noexcept;
void set_interrupt(const std::exception_ptr& interrupt);
class await_context {

void operator()() noexcept;
private:
coroutine_handle<void> handle;
std::exception_ptr interrupt_exception;

void throw_if_interrupted() const;
public:
void resume() noexcept;

concurrencpp::task to_task() noexcept;
};
void set_coro_handle(coroutine_handle<void> coro_handle) noexcept;
void set_interrupt(const std::exception_ptr& interrupt) noexcept;

class await_via_context {
void throw_if_interrupted() const;
};

private:
await_context m_await_context;
await_context m_await_ctx;
std::shared_ptr<executor> m_executor;

public:
await_via_context() noexcept = default;
await_via_context(std::shared_ptr<executor> executor) noexcept;

void set_coro_handle(details::coroutine_handle<void> coro_handle) noexcept;
await_via_context(const std::shared_ptr<executor>& executor) noexcept;

void operator()() noexcept;

void resume() noexcept;

void set_coro_handle(coroutine_handle<void> coro_handle) noexcept;
void set_interrupt(const std::exception_ptr& interrupt) noexcept;

void throw_if_interrupted() const;

await_via_functor get_functor() noexcept;
};

class await_via_functor {

private:
await_via_context::await_context* m_ctx;

public:
await_via_functor(await_via_context::await_context* ctx) noexcept;
await_via_functor(await_via_functor&& rhs) noexcept;
~await_via_functor() noexcept;

void operator()() noexcept;
};

class wait_context {
Expand All @@ -61,6 +79,7 @@ namespace concurrencpp::details {

protected:
std::atomic_size_t m_counter;
std::recursive_mutex m_lock;

public:
virtual ~when_all_state_base() noexcept = default;
Expand All @@ -86,22 +105,24 @@ namespace concurrencpp::details {

public:
when_any_context(std::shared_ptr<when_any_state_base> when_any_state, size_t index) noexcept;
when_any_context(const when_any_context&) noexcept = default;

void operator()() const noexcept;
};

class consumer_context {

private:
enum class consumer_status { idle, await, await_via, wait, when_all, when_any };
enum class consumer_status { idle, await, await_via, wait, when_all, when_any, shared };

union storage {
int idle;
await_context* await_context;
coroutine_handle<void> caller_handle;
await_via_context* await_via_ctx;
std::shared_ptr<wait_context> wait_ctx;
std::shared_ptr<when_all_state_base> when_all_state;
std::shared_ptr<when_all_state_base> when_all_ctx;
when_any_context when_any_ctx;
std::weak_ptr<shared_result_state_base> shared_ctx;

template<class type, class... argument_type>
static void build(type& o, argument_type&&... arguments) noexcept {
Expand All @@ -126,18 +147,14 @@ namespace concurrencpp::details {
~consumer_context() noexcept;

void clear() noexcept;
void resume_consumer() const noexcept;

void set_await_context(await_context* await_context) noexcept;

void set_await_via_context(await_via_context* await_ctx) noexcept;

void set_await_handle(coroutine_handle<void> caller_handle) noexcept;
void set_await_via_context(await_via_context& await_ctx) noexcept;
void set_wait_context(std::shared_ptr<wait_context> wait_ctx) noexcept;

void set_when_all_context(std::shared_ptr<when_all_state_base> when_all_state) noexcept;

void set_when_any_context(std::shared_ptr<when_any_state_base> when_any_ctx, size_t index) noexcept;

void operator()() noexcept;
void set_shared_context(std::weak_ptr<shared_result_state_base> shared_result_state) noexcept;
};
} // namespace concurrencpp::details

Expand Down
33 changes: 29 additions & 4 deletions include/concurrencpp/results/impl/producer_context.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#ifndef CONCURRENCPP_PRODUCER_CONTEXT_H
#define CONCURRENCPP_PRODUCER_CONTEXT_H

#include "concurrencpp/results/result_fwd_declerations.h"

#include <optional>
#include <exception>

#include <cassert>

Expand All @@ -20,8 +19,10 @@ namespace concurrencpp::details {
}

public:
producer_context& operator=(producer_context&& rhs) noexcept = default;

template<class... argument_types>
void build_result(argument_types&&... arguments) {
void build_result(argument_types&&... arguments) noexcept(noexcept(type(std::forward<argument_types>(arguments)...))) {
assert(!m_result.has_value());
assert(!static_cast<bool>(m_exception));
m_result.emplace(std::forward<argument_types>(arguments)...);
Expand Down Expand Up @@ -57,6 +58,17 @@ namespace concurrencpp::details {
assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}

type& get_ref() {
assert_state();

if (m_result.has_value()) {
return m_result.value();
}

assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}
};

template<>
Expand All @@ -71,6 +83,8 @@ namespace concurrencpp::details {
}

public:
producer_context& operator=(producer_context&& rhs) noexcept = default;

void build_result() noexcept {
assert(!m_ready);
assert(!static_cast<bool>(m_exception));
Expand Down Expand Up @@ -107,6 +121,10 @@ namespace concurrencpp::details {
assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}

void get_ref() const {
return get();
}
};

template<class type>
Expand All @@ -121,9 +139,12 @@ namespace concurrencpp::details {
}

public:
producer_context& operator=(producer_context&& rhs) noexcept = default;

void build_result(type& reference) noexcept {
assert(m_pointer == nullptr);
assert(!static_cast<bool>(m_exception));
assert(reinterpret_cast<size_t>(std::addressof(reference)) % alignof(type) == 0);
m_pointer = std::addressof(reference);
}

Expand All @@ -147,7 +168,7 @@ namespace concurrencpp::details {
return result_status::idle;
}

type& get() {
type& get() const {
assert_state();

if (m_pointer != nullptr) {
Expand All @@ -158,6 +179,10 @@ namespace concurrencpp::details {
assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}

type& get_ref() const {
return get();
}
};
} // namespace concurrencpp::details

Expand Down
Loading

0 comments on commit f037e8c

Please sign in to comment.