diff --git a/CHANGELOG.md b/CHANGELOG.md index 244de9a13af..ca658386d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ### Fixed * Fixed a crash or exception when doing a fulltext search for multiple keywords when the intersection of results is not equal. ([#6465](https://github.com/realm/realm-core/issues/6465) since v13.2.0). * Fixed issue where build would not succeed when consuming core as an installed dependancy due to missing install headers ([#6479](https://github.com/realm/realm-core/pull/6479) since v13.4.1). +* Fix a deadlock when closing a Transaction with a cancelled asynchronous write scheduled while another Transaction holds the write lock ([PR #6486](https://github.com/realm/realm-core/pull/6486), since v11.10.0) ### Breaking changes * None. diff --git a/src/realm/CMakeLists.txt b/src/realm/CMakeLists.txt index 3896b6409a6..85b8f61eddc 100644 --- a/src/realm/CMakeLists.txt +++ b/src/realm/CMakeLists.txt @@ -240,6 +240,7 @@ set(REALM_INSTALL_HEADERS util/random.hpp util/safe_int_ops.hpp util/scope_exit.hpp + util/semaphore.hpp util/serializer.hpp util/sha_crypto.hpp util/span.hpp diff --git a/src/realm/db.cpp b/src/realm/db.cpp index 70240c0d4c6..2a91a427092 100644 --- a/src/realm/db.cpp +++ b/src/realm/db.cpp @@ -1862,27 +1862,35 @@ class DB::AsyncCommitHelper { ~AsyncCommitHelper() { { - std::unique_lock lg(m_mutex); - if (!m_running) { + util::CheckedLockGuard lg(m_mutex); + if (!std::exchange(m_running, false)) { return; } - m_running = false; - m_cv_worker.notify_one(); } + m_cv_worker.notify_one(); m_thread.join(); } - void begin_write(util::UniqueFunction fn) + void begin_write(Transaction* tr) REQUIRES(!m_mutex) { - std::unique_lock lg(m_mutex); - start_thread(); - m_pending_writes.emplace_back(std::move(fn)); - m_cv_worker.notify_one(); + util::CheckedLockGuard lg(m_mutex); + m_pending_writes.push_back(tr); + wake_up_thread(); + } + + bool cancel_begin_write(Transaction* tr) REQUIRES(!m_mutex) + { + util::CheckedLockGuard lg(m_mutex); + if (auto it = std::find(m_pending_writes.begin(), m_pending_writes.end(), tr); it != m_pending_writes.end()) { + m_pending_writes.erase(it); + return true; + } + return false; } - void blocking_begin_write() + void blocking_begin_write() REQUIRES(!m_mutex) { - std::unique_lock lg(m_mutex); + util::CheckedUniqueLock lg(m_mutex); // If we support unlocking InterprocessMutex from a different thread // than it was locked on, we can sometimes just begin the write on @@ -1890,8 +1898,7 @@ class DB::AsyncCommitHelper { // for the worker thread to acquire the write lock, as we'll deadlock // if we try to async commit while the worker is waiting for the lock. bool can_lock_on_caller = - !InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && m_pending_writes.empty() && - m_write_lock_claim_ticket == m_write_lock_claim_fulfilled); + !InterprocessMutex::is_thread_confined && (!m_owns_write_mutex && !has_pending_write_requests()); // If we support cross-thread unlocking and m_running is false, // can_lock_on_caller should always be true or we forgot to launch the thread @@ -1911,17 +1918,16 @@ class DB::AsyncCommitHelper { // Otherwise we have to ask the worker thread to acquire it and wait // for that - start_thread(); size_t ticket = ++m_write_lock_claim_ticket; - m_cv_worker.notify_one(); - m_cv_callers.wait(lg, [this, ticket] { + wake_up_thread(); + m_cv_callers.wait(lg.native_handle(), [this, ticket]() REQUIRES(m_mutex) { return ticket == m_write_lock_claim_fulfilled; }); } - void end_write() + void end_write() REQUIRES(!m_mutex) { - std::unique_lock lg(m_mutex); + util::CheckedLockGuard lg(m_mutex); REALM_ASSERT(m_has_write_mutex); REALM_ASSERT(m_owns_write_mutex || !InterprocessMutex::is_thread_confined); @@ -1937,9 +1943,9 @@ class DB::AsyncCommitHelper { } } - bool blocking_end_write() + bool blocking_end_write() REQUIRES(!m_mutex) { - std::unique_lock lg(m_mutex); + util::CheckedUniqueLock lg(m_mutex); if (!m_has_write_mutex) { return false; } @@ -1950,7 +1956,7 @@ class DB::AsyncCommitHelper { if (m_owns_write_mutex) { m_pending_mx_release = true; m_cv_worker.notify_one(); - m_cv_callers.wait(lg, [this] { + m_cv_callers.wait(lg.native_handle(), [this]() REQUIRES(m_mutex) { return !m_pending_mx_release; }); } @@ -1969,46 +1975,64 @@ class DB::AsyncCommitHelper { } - void sync_to_disk(util::UniqueFunction fn) + void sync_to_disk(Transaction* tr) REQUIRES(!m_mutex) { - REALM_ASSERT(fn); - std::unique_lock lg(m_mutex); + REALM_ASSERT(tr); + util::CheckedLockGuard lg(m_mutex); REALM_ASSERT(!m_pending_sync); - start_thread(); - m_pending_sync = std::move(fn); - m_cv_worker.notify_one(); + REALM_ASSERT(m_has_write_mutex); + m_pending_sync = tr; + wake_up_thread(); } private: DB* m_db; std::thread m_thread; - std::mutex m_mutex; + // A mutex which guards most of the members in this class + util::CheckedMutex m_mutex; + // CV which the worker thread waits on to await work std::condition_variable m_cv_worker; + // CV which other threads wait on to await the worker thread completing work std::condition_variable m_cv_callers; - std::deque> m_pending_writes; - util::UniqueFunction m_pending_sync; - size_t m_write_lock_claim_ticket = 0; - size_t m_write_lock_claim_fulfilled = 0; - bool m_pending_mx_release = false; + // Transactions which are waiting for their turn to write. These are non-owning + // pointers to avoid a retain cycle. Weak pointers would result in the + // Transaction sometimes being destroyed on the worker thread, which results + // in complicated teardown. The non-owning pointers are safe because + // Transaction will unregister itself and/or wait for operations to complete + // before closing. + std::deque m_pending_writes GUARDED_BY(m_mutex); + // The Transaction which has commits to write to disk. + Transaction* m_pending_sync GUARDED_BY(m_mutex) = nullptr; + // Ticketing system for blocking write transactions. Blocking writes increment + // claim_ticket and then wait on m_cv_callers until claim_fulfilled is equal + // to the value of claim_ticket they saw at the start of the wait. The worker + // thread increments claim_fulfilled instead of calling an async callback + // if it is below claim_ticket, as sync writes take priority over async. + size_t m_write_lock_claim_ticket GUARDED_BY(m_mutex) = 0; + size_t m_write_lock_claim_fulfilled GUARDED_BY(m_mutex) = 0; + bool m_pending_mx_release GUARDED_BY(m_mutex) = false; bool m_running = false; - bool m_has_write_mutex = false; - bool m_owns_write_mutex = false; - bool m_waiting_for_write_mutex = false; + bool m_has_write_mutex GUARDED_BY(m_mutex) = false; + // True if the worker thread specifically owns the write mutex. May be false + // while `m_has_write_mutex` is true if the write mutex was acquired via + // a blocking begin and the mutex supports cross-thread unlocks. + bool m_owns_write_mutex GUARDED_BY(m_mutex) = false; + bool m_waiting_for_write_mutex GUARDED_BY(m_mutex) = false; - void main(); + void main() REQUIRES(!m_mutex); - void start_thread() + void wake_up_thread() REQUIRES(m_mutex) { - if (m_running) { + if (std::exchange(m_running, true)) { + m_cv_worker.notify_one(); return; } - m_running = true; m_thread = std::thread([this]() { main(); }); } - bool has_pending_write_requests() + bool has_pending_write_requests() REQUIRES(m_mutex) { return m_write_lock_claim_fulfilled < m_write_lock_claim_ticket || !m_pending_writes.empty(); } @@ -2016,20 +2040,22 @@ class DB::AsyncCommitHelper { void DB::AsyncCommitHelper::main() { - std::unique_lock lg(m_mutex); + util::CheckedUniqueLock lg(m_mutex); while (m_running) { #if 0 // Enable for testing purposes std::this_thread::sleep_for(std::chrono::milliseconds(10)); #endif if (m_has_write_mutex) { - if (auto cb = std::move(m_pending_sync)) { + if (auto tr = m_pending_sync) { + m_pending_sync = nullptr; // Only one of sync_to_disk(), end_write(), or blocking_end_write() // should be called, so we should never have both a pending sync // and pending release. REALM_ASSERT(!m_pending_mx_release); lg.unlock(); - cb(); - cb = nullptr; // Release things captured by the callback before reacquiring the lock + // Release the lock while calling this as it performs the actual + // i/o and we need to not block other threads. + tr->sync_async_commit(); lg.lock(); m_pending_mx_release = true; } @@ -2069,18 +2095,27 @@ void DB::AsyncCommitHelper::main() continue; } - REALM_ASSERT(!m_pending_writes.empty()); - auto callback = std::move(m_pending_writes.front()); + // The request could have been cancelled while we were waiting + // for the lock + if (m_pending_writes.empty()) { + m_pending_mx_release = true; + continue; + } + + auto writer = m_pending_writes.front(); m_pending_writes.pop_front(); - lg.unlock(); - callback(); - // Release things captured by the callback before reacquiring the lock - callback = nullptr; - lg.lock(); + // Call with lock held: this is safe to do because the function + // does very little work before dispatching the main work to the + // scheduler (and importantly will never call back into us), and + // it's required for Transaction teardown to be safe. We don't + // hold a strong reference to the Transaction to ensure that we + // will never destroy the DB on this thread, so we have to block + // closing the Transaction until we're done with the unowned pointer. + writer->async_write_began(); continue; } } - m_cv_worker.wait(lg); + m_cv_worker.wait(lg.native_handle()); } if (m_has_write_mutex && m_owns_write_mutex) { m_db->do_end_write(); @@ -2088,10 +2123,16 @@ void DB::AsyncCommitHelper::main() } -void DB::async_begin_write(util::UniqueFunction fn) +void DB::async_begin_write(Transaction* tr) { REALM_ASSERT(m_commit_helper); - m_commit_helper->begin_write(std::move(fn)); + m_commit_helper->begin_write(tr); +} + +bool DB::cancel_async_begin_write(Transaction* tr) +{ + REALM_ASSERT(m_commit_helper); + return m_commit_helper->cancel_begin_write(tr); } void DB::async_end_write() @@ -2100,10 +2141,10 @@ void DB::async_end_write() m_commit_helper->end_write(); } -void DB::async_sync_to_disk(util::UniqueFunction fn) +void DB::async_sync_to_disk(Transaction* tr) { REALM_ASSERT(m_commit_helper); - m_commit_helper->sync_to_disk(std::move(fn)); + m_commit_helper->sync_to_disk(tr); } bool DB::has_changed(TransactionRef& tr) @@ -2687,44 +2728,6 @@ TransactionRef DB::start_write(bool nonblocking) return tr; } -void DB::async_request_write_mutex(TransactionRef& tr, util::UniqueFunction&& when_acquired) -{ - { - util::CheckedLockGuard lck(tr->m_async_mutex); - REALM_ASSERT(tr->m_async_stage == Transaction::AsyncState::Idle); - tr->m_async_stage = Transaction::AsyncState::Requesting; - tr->m_request_time_point = std::chrono::steady_clock::now(); - if (tr->db->m_logger) { - tr->db->m_logger->log(util::Logger::Level::trace, "Async request write lock"); - } - } - std::weak_ptr weak_tr = tr; - async_begin_write([weak_tr, cb = std::move(when_acquired)]() { - if (auto tr = weak_tr.lock()) { - util::CheckedLockGuard lck(tr->m_async_mutex); - // If a synchronous transaction happened while we were pending - // we may be in HasCommits - if (tr->m_async_stage == Transaction::AsyncState::Requesting) { - tr->m_async_stage = Transaction::AsyncState::HasLock; - } - if (tr->db->m_logger) { - auto t2 = std::chrono::steady_clock::now(); - tr->db->m_logger->log( - util::Logger::Level::trace, "Got write lock in %1 us", - std::chrono::duration_cast(t2 - tr->m_request_time_point).count()); - } - if (tr->m_waiting_for_write_lock) { - tr->m_waiting_for_write_lock = false; - tr->m_async_cv.notify_one(); - } - else if (cb) { - cb(); - } - tr.reset(); // Release pointer while lock is held - } - }); -} - inline DB::DB(const DBOptions& options) : m_upgrade_callback(std::move(options.upgrade_callback)) { @@ -2818,7 +2821,6 @@ void DB::do_begin_possibly_async_write() void DB::end_write_on_correct_thread() noexcept { - // m_local_write_mutex.unlock(); if (!m_commit_helper || !m_commit_helper->blocking_end_write()) { do_end_write(); } diff --git a/src/realm/db.hpp b/src/realm/db.hpp index 03b06b011d4..55db6456382 100644 --- a/src/realm/db.hpp +++ b/src/realm/db.hpp @@ -258,11 +258,6 @@ class DB : public std::enable_shared_from_this { // an invalid TransactionRef is returned. TransactionRef start_write(bool nonblocking = false) REQUIRES(!m_mutex); - // ask for write mutex. Callback takes place when mutex has been acquired. - // callback may occur on ANOTHER THREAD. Must not be called if write mutex - // has already been acquired. - void async_request_write_mutex(TransactionRef& tr, util::UniqueFunction&& when_acquired); - // report statistics of last commit done on THIS DB. // The free space reported is what can be expected to be freed // by compact(). This may not correspond to the space which is free @@ -612,9 +607,10 @@ class DB : public std::enable_shared_from_this { void close_internal(std::unique_lock, bool allow_open_read_transactions) REQUIRES(!m_mutex); - void async_begin_write(util::UniqueFunction fn); + void async_begin_write(Transaction* tr); + bool cancel_async_begin_write(Transaction* tr); void async_end_write(); - void async_sync_to_disk(util::UniqueFunction fn); + void async_sync_to_disk(Transaction*); friend class SlabAlloc; friend class Transaction; diff --git a/src/realm/object-store/impl/realm_coordinator.cpp b/src/realm/object-store/impl/realm_coordinator.cpp index c3a8a12760f..2089725c8a8 100644 --- a/src/realm/object-store/impl/realm_coordinator.cpp +++ b/src/realm/object-store/impl/realm_coordinator.cpp @@ -1306,14 +1306,3 @@ void RealmCoordinator::write_copy(StringData path, const char* key) { m_db->write_copy(path, key); } - -void RealmCoordinator::async_request_write_mutex(Realm& realm) -{ - auto tr = Realm::Internal::get_transaction_ref(realm); - m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable { - auto& scheduler = *realm->scheduler(); - scheduler.invoke([realm = std::move(realm)] { - Realm::Internal::run_writes(*realm); - }); - }); -} diff --git a/src/realm/object-store/impl/realm_coordinator.hpp b/src/realm/object-store/impl/realm_coordinator.hpp index 1ec0169ebe7..5e0ca61bfbe 100644 --- a/src/realm/object-store/impl/realm_coordinator.hpp +++ b/src/realm/object-store/impl/realm_coordinator.hpp @@ -218,8 +218,6 @@ class RealmCoordinator : public std::enable_shared_from_this { return std::move(util::CheckedUniqueLock(m_running_notifiers_mutex).native_handle()); } - void async_request_write_mutex(Realm& realm); - AuditInterface* audit_context() const noexcept { return m_audit_context.get(); diff --git a/src/realm/object-store/shared_realm.cpp b/src/realm/object-store/shared_realm.cpp index 43fc4e860b4..8b58e2c66c9 100644 --- a/src/realm/object-store/shared_realm.cpp +++ b/src/realm/object-store/shared_realm.cpp @@ -750,11 +750,20 @@ void Realm::check_pending_write_requests() run_writes_on_proper_thread(); } else { - m_coordinator->async_request_write_mutex(*this); + async_request_write_mutex(); } } } +void Realm::async_request_write_mutex() +{ + transaction().async_request_write_mutex([scheduler = m_scheduler, realm = shared_from_this()]() mutable { + scheduler->invoke([realm = std::move(realm)] { + realm->run_writes(); + }); + }); +} + void Realm::end_current_write(bool check_pending) { if (!m_transaction) { @@ -872,7 +881,7 @@ auto Realm::async_begin_transaction(util::UniqueFunction&& the_write_blo if (!m_is_running_async_writes && !m_transaction->is_async() && m_transaction->get_transact_stage() != DB::transact_Writing) { - m_coordinator->async_request_write_mutex(*this); + async_request_write_mutex(); } return handle; } @@ -951,6 +960,9 @@ bool Realm::async_cancel_transaction(AsyncHandle handle) auto it1 = std::find_if(m_async_write_q.begin(), m_async_write_q.end(), compare); if (it1 != m_async_write_q.end()) { m_async_write_q.erase(it1); + if (m_async_write_q.empty()) { + m_transaction->cancel_async_request_write_mutex(); + } return true; } auto it2 = std::find_if(m_async_commit_q.begin(), m_async_commit_q.end(), compare); diff --git a/src/realm/object-store/shared_realm.hpp b/src/realm/object-store/shared_realm.hpp index f75bfa147ba..c4a408c2d7c 100644 --- a/src/realm/object-store/shared_realm.hpp +++ b/src/realm/object-store/shared_realm.hpp @@ -467,11 +467,6 @@ class Realm : public std::enable_shared_from_this { return realm.transaction_ref(); } - static void run_writes(Realm& realm) - { - realm.run_writes(); - } - static void copy_schema(Realm& target_realm, const Realm& source_realm) { target_realm.copy_schema_from(source_realm); @@ -568,6 +563,7 @@ class Realm : public std::enable_shared_from_this { void call_completion_callbacks(); void run_writes(); void run_async_completions(); + void async_request_write_mutex(); public: std::unique_ptr m_binding_context; diff --git a/src/realm/object-store/util/scheduler.hpp b/src/realm/object-store/util/scheduler.hpp index be43450a75d..43128786188 100644 --- a/src/realm/object-store/util/scheduler.hpp +++ b/src/realm/object-store/util/scheduler.hpp @@ -68,7 +68,7 @@ class Scheduler { // caching may occur. virtual bool is_same_as(const Scheduler* other) const noexcept = 0; - // Check if this scehduler actually can support invoke(). Invoking may be + // Check if this scheduler actually can support invoke(). Invoking may be // either not implemented, not applicable to a scheduler type, or simply not // be possible currently (e.g. if the associated event loop is not actually // running). @@ -129,7 +129,7 @@ class Scheduler { static void set_default_factory(util::UniqueFunction()>); }; -// A thread-safe queue of functions to invoke, used in the implemenation of +// A thread-safe queue of functions to invoke, used in the implementation of // some of the schedulers class InvocationQueue { public: diff --git a/src/realm/transaction.cpp b/src/realm/transaction.cpp index d2de3c0adf0..d242f1968e5 100644 --- a/src/realm/transaction.cpp +++ b/src/realm/transaction.cpp @@ -17,13 +17,14 @@ **************************************************************************/ #include -#include "impl/copy_replication.hpp" + +#include +#include +#include #include #include -#include #include -#include - +#include namespace { using namespace realm; @@ -256,6 +257,7 @@ VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk) REALM_ASSERT(!m_oldest_version_not_persisted || m_read_lock.m_version != m_oldest_version_not_persisted->m_version); + bool end_write = false; { util::CheckedLockGuard lock(m_async_mutex); REALM_ASSERT(m_async_stage != AsyncState::Syncing); @@ -264,7 +266,7 @@ VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk) m_async_stage = AsyncState::HasLock; } else { - db->end_write_on_correct_thread(); + end_write = true; m_async_stage = AsyncState::Idle; } } @@ -272,6 +274,9 @@ VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk) m_async_stage = AsyncState::HasCommits; } } + if (end_write) { + db->end_write_on_correct_thread(); + } // Remap file if it has grown, and update refs in underlying node structure. remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws @@ -731,50 +736,56 @@ void Transaction::complete_async_commit() } } -void Transaction::async_complete_writes(util::UniqueFunction when_synchronized) +void Transaction::sync_async_commit() { + complete_async_commit(); util::CheckedLockGuard lck(m_async_mutex); + m_async_stage = AsyncState::Idle; + m_sync_completion(); + m_sync_completion = nullptr; +} + +void Transaction::async_complete_writes(util::UniqueFunction when_synchronized) +{ + util::CheckedUniqueLock lck(m_async_mutex); if (m_async_stage == AsyncState::HasLock) { // Nothing to commit to disk - just release write lock m_async_stage = AsyncState::Idle; + lck.unlock(); db->async_end_write(); } else if (m_async_stage == AsyncState::HasCommits) { m_async_stage = AsyncState::Syncing; m_commit_exception = std::exception_ptr(); - // get a callback on the helper thread, in which to sync to disk - db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept { - complete_async_commit(); - util::CheckedLockGuard lck(m_async_mutex); - m_async_stage = AsyncState::Idle; - if (m_waiting_for_sync) { - m_waiting_for_sync = false; - m_async_cv.notify_all(); - } - else { - cb(); - } - }); + m_sync_completion = std::move(when_synchronized); + lck.unlock(); + db->async_sync_to_disk(this); + } + else { + REALM_ASSERT(m_async_stage == AsyncState::Idle); } } void Transaction::prepare_for_close() { - util::CheckedLockGuard lck(m_async_mutex); - switch (m_async_stage) { - case AsyncState::Idle: - break; + util::CheckedUniqueLock lck(m_async_mutex); + if (m_async_stage == AsyncState::Requesting) { + REALM_ASSERT(m_transact_stage == DB::transact_Reading); + REALM_ASSERT(!m_oldest_version_not_persisted); + lck.unlock(); + db->cancel_async_begin_write(this); + // While we released the lock, the stage may have changed from Requesting + // to HasLock on the background thread if we happened to acquire the + // write mutex at the same time as this is happening. No other + // transitions are possible. + lck.lock(); + } + auto stage = m_async_stage; + m_async_stage = AsyncState::Idle; + switch (stage) { + case AsyncState::Idle: case AsyncState::Requesting: - // We don't have the ability to cancel a wait on the write lock, so - // unfortunately we have to wait for it to be acquired. - REALM_ASSERT(m_transact_stage == DB::transact_Reading); - REALM_ASSERT(!m_oldest_version_not_persisted); - m_waiting_for_write_lock = true; - m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) { - return !m_waiting_for_write_lock; - }); - db->end_write_on_correct_thread(); break; case AsyncState::HasLock: @@ -787,6 +798,7 @@ void Transaction::prepare_for_close() if (m_oldest_version_not_persisted) { complete_async_commit(); } + lck.unlock(); db->end_write_on_correct_thread(); break; @@ -794,50 +806,94 @@ void Transaction::prepare_for_close() // We have commits which need to be synced to disk, so do that REALM_ASSERT(m_transact_stage == DB::transact_Reading); complete_async_commit(); + lck.unlock(); db->end_write_on_correct_thread(); break; case AsyncState::Syncing: // The worker thread is currently writing, so wait for it to complete REALM_ASSERT(m_transact_stage == DB::transact_Reading); - m_waiting_for_sync = true; - m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) { - return !m_waiting_for_sync; - }); + wait_for_async_completion(lck.native_handle(), m_sync_completion); break; } - m_async_stage = AsyncState::Idle; +} + +void Transaction::wait_for_async_completion(std::unique_lock& lock, + util::UniqueFunction& completion) +{ + util::BinarySemaphore sem(0); + completion = [&] { + sem.release(); + }; + lock.unlock(); + sem.acquire(); + lock.lock(); + completion = nullptr; } void Transaction::acquire_write_lock() { - util::CheckedUniqueLock lck(m_async_mutex); - switch (m_async_stage) { - case AsyncState::Idle: - lck.unlock(); - db->do_begin_possibly_async_write(); - return; + { + util::CheckedUniqueLock lck(m_async_mutex); + switch (m_async_stage) { + case AsyncState::Idle: + break; - case AsyncState::Requesting: - m_waiting_for_write_lock = true; - m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) { - return !m_waiting_for_write_lock; - }); - return; + case AsyncState::Requesting: + wait_for_async_completion(lck.native_handle(), m_got_write_completion); + return; - case AsyncState::HasLock: - case AsyncState::HasCommits: - return; + case AsyncState::HasLock: + case AsyncState::HasCommits: + return; - case AsyncState::Syncing: - m_waiting_for_sync = true; - m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) { - return !m_waiting_for_sync; - }); - lck.unlock(); - db->do_begin_possibly_async_write(); - break; + case AsyncState::Syncing: + wait_for_async_completion(lck.native_handle(), m_sync_completion); + break; + } + } + db->do_begin_possibly_async_write(); +} + +void Transaction::async_request_write_mutex(util::UniqueFunction&& when_acquired) +{ + { + util::CheckedLockGuard lck(m_async_mutex); + if (db->m_logger && db->m_logger->would_log(util::Logger::Level::trace)) { + m_request_start_time = std::chrono::steady_clock::now(); + db->m_logger->log(util::Logger::Level::trace, "Async request write lock"); + } + + REALM_ASSERT(m_async_stage == AsyncState::Idle); + m_async_stage = Transaction::AsyncState::Requesting; + m_got_write_completion = std::move(when_acquired); + } + db->async_begin_write(this); +} + +void Transaction::cancel_async_request_write_mutex() +{ + if (db->cancel_async_begin_write(this)) { + util::CheckedLockGuard lck(m_async_mutex); + REALM_ASSERT(m_async_stage == AsyncState::Requesting); + m_async_stage = Transaction::AsyncState::Idle; + } +} + +void Transaction::async_write_began() +{ + util::CheckedLockGuard lck(m_async_mutex); + // If a synchronous transaction happened while we were pending + // we may be in HasCommits + if (m_async_stage == Transaction::AsyncState::Requesting) { + m_async_stage = Transaction::AsyncState::HasLock; + } + if (db->m_logger) { + auto t2 = std::chrono::steady_clock::now(); + db->m_logger->log(util::Logger::Level::trace, "Got write lock in %1 us", + std::chrono::duration_cast(t2 - m_request_start_time).count()); } + m_got_write_completion(); } void Transaction::do_end_read() noexcept diff --git a/src/realm/transaction.hpp b/src/realm/transaction.hpp index 7af3064aa73..76fbea2c133 100644 --- a/src/realm/transaction.hpp +++ b/src/realm/transaction.hpp @@ -139,6 +139,12 @@ class Transaction : public Group { // The write mutex is released after full synchronization. void async_complete_writes(util::UniqueFunction when_synchronized = nullptr) REQUIRES(!m_async_mutex); + // ask for write mutex. Callback takes place when mutex has been acquired. + // callback may occur on ANOTHER THREAD. Must not be called if write mutex + // has already been acquired. + void async_request_write_mutex(util::UniqueFunction&& when_acquired) REQUIRES(!m_async_mutex); + void cancel_async_request_write_mutex() REQUIRES(!m_async_mutex); + // Complete all pending async work and return once the async stage is Idle. // If currently in an async write transaction that transaction is cancelled, // and any async writes which were committed are synchronized. @@ -186,7 +192,11 @@ class Transaction : public Group { void replicate(Transaction* dest, Replication& repl) const; void complete_async_commit(); + void sync_async_commit() REQUIRES(!m_async_mutex); void acquire_write_lock() REQUIRES(!m_async_mutex); + void async_write_began() REQUIRES(!m_async_mutex); + void wait_for_async_completion(std::unique_lock&, util::UniqueFunction&) + REQUIRES(m_async_mutex); void cow_outliers(std::vector& progress, size_t evac_limit, size_t work_limit); void close_read_with_lock() REQUIRES(!m_async_mutex, db->m_mutex); @@ -202,11 +212,10 @@ class Transaction : public Group { // Mutex is protecting access to members just below util::CheckedMutex m_async_mutex; - std::condition_variable m_async_cv GUARDED_BY(m_async_mutex); AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle; - std::chrono::steady_clock::time_point m_request_time_point; - bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false; - bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false; + std::chrono::steady_clock::time_point m_request_start_time GUARDED_BY(m_async_mutex); + util::UniqueFunction m_got_write_completion GUARDED_BY(m_async_mutex); + util::UniqueFunction m_sync_completion GUARDED_BY(m_async_mutex); DB::TransactStage m_transact_stage = DB::transact_Ready; @@ -362,8 +371,8 @@ inline bool Transaction::promote_to_write(O* observer, bool nonblocking) if (!holds_write_mutex()) { if (nonblocking) { - bool succes = db->do_try_begin_write(); - if (!succes) { + bool success = db->do_try_begin_write(); + if (!success) { return false; } } diff --git a/src/realm/util/semaphore.hpp b/src/realm/util/semaphore.hpp new file mode 100644 index 00000000000..38ef678883e --- /dev/null +++ b/src/realm/util/semaphore.hpp @@ -0,0 +1,183 @@ +/************************************************************************* + * + * Copyright 2023 Realm Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **************************************************************************/ + +// An implementation of C++20's counting_semaphore +// See https://en.cppreference.com/w/cpp/thread/counting_semaphore for documentation +// on how to use this. + +#ifndef REALM_UTIL_SEMAPHORE_HPP +#define REALM_UTIL_SEMAPHORE_HPP + +#include + +#if __has_include() +#include +#endif + +// C++20 semaphores require iOS 14, so don't use them on Apple platforms +// even when building in C++20 mode. +#if defined(__cpp_lib_semaphore) && !REALM_PLATFORM_APPLE +#include + +namespace realm::util { +using CountingSemaphore = std::counting_semaphore; +using BinarySemaphore = std::binary_semaphore; +} // namespace realm::util +#elif REALM_PLATFORM_APPLE +#include + +namespace realm::util { +template ::max()> +class CountingSemaphore { +public: + static constexpr ptrdiff_t max() noexcept + { + return LeastMaxValue; + } + + constexpr explicit CountingSemaphore(ptrdiff_t count) + : m_semaphore(dispatch_semaphore_create(count)) + { + } + ~CountingSemaphore() + { + dispatch_release(m_semaphore); + } + + CountingSemaphore(const CountingSemaphore&) = delete; + CountingSemaphore& operator=(const CountingSemaphore&) = delete; + + void release(ptrdiff_t update = 1) + { + while (update--) + dispatch_semaphore_signal(m_semaphore); + } + void acquire() + { + dispatch_semaphore_wait(m_semaphore, DISPATCH_TIME_FOREVER); + } + template + bool try_acquire_for(std::chrono::duration const& rel_time) + { + auto ns = std::chrono::duration_cast(rel_time); + return dispatch_semaphore_wait(m_semaphore, dispatch_time(DISPATCH_TIME_NOW, ns)) != 0; + } + bool try_acquire() + { + dispatch_semaphore_wait(m_semaphore, DISPATCH_TIME_NOW); + } + template + bool try_acquire_until(std::chrono::time_point const& abs_time) + { + auto const current = Clock::now(); + if (current >= abs_time) + return try_acquire(); + return try_acquire_for(abs_time - current); + } + +private: + dispatch_semaphore_t m_semaphore; +}; + +using BinarySemaphore = CountingSemaphore<1>; +} // namespace realm::util + +#else // !__has_include() && !REALM_PLATFORM_APPLE + +#include +#include + +namespace realm::util { +template ::max()> +class CountingSemaphore { +public: + static constexpr ptrdiff_t max() noexcept + { + return LeastMaxValue; + } + + constexpr explicit CountingSemaphore(ptrdiff_t count) + : m_count(count) + { + } + + CountingSemaphore(const CountingSemaphore&) = delete; + CountingSemaphore& operator=(const CountingSemaphore&) = delete; + + void release(ptrdiff_t update = 1) + { + { + std::lock_guard lock(m_mutex); + m_count += update; + } + m_cv.notify_all(); + } + void acquire() + { + std::unique_lock lock(m_mutex); + m_cv.wait(lock, [this] { + return m_count > 0; + }); + --m_count; + } + template + bool try_acquire_for(std::chrono::duration const& rel_time) + { + std::unique_lock lock(m_mutex); + if (m_cv.wait_for(lock, rel_time, [this] { + return m_count > 0; + })) { + --m_count; + return true; + } + return false; + } + bool try_acquire() + { + std::unique_lock lock(m_mutex); + if (m_count > 0) { + --m_count; + return true; + } + return false; + } + template + bool try_acquire_until(std::chrono::time_point const& abs_time) + { + std::unique_lock lock(m_mutex); + if (m_cv.wait_until(lock, abs_time, [this] { + return m_count > 0; + })) { + --m_count; + return true; + } + return false; + } + +private: + std::mutex m_mutex; + std::condition_variable m_cv; + ptrdiff_t m_count; +}; + +using BinarySemaphore = CountingSemaphore<1>; +} // namespace realm::util + +#endif + +#endif // REALM_UTIL_SEMAPHORE_HPP diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 1b00f7938bc..0f691de22a7 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1596,41 +1596,74 @@ TEST_CASE("SharedRealm: async writes") { for (int i = 0; i < 2; ++i) { SECTION(close_function_names[i]) { bool persisted = false; - SECTION("before write lock is acquired") { + SECTION("while another DB instance holds lock") { DBOptions options; options.encryption_key = config.encryption_key.data(); // Acquire the write lock with a different DB instance so that we'll // be stuck in the Requesting stage - realm::test_util::BowlOfStonesSemaphore sema; + util::BinarySemaphore sema1(0); + util::BinarySemaphore sema2(0); + auto db = DB::create(make_in_realm_history(), config.path, options); JoiningThread thread([&] { - auto db = DB::create(make_in_realm_history(), config.path, options); auto write = db->start_write(); - sema.add_stone(); + sema1.release(); // Wait until the main thread is waiting for the lock. - while (!db->other_writers_waiting_for_lock()) { - millisleep(1); - } + timed_sleeping_wait_for([&] { + return db->other_writers_waiting_for_lock(); + }); + // We're trying to unlock it while the main thread is waiting + // in close(), so don't unlock too quickly. There isn't a + // deterministic way to do this. + sema2.acquire(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); write->close(); }); // Wait for the background thread to have acquired the lock - sema.get_stone(); + sema1.acquire(); auto scheduler = realm->scheduler(); realm->async_begin_transaction([&] { // We should never get here as the realm is closed FAIL(); }); + timed_sleeping_wait_for([&] { + return db->other_writers_waiting_for_lock(); + }); // close() should block until we can acquire the write lock + sema2.release(); std::invoke(close_functions[i], *realm); - { - // Verify that we released the write lock - auto db = DB::create(make_in_realm_history(), config.path, options); - REQUIRE(db->start_write(/* nonblocking */ true)); - } + thread.join(); + + // We may not have released the write lock yet when close() + // returns, but it should happen promptly. + timed_wait_for([&] { + return db->start_write(/* nonblocking */ true) != nullptr; + }); + + // Verify that the transaction callback never got enqueued + scheduler->invoke([&] { + done = true; + }); + wait_for_done(); + } + SECTION("while another Realm instance holds lock") { + auto realm2 = Realm::get_shared_realm(config); + realm2->begin_transaction(); + + auto scheduler = realm->scheduler(); + realm->async_begin_transaction([&] { + // We should never get here as the realm is closed + FAIL(); + }); + + // Doesn't have to wait for the write lock because the DB + // instance already holds it and we were just waiting for our + // turn with it rather than waiting to acquire it + std::invoke(close_functions[i], *realm); // Verify that the transaction callback never got enqueued scheduler->invoke([&] { diff --git a/test/object-store/sync/sync_test_utils.cpp b/test/object-store/sync/sync_test_utils.cpp index dece34092f4..a5300bfd9ce 100644 --- a/test/object-store/sync/sync_test_utils.cpp +++ b/test/object-store/sync/sync_test_utils.cpp @@ -87,29 +87,6 @@ bool ReturnsTrueWithinTimeLimit::match(util::FunctionRef condition) cons return predicate_returned_true; } -void timed_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms) -{ - const auto wait_start = std::chrono::steady_clock::now(); - util::EventLoop::main().run_until([&] { - if (std::chrono::steady_clock::now() - wait_start > max_ms) { - throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count())); - } - return condition(); - }); -} - -void timed_sleeping_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms, - std::chrono::milliseconds sleep_ms) -{ - const auto wait_start = std::chrono::steady_clock::now(); - while (!condition()) { - if (std::chrono::steady_clock::now() - wait_start > max_ms) { - throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count())); - } - std::this_thread::sleep_for(sleep_ms); - } -} - auto do_hash = [](const std::string& name) -> std::string { std::array hash; util::sha256(name.data(), name.size(), hash.data()); diff --git a/test/object-store/sync/sync_test_utils.hpp b/test/object-store/sync/sync_test_utils.hpp index 4e2a318c2a3..a29b73a8165 100644 --- a/test/object-store/sync/sync_test_utils.hpp +++ b/test/object-store/sync/sync_test_utils.hpp @@ -47,13 +47,6 @@ bool results_contains_user(SyncUserMetadataResults& results, const std::string& const std::string& auth_server); bool results_contains_original_name(SyncFileActionMetadataResults& results, const std::string& original_name); -void timed_wait_for(util::FunctionRef condition, - std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)); - -void timed_sleeping_wait_for(util::FunctionRef condition, - std::chrono::milliseconds max_ms = std::chrono::seconds(30), - std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1)); - class ReturnsTrueWithinTimeLimit : public Catch::Matchers::MatcherGenericBase { public: ReturnsTrueWithinTimeLimit(std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)) diff --git a/test/object-store/util/test_utils.cpp b/test/object-store/util/test_utils.cpp index 4cd18d5769a..3ebb747188d 100644 --- a/test/object-store/util/test_utils.cpp +++ b/test/object-store/util/test_utils.cpp @@ -16,13 +16,13 @@ // //////////////////////////////////////////////////////////////////////////// -#include #include "test_utils.hpp" +#include +#include #include #include #include -#include #include @@ -230,4 +230,27 @@ void chmod(const std::string& path, int permissions) #endif } +void timed_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms) +{ + const auto wait_start = std::chrono::steady_clock::now(); + util::EventLoop::main().run_until([&] { + if (std::chrono::steady_clock::now() - wait_start > max_ms) { + throw std::runtime_error(util::format("timed_wait_for exceeded %1 ms", max_ms.count())); + } + return condition(); + }); +} + +void timed_sleeping_wait_for(util::FunctionRef condition, std::chrono::milliseconds max_ms, + std::chrono::milliseconds sleep_ms) +{ + const auto wait_start = std::chrono::steady_clock::now(); + while (!condition()) { + if (std::chrono::steady_clock::now() - wait_start > max_ms) { + throw std::runtime_error(util::format("timed_sleeping_wait_for exceeded %1 ms", max_ms.count())); + } + std::this_thread::sleep_for(sleep_ms); + } +} + } // namespace realm diff --git a/test/object-store/util/test_utils.hpp b/test/object-store/util/test_utils.hpp index 38f3e8f1ef3..973e03abcc5 100644 --- a/test/object-store/util/test_utils.hpp +++ b/test/object-store/util/test_utils.hpp @@ -19,11 +19,13 @@ #ifndef REALM_TEST_UTILS_HPP #define REALM_TEST_UTILS_HPP -#include -#include +#include "util/event_loop.hpp" + #include #include +#include +#include #include #include namespace fs = std::filesystem; @@ -145,6 +147,12 @@ bool chmod_supported(const std::string& path); int get_permissions(const std::string& path); void chmod(const std::string& path, int permissions); +void timed_wait_for(util::FunctionRef condition, + std::chrono::milliseconds max_ms = std::chrono::milliseconds(5000)); + +void timed_sleeping_wait_for(util::FunctionRef condition, + std::chrono::milliseconds max_ms = std::chrono::seconds(30), + std::chrono::milliseconds sleep_ms = std::chrono::milliseconds(1)); } // namespace realm #define REQUIRE_DIR_EXISTS(macro_path) \ diff --git a/test/test_client_reset.cpp b/test/test_client_reset.cpp index 8481506d4a2..2d0b8c26185 100644 --- a/test/test_client_reset.cpp +++ b/test/test_client_reset.cpp @@ -141,12 +141,12 @@ TEST(ClientReset_NoLocalChanges) // Start the server from dir_2 and connect with the client 2. // We expect an error of type 209, "Bad server version". { + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir_2, test_context); fixture.start(); // The session that receives an error. { - BowlOfStonesSemaphore bowl; auto listener = [&](ConnectionState state, util::Optional error_info) { if (state != ConnectionState::disconnected) return; @@ -552,6 +552,7 @@ TEST(ClientReset_ThreeClients) { // client 1 and 2 will receive session errors. + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir_2, test_context); fixture.start(); @@ -595,7 +596,6 @@ TEST(ClientReset_ThreeClients) // The clients get session errors. { - BowlOfStonesSemaphore bowl; auto listener = [&](ConnectionState state, util::Optional error_info) { if (state != ConnectionState::disconnected) return; @@ -721,6 +721,7 @@ TEST(ClientReset_DoNotRecoverSchema) const std::string server_path_1 = "/data_1"; const std::string server_path_2 = "/data_2"; + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context); fixture.start(); @@ -768,7 +769,6 @@ TEST(ClientReset_DoNotRecoverSchema) session_config.client_reset_config = std::move(client_reset_config); } Session session = fixture.make_session(path_1, server_path_2, std::move(session_config)); - BowlOfStonesSemaphore bowl; session.set_connection_state_change_listener( [&](ConnectionState state, util::Optional error_info) { if (state != ConnectionState::disconnected) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index a6572563bbf..df0016f62b6 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -178,13 +178,12 @@ TEST(Sync_AsyncWaitForUploadCompletion) { TEST_DIR(dir); TEST_CLIENT_DB(db); + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context); fixture.start(); Session session = fixture.make_bound_session(db, "/test"); - auto wait = [&] { - BowlOfStonesSemaphore bowl; auto handler = [&](std::error_code ec) { if (CHECK_NOT(ec)) bowl.add_stone(); @@ -242,11 +241,11 @@ TEST(Sync_AsyncWaitForDownloadCompletion) TEST_DIR(dir); TEST_CLIENT_DB(db_1); TEST_CLIENT_DB(db_2); + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context); fixture.start(); auto wait = [&](Session& session) { - BowlOfStonesSemaphore bowl; auto handler = [&](std::error_code ec) { if (CHECK_NOT(ec)) bowl.add_stone(); @@ -304,11 +303,11 @@ TEST(Sync_AsyncWaitForSyncCompletion) TEST_DIR(dir); TEST_CLIENT_DB(db_1); TEST_CLIENT_DB(db_2); + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context); fixture.start(); auto wait = [&](Session& session) { - BowlOfStonesSemaphore bowl; auto handler = [&](std::error_code ec) { if (CHECK_NOT(ec)) bowl.add_stone(); @@ -352,9 +351,9 @@ TEST(Sync_AsyncWaitCancellation) { TEST_DIR(dir); TEST_CLIENT_DB(db); + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context); - BowlOfStonesSemaphore bowl; auto upload_completion_handler = [&](std::error_code ec) { CHECK_EQUAL(util::error::operation_aborted, ec); bowl.add_stone(); @@ -3822,6 +3821,7 @@ TEST(Sync_CancelReconnectDelay) TEST_CLIENT_DB(db); TEST_CLIENT_DB(db_x); + BowlOfStonesSemaphore bowl; ClientServerFixture::Config fixture_config; fixture_config.one_connection_per_session = false; @@ -3830,7 +3830,6 @@ TEST(Sync_CancelReconnectDelay) ClientServerFixture fixture{server_dir, test_context, std::move(fixture_config)}; fixture.start(); - BowlOfStonesSemaphore bowl; auto handler = [&](const SessionErrorInfo& info) { if (CHECK_EQUAL(info.error_code, ProtocolError::connection_closed)) bowl.add_stone(); @@ -3847,12 +3846,11 @@ TEST(Sync_CancelReconnectDelay) } // After connection-level error, and at client-level while connection - // object exists (ConnectionImpl in clinet.cpp). + // object exists (ConnectionImpl in client.cpp). { ClientServerFixture fixture{server_dir, test_context, std::move(fixture_config)}; fixture.start(); - BowlOfStonesSemaphore bowl; auto handler = [&](const SessionErrorInfo& info) { if (CHECK_EQUAL(info.error_code, ProtocolError::connection_closed)) bowl.add_stone(); @@ -3875,7 +3873,6 @@ TEST(Sync_CancelReconnectDelay) fixture.start(); { - BowlOfStonesSemaphore bowl; auto handler = [&](const SessionErrorInfo& info) { if (CHECK_EQUAL(info.error_code, ProtocolError::connection_closed)) bowl.add_stone(); @@ -3911,7 +3908,6 @@ TEST(Sync_CancelReconnectDelay) Session session_x = fixture.make_bound_session(db_x, "/x"); session_x.wait_for_download_complete_or_client_stopped(); - BowlOfStonesSemaphore bowl; auto handler = [&](const SessionErrorInfo& info) { if (CHECK_EQUAL(info.error_code, ProtocolError::illegal_realm_path)) bowl.add_stone(); @@ -3934,7 +3930,6 @@ TEST(Sync_CancelReconnectDelay) Session session_x = fixture.make_bound_session(db_x, "/x"); session_x.wait_for_download_complete_or_client_stopped(); - BowlOfStonesSemaphore bowl; auto handler = [&](const SessionErrorInfo& info) { if (CHECK_EQUAL(info.error_code, ProtocolError::illegal_realm_path)) bowl.add_stone(); @@ -4466,9 +4461,8 @@ TEST(Sync_ReconnectAfterPingTimeout) config.client_ping_period = 0; // send ping immediately config.client_pong_timeout = 0; // time out immediately - ClientServerFixture fixture(dir, test_context, std::move(config)); - BowlOfStonesSemaphore bowl; + ClientServerFixture fixture(dir, test_context, std::move(config)); auto error_handler = [&](std::error_code ec, bool, const std::string&) { if (CHECK_EQUAL(Client::Error::pong_timeout, ec)) bowl.add_stone(); @@ -4519,9 +4513,9 @@ TEST(Sync_ServerDiscardDeadConnections) ClientServerFixture::Config config; config.server_connection_reaper_interval = 1; // discard dead connections quickly, FIXME: 0 will not work here :( + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context, std::move(config)); - BowlOfStonesSemaphore bowl; auto error_handler = [&](std::error_code ec, bool, const std::string&) { bool valid_error = ec == sync::websocket::WebSocketError::websocket_read_error; CHECK(valid_error); @@ -4899,10 +4893,11 @@ TEST(Sync_ConnectionStateChange) std::vector states_1, states_2; { + BowlOfStonesSemaphore bowl_1, bowl_2; + ClientServerFixture fixture(dir, test_context); fixture.start(); - BowlOfStonesSemaphore bowl_1, bowl_2; auto listener_1 = [&](ConnectionState state, util::Optional error_info) { CHECK_EQUAL(state == ConnectionState::disconnected, bool(error_info)); states_1.push_back(state); @@ -4941,10 +4936,10 @@ TEST(Sync_ClientErrorHandler) { TEST_DIR(dir); TEST_CLIENT_DB(db); + BowlOfStonesSemaphore bowl; ClientServerFixture fixture(dir, test_context); fixture.start(); - BowlOfStonesSemaphore bowl; auto handler = [&](const SessionErrorInfo&) { bowl.add_stone(); }; diff --git a/test/util/semaphore.hpp b/test/util/semaphore.hpp index c7d69c8dc8b..99153561424 100644 --- a/test/util/semaphore.hpp +++ b/test/util/semaphore.hpp @@ -19,38 +19,29 @@ #ifndef REALM_TEST_UTIL_SEMAPHORE_HPP #define REALM_TEST_UTIL_SEMAPHORE_HPP -#include +#include -namespace realm { -namespace test_util { +namespace realm::test_util { class BowlOfStonesSemaphore { public: BowlOfStonesSemaphore(int initial_number_of_stones = 0) - : m_num_stones(initial_number_of_stones) + : m_semaphore(initial_number_of_stones) { } void get_stone() { - util::LockGuard lock(m_mutex); - while (m_num_stones == 0) - m_cond_var.wait(lock); - --m_num_stones; + m_semaphore.acquire(); } void add_stone() { - util::LockGuard lock(m_mutex); - ++m_num_stones; - m_cond_var.notify(); + m_semaphore.release(); } private: - util::Mutex m_mutex; - int m_num_stones; - util::CondVar m_cond_var; + util::CountingSemaphore<> m_semaphore; }; -} // namespace test_util -} // namespace realm +} // namespace realm::test_util #endif // REALM_TEST_UTIL_SEMAPHORE_HPP