Skip to content

Commit

Permalink
Add shared mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
Clcanny committed Feb 7, 2025
1 parent fdd2c23 commit 12fb10d
Show file tree
Hide file tree
Showing 5 changed files with 559 additions and 0 deletions.
109 changes: 109 additions & 0 deletions examples/async_shared_mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <unifex/async_mutex.hpp>
#include <unifex/async_shared_mutex.hpp>
#include <unifex/sync_wait.hpp>

#include <unifex/coroutine.hpp>

#include <atomic>

#if !UNIFEX_NO_COROUTINES

# include <unifex/scheduler_concepts.hpp>
# include <unifex/single_thread_context.hpp>
# include <unifex/task.hpp>
# include <unifex/when_all.hpp>

# include <cstdio>

using namespace unifex;

int main() {
async_shared_mutex mutex;

int uniqueState = 0;
std::atomic<int> stolenUniqueState = 0;
std::atomic<int> sharedState = 0;

auto makeUniqueTask =
[&](manual_event_loop::scheduler scheduler) -> task<int> {
for (int i = 0; i < 100'000; ++i) {
co_await mutex.async_lock();
co_await schedule(scheduler);
++uniqueState;
mutex.unlock();
}
co_return 0;
};
auto makeSharedTask =
[&](manual_event_loop::scheduler scheduler) -> task<int> {
for (int i = 0; i < 100'000; ++i) {
co_await mutex.async_lock_shared();
co_await schedule(scheduler);
int expected = 0;
if (uniqueState != 0 &&
stolenUniqueState.compare_exchange_strong(expected, uniqueState)) {
uniqueState = 0;
co_await schedule(scheduler);
uniqueState = stolenUniqueState.exchange(0);
}
++sharedState;
mutex.unlock_shared();
}
co_return 0;
};

single_thread_context ctx1;
single_thread_context ctx2;
single_thread_context ctx3;
single_thread_context ctx4;

sync_wait(when_all(
makeUniqueTask(ctx1.get_scheduler()),
makeUniqueTask(ctx2.get_scheduler()),
makeSharedTask(ctx3.get_scheduler()),
makeSharedTask(ctx4.get_scheduler())));

if (uniqueState != 200'000) {
std::printf("error: incorrect result %i, expected 2000000\n", uniqueState);
return 1;
}
if (sharedState != 200'000) {
std::printf(
"error: incorrect result %i, expected 2000000\n", sharedState.load());
return 1;
}

return 0;
}

#else // UNIFEX_NO_COROUTINES

# include <cstdio>

int main() {
// Very simple usage of async_mutex.

unifex::async_shared_mutex m;
unifex::sync_wait(m.async_lock());
m.unlock();

return 0;
}

#endif // UNIFEX_NO_COROUTINES
248 changes: 248 additions & 0 deletions include/unifex/async_shared_mutex.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <unifex/async_mutex.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/tag_invoke.hpp>
#include <unifex/detail/prologue.hpp>

#include <list>
#include <utility>

