Skip to content

Commit

Permalink
Version 0.1.2 (#46)
Browse files Browse the repository at this point in the history
*  deprecation of await_via and resume_via
*  addition of lazy_result
*  addition of resume_on
*  when_all + when_any rewrite
*  shared_result optimizations
*  examples rewrite

Co-authored-by: Zakhar Karlin <[email protected]>
  • Loading branch information
David-Haim and Ladence authored Aug 27, 2021
1 parent c9ae7d0 commit cbfc4a1
Show file tree
Hide file tree
Showing 81 changed files with 2,650 additions and 3,896 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.16)

project(concurrencpp
VERSION 0.1.1
VERSION 0.1.2
LANGUAGES CXX)

include(cmake/coroutineOptions.cmake)
Expand Down Expand Up @@ -52,15 +52,19 @@ set(concurrencpp_headers
include/concurrencpp/results/impl/producer_context.h
include/concurrencpp/results/impl/result_state.h
include/concurrencpp/results/impl/shared_result_state.h
include/concurrencpp/results/impl/lazy_result_state.h
include/concurrencpp/results/constants.h
include/concurrencpp/results/make_result.h
include/concurrencpp/results/promises.h
include/concurrencpp/results/result.h
include/concurrencpp/results/lazy_result.h
include/concurrencpp/results/lazy_result_awaitable.h
include/concurrencpp/results/shared_result.h
include/concurrencpp/results/result_awaitable.h
include/concurrencpp/results/shared_result_awaitable.h
include/concurrencpp/results/result_fwd_declerations.h
include/concurrencpp/results/when_result.h
include/concurrencpp/results/resume_on.h
include/concurrencpp/runtime/constants.h
include/concurrencpp/runtime/runtime.h
include/concurrencpp/threads/thread.h
Expand Down
277 changes: 161 additions & 116 deletions README.md

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions example/10_regular_timer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(10_regular_timer LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(10_regular_timer source/main.cpp)

target_compile_features(10_regular_timer PRIVATE cxx_std_20)

target_link_libraries(10_regular_timer PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(10_regular_timer)
27 changes: 27 additions & 0 deletions example/10_regular_timer/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "concurrencpp/concurrencpp.h"

#include <iostream>

using namespace std::chrono_literals;

int main() {
concurrencpp::runtime runtime;
std::atomic_size_t counter = 1;
concurrencpp::timer timer = runtime.timer_queue()->make_timer(1500ms, 2000ms, runtime.thread_pool_executor(), [&] {
const auto c = counter.fetch_add(1);
std::cout << "timer was invoked for the " << c << "th time" << std::endl;
});

std::cout << "timer due time (ms): " << timer.get_due_time().count() << std::endl;
std::cout << "timer frequency (ms): " << timer.get_frequency().count() << std::endl;
std::cout << "timer-associated executor : " << timer.get_executor()->name << std::endl;

std::this_thread::sleep_for(20s);

std::cout << "main thread cancelling timer" << std::endl;
timer.cancel();

std::this_thread::sleep_for(10s);

return 0;
}
17 changes: 17 additions & 0 deletions example/11_oneshot_timer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(11_oneshot_time LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(11_oneshot_time source/main.cpp)

target_compile_features(11_oneshot_time PRIVATE cxx_std_20)

target_link_libraries(11_oneshot_time PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(11_oneshot_time)
19 changes: 19 additions & 0 deletions example/11_oneshot_timer/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "concurrencpp/concurrencpp.h"

#include <iostream>

using namespace std::chrono_literals;

int main() {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime.timer_queue()->make_one_shot_timer(3s, runtime.thread_executor(), [] {
std::cout << "hello and goodbye" << std::endl;
});

std::cout << "timer due time (ms): " << timer.get_due_time().count() << std::endl;
std::cout << "timer frequency (ms): " << timer.get_frequency().count() << std::endl;
std::cout << "timer-associated executor : " << timer.get_executor()->name << std::endl;

std::this_thread::sleep_for(4s);
return 0;
}
17 changes: 17 additions & 0 deletions example/12_delay_object/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(12_delay_object LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(12_delay_object source/main.cpp)

target_compile_features(12_delay_object PRIVATE cxx_std_20)

target_link_libraries(12_delay_object PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(12_delay_object)
25 changes: 25 additions & 0 deletions example/12_delay_object/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include <iostream>

#include "concurrencpp/concurrencpp.h"

using namespace std::chrono_literals;

concurrencpp::null_result delayed_task(std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
size_t counter = 1;

while (true) {
std::cout << "task was invoked " << counter << " times." << std::endl;
counter++;

co_await tq->make_delay_object(1500ms, ex);
}
}

int main() {
concurrencpp::runtime runtime;
delayed_task(runtime.timer_queue(), runtime.thread_pool_executor());

std::this_thread::sleep_for(10s);
return 0;
}
17 changes: 17 additions & 0 deletions example/1_hello_world/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(1_hello_world LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(1_hello_world source/main.cpp)

target_compile_features(1_hello_world PRIVATE cxx_std_20)

target_link_libraries(1_hello_world PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(1_hello_world)
12 changes: 12 additions & 0 deletions example/1_hello_world/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include "concurrencpp/concurrencpp.h"
#include <iostream>

int main() {
concurrencpp::runtime runtime;
auto result = runtime.thread_executor()->submit([] {
std::cout << "hello world" << std::endl;
});

result.get();
return 0;
}
17 changes: 17 additions & 0 deletions example/2_concurrent_even_number_counting/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(2_concurrent_even_number_counting LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(2_concurrent_even_number_counting source/main.cpp)

target_compile_features(2_concurrent_even_number_counting PRIVATE cxx_std_20)

target_link_libraries(2_concurrent_even_number_counting PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(2_concurrent_even_number_counting)
57 changes: 57 additions & 0 deletions example/2_concurrent_even_number_counting/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "concurrencpp/concurrencpp.h"

#include <iostream>
#include <vector>
#include <algorithm>

#include <ctime>

using namespace concurrencpp;

std::vector<int> make_random_vector() {
std::vector<int> vec(64 * 1'024);

std::srand(std::time(nullptr));
for (auto& i : vec) {
i = ::rand();
}

return vec;
}

result<size_t> count_even(std::shared_ptr<thread_pool_executor> tpe, const std::vector<int>& vector) {
const auto vecor_size = vector.size();
const auto concurrency_level = tpe->max_concurrency_level();
const auto chunk_size = vecor_size / concurrency_level;

std::vector<result<size_t>> chunk_count;

for (auto i = 0; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe->submit([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if(vector.begin() + chunk_begin, vector.begin() + chunk_end, [](auto i) {
return i % 2 == 0;
});
});

chunk_count.emplace_back(std::move(result));
}

size_t total_count = 0;

for (auto& result : chunk_count) {
total_count += co_await result;
}

co_return total_count;
}

int main() {
concurrencpp::runtime runtime;
const auto vector = make_random_vector();
auto result = count_even(runtime.thread_pool_executor(), vector);
const auto total_count = result.get();
std::cout << "there are " << total_count << " even numbers in the vector" << std::endl;
return 0;
}
17 changes: 17 additions & 0 deletions example/3_async_file_processing/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.16)

project(3_async_file_processing LANGUAGES CXX)

include(FetchContent)
FetchContent_Declare(concurrencpp SOURCE_DIR "${CMAKE_CURRENT_LIST_DIR}/../..")
FetchContent_MakeAvailable(concurrencpp)

include(../../cmake/coroutineOptions.cmake)

add_executable(3_async_file_processing source/main.cpp)

target_compile_features(3_async_file_processing PRIVATE cxx_std_20)

target_link_libraries(3_async_file_processing PRIVATE concurrencpp::concurrencpp)

target_coroutine_options(3_async_file_processing)
96 changes: 96 additions & 0 deletions example/3_async_file_processing/source/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
In this example, we will use concurrencpp executors to process a file asynchronously.
This application will iterate the file characters (we assume
ASCII) and replace a character with another. The application gets three
parameters through the command line arguments:
argv[0] - the path to the binary that created this process (not used in this example)
argv[1] - a file path
argv[2] - the character to replace
argv[3] - the character to replace with.
Since standard file streams are blocking, we would like to execute
file-io operations using the background_executor, which its job is to execute
relatively short-blocking tasks (like file-io).
Processing the file content is a cpu-bound task (iterating over a binary
buffer and potentially changing characters), so after reading the file we
will resume execution in the thread_pool_executor,
After the content has been modified, it is ready to be re-written back to
the file. we will again schedule a blocking write operation to the
background_executor.
*/

#include <iostream>
#include <vector>
#include <fstream>

#include "concurrencpp/concurrencpp.h"

concurrencpp::result<void> replace_chars_in_file(std::shared_ptr<concurrencpp::thread_pool_executor> background_executor,
std::shared_ptr<concurrencpp::thread_pool_executor> threadpool_executor,
const std::string file_path,
char from,
char to) {

// tell the background executor to read a whole file to a buffer and return it
auto file_content = co_await background_executor->submit([file_path] {
std::ifstream input;
input.exceptions(std::ifstream::badbit);
input.open(file_path, std::ios::binary);
std::vector<char> buffer(std::istreambuf_iterator<char>(input), {});
return buffer;
});

// tell the threadpool executor to process the file
auto processed_file_content = co_await threadpool_executor->submit([file_content = std::move(file_content), from, to]() mutable {
for (auto& c : file_content) {
if (c == from) {
c = to;
}
}

return std::move(file_content);
});

// schedule the write operation on the background executor and await it to finish.
co_await background_executor->submit([file_path, file_content = std::move(processed_file_content)] {
std::ofstream output;
output.exceptions(std::ofstream::badbit);
output.open(file_path, std::ios::binary);
output.write(file_content.data(), file_content.size());
});

std::cout << "file has been modified successfully" << std::endl;
}

int main(int argc, const char* argv[]) {
if (argc < 4) {
const auto help_msg = "please pass all necessary arguments\n\
argv[1] - the file to process\n\
argv[2] - the character to replace\n\
argv[3] - the character to replace with";

std::cerr << help_msg << std::endl;
return -1;
}

if (std::strlen(argv[2]) != 1 || std::strlen(argv[3]) != 1) {
std::cerr << "argv[2] and argv[3] must be one character only" << std::endl;
return -1;
}

const auto file_path = std::string(argv[1]);
const auto from_char = argv[2][0];
const auto to_char = argv[3][0];

concurrencpp::runtime runtime;

try {
replace_chars_in_file(runtime.background_executor(), runtime.thread_pool_executor(), file_path, from_char, to_char).get();
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}

return 0;
}
Loading

0 comments on commit cbfc4a1

Please sign in to comment.