Skip to content

Commit

Permalink
Version 0.1.4 (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Haim authored Dec 20, 2021
1 parent 93890e3 commit 580f430
Show file tree
Hide file tree
Showing 55 changed files with 2,077 additions and 451 deletions.
17 changes: 9 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ jobs:
strategy:
matrix:
conf:
- name: Ubuntu (Clang 11 - TSAN)
- name: Ubuntu (Clang 12 - TSAN)
os: ubuntu-20.04
cc: clang-11
cxx: clang++-11
cc: clang-12
cxx: clang++-12
tsan: YES

- name: Ubuntu (Clang 11 - no TSAN)
- name: Ubuntu (Clang 12 - no TSAN)
os: ubuntu-20.04
cc: clang-11
cxx: clang++-11
cc: clang-12
cxx: clang++-12
tsan: NO

- name: macOS (Clang 11 - no TSAN)
Expand Down Expand Up @@ -79,10 +79,11 @@ jobs:
"${{ steps.tools.outputs.ninja }}"
${{ steps.cores.outputs.plus_one }}]==])
- name: Install clang 11
- name: Install clang 12
working-directory: ${{ env.HOME }}
run: |
sudo apt-get update
sudo apt-get install clang-11 libc++-11-dev libc++abi-11-dev
sudo apt-get install clang-12 libc++-12-dev libc++abi-12-dev
if: ${{ startsWith(matrix.conf.os, 'ubuntu') }}

- name: Build examples
Expand Down
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.16)

project(concurrencpp
VERSION 0.1.3
VERSION 0.1.4
LANGUAGES CXX)

