Skip to content

Commit

Permalink
Version 0.1.5 (#84)
Browse files Browse the repository at this point in the history
* `async_lock` implementation
*  `when_any` data race fix
* parallel coroutine to throw `std::invalid_argument` on null executor
* tests compilation time optimization
* `result` optimizations
  • Loading branch information
David-Haim authored Sep 23, 2022
1 parent 580f430 commit 0c612ae
Show file tree
Hide file tree
Showing 40 changed files with 1,664 additions and 259 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ jobs:
cxx: clang++
tsan: NO

- name: Windows (Visual Studio Enterprise 2019)
os: windows-latest
- name: Windows (Visual Studio Enterprise 2022)
os: windows-2022
cc: cl
cxx: cl
tsan: NO
Expand Down
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.16)

project(concurrencpp
VERSION 0.1.4
VERSION 0.1.5
LANGUAGES CXX)

include(cmake/coroutineOptions.cmake)
Expand Down Expand Up @@ -29,6 +29,7 @@ set(concurrencpp_sources
source/results/impl/shared_result_state.cpp
source/results/promises.cpp
source/runtime/runtime.cpp
source/threads/async_lock.cpp
source/threads/binary_semaphore.cpp
source/threads/thread.cpp
source/timers/timer.cpp
Expand Down Expand Up @@ -71,6 +72,7 @@ set(concurrencpp_headers
include/concurrencpp/results/generator.h
include/concurrencpp/runtime/constants.h
include/concurrencpp/runtime/runtime.h
include/concurrencpp/threads/async_lock.h
include/concurrencpp/threads/binary_semaphore.h
include/concurrencpp/threads/thread.h
include/concurrencpp/threads/cache_line.h
Expand All @@ -91,6 +93,9 @@ target_compile_features(concurrencpp PUBLIC cxx_std_20)

target_coroutine_options(concurrencpp)

find_package(Threads REQUIRED)
target_link_libraries(concurrencpp PUBLIC Threads::Threads)

find_library(LIBRT NAMES rt DOC "Path to the Real Time shared library")
target_link_libraries(concurrencpp PUBLIC "$<$<BOOL:${LIBRT}>:${LIBRT}>")

Expand Down
228 changes: 219 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


# concurrencpp, the C++ concurrency library

![Latest Release](https://img.shields.io/github/v/release/David-Haim/concurrencpp.svg) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Expand Down Expand Up @@ -57,6 +55,9 @@ concurrencpp main advantages are:
* [Delay object example](#delay-object-example)
* [Generators](#generators)
* [`generator` API](#generator-api)
* [Asynchronous locks](#asynchronous-locks)
* [`async_lock` API](#async_lock-api)
* [`scoped_async_lock` API](#scoped_async_lock-api)
* [The runtime object](#the-runtime-object)
* [`runtime` API](#runtime-api)
* [Creating user-defined executors](#creating-user-defined-executors)
Expand Down Expand Up @@ -766,14 +767,16 @@ concurrencpp also provides parallel coroutines, which start to run inside a give
Every parallel coroutine must meet the following preconditions:
1. Returns any of `result` / `null_result` .
1. Gets `executor_tag` as its first argument .
1. Gets any of `type*` / `type&` / `std::shared_ptr<type>`, where `type` is a concrete class of `executor` as its second argument.
1. Contains any of `co_await` or `co_return` in its body.
2. Gets `executor_tag` as its first argument .
3. Gets any of `type*` / `type&` / `std::shared_ptr<type>`, where `type` is a concrete class of `executor` as its second argument.
4. Contains any of `co_await` or `co_return` in its body.
5. Is not a member function or a lambda function
If all the above applies, the function is a parallel coroutine:
concurrencpp will start the coroutine suspended and immediately reschedule it to run in the provided executor.
`concurrencpp::executor_tag` is a dummy placeholder to tell the concurrencpp runtime that this function is not a regular function, it needs to start running inside the given executor.
Applications can then consume the result of the parallel coroutine by using the returned result object.
If the executor passed to the parallel coroutine is null, the coroutine will not start to run and an `std::invalid_argument` exception will be thrown synchronously.
If all preconditions are met, Applications can consume the result of the parallel coroutine by using the returned result object.
#### *Parallel Fibonacci example:*
```cpp
Expand Down Expand Up @@ -1650,6 +1653,210 @@ class generator_iterator {
};
```
### Asynchronous locks
Regular synchronous locks cannot be used safely inside coroutines for a number of reasons:
- Synchronous locks, such as `std::mutex`, are expected to be locked and unlocked in the same thread of execution. Unlocking a synchronous lock in a thread which had not locked it is undefined behavior. Since tasks can be suspended and resumed in any thread of execution, synchronous locks will break when used inside tasks.
- Synchronous locks were created to work with *threads* and not with *coroutines*. If a synchronous lock is already locked by one thread, then when another thread tries to lock it, the entire thread of execution will be blocked and will be unblocked when the lock is released. This mechanism works well for traditional multi-threading paradigms but not for coroutines: with coroutines, we want *tasks* to be *suspended and resumed* without blocking or interfering with the execution of underlying threads and executors.
`concurrencpp::async_lock` solves those issues by providing a similar API to `std::mutex`, with the main difference that calls to `concurrencpp::async_lock` will return a lazy-result that can be `co_awaited` safely inside tasks. If one task tries to lock an async-lock and fails, the task will be suspended, and will be resumed when the lock is unlocked and acquired by the suspended task. This allows executors to process a huge amount of tasks waiting to acquire a lock without expensive context-switching and expensive kernel calls.
Similar to how `std::mutex` works, only one task can acquire `async_lock` at any given time, and a *read barrier* is place at the moment of acquiring. Releasing an async lock places a *write barrier* and allows the next task to acquire it, creating a chain of one-modifier at a time who sees the changes other modifiers had done and posts its modifications for the next modifiers to see.
Like `std::mutex`, `concurrencpp::async_lock` ***is not recursive***. Extra attention must be given when acquiring such lock - A lock must not be acquired again in a task that has been spawned by another task which had already acquired the lock. In such case, an unavoidable dead-lock will occur. Unlike other objects in concurrencpp, `async_lock` is neither copiable nor movable.
Like standard locks, `concurrencpp::async_lock` is meant to be used with scoped wrappers which leverage C++ RAII idiom to ensure locks are always unlocked upon function return or thrown exception. `async_lock::lock` returns a lazy-result of a scoped wrapper that calls `async_lock::unlock` on destruction. Raw uses of `async_lock::unlock` are discouraged. `concurrencpp::scoped_async_lock` acts as the scoped wrapper and provides an API which is almost identical to `std::unique_lock`. `concurrencpp::scoped_async_lock` is movable, but not copiable.
`async_lock::lock` and `scoped_async_lock::lock` require a resume-executor as their parameter. Upon calling those methods, if the lock is available for locking, then it is locked and the current task is resumed immediately. If not, then the current task is suspended, and will be resumed inside the given resume-executor when the lock is finally acquired by it.
`concurrencpp::scoped_async_lock` wraps an `async_lock` and ensure it's properly unlocked. like `std::unique_lock`, there are cases it does not wrap any lock, and in this case it's considered to be empty. An empty `scoped_async_lock` can happen when it's defaultly constructed, moved, or `scoped_async_lock::release` method is called. An empty scoped-async-lock will not unlock any lock on destruction.
Even if the scoped-async-lock is not empty, it does not mean that it owns the underlying async-lock and it will unlock it on destruction. Non-empty and non-owning scoped-async locks can happen if `scoped_async_lock::unlock` was called or the scoped-async-lock was constructed using `scoped_async_lock(async_lock&, std::defer_lock_t)` constructor.
#### `async_lock` *example:*
```cpp
#include "concurrencpp/concurrencpp.h"
#include <vector>
#include <iostream>
std::vector<size_t> numbers;
concurrencpp::async_lock lock;
concurrencpp::result<void> add_numbers(concurrencpp::executor_tag,
std::shared_ptr<concurrencpp::executor> executor,
size_t begin,
size_t end) {
for (auto i = begin; i < end; i++) {
concurrencpp::scoped_async_lock raii_wrapper = co_await lock.lock(executor);
numbers.push_back(i);
}
}
int main() {
concurrencpp::runtime runtime;
constexpr size_t range = 10'000'000;
constexpr size_t sections = 4;
concurrencpp::result<void> results[sections];
for (size_t i = 0; i < 4; i++) {
const auto range_start = i * range / sections;
const auto range_end = (i + 1) * range / sections;
results[i] = add_numbers({}, runtime.thread_pool_executor(), range_start, range_end);
}
for (auto& result : results) {
result.get();
}
std::cout << "vector size is " << numbers.size() << std::endl;
// make sure the vector state has not been corrupted by unprotected concurrent accesses
std::sort(numbers.begin(), numbers.end());
for (size_t i = 0; i < range; i++) {
if (numbers[i] != i) {
std::cerr << "vector state is corrupted." << std::endl;
return -1;
}
}
std::cout << "succeeded pushing range [0 - 10,000,000] concurrently to the vector!" << std::endl;
return 0;
}
```
#### `async_lock` API
```cpp
class async_lock {
/*
Constructs an async lock object.
*/
async_lock() noexcept;

/*
Destructs an async lock object.
*this is not automatically unlocked at the moment of destruction.
*/
~async_lock() noexcept;

/*
Asynchronously acquires the async lock.
If *this has already been locked by another non-parent task, the current task will be suspended
and will be resumed when *this is acquired, inside resume_executor.
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
immediately in the calling thread of execution.
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
Throws std::invalid_argument if resume_executor is null.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<scoped_async_lock> lock(std::shared_ptr<executor> resume_executor);

/*
Tries to acquire *this in the calling thread of execution.
Returns true if *this is acquired, false otherwise.
In any case, the current task is resumed immediately in the calling thread of execution.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
lazy_result<bool> try_lock();

/*
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
Throws std::system error if *this is not locked at the moment of calling this method.
Throws std::system error if one of the underlying synhchronization primitives throws.
*/
void unlock();
};
```
#### `scoped_async_lock` API
```cpp
class scoped_async_lock {
/*
Constructs an async lock wrapper that does not wrap any async lock.
*/
scoped_async_lock() noexcept = default;
/*
If *this wraps async_lock, this method releases the wrapped lock.
*/
~scoped_async_lock() noexcept;
/*
Moves rhs to *this.
After this call, *rhs does not wrap any async lock.
*/
scoped_async_lock(scoped_async_lock&& rhs) noexcept;
/*
Wrapps unlocked lock.
lock must not be in acquired mode when calling this method.
*/
scoped_async_lock(async_lock& lock, std::defer_lock_t) noexcept;
/*
Wrapps locked lock.
lock must be already acquired when calling this method.
*/
scoped_async_lock(async_lock& lock, std::adopt_lock_t) noexcept;
/*
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
Throws std::invalid_argument if resume_executor is nulll.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::lock throws.
*/
lazy_result<void> lock(std::shared_ptr<executor> resume_executor);
/*
Calls async_lock::try_lock on the wrapped lock.
Throws std::system_error if *this does not wrap any lock.
Throws std::system_error if wrapped lock is already locked.
Throws any exception async_lock::try_lock throws.
*/
lazy_result<bool> try_lock();
/*
Calls async_lock::unlock on the wrapped lock.
If *this does not wrap any lock, this method does nothing.
Throws std::system_error if *this wraps a lock and it is not locked.
*/
void unlock();
/*
Checks whether *this wraps a locked mutex or not.
Returns true if wrapped locked is in acquired state, false otherwise.
*/
bool owns_lock() const noexcept;
/*
Equivalent to owns_lock.
*/
explicit operator bool() const noexcept;
/*
Swaps the contents of *this and rhs.
*/
void swap(scoped_async_lock& rhs) noexcept;
/*
Empties *this and returns a pointer to the previously wrapped lock.
After a call to this method, *this doesn't wrap any lock.
The previously wrapped lock is not released,
it must be released by either unlocking it manually through the returned pointer or by
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
*/
async_lock* release() noexcept;
/*
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
*/
async_lock* mutex() const noexcept;
};
```

### The runtime object

The concurrencpp runtime object is the agent used to acquire, store and create new executors.
Expand Down Expand Up @@ -1988,14 +2195,17 @@ $ cd build/test
$ ctest . -V
```

##### Via vcpkg on Windows and *nix platforms
##### Via package managers on Windows and *nix platforms

Alternatively to building and installing the library manually, developers may get stable releases of concurrencpp as [vcpkg](https://vcpkg.io/) packages:
Alternatively to building and installing the library manually, developers may get stable releases of concurrencpp via the [vcpkg](https://vcpkg.io/) and [Conan](https://conan.io/) package managers:

vcpkg:
```shell
$ vcpkg install concurrencpp
```

Conan: [concurrencpp on ConanCenter](https://conan.io/center/concurrencpp)

##### Experimenting with the built-in sandbox
concurrencpp comes with a built-in sandbox program which developers can modify and experiment, without having to install or link the compiled library to a different code-base. In order to play with the sandbox, developers can modify `sandbox/main.cpp` and compile the application using the following commands:

Expand All @@ -2012,4 +2222,4 @@ $ cmake -S sandbox -B build/sandbox
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
$ cmake --build build/sandbox
$ ./build/sandbox #runs the sandbox
```
```
3 changes: 3 additions & 0 deletions cmake/concurrencppConfig.cmake
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
include(CMakeFindDependencyMacro)
find_dependency(Threads)

include("${CMAKE_CURRENT_LIST_DIR}/concurrencppTargets.cmake")
8 changes: 1 addition & 7 deletions cmake/coroutineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,7 @@
function(target_coroutine_options TARGET)
if(MSVC)
target_compile_options(${TARGET} PUBLIC /std:c++latest /permissive-)
return()
endif()

find_package(Threads REQUIRED)
target_link_libraries(${TARGET} PRIVATE Threads::Threads)

if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
target_compile_options(${TARGET} PUBLIC -stdlib=libc++ -fcoroutines-ts)
target_link_options(${TARGET} PUBLIC -stdlib=libc++)
set_target_properties(${TARGET} PROPERTIES CXX_EXTENSIONS NO)
Expand Down
2 changes: 1 addition & 1 deletion cmake/setCiVars.cmake
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
if (os MATCHES "^windows")
execute_process(
COMMAND "C:/Program Files (x86)/Microsoft Visual Studio/2019/Enterprise/VC/Auxiliary/Build/vcvars64.bat" && set
COMMAND "C:/Program Files/Microsoft Visual Studio/2022/Enterprise/VC/Auxiliary/Build/vcvars64.bat" && set
OUTPUT_FILE environment_script_output.txt
)
file(STRINGS environment_script_output.txt output_lines)
Expand Down
1 change: 1 addition & 0 deletions include/concurrencpp/concurrencpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
#include "concurrencpp/results/resume_on.h"
#include "concurrencpp/results/generator.h"
#include "concurrencpp/executors/executor_all.h"
#include "concurrencpp/threads/async_lock.h"

#endif
25 changes: 10 additions & 15 deletions include/concurrencpp/coroutines/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,23 @@

#include "../platform_defs.h"

#ifdef CRCPP_MSVC_COMPILER
#if !__has_include(<coroutine>) && __has_include(<experimental/coroutine>)

# include <coroutine>
# include <experimental/coroutine>
# define CRCPP_COROUTINE_NAMESPACE std::experimental

namespace concurrencpp::details {
template<class promise_type>
using coroutine_handle = std::coroutine_handle<promise_type>;
using suspend_never = std::suspend_never;
using suspend_always = std::suspend_always;
} // namespace concurrencpp::details
#else

#elif defined(CRCPP_CLANG_COMPILER)
# include <coroutine>
# define CRCPP_COROUTINE_NAMESPACE std

# include <experimental/coroutine>
#endif

namespace concurrencpp::details {
template<class promise_type>
using coroutine_handle = std::experimental::coroutine_handle<promise_type>;
using suspend_never = std::experimental::suspend_never;
using suspend_always = std::experimental::suspend_always;
using coroutine_handle = CRCPP_COROUTINE_NAMESPACE::coroutine_handle<promise_type>;
using suspend_never = CRCPP_COROUTINE_NAMESPACE::suspend_never;
using suspend_always = CRCPP_COROUTINE_NAMESPACE::suspend_always;
} // namespace concurrencpp::details

#endif

#endif
Loading

0 comments on commit 0c612ae

Please sign in to comment.