namespace unifex {

class async_shared_mutex {
class unique_lock_sender;
class shared_lock_sender;

public:
async_shared_mutex() noexcept;
async_shared_mutex(const async_shared_mutex&) = delete;
async_shared_mutex(async_shared_mutex&&) = delete;
~async_shared_mutex();

async_shared_mutex& operator=(const async_shared_mutex&) = delete;
async_shared_mutex& operator=(async_shared_mutex&&) = delete;

[[nodiscard]] bool try_lock() noexcept;
[[nodiscard]] bool try_lock_shared() noexcept;

[[nodiscard]] unique_lock_sender async_lock() noexcept;
[[nodiscard]] shared_lock_sender async_lock_shared() noexcept;

void unlock() noexcept;
void unlock_shared() noexcept;

private:
struct waiter_base {
void (*resume_)(waiter_base*) noexcept;
waiter_base* next_;
};

class unique_lock_sender {
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;

template <template <typename...> class Variant>
using error_types = Variant<>;

static constexpr bool sends_done = false;

static constexpr blocking_kind blocking = blocking_kind::maybe;

static constexpr bool is_always_scheduler_affine = false;

unique_lock_sender(const unique_lock_sender&) = delete;
unique_lock_sender(unique_lock_sender&&) = default;

private:
friend async_shared_mutex;

explicit unique_lock_sender(async_shared_mutex& mutex) noexcept
: mutex_(mutex) {}

template <typename Receiver>
struct _op {
class type : waiter_base {
friend unique_lock_sender;

public:
template <typename Receiver2>
explicit type(async_shared_mutex& mutex, Receiver2&& r) noexcept
: mutex_(mutex)
, receiver_((Receiver2&&)r) {
this->resume_ = [](waiter_base* self) noexcept {
type& op = *static_cast<type*>(self);
unifex::set_value((Receiver&&)op.receiver_);
};
}

type(type&&) = delete;

private:
friend void tag_invoke(tag_t<start>, type& op) noexcept {
if (!op.try_enqueue()) {
set_value((Receiver&&)op.receiver_);
}
}

bool try_enqueue() noexcept { return mutex_.try_enqueue(this, true); }

async_shared_mutex& mutex_;
Receiver receiver_;
};
};
template <typename Receiver>
using operation = typename _op<remove_cvref_t<Receiver>>::type;

template(typename Receiver) //
(requires receiver<Receiver>) //
friend operation<Receiver> tag_invoke(
tag_t<connect>, unique_lock_sender&& s, Receiver&& r) noexcept {
return operation<Receiver>{s.mutex_, (Receiver&&)r};
}

async_shared_mutex& mutex_;
};

class shared_lock_sender {
public:
template <
template <typename...> class Variant,
template <typename...> class Tuple>
using value_types = Variant<Tuple<>>;

template <template <typename...> class Variant>
using error_types = Variant<>;

static constexpr bool sends_done = false;

// we complete inline if we manage to grab the lock immediately
static constexpr blocking_kind blocking = blocking_kind::maybe;

// if we have to wait for the lock, we'll be resumed on whichever scheduler
// happens to be running the unlock()
static constexpr bool is_always_scheduler_affine = false;

shared_lock_sender(const shared_lock_sender&) = delete;
shared_lock_sender(shared_lock_sender&&) = default;

private:
friend async_shared_mutex;

explicit shared_lock_sender(async_shared_mutex& mutex) noexcept
: mutex_(mutex) {}

template <typename Receiver>
struct _op {
class type : waiter_base {
friend shared_lock_sender;

public:
template <typename Receiver2>
explicit type(async_shared_mutex& mutex, Receiver2&& r) noexcept
: mutex_(mutex)
, receiver_((Receiver2&&)r) {
this->resume_ = [](waiter_base* self) noexcept {
type& op = *static_cast<type*>(self);
unifex::set_value((Receiver&&)op.receiver_);
};
}

type(type&&) = delete;

private:
friend void tag_invoke(tag_t<start>, type& op) noexcept {
if (!op.try_enqueue()) {
// Failed to enqueue because we acquired the lock
// synchronously. Invoke the continuation inline
// without type-erasure here.
set_value((Receiver&&)op.receiver_);
}
}

bool try_enqueue() noexcept { return mutex_.try_enqueue(this, false); }

async_shared_mutex& mutex_;
Receiver receiver_;
};
};
template <typename Receiver>
using operation = typename _op<remove_cvref_t<Receiver>>::type;

template(typename Receiver) //
(requires receiver<Receiver>) //
friend operation<Receiver> tag_invoke(
tag_t<connect>, shared_lock_sender&& s, Receiver&& r) noexcept {
return operation<Receiver>{s.mutex_, (Receiver&&)r};
}

async_shared_mutex& mutex_;
};

// Attempt to enqueue the waiter object to the queue.
// Returns true if successfully enqueued, false if it was not enqueued because
// the lock was acquired synchronously.
bool try_enqueue(waiter_base* waiter, bool unique) noexcept;

async_mutex mutex_;
int activeUniqueCount_;
int activeSharedCount_;
std::list<std::pair<waiter_base*, bool>> pendingQueue_;
};

inline bool async_shared_mutex::try_lock() noexcept {
unifex::sync_wait(mutex_.async_lock());
if (activeUniqueCount_ == 0 && activeSharedCount_ == 0) {
UNIFEX_ASSERT(pendingQueue_.empty());
activeUniqueCount_++;
mutex_.unlock();
return true;
}
mutex_.unlock();
return false;
}

inline bool async_shared_mutex::try_lock_shared() noexcept {
unifex::sync_wait(mutex_.async_lock());
if (activeUniqueCount_ == 0 && pendingQueue_.empty()) {
activeSharedCount_++;
mutex_.unlock();
return true;
}
mutex_.unlock();
return false;
}

inline async_shared_mutex::unique_lock_sender
async_shared_mutex::async_lock() noexcept {
return unique_lock_sender{*this};
}

inline async_shared_mutex::shared_lock_sender
async_shared_mutex::async_lock_shared() noexcept {
return shared_lock_sender{*this};
}

} // namespace unifex

#include <unifex/detail/epilogue.hpp>
1 change: 1 addition & 0 deletions source/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ target_sources(unifex
async_auto_reset_event.cpp
async_manual_reset_event.cpp
async_mutex.cpp
async_shared_mutex.cpp
async_stack.cpp
exception.cpp
inplace_stop_token.cpp
Expand Down
Loading

0 comments on commit 12fb10d

Please sign in to comment.