diff --git a/CHANGELOG.md b/CHANGELOG.md index b056b319aeb..5ffaad435b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * Fixed FLX subscriptions not being sent to the server if the session was interrupted during bootstrapping. ([#7077](https://github.com/realm/realm-core/issues/7077), since v11.8.0) * Fixed FLX subscriptions not being sent to the server if an upload message was sent immediately after a subscription was committed but before the sync client checks for new subscriptions via `SubscriptionStore::get_next_pending_version()`. ([#7076](https://github.com/realm/realm-core/issues/7076), since v13.23.1) * Fixed application crash with 'KeyNotFound' exception when subscriptions are marked complete after a client reset. ([#7090](https://github.com/realm/realm-core/issues/7090), since v12.3.0) +* Add thread safety to session wrapper checks to address threading issues/thread sanitizer failures. ([#6844](https://github.com/realm/realm-core/issues/6844), since v13.4.2) ### Breaking changes * None. diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 07b8f9590ec..f202002d71a 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -175,6 +175,10 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener std::shared_ptr m_migration_store; + // @{ + // The following parameters are protected by m_mutex + std::mutex m_mutex; + bool m_initiated = false; // Set to true when this session wrapper is actualized (or when it is @@ -228,6 +232,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener // Must only be accessed from the event loop thread. SessionImpl* m_sess = nullptr; + // @} + // These must only be accessed from the event loop thread. std::vector m_upload_completion_handlers; std::vector m_download_completion_handlers; @@ -1184,13 +1190,16 @@ void SessionWrapper::on_flx_sync_version_complete(int64_t version) void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchState batch_state) { - if (!has_flx_subscription_store()) { - return; + { + std::lock_guard lock(m_mutex); + if (!has_flx_subscription_store()) { + return; + } + REALM_ASSERT(!m_finalized); + REALM_ASSERT(new_version >= m_flx_last_seen_version); + REALM_ASSERT(new_version >= m_flx_active_version); + REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); } - REALM_ASSERT(!m_finalized); - REALM_ASSERT(new_version >= m_flx_last_seen_version); - REALM_ASSERT(new_version >= m_flx_active_version); - REALM_ASSERT(batch_state != DownloadBatchState::SteadyState); SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy @@ -1228,24 +1237,28 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat SubscriptionStore* SessionWrapper::get_flx_subscription_store() { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_finalized); return m_flx_subscription_store.get(); } PendingBootstrapStore* SessionWrapper::get_flx_pending_bootstrap_store() { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_finalized); return m_flx_pending_bootstrap_store.get(); } MigrationStore* SessionWrapper::get_migration_store() { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_finalized); return m_migration_store.get(); } inline void SessionWrapper::set_progress_handler(util::UniqueFunction handler) { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_initiated); m_progress_handler = std::move(handler); } @@ -1254,6 +1267,7 @@ inline void SessionWrapper::set_progress_handler(util::UniqueFunction listener) { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_initiated); m_connection_state_change_listener = std::move(listener); } @@ -1261,10 +1275,13 @@ SessionWrapper::set_connection_state_change_listener(util::UniqueFunctionadd_commit_listener(this); } @@ -1272,10 +1289,13 @@ void SessionWrapper::initiate() void SessionWrapper::on_commit(version_type new_version) { // Thread safety required - REALM_ASSERT(m_initiated); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); - if (REALM_UNLIKELY(m_finalized || m_force_closed)) { - return; + if (REALM_UNLIKELY(m_finalized || m_force_closed)) { + return; + } } util::bind_ptr self{this}; @@ -1285,9 +1305,12 @@ void SessionWrapper::on_commit(version_type new_version) else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) - return; // Already finalized + { + std::lock_guard lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) + return; // Already finalized + } SessionImpl& sess = *self->m_sess; sess.recognize_sync_version(new_version); // Throws bool only_if_new_uploadable_data = true; @@ -1299,10 +1322,13 @@ void SessionWrapper::on_commit(version_type new_version) void SessionWrapper::cancel_reconnect_delay() { // Thread safety required - REALM_ASSERT(m_initiated); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); - if (REALM_UNLIKELY(m_finalized || m_force_closed)) { - return; + if (REALM_UNLIKELY(m_finalized || m_force_closed)) { + return; + } } util::bind_ptr self{this}; @@ -1312,9 +1338,12 @@ void SessionWrapper::cancel_reconnect_delay() else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) - return; // Already finalized + { + std::lock_guard lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) + return; // Already finalized + } SessionImpl& sess = *self->m_sess; sess.cancel_resumption_delay(); // Throws ClientImpl::Connection& conn = sess.get_connection(); @@ -1326,8 +1355,12 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple WaitOperCompletionHandler handler) { REALM_ASSERT(upload_completion || download_completion); - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } util::bind_ptr self{this}; m_client.post([self = std::move(self), handler = std::move(handler), upload_completion, @@ -1337,31 +1370,36 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) { - // Already finalized - handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws - return; - } - if (upload_completion) { - if (download_completion) { - // Wait for upload and download completion - self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) { + // Already finalized + lock.unlock(); + handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws + return; + } + if (upload_completion) { + if (download_completion) { + // Wait for upload and download completion + self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws + } + else { + // Wait for upload completion only + self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws + } } else { - // Wait for upload completion only - self->m_upload_completion_handlers.push_back(std::move(handler)); // Throws + // Wait for download completion only + self->m_download_completion_handlers.push_back(std::move(handler)); // Throws } + SessionImpl& sess = *self->m_sess; + lock.unlock(); + if (upload_completion) + sess.request_upload_completion_notification(); // Throws + if (download_completion) + sess.request_download_completion_notification(); // Throws } - else { - // Wait for download completion only - self->m_download_completion_handlers.push_back(std::move(handler)); // Throws - } - SessionImpl& sess = *self->m_sess; - if (upload_completion) - sess.request_upload_completion_notification(); // Throws - if (download_completion) - sess.request_download_completion_notification(); // Throws }); // Throws } @@ -1369,8 +1407,11 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple bool SessionWrapper::wait_for_upload_complete_or_client_stopped() { // Thread safety required - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } std::int_fast64_t target_mark; { @@ -1385,17 +1426,21 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - // The session wrapper may already have been finalized. This can only - // happen if it was abandoned, but in that case, the call of - // wait_for_upload_complete_or_client_stopped() must have returned - // already. - if (REALM_UNLIKELY(!self->m_sess)) - return; - if (target_mark > self->m_staged_upload_mark) { - self->m_staged_upload_mark = target_mark; - SessionImpl& sess = *self->m_sess; - sess.request_upload_completion_notification(); // Throws + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + // The session wrapper may already have been finalized. This can only + // happen if it was abandoned, but in that case, the call of + // wait_for_upload_complete_or_client_stopped() must have returned + // already. + if (REALM_UNLIKELY(!self->m_sess)) + return; + if (target_mark > self->m_staged_upload_mark) { + self->m_staged_upload_mark = target_mark; + SessionImpl& sess = *self->m_sess; + lock.unlock(); + sess.request_upload_completion_notification(); // Throws + } } }); // Throws @@ -1413,8 +1458,11 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped() bool SessionWrapper::wait_for_download_complete_or_client_stopped() { // Thread safety required - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } std::int_fast64_t target_mark; { @@ -1429,17 +1477,21 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - // The session wrapper may already have been finalized. This can only - // happen if it was abandoned, but in that case, the call of - // wait_for_download_complete_or_client_stopped() must have returned - // already. - if (REALM_UNLIKELY(!self->m_sess)) - return; - if (target_mark > self->m_staged_download_mark) { - self->m_staged_download_mark = target_mark; - SessionImpl& sess = *self->m_sess; - sess.request_download_completion_notification(); // Throws + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + // The session wrapper may already have been finalized. This can only + // happen if it was abandoned, but in that case, the call of + // wait_for_download_complete_or_client_stopped() must have returned + // already. + if (REALM_UNLIKELY(!self->m_sess)) + return; + if (target_mark > self->m_staged_download_mark) { + self->m_staged_download_mark = target_mark; + SessionImpl& sess = *self->m_sess; + lock.unlock(); + sess.request_download_completion_notification(); // Throws + } } }); // Throws @@ -1457,8 +1509,11 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped() void SessionWrapper::refresh(std::string signed_access_token) { // Thread safety required - REALM_ASSERT(m_initiated); - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + REALM_ASSERT(!m_finalized); + } m_client.post([self = util::bind_ptr(this), token = std::move(signed_access_token)](Status status) { if (status == ErrorCodes::OperationAborted) @@ -1466,9 +1521,12 @@ void SessionWrapper::refresh(std::string signed_access_token) else if (!status.is_ok()) throw Exception(status); - REALM_ASSERT(self->m_actualized); - if (REALM_UNLIKELY(!self->m_sess)) - return; // Already finalized + { + std::unique_lock lock(self->m_mutex); + REALM_ASSERT(self->m_actualized); + if (REALM_UNLIKELY(!self->m_sess)) + return; // Already finalized + } self->m_signed_access_token = std::move(token); SessionImpl& sess = *self->m_sess; ClientImpl::Connection& conn = sess.get_connection(); @@ -1482,25 +1540,31 @@ void SessionWrapper::refresh(std::string signed_access_token) inline void SessionWrapper::abandon(util::bind_ptr wrapper) noexcept { - if (wrapper->m_initiated) { - ClientImpl& client = wrapper->m_client; - client.register_abandoned_session_wrapper(std::move(wrapper)); + { + std::lock_guard lock(wrapper->m_mutex); + if (!wrapper->m_initiated) + return; } + ClientImpl& client = wrapper->m_client; + client.register_abandoned_session_wrapper(std::move(wrapper)); } // Must be called from event loop thread void SessionWrapper::actualize(ServerEndpoint endpoint) { + std::unique_lock lock(m_mutex); REALM_ASSERT(!m_actualized); REALM_ASSERT(!m_sess); // Cannot be actualized if it's already been finalized or force closed REALM_ASSERT(!m_finalized); REALM_ASSERT(!m_force_closed); + try { m_db->claim_sync_agent(); } catch (const MultipleSyncAgents&) { + lock.unlock(); finalize_before_actualization(); throw; } @@ -1511,6 +1575,7 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) std::move(endpoint), m_authorization_header_name, m_custom_http_headers, m_verify_servers_ssl_certificate, m_ssl_trust_certificate_path, m_ssl_verify_callback, m_proxy_config, was_created); // Throws + lock.unlock(); try { // FIXME: This only makes sense when each session uses a separate connection. conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws @@ -1526,14 +1591,16 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) catch (...) { if (was_created) m_client.remove_connection(conn); - finalize_before_actualization(); throw; } + lock.lock(); m_actualized = true; + if (was_created) conn.activate(); // Throws + lock.unlock(); if (m_connection_state_change_listener) { ConnectionState state = conn.get_state(); @@ -1550,6 +1617,7 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) void SessionWrapper::force_close() { + std::lock_guard lock(m_mutex); if (m_force_closed || m_finalized) { return; } @@ -1573,31 +1641,34 @@ void SessionWrapper::force_close() // Must be called from event loop thread void SessionWrapper::finalize() { - REALM_ASSERT(m_actualized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_actualized); - // Already finalized? - if (m_finalized) { - return; - } + // Already finalized? + if (m_finalized) { + return; + } - // Must be before marking as finalized as we expect m_finalized == false in on_change() - m_db->remove_commit_listener(this); + m_finalized = true; - m_finalized = true; + if (!m_force_closed) { + REALM_ASSERT(m_sess); + ClientImpl::Connection& conn = m_sess->get_connection(); + conn.initiate_session_deactivation(m_sess); // Throws - if (!m_force_closed) { - REALM_ASSERT(m_sess); - ClientImpl::Connection& conn = m_sess->get_connection(); - conn.initiate_session_deactivation(m_sess); // Throws - - // Delete the pending bootstrap store since it uses a reference to the logger in m_sess - m_flx_pending_bootstrap_store.reset(); - // Clear the subscription and migration store refs since they are owned by SyncSession - m_flx_subscription_store.reset(); - m_migration_store.reset(); - m_sess = nullptr; + // Delete the pending bootstrap store since it uses a reference to the logger in m_sess + m_flx_pending_bootstrap_store.reset(); + // Clear the subscription and migration store refs since they are owned by SyncSession + m_flx_subscription_store.reset(); + m_migration_store.reset(); + m_sess = nullptr; + } } + // Must be before marking as finalized as we expect m_finalized == false in on_change() + m_db->remove_commit_listener(this); + // The Realm file can be closed now, as no access to the Realm file is // supposed to happen on behalf of a session after initiation of // deactivation. @@ -1630,6 +1701,7 @@ void SessionWrapper::finalize() // Called with a lock on `m_client.m_mutex`. inline void SessionWrapper::finalize_before_actualization() noexcept { + std::lock_guard lock(m_mutex); REALM_ASSERT(!m_sess); m_actualized = true; m_force_closed = true; @@ -1638,15 +1710,21 @@ inline void SessionWrapper::finalize_before_actualization() noexcept inline void SessionWrapper::on_sync_progress() { - REALM_ASSERT(!m_finalized); - m_reliable_download_progress = true; + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + m_reliable_download_progress = true; + } report_progress(); // Throws } void SessionWrapper::on_upload_completion() { - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + } while (!m_upload_completion_handlers.empty()) { auto handler = std::move(m_upload_completion_handlers.back()); m_upload_completion_handlers.pop_back(); @@ -1667,6 +1745,10 @@ void SessionWrapper::on_upload_completion() void SessionWrapper::on_download_completion() { + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + } while (!m_download_completion_handlers.empty()) { auto handler = std::move(m_download_completion_handlers.back()); m_download_completion_handlers.pop_back(); @@ -1697,8 +1779,11 @@ void SessionWrapper::on_download_completion() void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) { - REALM_ASSERT(!m_finalized); - m_suspended = true; + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + m_suspended = true; + } if (m_connection_state_change_listener) { m_connection_state_change_listener(ConnectionState::disconnected, error_info); // Throws } @@ -1707,8 +1792,11 @@ void SessionWrapper::on_suspended(const SessionErrorInfo& error_info) void SessionWrapper::on_resumed() { - REALM_ASSERT(!m_finalized); - m_suspended = false; + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + m_suspended = false; + } if (m_connection_state_change_listener) { ClientImpl::Connection& conn = m_sess->get_connection(); if (conn.get_state() != ConnectionState::disconnected) { @@ -1723,20 +1811,28 @@ void SessionWrapper::on_resumed() void SessionWrapper::on_connection_state_changed(ConnectionState state, const util::Optional& error_info) { + { + std::lock_guard lock(m_mutex); + if (m_suspended) { + return; + } + } if (m_connection_state_change_listener) { - if (!m_suspended) - m_connection_state_change_listener(state, error_info); // Throws + m_connection_state_change_listener(state, error_info); // Throws } } void SessionWrapper::report_progress(bool only_if_new_uploadable_data) { - REALM_ASSERT(!m_finalized); - REALM_ASSERT(m_sess); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + REALM_ASSERT(m_sess); - if (!m_progress_handler) - return; + if (!m_progress_handler) + return; + } std::uint_fast64_t downloaded_bytes = 0; std::uint_fast64_t downloadable_bytes = 0; @@ -1774,8 +1870,11 @@ void SessionWrapper::report_progress(bool only_if_new_uploadable_data) util::Future SessionWrapper::send_test_command(std::string body) { - if (!m_sess) { - return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}; + { + std::lock_guard lock(m_mutex); + if (!m_sess) { + return Status{ErrorCodes::RuntimeError, "session must be activated to send a test command"}; + } } return m_sess->send_test_command(std::move(body)); @@ -1783,7 +1882,10 @@ util::Future SessionWrapper::send_test_command(std::string body) void SessionWrapper::handle_pending_client_reset_acknowledgement() { - REALM_ASSERT(!m_finalized); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(!m_finalized); + } auto pending_reset = [&] { auto ft = m_db->start_frozen(); @@ -1837,9 +1939,12 @@ void SessionWrapper::update_subscription_version_info() std::string SessionWrapper::get_appservices_connection_id() { - auto pf = util::make_promise_future(); - REALM_ASSERT(m_initiated); + { + std::lock_guard lock(m_mutex); + REALM_ASSERT(m_initiated); + } + auto pf = util::make_promise_future(); util::bind_ptr self(this); get_client().post([self, promise = std::move(pf.promise)](Status status) mutable { if (!status.is_ok()) { diff --git a/test/object-store/sync/flx_migration.cpp b/test/object-store/sync/flx_migration.cpp index 7452b4b2619..295c3064b94 100644 --- a/test/object-store/sync/flx_migration.cpp +++ b/test/object-store/sync/flx_migration.cpp @@ -784,14 +784,16 @@ TEST_CASE("New table is synced after migration", "[sync][flx][flx migration][baa REQUIRE(!wait_for_upload(*realm)); REQUIRE(!wait_for_download(*realm)); - auto table = realm->read_group().get_table("class_Object"); - CHECK(table->size() == 5); - auto table2 = realm->read_group().get_table("class_Object2"); - CHECK(table2->size() == 1); auto sync_session = realm->sync_session(); REQUIRE(sync_session); auto sub_store = sync_session->get_flx_subscription_store(); REQUIRE(sub_store); + sub_store->get_latest().get_state_change_notification(sync::SubscriptionSet::State::Complete).get(); + + auto table = realm->read_group().get_table("class_Object"); + CHECK(table->size() == 5); + auto table2 = realm->read_group().get_table("class_Object2"); + CHECK(table2->size() == 1); auto active_subs = sub_store->get_active(); REQUIRE(active_subs.size() == 2); REQUIRE(active_subs.find("flx_migrated_Object2"));