From e063d5549b9d8b32ec88f1c961f031803291872d Mon Sep 17 00:00:00 2001 From: Michael Wilkerson-Barker Date: Thu, 19 Oct 2023 00:23:26 -0400 Subject: [PATCH 1/6] Added thread safety to session wrapper checks --- src/realm/sync/client.cpp | 203 ++++++++++++++++++++++++++------------ 1 file changed, 141 insertions(+), 62 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index dad5c43be86..316e59cf20e 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -173,6 +173,10 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener std::shared_ptr m_migration_store; + // (** + // The following parameters are protected by m_mutex + std::mutex m_mutex; + bool m_initiated = false; // Set to true when this session wrapper is actualized (or when it is @@ -226,6 +230,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener // Must only be accessed from the event loop thread. SessionImpl* m_sess = nullptr; + // **) + // These must only be accessed from the event loop thread. std::vector m_upload_completion_handlers; std::vector m_download_completion_handlers; @@ -1241,6 +1247,7 @@ MigrationStore* SessionWrapper::get_migration_store() inline void SessionWrapper::set_progress_handler(util::UniqueFunction handler) { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_initiated); m_progress_handler = std::move(handler); } @@ -1249,6 +1256,7 @@ inline void SessionWrapper::set_progress_handler(util::UniqueFunction listener) { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_initiated); m_connection_state_change_listener = std::move(listener); } @@ -1256,6 +1264,7 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunction self{this}; @@ -1294,10 +1306,13 @@ void SessionWrapper::on_commit(version_type new_version) void SessionWrapper::cancel_reconnect_delay() { // Thread safety required - REALM_ASSERT(m_initiated); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); - if (REALM_UNLIKELY(m_finalized || m_force_closed)) { - return; + if (REALM_UNLIKELY(m_finalized || m_force_closed)) { + return; + } } util::bind_ptr self{this}; @@ -1321,8 +1336,12 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple WaitOperCompletionHandler handler) { REALM_ASSERT(upload_completion || download_completion); - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } util::bind_ptr self{this}; m_client.post([self = std::move(self), handler = std::move(handler), upload_completion, @@ -1364,8 +1383,11 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple bool SessionWrapper::wait_for_upload_complete_or_client_stopped() { // Thread safety required - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } std::int_fast64_t target_mark; { @@ -1408,8 +1430,11 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() bool SessionWrapper::wait_for_download_complete_or_client_stopped() { // Thread safety required - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } std::int_fast64_t target_mark; { @@ -1452,8 +1477,11 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() void SessionWrapper::refresh(std::string signed_access_token) { // Thread safety required - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) { if (status == ErrorCodes::OperationAborted) @@ -1477,7 +1505,12 @@ void SessionWrapper::refresh(std::string signed_access_token) inline void SessionWrapper::abandon(util::bind_ptr wrapper) noexcept { - if (wrapper->m_initiated) { + auto is_initiated = [&]() { + std::lock_guard lock(wrapper->m_mutex); + return wrapper->m_initiated; + }; + + if (is_initiated()) { ClientImpl& client = wrapper->m_client; client.register_abandoned_session_wrapper(std::move(wrapper)); } @@ -1487,11 +1520,14 @@ inline void SessionWrapper::abandon(util::bind_ptr wrapper) noex // Must be called from event loop thread void SessionWrapper::actualize(ServerEndpoint endpoint) { - REALM_ASSERT(!m_actualized); - REALM_ASSERT(!m_sess); - // Cannot be actualized if it's already been finalized or force closed - REALM_ASSERT(!m_finalized); - REALM_ASSERT(!m_force_closed); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_actualized); + REALM_ASSERT(!m_sess); + // Cannot be actualized if it's already been finalized or force closed + REALM_ASSERT(!m_finalized); + REALM_ASSERT(!m_force_closed); + } try { m_db->claim_sync_agent(); } @@ -1526,7 +1562,10 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) throw; } - m_actualized = true; + { + std::lock_guard lock(m_mutex); + m_actualized = true; + } if (was_created) conn.activate(); // Throws @@ -1545,12 +1584,15 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) void SessionWrapper::force_close() { - if (m_force_closed || m_finalized) { - return; + { + std::lock_guard lock(m_mutex); + if (m_force_closed || m_finalized) { + return; + } + REALM_ASSERT(m_actualized); + REALM_ASSERT(m_sess); + m_force_closed = true; } - REALM_ASSERT(m_actualized); - REALM_ASSERT(m_sess); - m_force_closed = true; ClientImpl::Connection& conn = m_sess->get_connection(); conn.initiate_session_deactivation(m_sess); // Throws @@ -1568,29 +1610,32 @@ void SessionWrapper::force_close() // Must be called from event loop thread void SessionWrapper::finalize() { - REALM_ASSERT(m_actualized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_actualized); - // Already finalized? - if (m_finalized) { - return; - } + // Already finalized? + if (m_finalized) { + return; + } - // Must be before marking as finalized as we expect m_finalized == false in on_change() - m_db->remove_commit_listener(this); + // Must be before marking as finalized as we expect m_finalized == false in on_change() + m_db->remove_commit_listener(this); - m_finalized = true; + m_finalized = true; - if (!m_force_closed) { - REALM_ASSERT(m_sess); - ClientImpl::Connection& conn = m_sess->get_connection(); - conn.initiate_session_deactivation(m_sess); // Throws + if (!m_force_closed) { + REALM_ASSERT(m_sess); + ClientImpl::Connection& conn = m_sess->get_connection(); + conn.initiate_session_deactivation(m_sess); // Throws - // Delete the pending bootstrap store since it uses a reference to the logger in m_sess - m_flx_pending_bootstrap_store.reset(); - // Clear the subscription and migration store refs since they are owned by SyncSession - m_flx_subscription_store.reset(); - m_migration_store.reset(); - m_sess = nullptr; + // Delete the pending bootstrap store since it uses a reference to the logger in m_sess + m_flx_pending_bootstrap_store.reset(); + // Clear the subscription and migration store refs since they are owned by SyncSession + m_flx_subscription_store.reset(); + m_migration_store.reset(); + m_sess = nullptr; + } } // The Realm file can be closed now, as no access to the Realm file is @@ -1625,6 +1670,7 @@ void SessionWrapper::finalize() // Called with a lock on `m_client.m_mutex`. inline void SessionWrapper::finalize_before_actualization() noexcept { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_sess); m_actualized = true; m_force_closed = true; @@ -1633,15 +1679,21 @@ inline void SessionWrapper::finalize_before_actualization() noexcept inline void SessionWrapper::on_sync_progress() { - REALM_ASSERT(!m_finalized); - m_reliable_download_progress = true; + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + m_reliable_download_progress = true; + } report_progress(); // Throws } void SessionWrapper::on_upload_completion() { - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + } while (!m_upload_completion_handlers.empty()) { auto handler = std::move(m_upload_completion_handlers.back()); m_upload_completion_handlers.pop_back(); @@ -1662,6 +1714,10 @@ void SessionWrapper::on_upload_completion() void SessionWrapper::on_download_completion() { + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + } while (!m_download_completion_handlers.empty()) { auto handler = std::move(m_download_completion_handlers.back()); m_download_completion_handlers.pop_back(); @@ -1692,8 +1748,11 @@ void SessionWrapper::on_download_completion() void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) { - REALM_ASSERT(!m_finalized); - m_suspended = true; + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + m_suspended = true; + } if (m_connection_state_change_listener) { m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws } @@ -1702,8 +1761,11 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) void SessionWrapper::on_resumed() { - REALM_ASSERT(!m_finalized); - m_suspended = false; + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + m_suspended = false; + } if (m_connection_state_change_listener) { ClientImpl::Connection& conn = m_sess->get_connection(); if (conn.get_state() != ConnectionState::disconnected) { @@ -1718,20 +1780,28 @@ void SessionWrapper::on_resumed() void SessionWrapper::on_connection_state_changed(ConnectionState state, const util::Optional& error_info) { + { + std::lock_guard lock(m_mutex); + if (m_suspended) { + return; + } + } if (m_connection_state_change_listener) { - if (!m_suspended) - m_connection_state_change_listener(state, error_info); // Throws + m_connection_state_change_listener(state, error_info); // Throws } } void SessionWrapper::report_progress(bool only_if_new_uploadable_data) { - REALM_ASSERT(!m_finalized); - REALM_ASSERT(m_sess); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + REALM_ASSERT(m_sess); - if (!m_progress_handler) - return; + if (!m_progress_handler) + return; + } std::uint_fast64_t downloaded_bytes = 0; std::uint_fast64_t downloadable_bytes = 0; @@ -1769,8 +1839,11 @@ void SessionWrapper::report_progress(bool only_if_new_uploadable_data) util::Future SessionWrapper::send_test_command(std::string body) { - if (!m_sess) { - return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}; + { + std::lock_guard lock(m_mutex); + if (!m_sess) { + return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}; + } } return m_sess->send_test_command(std::move(body)); @@ -1778,7 +1851,10 @@ util::Future SessionWrapper::send_test_command(std::string body) void SessionWrapper::handle_pending_client_reset_acknowledgement() { - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + } auto pending_reset = [&] { auto ft = m_db->start_frozen(); @@ -1823,9 +1899,12 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement() std::string SessionWrapper::get_appservices_connection_id() { - auto pf = util::make_promise_future(); - REALM_ASSERT(m_initiated); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + } + auto pf = util::make_promise_future(); util::bind_ptr self(this); get_client().post([self, promise = std::move(pf.promise)](Status status) mutable { if (!status.is_ok()) { From 6c89dfdc3381d3a1b14cbab304b812c3b609c810 Mon Sep 17 00:00:00 2001 From: Michael Wilkerson-Barker Date: Thu, 19 Oct 2023 19:30:23 -0400 Subject: [PATCH 2/6] Added some more mutex and updated changelog --- CHANGELOG.md | 2 +- src/realm/sync/client.cpp | 188 ++++++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 82 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ff5008fec0..534d085ced3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Add thread safety to session wrapper checks to address threading issues/thread sanitizer failures. ([#6844](https://github.com/realm/realm-core/issues/6844), since v13.4.2) ### Breaking changes * None. diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 316e59cf20e..65575c655d4 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -173,7 +173,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener std::shared_ptr m_migration_store; - // (** + // @{ // The following parameters are protected by m_mutex std::mutex m_mutex; @@ -230,7 +230,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener // Must only be accessed from the event loop thread. SessionImpl* m_sess = nullptr; - // **) + // @} // These must only be accessed from the event loop thread. std::vector m_upload_completion_handlers; @@ -1185,13 +1185,16 @@ void SessionWrapper::on_flx_sync_version_complete(int64_t version) void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state) { - if (!has_flx_subscription_store()) { - return; + { + std::lock_guard lock(m_mutex); + if (!has_flx_subscription_store()) { + return; + } + REALM_ASSERT(!m_finalized); + REALM_ASSERT(new_version >= m_flx_last_seen_version); + REALM_ASSERT(new_version >= m_flx_active_version); + REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); } - REALM_ASSERT(!m_finalized); - REALM_ASSERT(new_version >= m_flx_last_seen_version); - REALM_ASSERT(new_version >= m_flx_active_version); - REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy @@ -1229,18 +1232,21 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat SubscriptionStore* SessionWrapper::get_flx_subscription_store() { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_finalized); return m_flx_subscription_store.get(); } PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store() { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_finalized); return m_flx_pending_bootstrap_store.get(); } MigrationStore* SessionWrapper::get_migration_store() { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_finalized); return m_migration_store.get(); } @@ -1292,9 +1298,12 @@ void SessionWrapper::on_commit(version_type new_version) else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) - return; // Already finalized + { + std::lock_guard lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) + return; // Already finalized + } SessionImpl& sess = *self->m_sess; sess.recognize_sync_version(new_version); // Throws bool only_if_new_uploadable_data = true; @@ -1322,9 +1331,12 @@ void SessionWrapper::cancel_reconnect_delay() else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) - return; // Already finalized + { + std::lock_guard lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) + return; // Already finalized + } SessionImpl& sess = *self->m_sess; sess.cancel_resumption_delay(); // Throws ClientImpl::Connection& conn = sess.get_connection(); @@ -1351,31 +1363,36 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) { - // Already finalized - handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws - return; - } - if (upload_completion) { - if (download_completion) { - // Wait for upload and download completion - self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) { + // Already finalized + lock.unlock(); + handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws + return; + } + if (upload_completion) { + if (download_completion) { + // Wait for upload and download completion + self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws + } + else { + // Wait for upload completion only + self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws + } } else { - // Wait for upload completion only - self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws + // Wait for download completion only + self->m_download_completion_handlers.push_back(std::move(handler)); // Throws } + SessionImpl& sess = *self->m_sess; + lock.unlock(); + if (upload_completion) + sess.request_upload_completion_notification(); // Throws + if (download_completion) + sess.request_download_completion_notification(); // Throws } - else { - // Wait for download completion only - self->m_download_completion_handlers.push_back(std::move(handler)); // Throws - } - SessionImpl& sess = *self->m_sess; - if (upload_completion) - sess.request_upload_completion_notification(); // Throws - if (download_completion) - sess.request_download_completion_notification(); // Throws }); // Throws } @@ -1402,17 +1419,21 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - // The session wrapper may already have been finalized. This can only - // happen if it was abandoned, but in that case, the call of - // wait_for_upload_complete_or_client_stopped() must have returned - // already. - if (REALM_UNLIKELY(!self->m_sess)) - return; - if (target_mark > self->m_staged_upload_mark) { - self->m_staged_upload_mark = target_mark; - SessionImpl& sess = *self->m_sess; - sess.request_upload_completion_notification(); // Throws + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + // The session wrapper may already have been finalized. This can only + // happen if it was abandoned, but in that case, the call of + // wait_for_upload_complete_or_client_stopped() must have returned + // already. + if (REALM_UNLIKELY(!self->m_sess)) + return; + if (target_mark > self->m_staged_upload_mark) { + self->m_staged_upload_mark = target_mark; + SessionImpl& sess = *self->m_sess; + lock.unlock(); + sess.request_upload_completion_notification(); // Throws + } } }); // Throws @@ -1449,17 +1470,21 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - // The session wrapper may already have been finalized. This can only - // happen if it was abandoned, but in that case, the call of - // wait_for_download_complete_or_client_stopped() must have returned - // already. - if (REALM_UNLIKELY(!self->m_sess)) - return; - if (target_mark > self->m_staged_download_mark) { - self->m_staged_download_mark = target_mark; - SessionImpl& sess = *self->m_sess; - sess.request_download_completion_notification(); // Throws + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + // The session wrapper may already have been finalized. This can only + // happen if it was abandoned, but in that case, the call of + // wait_for_download_complete_or_client_stopped() must have returned + // already. + if (REALM_UNLIKELY(!self->m_sess)) + return; + if (target_mark > self->m_staged_download_mark) { + self->m_staged_download_mark = target_mark; + SessionImpl& sess = *self->m_sess; + lock.unlock(); + sess.request_download_completion_notification(); // Throws + } } }); // Throws @@ -1489,9 +1514,12 @@ void SessionWrapper::refresh(std::string signed_access_token) else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) - return; // Already finalized + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) + return; // Already finalized + } self->m_signed_access_token = std::move(token); SessionImpl& sess = *self->m_sess; ClientImpl::Connection& conn = sess.get_connection(); @@ -1520,18 +1548,18 @@ inline void SessionWrapper::abandon(util::bind_ptr wrapper) noex // Must be called from event loop thread void SessionWrapper::actualize(ServerEndpoint endpoint) { - { - std::lock_guard lock(m_mutex); - REALM_ASSERT(!m_actualized); - REALM_ASSERT(!m_sess); - // Cannot be actualized if it's already been finalized or force closed - REALM_ASSERT(!m_finalized); - REALM_ASSERT(!m_force_closed); - } + std::unique_lock lock(m_mutex); + REALM_ASSERT(!m_actualized); + REALM_ASSERT(!m_sess); + // Cannot be actualized if it's already been finalized or force closed + REALM_ASSERT(!m_finalized); + REALM_ASSERT(!m_force_closed); + try { m_db->claim_sync_agent(); } catch (const MultipleSyncAgents&) { + lock.unlock(); finalize_before_actualization(); throw; } @@ -1542,6 +1570,7 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate, m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config, was_created); // Throws + lock.unlock(); try { // FIXME: This only makes sense when each session uses a separate connection. conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws @@ -1557,17 +1586,16 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) catch (...) { if (was_created) m_client.remove_connection(conn); - finalize_before_actualization(); throw; } - { - std::lock_guard lock(m_mutex); - m_actualized = true; - } + lock.lock(); + m_actualized = true; + if (was_created) conn.activate(); // Throws + lock.unlock(); if (m_connection_state_change_listener) { ConnectionState state = conn.get_state(); @@ -1584,15 +1612,13 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) void SessionWrapper::force_close() { - { - std::lock_guard lock(m_mutex); - if (m_force_closed || m_finalized) { - return; - } - REALM_ASSERT(m_actualized); - REALM_ASSERT(m_sess); - m_force_closed = true; + std::lock_guard lock(m_mutex); + if (m_force_closed || m_finalized) { + return; } + REALM_ASSERT(m_actualized); + REALM_ASSERT(m_sess); + m_force_closed = true; ClientImpl::Connection& conn = m_sess->get_connection(); conn.initiate_session_deactivation(m_sess); // Throws From ad19c3dde86b87a0b324ccd540feecc88611bef8 Mon Sep 17 00:00:00 2001 From: Michael Wilkerson-Barker Date: Thu, 19 Oct 2023 23:06:49 -0400 Subject: [PATCH 3/6] Fix lock inversion --- src/realm/sync/client.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 65575c655d4..e123aa3f12d 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1270,11 +1270,13 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunctionadd_commit_listener(this); } From 39510806eb846f7b5d18e1a440fb3080d40cf6e2 Mon Sep 17 00:00:00 2001 From: Michael Wilkerson-Barker Date: Fri, 20 Oct 2023 15:42:19 -0400 Subject: [PATCH 4/6] Release SessionWrapper mutex before removing the commit listener --- src/realm/sync/client.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index e123aa3f12d..fb3c9d0a712 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1647,9 +1647,6 @@ void SessionWrapper::finalize() return; } - // Must be before marking as finalized as we expect m_finalized == false in on_change() - m_db->remove_commit_listener(this); - m_finalized = true; if (!m_force_closed) { @@ -1666,6 +1663,9 @@ void SessionWrapper::finalize() } } + // Must be before marking as finalized as we expect m_finalized == false in on_change() + m_db->remove_commit_listener(this); + // The Realm file can be closed now, as no access to the Realm file is // supposed to happen on behalf of a session after initiation of // deactivation. From d45382daede27b75678bd4c65fb792130405e51d Mon Sep 17 00:00:00 2001 From: Michael Wilkerson-Barker Date: Fri, 20 Oct 2023 16:56:26 -0400 Subject: [PATCH 5/6] More tsan fixes --- src/realm/sync/client.cpp | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index fb3c9d0a712..b48dd462416 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1270,13 +1270,13 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunctionadd_commit_listener(this); } @@ -1535,15 +1535,13 @@ void SessionWrapper::refresh(std::string signed_access_token) inline void SessionWrapper::abandon(util::bind_ptr wrapper) noexcept { - auto is_initiated = [&]() { + { std::lock_guard lock(wrapper->m_mutex); - return wrapper->m_initiated; - }; - - if (is_initiated()) { - ClientImpl& client = wrapper->m_client; - client.register_abandoned_session_wrapper(std::move(wrapper)); + if (!wrapper->m_initiated) + return; } + ClientImpl& client = wrapper->m_client; + client.register_abandoned_session_wrapper(std::move(wrapper)); } From 63eaef6e5f19f4b652246239c35a14bdb12bcfc5 Mon Sep 17 00:00:00 2001 From: Michael Wilkerson-Barker Date: Wed, 1 Nov 2023 22:12:17 -0400 Subject: [PATCH 6/6] Added fix for New table is synced after migration test --- test/object-store/sync/flx_migration.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/object-store/sync/flx_migration.cpp b/test/object-store/sync/flx_migration.cpp index 7452b4b2619..295c3064b94 100644 --- a/test/object-store/sync/flx_migration.cpp +++ b/test/object-store/sync/flx_migration.cpp @@ -784,14 +784,16 @@ TEST_CASE("New table is synced after migration", "[sync][flx][flx migration][baa REQUIRE(!wait_for_upload(*realm)); REQUIRE(!wait_for_download(*realm)); - auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 5); - auto table2 = realm->read_group().get_table("class_Object2"); - CHECK(table2->size() == 1); auto sync_session = realm->sync_session(); REQUIRE(sync_session); auto sub_store = sync_session->get_flx_subscription_store(); REQUIRE(sub_store); + sub_store->get_latest().get_state_change_notification(sync::SubscriptionSet::State::Complete).get(); + + auto table = realm->read_group().get_table("class_Object"); + CHECK(table->size() == 5); + auto table2 = realm->read_group().get_table("class_Object2"); + CHECK(table2->size() == 1); auto active_subs = sub_store->get_active(); REQUIRE(active_subs.size() == 2); REQUIRE(active_subs.find("flx_migrated_Object2"));