include(cmake/coroutineOptions.cmake)
Expand Down Expand Up @@ -55,6 +55,7 @@ set(concurrencpp_headers
include/concurrencpp/results/impl/result_state.h
include/concurrencpp/results/impl/shared_result_state.h
include/concurrencpp/results/impl/lazy_result_state.h
include/concurrencpp/results/impl/generator_state.h
include/concurrencpp/results/constants.h
include/concurrencpp/results/make_result.h
include/concurrencpp/results/promises.h
Expand All @@ -67,10 +68,12 @@ set(concurrencpp_headers
include/concurrencpp/results/result_fwd_declarations.h
include/concurrencpp/results/when_result.h
include/concurrencpp/results/resume_on.h
include/concurrencpp/results/generator.h
include/concurrencpp/runtime/constants.h
include/concurrencpp/runtime/runtime.h
include/concurrencpp/threads/binary_semaphore.h
include/concurrencpp/threads/thread.h
include/concurrencpp/threads/cache_line.h
include/concurrencpp/timers/constants.h
include/concurrencpp/timers/timer.h
include/concurrencpp/timers/timer_queue.h
Expand Down
629 changes: 459 additions & 170 deletions README.md

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions example/13_generator/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(13_generator LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(13_generator source/main.cpp)

target_compile_features(13_generator PRIVATE cxx_std_20)

target_link_libraries(13_generator PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(13_generator)
54 changes: 54 additions & 0 deletions example/13_generator/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <fstream>
#include <iostream>
#include <filesystem>

#include "concurrencpp/concurrencpp.h"

concurrencpp::generator<std::string_view> read_lines(std::string_view text) {
std::string_view::size_type pos = 0;
std::string_view::size_type prev = 0;

while ((pos = text.find('\n', prev)) != std::string::npos) {
co_yield text.substr(prev, pos - prev);
prev = pos + 1;
}

co_yield text.substr(prev);
}

concurrencpp::result<void> read_file_lines(const std::filesystem::path& path,
std::shared_ptr<concurrencpp::thread_pool_executor> background_executor,
std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor) {
// make sure we don't block in a thread that is used for cpu-processing
co_await concurrencpp::resume_on(background_executor);

std::ifstream stream(path.c_str(), std::ios::binary | std::ios::in);
std::string file_content(std::istreambuf_iterator<char>(stream), {});

// make sure we don't process cpu-bound tasks on the background executor
co_await concurrencpp::resume_on(thread_pool_executor);

for (const auto& line : read_lines(file_content)) {
std::cout << "read a new line. size: " << line.size() << std::endl;
std::cout << "line: " << std::endl;
std::cout << line;
std::cout << "\n==============" << std::endl;
}
}

int main(const int argc, const char* argv[]) {
if (argc < 2) {
const auto help_msg = "please pass all necessary arguments\n argv[1] - the file to be read\n";
std::cerr << help_msg << std::endl;
return -1;
}

const auto file_path = std::string(argv[1]);

concurrencpp::runtime runtime;
const auto thread_pool_executor = runtime.thread_pool_executor();
const auto background_executor = runtime.background_executor();

read_file_lines(file_path, thread_pool_executor, background_executor).get();
return 0;
}
2 changes: 1 addition & 1 deletion example/5_prime_number_finder/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ concurrencpp::result<std::vector<int>> find_prime_numbers(std::shared_ptr<concur
found_primes_in_range.emplace_back(std::move(result));
}

auto all_done = co_await concurrencpp::when_all(found_primes_in_range.begin(), found_primes_in_range.end());
auto all_done = co_await concurrencpp::when_all(executor, found_primes_in_range.begin(), found_primes_in_range.end());

std::vector<int> found_primes;
for (auto& done_result : all_done) {
Expand Down
8 changes: 5 additions & 3 deletions example/7_when_all/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ int example_job(int task_num, int dummy_value, int sleeping_time_ms) {
return dummy_value;
}

concurrencpp::result<void> consume_all_tasks(std::vector<concurrencpp::result<int>> results) {
auto all_done = co_await concurrencpp::when_all(results.begin(), results.end());
concurrencpp::result<void> consume_all_tasks(std::shared_ptr<concurrencpp::thread_pool_executor> resume_executor,
std::vector<concurrencpp::result<int>> results) {
auto all_done = co_await concurrencpp::when_all(resume_executor, results.begin(), results.end());

for (auto& done_result : all_done) {
std::cout << co_await done_result << std::endl;
Expand All @@ -32,6 +33,7 @@ concurrencpp::result<void> consume_all_tasks(std::vector<concurrencpp::result<in
int main(int argc, const char* argv[]) {
concurrencpp::runtime runtime;
auto background_executor = runtime.background_executor();
auto thread_pool_executor = runtime.thread_pool_executor();
std::vector<concurrencpp::result<int>> results;

std::srand(static_cast<unsigned>(std::time(nullptr)));
Expand All @@ -41,6 +43,6 @@ int main(int argc, const char* argv[]) {
results.emplace_back(background_executor->submit(example_job, i, i * 15, sleeping_time_ms));
}

consume_all_tasks(std::move(results)).get();
consume_all_tasks(thread_pool_executor, std::move(results)).get();
return 0;
}
8 changes: 5 additions & 3 deletions example/8_when_any/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ int example_job(int task_num, int dummy_value, int sleeping_time_ms) {
return dummy_value;
}

concurrencpp::result<void> consume_tasks_as_they_finish(std::vector<concurrencpp::result<int>> results) {
concurrencpp::result<void> consume_tasks_as_they_finish(std::shared_ptr<concurrencpp::thread_pool_executor> resume_executor,
std::vector<concurrencpp::result<int>> results) {
while (!results.empty()) {
auto when_any = co_await concurrencpp::when_any(results.begin(), results.end());
auto when_any = co_await concurrencpp::when_any(resume_executor, results.begin(), results.end());
auto finished_task = std::move(when_any.results[when_any.index]);

const auto done_value = co_await finished_task;
Expand All @@ -39,6 +40,7 @@ concurrencpp::result<void> consume_tasks_as_they_finish(std::vector<concurrencpp
int main(int argc, const char* argv[]) {
concurrencpp::runtime runtime;
auto background_executor = runtime.background_executor();
auto thread_pool_executor = runtime.thread_pool_executor();
std::vector<concurrencpp::result<int>> results;

std::srand(static_cast<unsigned>(std::time(nullptr)));
Expand All @@ -48,6 +50,6 @@ int main(int argc, const char* argv[]) {
results.emplace_back(background_executor->submit(example_job, i, i * 15, sleeping_time_ms));
}

consume_tasks_as_they_finish(std::move(results)).get();
consume_tasks_as_they_finish(thread_pool_executor, std::move(results)).get();
return 0;
}
1 change: 1 addition & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ foreach(example IN ITEMS
10_regular_timer
11_oneshot_timer
12_delay_object
13_generator
)
add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/${example}"
"${CMAKE_CURRENT_BINARY_DIR}/${example}")
Expand Down
1 change: 1 addition & 0 deletions include/concurrencpp/concurrencpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "concurrencpp/results/shared_result_awaitable.h"
#include "concurrencpp/results/promises.h"
#include "concurrencpp/results/resume_on.h"
#include "concurrencpp/results/generator.h"
#include "concurrencpp/executors/executor_all.h"

#endif
4 changes: 4 additions & 0 deletions include/concurrencpp/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ namespace concurrencpp::errors {
using empty_object::empty_object;
};

struct empty_generator : public empty_object {
using empty_object::empty_object;
};

struct broken_task : public std::runtime_error {
using runtime_error::runtime_error;
};
Expand Down
4 changes: 2 additions & 2 deletions include/concurrencpp/executors/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ namespace concurrencpp {

virtual int max_concurrency_level() const noexcept = 0;

virtual bool shutdown_requested() const noexcept = 0;
virtual void shutdown() noexcept = 0;
virtual bool shutdown_requested() const = 0;
virtual void shutdown() = 0;

template<class callable_type, class... argument_types>
void post(callable_type&& callable, argument_types&&... arguments) {
Expand Down
4 changes: 2 additions & 2 deletions include/concurrencpp/executors/inline_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ namespace concurrencpp {
return details::consts::k_inline_executor_max_concurrency_level;
}

void shutdown() noexcept override {
void shutdown() override {
m_abort.store(true, std::memory_order_relaxed);
}

bool shutdown_requested() const noexcept override {
bool shutdown_requested() const override {
return m_abort.load(std::memory_order_relaxed);
}
};
Expand Down
15 changes: 8 additions & 7 deletions include/concurrencpp/executors/manual_executor.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#ifndef CONCURRENCPP_MANUAL_EXECUTOR_H
#define CONCURRENCPP_MANUAL_EXECUTOR_H

#include "concurrencpp/threads/cache_line.h"
#include "concurrencpp/executors/derivable_executor.h"

#include <deque>
#include <chrono>

namespace concurrencpp {
class alignas(64) manual_executor final : public derivable_executor<manual_executor> {
class alignas(CRCPP_CACHE_LINE_ALIGNMENT) manual_executor final : public derivable_executor<manual_executor> {

private:
mutable std::mutex m_lock;
Expand All @@ -18,13 +19,13 @@ namespace concurrencpp {

template<class clock_type, class duration_type>
static std::chrono::system_clock::time_point to_system_time_point(
std::chrono::time_point<clock_type, duration_type> time_point) {
std::chrono::time_point<clock_type, duration_type> time_point) noexcept(noexcept(clock_type::now())) {
const auto src_now = clock_type::now();
const auto dst_now = std::chrono::system_clock::now();
return dst_now + std::chrono::duration_cast<std::chrono::milliseconds>(time_point - src_now);
}

static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms) {
static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms) noexcept {
return std::chrono::system_clock::now() + ms;
}

Expand All @@ -42,11 +43,11 @@ namespace concurrencpp {

int max_concurrency_level() const noexcept override;

void shutdown() noexcept override;
bool shutdown_requested() const noexcept override;
void shutdown() override;
bool shutdown_requested() const override;

size_t size() const noexcept;
bool empty() const noexcept;
size_t size() const;
bool empty() const;

size_t clear();

Expand Down
7 changes: 4 additions & 3 deletions include/concurrencpp/executors/thread_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define CONCURRENCPP_THREAD_EXECUTOR_H

#include "concurrencpp/threads/thread.h"
#include "concurrencpp/threads/cache_line.h"
#include "concurrencpp/executors/derivable_executor.h"

#include <list>
Expand All @@ -10,7 +11,7 @@
#include <condition_variable>

namespace concurrencpp {
class alignas(64) thread_executor final : public derivable_executor<thread_executor> {
class alignas(CRCPP_CACHE_LINE_ALIGNMENT) thread_executor final : public derivable_executor<thread_executor> {

private:
std::mutex m_lock;
Expand All @@ -32,8 +33,8 @@ namespace concurrencpp {

int max_concurrency_level() const noexcept override;

bool shutdown_requested() const noexcept override;
void shutdown() noexcept override;
bool shutdown_requested() const override;
void shutdown() override;
};
} // namespace concurrencpp

Expand Down
Loading

0 comments on commit 580f430

Please sign in to comment.