Skip to content

Commit

Permalink
atomic_wait improvements (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
David-Haim authored Apr 21, 2024
1 parent 33c4da9 commit e4b2cf4
Show file tree
Hide file tree
Showing 14 changed files with 587 additions and 89 deletions.
7 changes: 5 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ set(concurrencpp_sources
source/threads/async_condition_variable.cpp
source/threads/thread.cpp
source/timers/timer.cpp
source/timers/timer_queue.cpp)
source/timers/timer_queue.cpp
source/utils/math_helper.cpp)

set(concurrencpp_headers
include/concurrencpp/concurrencpp.h
Expand Down Expand Up @@ -81,7 +82,9 @@ set(concurrencpp_headers
include/concurrencpp/timers/timer.h
include/concurrencpp/timers/timer_queue.h
include/concurrencpp/utils/bind.h
include/concurrencpp/utils/slist.h)
include/concurrencpp/utils/math_helper.h
include/concurrencpp/utils/slist.h
include/concurrencpp/utils/dlist.h)

add_library(concurrencpp ${concurrencpp_headers} ${concurrencpp_sources})
add_library(concurrencpp::concurrencpp ALIAS concurrencpp)
Expand Down
6 changes: 4 additions & 2 deletions include/concurrencpp/results/impl/result_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace concurrencpp::details {
class CRCPP_API result_state_base {

public:
enum class pc_status : int32_t { idle, consumer_set, consumer_waiting, consumer_done, producer_done };
enum class pc_status : uint32_t { idle, consumer_set, consumer_waiting, consumer_done, producer_done };

protected:
std::atomic<pc_status> m_pc_status {pc_status::idle};
Expand All @@ -42,6 +42,8 @@ namespace concurrencpp::details {
producer_context<type> m_producer;

static void delete_self(result_state<type>* state) noexcept {
assert(state != nullptr);

auto done_handle = state->m_done_handle;
if (static_cast<bool>(done_handle)) {
assert(done_handle.done());
Expand Down Expand Up @@ -151,7 +153,7 @@ namespace concurrencpp::details {
}

case pc_status::consumer_waiting: {
return atomic_notify_all(m_pc_status);
return atomic_notify_one(m_pc_status);
}

case pc_status::consumer_done: {
Expand Down
1 change: 0 additions & 1 deletion include/concurrencpp/results/impl/shared_result_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "concurrencpp/results/impl/result_state.h"

#include <atomic>
#include <semaphore>

#include <cassert>

Expand Down
2 changes: 1 addition & 1 deletion include/concurrencpp/results/result_fwd_declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace concurrencpp {

struct null_result {};

enum class result_status : int32_t { idle, value, exception };
enum class result_status : uint32_t { idle, value, exception };
} // namespace concurrencpp

namespace concurrencpp::details {
Expand Down
167 changes: 122 additions & 45 deletions include/concurrencpp/threads/atomic_wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,61 @@

#include "concurrencpp/platform_defs.h"

#include <thread>
#include <atomic>
#include <chrono>
#include <type_traits>

#include <cassert>

namespace concurrencpp::details {
void CRCPP_API atomic_wait_native(void* atom, int32_t old) noexcept;
void CRCPP_API atomic_wait_for_native(void* atom, int32_t old, std::chrono::milliseconds ms) noexcept;
void CRCPP_API atomic_notify_all_native(void* atom) noexcept;

enum class atomic_wait_status { ok, timeout };

template<class type>
void atomic_wait(std::atomic<type>& atom, type old, std::memory_order order) noexcept;

template<class type>
atomic_wait_status atomic_wait_for(std::atomic<type>& atom,
type old,
std::chrono::milliseconds ms,
std::memory_order order) noexcept;

template<class type>
void atomic_notify_one(std::atomic<type>& atom) noexcept;

template<class type>
void atomic_notify_all(std::atomic<type>& atom) noexcept;

template<class type>
void assert_atomic_type_waitable() noexcept {
static_assert(std::is_integral_v<type> || std::is_enum_v<type>,
"atomic_wait/atomic_notify - <<type>> must be integeral or enumeration type");
static_assert(sizeof(type) == sizeof(uint32_t), "atomic_wait/atomic_notify - <<type>> must be 4 bytes.");
static_assert(std::is_standard_layout_v<std::atomic<type>>,
"atomic_wait/atomic_notify - std::atom<type> is not standard-layout");
static_assert(std::atomic<type>::is_always_lock_free, "atomic_wait/atomic_notify - std::atom<type> is not lock free");
}
} // namespace concurrencpp::details

#if !defined(CRCPP_MAC_OS)

namespace concurrencpp::details {
void CRCPP_API atomic_wait_native(void* atom, uint32_t old) noexcept;
void CRCPP_API atomic_wait_for_native(void* atom, uint32_t old, std::chrono::milliseconds ms) noexcept;
void CRCPP_API atomic_notify_one_native(void* atom) noexcept;
void CRCPP_API atomic_notify_all_native(void* atom) noexcept;

template<class type>
void atomic_wait(std::atomic<type>& atom, type old, std::memory_order order) noexcept {
static_assert(std::is_standard_layout_v<std::atomic<type>>, "atomic_wait - std::atom<type> is not standard-layout");
static_assert(sizeof(type) == sizeof(int32_t), "atomic_wait - <<type>> must be 4 bytes.");
assert_atomic_type_waitable<type>();

while (true) {
const auto val = atom.load(order);
if (val != old) {
return;
}

#if defined(CRCPP_MAC_OS)
atom.wait(old, order);
#else
atomic_wait_native(&atom, static_cast<int32_t>(old));
#endif
atomic_wait_native(&atom, static_cast<uint32_t>(old));
}
}

Expand All @@ -40,14 +66,10 @@ namespace concurrencpp::details {
type old,
std::chrono::milliseconds ms,
std::memory_order order) noexcept {
static_assert(std::is_standard_layout_v<std::atomic<type>>, "atomic_wait_for - std::atom<type> is not standard-layout");
static_assert(sizeof(type) == sizeof(int32_t), "atomic_wait_for - <<type>> must be 4 bytes.");
assert_atomic_type_waitable<type>();

const auto deadline = std::chrono::system_clock::now() + ms;

#if defined(CRCPP_MAC_OS)
size_t polling_cycle = 0;
#endif
while (true) {
if (atom.load(order) != old) {
return atomic_wait_status::ok;
Expand All @@ -62,45 +84,100 @@ namespace concurrencpp::details {
return atomic_wait_status::timeout;
}

#if defined(CRCPP_MAC_OS)
if (polling_cycle < 64) {
std::this_thread::yield();
++polling_cycle;
continue;
}

if (polling_cycle < 5'000) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
++polling_cycle;
continue;
}

if (polling_cycle < 10'000) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
++polling_cycle;
continue;
}

std::this_thread::sleep_for(std::chrono::milliseconds(5));
++polling_cycle;

#else
const auto time_diff = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now);
assert(time_diff.count() >= 0);
atomic_wait_for_native(&atom, static_cast<int32_t>(old), time_diff);
#endif
atomic_wait_for_native(&atom, static_cast<uint32_t>(old), time_diff);
}
}

template<class type>
void atomic_notify_one(std::atomic<type>& atom) noexcept {
atomic_notify_one_native(&atom);
}

template<class type>
void atomic_notify_all(std::atomic<type>& atom) noexcept {
#if defined(CRCPP_MAC_OS)
atomic_notify_all_native(&atom);
}
} // namespace concurrencpp::details

atom.notify_all();
#else
atomic_notify_all_native(&atom);
#endif

#include <memory>

namespace concurrencpp::details {
class atomic_wait_bucket;

using atomic_comp_fn = bool (*)(void*, const uint32_t, std::memory_order) noexcept;

class CRCPP_API atomic_wait_table {

private:
std::unique_ptr<atomic_wait_bucket[]> m_buckets;
const size_t m_size;

static size_t calc_table_size() noexcept;
size_t index_for(const void* atom) const noexcept;

public:
atomic_wait_table();

void wait(void* atom, const uint32_t old, std::memory_order order, atomic_comp_fn comp);
atomic_wait_status wait_for(void* atom,
const uint32_t old,
std::chrono::milliseconds ms,
std::memory_order order,
atomic_comp_fn comp);

void notify_one(const void* atom) noexcept;
void notify_all(const void* atom) noexcept;

static atomic_wait_table& instance();
};

template<class type>
void atomic_wait(std::atomic<type>& atom, type old, std::memory_order order) noexcept {
assert_atomic_type_waitable<type>();

auto comp = [](void* atom_, const uint32_t old_, std::memory_order order_) noexcept -> bool {
auto& original_atom = *static_cast<std::atomic<type>*>(atom_);
const auto original_old = static_cast<type>(old_);

return original_atom.load(order_) == original_old;
};

atomic_wait_table::instance().wait(&atom, static_cast<uint32_t>(old), order, comp);
}

template<class type>
atomic_wait_status atomic_wait_for(std::atomic<type>& atom,
type old,
std::chrono::milliseconds ms,
std::memory_order order) noexcept {
assert_atomic_type_waitable<type>();

auto comp = [](void* atom_, const uint32_t old_, std::memory_order order_) noexcept -> bool {
auto& original_atom = *static_cast<std::atomic<type>*>(atom_);
const auto original_old = static_cast<type>(old_);

return original_atom.load(order_) == original_old;
};

return atomic_wait_table::instance().wait_for(&atom, static_cast<uint32_t>(old), ms, order, comp);
}

template<class type>
void atomic_notify_one(std::atomic<type>& atom) noexcept {
atomic_wait_table::instance().notify_one(&atom);
}

template<class type>
void atomic_notify_all(std::atomic<type>& atom) noexcept {
atomic_wait_table::instance().notify_all(&atom);
}

} // namespace concurrencpp::details

#endif

#endif
62 changes: 62 additions & 0 deletions include/concurrencpp/utils/dlist.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#ifndef CONCURRENCPP_DLIST_H
#define CONCURRENCPP_DLIST_H

#include "concurrencpp/platform_defs.h"

#include <cassert>

namespace concurrencpp::details {
template<class node_type>
class dlist {

private:
node_type* m_head = nullptr;

public:
void push_front(node_type& new_node) noexcept {
assert(new_node.next == nullptr);
assert(new_node.prev == nullptr);

if (m_head != nullptr) {
m_head->prev = &new_node;
new_node.next = m_head;
}

m_head = &new_node;
}

void remove_node(node_type& old_node) noexcept {
assert(m_head != nullptr);
assert(old_node.prev != nullptr || old_node.next != nullptr);

if (old_node.prev != nullptr) {
old_node.prev->next = old_node.next;
} else {
m_head = old_node.next;
}

if (old_node.next != nullptr) {
old_node.next->prev = old_node.prev;
}
}

bool empty() const noexcept {
return m_head != nullptr;
}

template<class functor_type>
void for_each(functor_type f) {
auto cursor = m_head;
while (cursor != nullptr) {
const bool should_continue_iteration = f(*cursor);
if (!should_continue_iteration) {
return;
}

cursor = cursor->next;
}
}
};
} // namespace concurrencpp::details

#endif
15 changes: 15 additions & 0 deletions include/concurrencpp/utils/math_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef CONCURRENCPP_MATH_H
#define CONCURRENCPP_MATH_H

#include "concurrencpp/platform_defs.h"

#include <cstddef>

namespace concurrencpp::details {
struct CRCPP_API math_helper {
static bool is_prime(size_t n) noexcept;
static size_t next_prime(size_t n) noexcept;
};
} // namespace concurrencpp::details

#endif
Loading

0 comments on commit e4b2cf4

Please sign in to comment.