From 7e20b9f9068a009a511330174ea5737df57e0054 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Tue, 23 Jul 2024 12:24:52 -0700 Subject: [PATCH] Treat completing a client reset as receiving a MARK message --- CHANGELOG.md | 3 +- src/realm/sync/client.cpp | 59 +---- src/realm/sync/network/default_socket.cpp | 2 - src/realm/sync/noinst/client_history_impl.cpp | 28 ++- src/realm/sync/noinst/client_history_impl.hpp | 5 +- src/realm/sync/noinst/client_impl_base.cpp | 27 +-- src/realm/sync/noinst/client_impl_base.hpp | 1 - src/realm/sync/noinst/client_reset.cpp | 8 + src/realm/sync/noinst/migration_store.cpp | 4 +- .../sync/noinst/pending_bootstrap_store.cpp | 4 +- src/realm/sync/noinst/pending_reset_store.cpp | 4 +- src/realm/sync/noinst/pending_reset_store.hpp | 3 +- .../sync/noinst/sync_metadata_schema.cpp | 31 +-- .../sync/noinst/sync_metadata_schema.hpp | 6 +- src/realm/sync/subscriptions.cpp | 4 +- test/object-store/benchmarks/client_reset.cpp | 2 +- test/object-store/sync/client_reset.cpp | 210 +++++----------- test/object-store/sync/flx_sync.cpp | 226 +++++++++++++++++- .../util/sync/sync_test_utils.cpp | 51 +++- .../util/sync/sync_test_utils.hpp | 6 +- test/test_client_reset.cpp | 44 ++-- test/test_sync_subscriptions.cpp | 27 ++- 22 files changed, 435 insertions(+), 320 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b24470d2676..676d95ba3ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Automatic client reset handling now reports download completion as soon as all changes from the newly downloaded file have been applied to the main Realm file rather than at an inconsistent time afterwards ([PR #7921](https://github.com/realm/realm-core/pull/7921)). +* Cycle detection for automatic client reset handling would sometimes consider two client resets in a row to be a cycle even when the first reset did not recover any changes and so could not have triggered the second. ([PR #7921](https://github.com/realm/realm-core/pull/7921)). ### Breaking changes * None. diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 214a938969c..b201085fbec 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -102,8 +102,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener // Can be called from any thread. util::Future send_test_command(std::string body); - void handle_pending_client_reset_acknowledgement(); - // Can be called from any thread. std::string get_appservices_connection_id(); @@ -779,14 +777,6 @@ void SessionImpl::on_resumed() } } -void SessionImpl::handle_pending_client_reset_acknowledgement() -{ - // Ignore the call if the session is not active - if (m_state == State::Active) { - m_wrapper.handle_pending_client_reset_acknowledgement(); - } -} - bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message) { // Ignore the message if the session is not active or a steady state message @@ -1354,8 +1344,12 @@ void SessionWrapper::actualize() } } - if (!m_client_reset_config) + if (!m_client_reset_config) { check_progress(); // Throws + if (auto pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen())) { + m_sess->logger.info(util::LogCategory::reset, "Found pending client reset tracker: %1", *pending_reset); + } + } } void SessionWrapper::force_close() @@ -1651,49 +1645,6 @@ util::Future SessionWrapper::send_test_command(std::string body) return m_sess->send_test_command(std::move(body)); } -void SessionWrapper::handle_pending_client_reset_acknowledgement() -{ - REALM_ASSERT(!m_finalized); - - auto has_pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen()); - if (!has_pending_reset) { - return; // nothing to do - } - - m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset); - - // Now that the client reset merge is complete, wait for the changes to synchronize with the server - async_wait_for( - true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) { - if (status == ErrorCodes::OperationAborted) { - return; - } - auto& logger = self->m_sess->logger; - if (!status.is_ok()) { - logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1", - status); - return; - } - - logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset); - - auto tr = self->m_db->start_write(); - auto cur_pending_reset = PendingResetStore::has_pending_reset(*tr); - if (!cur_pending_reset) { - logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed."); - return; - } - if (*cur_pending_reset == pending_reset) { - logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker."); - } - else { - logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset); - } - PendingResetStore::clear_pending_reset(*tr); - tr->commit(); - }); -} - std::string SessionWrapper::get_appservices_connection_id() { auto pf = util::make_promise_future(); diff --git a/src/realm/sync/network/default_socket.cpp b/src/realm/sync/network/default_socket.cpp index c2b69d64d25..56561ef8c5c 100644 --- a/src/realm/sync/network/default_socket.cpp +++ b/src/realm/sync/network/default_socket.cpp @@ -32,8 +32,6 @@ class DefaultWebSocketImpl final : public DefaultWebSocket, public Config { initiate_resolve(); } - virtual ~DefaultWebSocketImpl() = default; - void async_write_binary(util::Span data, SyncSocketProvider::FunctionHandler&& handler) override { m_websocket.async_write_binary(data.data(), data.size(), diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 022e381fa04..98b2e819169 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -335,16 +336,15 @@ void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, boo } -// Overriding member function in realm::sync::ClientHistoryBase void ClientHistory::set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, - VersionInfo& version_info) + VersionInfo& version_info, util::Logger& logger) { TransactionRef wt = m_db->start_write(); // Throws version_type local_version = wt->get_version(); ensure_updated(local_version); // Throws prepare_for_write(); // Throws - update_sync_progress(progress, downloadable_bytes); // Throws + update_sync_progress(progress, downloadable_bytes, logger); // Throws // Note: This transaction produces an empty changeset. Empty changesets are // not uploaded to the server. @@ -489,17 +489,17 @@ void ClientHistory::integrate_server_changesets( // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same // synthetic server version that represents synthetic changesets generated from state on the server. if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) { - update_sync_progress(progress, downloadable_bytes); // Throws + update_sync_progress(progress, downloadable_bytes, logger); // Throws } // Always update progress for download messages from steady state. else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) { auto partial_progress = progress; partial_progress.download.server_version = last_changeset.remote_version; partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version; - update_sync_progress(partial_progress, downloadable_bytes); // Throws + update_sync_progress(partial_progress, downloadable_bytes, logger); // Throws } else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) { - update_sync_progress(progress, downloadable_bytes); // Throws + update_sync_progress(progress, downloadable_bytes, logger); // Throws } if (run_in_write_tr) { run_in_write_tr(*transact, changesets_for_cb); @@ -876,7 +876,8 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry) } -void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes) +void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, + util::Logger& logger) { Array& root = m_arrays->root; @@ -947,6 +948,19 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada root.set(s_progress_uploaded_bytes_iip, RefOrTagged::make_tagged(uploaded_bytes)); // Throws + if (previous_upload_client_version < progress.upload.client_version) { + // This is part of the client reset cycle detection. + // A client reset operation will write a flag to an internal table indicating that + // the changes there are a result of a successful reset. However, it is not possible to + // know if a recovery has been successful until the changes have been acknowledged by the + // server. The situation we want to avoid is that a recovery itself causes another reset + // which creates a reset cycle. However, at this point, upload progress has been made + // and we can remove the cycle detection flag if there is one. + if (PendingResetStore::clear_pending_reset(*m_group)) { + logger.info(util::LogCategory::reset, "Clearing pending reset tracker after upload completion."); + } + } + m_progress_download = progress.download; trim_sync_history(); // Throws diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index b1f74a1641b..8dfaeb3b58b 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -175,7 +175,8 @@ class ClientHistory final : public _impl::History, public TransformHistory { /// \param downloadable_bytes If specified, and if the implementation cares /// about byte-level progress, this function updates the persistent record /// of the estimate of the number of remaining bytes to be downloaded. - void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&); + void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&, + util::Logger& logger); /// \brief Scan through the history for changesets to be uploaded. /// @@ -421,7 +422,7 @@ class ClientHistory final : public _impl::History, public TransformHistory { void prepare_for_write(); Replication::version_type add_changeset(BinaryData changeset, BinaryData sync_changeset); void add_sync_history_entry(const HistoryEntry&); - void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes); + void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes, util::Logger& logger); void trim_ct_history(); void trim_sync_history(); void do_trim_sync_history(std::size_t n); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index eb57eafd45c..35d2cafbc7b 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -978,6 +978,8 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess) if (m_websocket_error_received) return; + m_sending_session = sess; + m_sending = true; m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) { if (sentinel->destroyed) { return; @@ -991,8 +993,6 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess) } handle_write_message(); // Throws }); // Throws - m_sending_session = sess; - m_sending = true; } @@ -1571,7 +1571,7 @@ void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast6 "received empty download message that was not the last in batch", ProtocolError::bad_progress); } - history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws + history.set_sync_progress(progress, downloadable_bytes, version_info, logger); // Throws return; } @@ -1718,9 +1718,6 @@ void Session::activate() catch (...) { on_integration_failure(IntegrationException(exception_to_status())); } - - // Checks if there is a pending client reset - handle_pending_client_reset_acknowledgement(); } @@ -2270,16 +2267,18 @@ bool Session::client_reset_if_needed() m_progress.download.last_integrated_client_version); REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version); - m_upload_progress = m_progress.upload; - m_download_progress = m_progress.download; + // Reset the cached values which are used to calculate progress since the + // last time sync completed init_progress_handler(); - // In recovery mode, there may be new changesets to upload and nothing left to download. - // In FLX DiscardLocal mode, there may be new commits due to subscription handling. - // For both, we want to allow uploads again without needing external changes to download first. - m_delay_uploads = false; - // Checks if there is a pending client reset - handle_pending_client_reset_acknowledgement(); + // Update the download progress to match what it would have been if we'd + // received a MARK message from the server (as the fresh Realm which we used + // as the source data for the reset did). + m_upload_progress = m_progress.upload; + m_download_progress = m_progress.download; + m_server_version_at_last_download_mark = m_progress.download.server_version; + m_last_download_mark_received = m_last_download_mark_sent = m_target_download_mark; + check_for_download_completion(); // If a migration or rollback is in progress, mark it complete when client reset is completed. if (auto migration_store = get_migration_store()) { diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index c6dd8f97ca3..134259ce750 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -934,7 +934,6 @@ class ClientImpl::Session { void process_pending_flx_bootstrap(); bool client_reset_if_needed(); - void handle_pending_client_reset_acknowledgement(); void gather_pending_compensating_writes(util::Span changesets, std::vector* out); diff --git a/src/realm/sync/noinst/client_reset.cpp b/src/realm/sync/noinst/client_reset.cpp index 6a9e875e345..e94ac8398ae 100644 --- a/src/realm/sync/noinst/client_reset.cpp +++ b/src/realm/sync/noinst/client_reset.cpp @@ -545,6 +545,14 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, ut } } + // If there was nothing to recover or recovery was disabled then immediately + // mark the client reset as successfully complete + if (recovered.empty()) { + logger.info(util::LogCategory::reset, + "Immediately removing client reset tracker as there are no recovered changesets to upload."); + sync::PendingResetStore::clear_pending_reset(*wt_local); + } + wt_local->commit_and_continue_as_read(); VersionID new_version_local = wt_local->get_version_of_current_transaction(); diff --git a/src/realm/sync/noinst/migration_store.cpp b/src/realm/sync/noinst/migration_store.cpp index 6f8e3d10d99..608e7fe9d0c 100644 --- a/src/realm/sync/noinst/migration_store.cpp +++ b/src/realm/sync/noinst/migration_store.cpp @@ -53,9 +53,9 @@ bool MigrationStore::load_data(bool read_only) auto tr = m_db->start_read(); // Start with a reader so it doesn't try to write until we are ready - SyncMetadataSchemaVersionsReader schema_versions_reader(tr); + SyncMetadataSchemaVersionsReader schema_versions_reader(*tr); if (auto schema_version = - schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_migration_store)) { + schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_flx_migration_store)) { if (*schema_version != c_schema_version) { throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion, "Invalid schema version for flexible sync migration store metadata"); diff --git a/src/realm/sync/noinst/pending_bootstrap_store.cpp b/src/realm/sync/noinst/pending_bootstrap_store.cpp index 187de9b58fc..aa46b833ded 100644 --- a/src/realm/sync/noinst/pending_bootstrap_store.cpp +++ b/src/realm/sync/noinst/pending_bootstrap_store.cpp @@ -102,9 +102,9 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger, auto tr = m_db->start_read(); // Start with a reader so it doesn't try to write until we are ready - SyncMetadataSchemaVersionsReader schema_versions_reader(tr); + SyncMetadataSchemaVersionsReader schema_versions_reader(*tr); if (auto schema_version = - schema_versions_reader.get_version_for(tr, internal_schema_groups::c_pending_bootstraps)) { + schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_pending_bootstraps)) { if (*schema_version != c_schema_version) { throw RuntimeError(ErrorCodes::SchemaVersionMismatch, "Invalid schema version for FLX sync pending bootstrap table group"); diff --git a/src/realm/sync/noinst/pending_reset_store.cpp b/src/realm/sync/noinst/pending_reset_store.cpp index 7afa7083713..fd722969f1c 100644 --- a/src/realm/sync/noinst/pending_reset_store.cpp +++ b/src/realm/sync/noinst/pending_reset_store.cpp @@ -63,11 +63,13 @@ constexpr static std::string_view s_reset_action_col_name("action"); constexpr static std::string_view s_reset_error_code_col_name("error_code"); constexpr static std::string_view s_reset_error_msg_col_name("error_msg"); -void PendingResetStore::clear_pending_reset(Group& group) +bool PendingResetStore::clear_pending_reset(Group& group) { if (auto table = group.get_table(s_meta_reset_table_name); table && !table->is_empty()) { table->clear(); + return true; } + return false; } std::optional PendingResetStore::has_pending_reset(const Group& group) diff --git a/src/realm/sync/noinst/pending_reset_store.hpp b/src/realm/sync/noinst/pending_reset_store.hpp index b8e06ab84b9..3c48d9f33a5 100644 --- a/src/realm/sync/noinst/pending_reset_store.hpp +++ b/src/realm/sync/noinst/pending_reset_store.hpp @@ -52,7 +52,8 @@ class PendingResetStore { static void track_reset(Group& group, ClientResyncMode mode, PendingReset::Action action, Status error); // Clear the pending reset tracking information, if it exists // Requires a writable transaction and changes must be committed manually - static void clear_pending_reset(Group& group); + // Returns true if there was anything to remove + static bool clear_pending_reset(Group& group); static std::optional has_pending_reset(const Group& group); static int64_t from_reset_action(PendingReset::Action action); diff --git a/src/realm/sync/noinst/sync_metadata_schema.cpp b/src/realm/sync/noinst/sync_metadata_schema.cpp index 90b101041bb..91fc2cfc0f7 100644 --- a/src/realm/sync/noinst/sync_metadata_schema.cpp +++ b/src/realm/sync/noinst/sync_metadata_schema.cpp @@ -171,7 +171,7 @@ Status try_load_sync_metadata_schema(const Group& g, std::vector unified_schema_version_table_def{ {&m_table, @@ -179,23 +179,21 @@ SyncMetadataSchemaVersionsReader::SyncMetadataSchemaVersionsReader(const Transac {&m_schema_group_field, c_meta_schema_schema_group_field, type_String}, {{&m_version_field, c_meta_schema_version_field, type_Int}}}}; - // Any type of transaction is allowed, including frozen and write, as long as it supports reading - REALM_ASSERT_EX(tr->get_transact_stage() != DB::transact_Ready, tr->get_transact_stage()); // If the legacy_meta_table exists, then this table hasn't been converted and // the metadata schema versions information has not been upgraded/not accurate - if (tr->has_table(c_flx_metadata_table)) { + if (g.has_table(c_flx_metadata_table)) { return; } - if (tr->has_table(c_sync_internal_schemas_table)) { + if (g.has_table(c_sync_internal_schemas_table)) { // Load m_table with the table/schema information - load_sync_metadata_schema(*tr, &unified_schema_version_table_def); + load_sync_metadata_schema(g, &unified_schema_version_table_def); } } -std::optional SyncMetadataSchemaVersionsReader::get_legacy_version(const TransactionRef& tr) +std::optional SyncMetadataSchemaVersionsReader::get_legacy_version(const Group& g) { - if (!tr->has_table(c_flx_metadata_table)) { + if (!g.has_table(c_flx_metadata_table)) { return std::nullopt; } @@ -205,10 +203,9 @@ std::optional SyncMetadataSchemaVersionsReader::get_legacy_version(cons {&legacy_table_key, c_flx_metadata_table, {{&legacy_version_key, c_meta_schema_version_field, type_Int}}}}; // Convert the legacy table to the regular schema versions table if it exists - load_sync_metadata_schema(*tr, &legacy_table_def); + load_sync_metadata_schema(g, &legacy_table_def); - if (auto legacy_meta_table = tr->get_table(legacy_table_key); - legacy_meta_table && legacy_meta_table->size() > 0) { + if (auto legacy_meta_table = g.get_table(legacy_table_key); legacy_meta_table && legacy_meta_table->size() > 0) { auto legacy_obj = legacy_meta_table->get_object(0); return legacy_obj.get(legacy_version_key); } @@ -216,20 +213,20 @@ std::optional SyncMetadataSchemaVersionsReader::get_legacy_version(cons return std::nullopt; } -std::optional SyncMetadataSchemaVersionsReader::get_version_for(const TransactionRef& tr, +std::optional SyncMetadataSchemaVersionsReader::get_version_for(const Group& g, std::string_view schema_group_name) { if (!m_table) { // The legacy version only applies to the subscription store, don't query otherwise if (schema_group_name == internal_schema_groups::c_flx_subscription_store) { - if (auto legacy_version = get_legacy_version(tr)) { + if (auto legacy_version = get_legacy_version(g)) { return legacy_version; } } return util::none; } - auto schema_versions = tr->get_table(m_table); + auto schema_versions = g.get_table(m_table); auto obj_key = schema_versions->find_primary_key(Mixed{StringData(schema_group_name)}); if (!obj_key) { return util::none; @@ -243,7 +240,7 @@ std::optional SyncMetadataSchemaVersionsReader::get_version_for(const T } SyncMetadataSchemaVersions::SyncMetadataSchemaVersions(const TransactionRef& tr) - : SyncMetadataSchemaVersionsReader(tr) + : SyncMetadataSchemaVersionsReader(*tr) { std::vector unified_schema_version_table_def{ {&m_table, @@ -274,7 +271,7 @@ SyncMetadataSchemaVersions::SyncMetadataSchemaVersions(const TransactionRef& tr) } } - if (auto legacy_version = get_legacy_version(tr)) { + if (auto legacy_version = get_legacy_version(*tr)) { // Migrate from just having a subscription store metadata table to having multiple table groups with multiple // versions. if (tr->get_transact_stage() != DB::transact_Writing) { @@ -291,8 +288,6 @@ SyncMetadataSchemaVersions::SyncMetadataSchemaVersions(const TransactionRef& tr) // Commit and revert to the original transact stage if (orig == DB::transact_Reading) tr->commit_and_continue_as_read(); - else - tr->commit_and_continue_writing(); } void SyncMetadataSchemaVersions::set_version_for(const TransactionRef& tr, std::string_view schema_group_name, diff --git a/src/realm/sync/noinst/sync_metadata_schema.hpp b/src/realm/sync/noinst/sync_metadata_schema.hpp index 35f7d547231..e90c602c99b 100644 --- a/src/realm/sync/noinst/sync_metadata_schema.hpp +++ b/src/realm/sync/noinst/sync_metadata_schema.hpp @@ -129,11 +129,11 @@ Status try_load_sync_metadata_schema(const Group& g, std::vector get_version_for(const TransactionRef& tr, std::string_view schema_group_name); + std::optional get_version_for(const Group& g, std::string_view schema_group_name); - std::optional get_legacy_version(const TransactionRef& tr); + std::optional get_legacy_version(const Group& g); protected: TableKey m_table; diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp index 35f2901557f..1420dfb8052 100644 --- a/src/realm/sync/subscriptions.cpp +++ b/src/realm/sync/subscriptions.cpp @@ -658,10 +658,10 @@ SubscriptionStore::SubscriptionStore(Private, DBRef db) auto tr = m_db->start_read(); // Start with a reader so it doesn't try to write until we are ready - SyncMetadataSchemaVersionsReader schema_versions_reader(tr); + SyncMetadataSchemaVersionsReader schema_versions_reader(*tr); if (auto schema_version = - schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_subscription_store)) { + schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_flx_subscription_store)) { if (*schema_version != c_flx_schema_version) { throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion, "Invalid schema version for flexible sync metadata"); diff --git a/test/object-store/benchmarks/client_reset.cpp b/test/object-store/benchmarks/client_reset.cpp index 8d8c6368a7d..4754e17402e 100644 --- a/test/object-store/benchmarks/client_reset.cpp +++ b/test/object-store/benchmarks/client_reset.cpp @@ -80,7 +80,7 @@ struct BenchmarkLocalClientReset : public reset_utils::TestClientReset { progress.upload.client_version = current_version; progress.upload.last_integrated_server_version = current_version; sync::VersionInfo info_out; - history_local->set_sync_progress(progress, 0, info_out); + history_local->set_sync_progress(progress, 0, info_out, *util::Logger::get_default_logger()); constexpr int64_t shared_pk = -42; { diff --git a/test/object-store/sync/client_reset.cpp b/test/object-store/sync/client_reset.cpp index b1eda09ff7d..d4097cb98f3 100644 --- a/test/object-store/sync/client_reset.cpp +++ b/test/object-store/sync/client_reset.cpp @@ -204,7 +204,8 @@ TEST_CASE("sync: large reset with recovery is restartable", "[sync][pbs][client REQUIRE(expected_obj_ids == found_object_ids); } -TEST_CASE("sync: pending client resets are cleared when downloads are complete", "[sync][pbs][client reset][baas]") { +TEST_CASE("sync: pending client resets are cleared immediately when there's nothing to recover", + "[sync][pbs][client reset][baas]") { const reset_utils::Partition partition{"realm_id", random_string(20)}; Property partition_prop = {partition.property_name, PropertyType::String | PropertyType::Nullable}; Schema schema{ @@ -718,12 +719,10 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { Property::IsPrimary{true}}; const std::string table_name = "new table"; const std::string prop_name = "new_property"; - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; + std::optional err; make_reset(local_config, remote_config) ->set_development_mode(true) + ->expect_reset_error(err) ->make_local_changes([&](SharedRealm local) { local->update_schema( { @@ -749,14 +748,11 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { 0, nullptr, nullptr, true); }) ->on_post_reset([&](SharedRealm realm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); REQUIRE_NOTHROW(realm->refresh()); }) ->run(); REQUIRE(err); - REQUIRE(err.value()->is_client_reset_requested()); + REQUIRE(err->is_client_reset_requested()); REQUIRE(before_callback_invocations == 1); REQUIRE(after_callback_invocations == 0); } @@ -1107,10 +1103,6 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { } SECTION("invalid files at the fresh copy path are cleaned up") { - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(local_config.path); { util::File f(fresh_path, util::File::Mode::mode_Write); @@ -1118,31 +1110,20 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { } make_reset(local_config, remote_config)->run(); - REQUIRE(!err); REQUIRE(before_callback_invocations == 1); REQUIRE(after_callback_invocations == 1); } SECTION("failing to download a fresh copy results in an error") { - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(local_config.path); // create a non-empty directory that we'll fail to delete util::make_dir(fresh_path); util::File(util::File::resolve("file", fresh_path), util::File::mode_Write); - REQUIRE(!err); - make_reset(local_config, remote_config) - ->on_post_reset([&](SharedRealm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); - }) - ->run(); + std::optional err; + make_reset(local_config, remote_config)->expect_reset_error(err)->run(); REQUIRE(err); - REQUIRE(err.value()->is_client_reset_requested()); + REQUIRE(err->is_client_reset_requested()); } SECTION("should honor encryption key for downloaded Realm") { @@ -1290,12 +1271,10 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { } SECTION("extra local table creates a client reset error") { - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; + std::optional err; make_reset(local_config, remote_config) ->set_development_mode(true) + ->expect_reset_error(err) ->make_local_changes([&](SharedRealm local) { local->update_schema( { @@ -1310,25 +1289,20 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { create_object(*local, "object2", ObjectId::gen(), partition); }) ->on_post_reset([&](SharedRealm realm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); REQUIRE_NOTHROW(realm->refresh()); }) ->run(); REQUIRE(err); - REQUIRE(err.value()->is_client_reset_requested()); + REQUIRE(err->is_client_reset_requested()); REQUIRE(before_callback_invocations == 1); REQUIRE(after_callback_invocations == 0); } SECTION("extra local column creates a client reset error") { - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; + std::optional err; make_reset(local_config, remote_config) ->set_development_mode(true) + ->expect_reset_error(err) ->make_local_changes([](SharedRealm local) { local->update_schema( { @@ -1346,15 +1320,12 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { table->begin()->set(table->get_column_key("value2"), 123); }) ->on_post_reset([&](SharedRealm realm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); REQUIRE_NOTHROW(realm->refresh()); }) ->run(); REQUIRE(err); - REQUIRE(err.value()->is_client_reset_requested()); + REQUIRE(err->is_client_reset_requested()); REQUIRE(before_callback_invocations == 1); REQUIRE(after_callback_invocations == 0); } @@ -1407,12 +1378,10 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { } SECTION("incompatible schema changes in remote and local transactions") { - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; + std::optional err; make_reset(local_config, remote_config) ->set_development_mode(true) + ->expect_reset_error(err) ->make_local_changes([](SharedRealm local) { local->update_schema( { @@ -1438,24 +1407,18 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { 0, nullptr, nullptr, true); }) ->on_post_reset([&](SharedRealm realm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); REQUIRE_NOTHROW(realm->refresh()); }) ->run(); REQUIRE(err); - REQUIRE(err.value()->is_client_reset_requested()); + REQUIRE(err->is_client_reset_requested()); } SECTION("primary key type cannot be changed") { - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; - + std::optional err; make_reset(local_config, remote_config) ->set_development_mode(true) + ->expect_reset_error(err) ->make_local_changes([](SharedRealm local) { local->update_schema( { @@ -1479,14 +1442,11 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { 0, nullptr, nullptr, true); }) ->on_post_reset([&](SharedRealm realm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); REQUIRE_NOTHROW(realm->refresh()); }) ->run(); REQUIRE(err); - REQUIRE(err.value()->is_client_reset_requested()); + REQUIRE(err->is_client_reset_requested()); } SECTION("list operations") { @@ -1681,15 +1641,8 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { SECTION("cycle detection") { auto has_reset_cycle_flag = [](SharedRealm realm) -> util::Optional { - auto db = TestHelper::get_db(realm); - auto rd_tr = db->start_frozen(); - return sync::PendingResetStore::has_pending_reset(*rd_tr); - }; - auto logger = util::Logger::get_default_logger(); - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - logger->error("Detected cycle detection error: %1", error.status); - err = error; + realm->refresh(); + return sync::PendingResetStore::has_pending_reset(realm->read_group()); }; auto make_fake_previous_reset = [&local_config](ClientResyncMode mode, sync::ProtocolErrorInfo::Action action = @@ -1702,7 +1655,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { wr_tr->commit(); }; }; - SECTION("a normal reset adds and removes a cycle detection flag") { + SECTION("a successful reset adds and removes a cycle detection flag") { local_config.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard; local_config.sync_config->notify_before_client_reset = [&](SharedRealm realm) { REQUIRE_FALSE(has_reset_cycle_flag(realm)); @@ -1713,7 +1666,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { bool did_recover) { SharedRealm realm = Realm::get_shared_realm(std::move(realm_ref), util::Scheduler::make_dummy()); auto flag = has_reset_cycle_flag(realm); - REQUIRE(bool(flag)); + REQUIRE(flag); REQUIRE(flag->mode == ClientResyncMode::Recover); REQUIRE(did_recover); std::lock_guard lock(mtx); @@ -1724,19 +1677,21 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { REQUIRE_FALSE(has_reset_cycle_flag(realm)); }) ->run(); - REQUIRE(!err); REQUIRE(before_callback_invocations == 1); REQUIRE(after_callback_invocations == 1); } - SECTION("a failed reset leaves a cycle detection flag") { + SECTION("failed recovery leaves a cycle detection flag") { local_config.sync_config->client_resync_mode = ClientResyncMode::Recover; + std::optional error; make_reset(local_config, remote_config) ->make_local_changes([](SharedRealm realm) { auto table = realm->read_group().get_table("class_object"); table->remove_column(table->add_column(type_Int, "new col")); }) + ->expect_reset_error(error) ->run(); + REQUIRE(error); local_config.sync_config.reset(); local_config.force_sync_history = true; auto realm = Realm::get_shared_realm(local_config); @@ -1745,42 +1700,30 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { CHECK(flag->mode == ClientResyncMode::Recover); } - SECTION("In DiscardLocal mode: a previous failed discard reset is detected and generates an error") { - local_config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; + SECTION("failed discard leaves a cycle detection flag") { + // DiscardLocal can only fail due to resource exhaustion, an internal + // bug, or the process being killed, so there isn't a clear way to test this + } + + SECTION("a previous failed discard reset is detected and generates an error") { + local_config.sync_config->client_resync_mode = GENERATE( + ClientResyncMode::DiscardLocal, ClientResyncMode::Recover, ClientResyncMode::RecoverOrDiscard); make_fake_previous_reset(ClientResyncMode::DiscardLocal); - make_reset(local_config, remote_config)->run(); - timed_sleeping_wait_for([&]() -> bool { - return !!err; - }); - REQUIRE(err.value()->is_client_reset_requested()); + std::optional error; + make_reset(local_config, remote_config)->expect_reset_error(error)->run(); + REQUIRE(error); + REQUIRE(error->is_client_reset_requested()); } + SECTION("In Recover mode: a previous failed recover reset is detected and generates an error") { local_config.sync_config->client_resync_mode = ClientResyncMode::Recover; make_fake_previous_reset(ClientResyncMode::Recover); - make_reset(local_config, remote_config)->run(); - timed_sleeping_wait_for([&]() -> bool { - return !!err; - }); - REQUIRE(err.value()->is_client_reset_requested()); - } - SECTION("In Recover mode: a previous failed discard reset is detected and generates an error") { - local_config.sync_config->client_resync_mode = ClientResyncMode::Recover; - make_fake_previous_reset(ClientResyncMode::DiscardLocal); - make_reset(local_config, remote_config)->run(); - timed_sleeping_wait_for([&]() -> bool { - return !!err; - }); - REQUIRE(err.value()->is_client_reset_requested()); - } - SECTION("In RecoverOrDiscard mode: a previous failed discard reset is detected and generates an error") { - local_config.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard; - make_fake_previous_reset(ClientResyncMode::DiscardLocal); - make_reset(local_config, remote_config)->run(); - timed_sleeping_wait_for([&]() -> bool { - return !!err; - }); - REQUIRE(err.value()->is_client_reset_requested()); + std::optional error; + make_reset(local_config, remote_config)->expect_reset_error(error)->run(); + REQUIRE(error); + REQUIRE(error->is_client_reset_requested()); } + const ObjectId added_pk = ObjectId::gen(); auto has_added_object = [&](SharedRealm realm) -> bool { REQUIRE_NOTHROW(realm->refresh()); @@ -1789,37 +1732,11 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { ObjKey key = table->find_primary_key(added_pk); return !!key; }; - SECTION( - "In RecoverOrDiscard mode: a previous failed recovery is detected and triggers a DiscardLocal reset") { - local_config.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard; - make_fake_previous_reset(ClientResyncMode::Recover); - local_config.sync_config->notify_after_client_reset = - [&](SharedRealm before, ThreadSafeReference after_ref, bool did_recover) { - SharedRealm after = Realm::get_shared_realm(std::move(after_ref), util::Scheduler::make_dummy()); - REQUIRE(!did_recover); - REQUIRE(has_added_object(before)); - REQUIRE(!has_added_object(after)); // discarded insert due to fallback to DiscardLocal mode - std::lock_guard lock(mtx); - ++after_callback_invocations; - }; - make_reset(local_config, remote_config) - ->make_local_changes([&](SharedRealm realm) { - auto table = get_table(*realm, "object"); - REQUIRE(table); - create_object(*realm, "object", {added_pk}, partition); - }) - ->run(); - timed_sleeping_wait_for( - [&]() -> bool { - std::lock_guard lock(mtx); - return after_callback_invocations > 0 || err; - }, - std::chrono::seconds(120)); - REQUIRE(!err); - } - SECTION("In DiscardLocal mode: a previous failed recovery does not cause an error") { - local_config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; + SECTION("In DiscardLocal or RecoverOrDiscard mode: a previous failed recovery is detected and triggers a " + "DiscardLocal reset") { + local_config.sync_config->client_resync_mode = + GENERATE(ClientResyncMode::RecoverOrDiscard, ClientResyncMode::DiscardLocal); make_fake_previous_reset(ClientResyncMode::Recover); local_config.sync_config->notify_after_client_reset = [&](SharedRealm before, ThreadSafeReference after_ref, bool did_recover) { @@ -1827,9 +1744,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { REQUIRE(!did_recover); REQUIRE(has_added_object(before)); - REQUIRE(!has_added_object(after)); // not recovered - std::lock_guard lock(mtx); - ++after_callback_invocations; + REQUIRE(!has_added_object(after)); // discarded insert due to fallback to DiscardLocal mode }; make_reset(local_config, remote_config) ->make_local_changes([&](SharedRealm realm) { @@ -1838,15 +1753,9 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { create_object(*realm, "object", {added_pk}, partition); }) ->run(); - timed_sleeping_wait_for( - [&]() -> bool { - std::lock_guard lock(mtx); - return after_callback_invocations > 0 || err; - }, - std::chrono::seconds(120)); - REQUIRE(!err); } } // end cycle detection + SECTION("The server can prohibit recovery") { const realm::AppSession& app_session = test_app_session.app_session(); auto sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id); @@ -1860,19 +1769,10 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { SECTION("In Recover mode, a manual client reset is triggered") { local_config.sync_config->client_resync_mode = ClientResyncMode::Recover; - ThreadSafeSyncError err; - local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { - err = error; - }; - make_reset(local_config, remote_config) - ->on_post_reset([&](SharedRealm) { - util::EventLoop::main().run_until([&] { - return bool(err); - }); - }) - ->run(); + std::optional err; + make_reset(local_config, remote_config)->expect_reset_error(err)->run(); REQUIRE(err); - SyncError error = *err.value(); + SyncError error = *err; REQUIRE(error.is_client_reset_requested()); REQUIRE(error.user_info.size() >= 2); REQUIRE(error.user_info.count(SyncError::c_original_file_path_key) == 1); diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 9f648c44d7a..a083ded2bf4 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -471,6 +472,79 @@ TEST_CASE("app: error handling integration test", "[sync][flx][baas]") { } } +namespace { +struct DisconnectingWebSocketInterface : sync::WebSocketInterface { + std::unique_ptr m_impl; + std::atomic* disconnect; + + std::string_view get_appservices_request_id() const noexcept override + { + return m_impl->get_appservices_request_id(); + } + + void async_write_binary(util::Span data, sync::SyncSocketProvider::FunctionHandler&& handler) override + { + if (*disconnect) { + handler(Status::OK()); + } + else { + m_impl->async_write_binary(data, std::move(handler)); + } + } +}; + +struct DisconnectingWebSocketObserver : sync::WebSocketObserver { + std::unique_ptr m_impl; + std::atomic* disconnect; + + void websocket_connected_handler(const std::string& protocol) override + { + m_impl->websocket_connected_handler(protocol); + } + + void websocket_error_handler() override + { + m_impl->websocket_error_handler(); + } + + bool websocket_binary_message_received(util::Span data) override + { + if (*disconnect) + return true; + return m_impl->websocket_binary_message_received(data); + } + + bool websocket_closed_handler(bool was_clean, sync::websocket::WebSocketError error_code, + std::string_view message) override + { + return m_impl->websocket_closed_handler(was_clean, error_code, message); + } +}; + +// A socket provider which claims to always work, but when `disconnect = true` +// will actually drop all incoming and outgoing messages. This enables testing +// going offline at very specfic points. +struct DisconnectingSocketProvider : sync::websocket::DefaultSocketProvider { + std::atomic disconnect{false}; + + DisconnectingSocketProvider() + : sync::websocket::DefaultSocketProvider(util::Logger::get_default_logger(), "user agent") + { + } + + std::unique_ptr connect(std::unique_ptr observer, + sync::WebSocketEndpoint&& endpoint) override + { + auto wrapped_observer = std::make_unique(); + wrapped_observer->m_impl = std::move(observer); + wrapped_observer->disconnect = &disconnect; + auto wrapped_interface = std::make_unique(); + wrapped_interface->m_impl = DefaultSocketProvider::connect(std::move(wrapped_observer), std::move(endpoint)); + wrapped_interface->disconnect = &disconnect; + return wrapped_interface; + } +}; +} // namespace TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { std::vector schema{ @@ -494,8 +568,10 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { // some of these tests make additive schema changes which is only allowed in dev mode constexpr bool dev_mode = true; + const auto socket_provider = std::make_shared(); FLXSyncTestHarness harness("flx_client_reset", - {schema, {"queryable_str_field", "queryable_int_field"}, {}, dev_mode}); + {schema, {"queryable_str_field", "queryable_int_field"}, {}, dev_mode}, + instance_of, socket_provider); auto add_object = [](SharedRealm realm, std::string str_field, int64_t int_field, ObjectId oid = ObjectId::gen()) { @@ -636,11 +712,6 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { }) ->make_remote_changes([&](SharedRealm remote_realm) { add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int); - sync::SubscriptionSet::State actual = - remote_realm->get_latest_subscription_set() - .get_state_change_notification(sync::SubscriptionSet::State::Complete) - .get(); - REQUIRE(actual == sync::SubscriptionSet::State::Complete); }) ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) { wait_for_advance(*local_realm); @@ -1161,20 +1232,20 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("DiscardLocal: open realm after client reset failure") { config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; - auto&& [error_future, error_handler] = make_error_handler(); - config_local.sync_config->error_handler = error_handler; std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(config_local.path); // create a non-empty directory that we'll fail to delete util::make_dir(fresh_path); util::File(util::File::resolve("file", fresh_path), util::File::mode_Write); - auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session()); - test_reset->run(); + std::optional sync_error; + reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session()) + ->expect_reset_error(sync_error) + ->run(); // Client reset fails due to sync client not being able to create the fresh realm. - auto sync_error = wait_for_future(std::move(error_future)).get(); - REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed); + REQUIRE(sync_error); + REQUIRE(sync_error->status == ErrorCodes::AutoClientResetFailed); // Open the realm again. This should not crash. auto&& [err_future, err_handler] = make_error_handler(); @@ -1182,7 +1253,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { auto realm_post_reset = Realm::get_shared_realm(config_local); sync_error = wait_for_future(std::move(err_future)).get(); - REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed); + REQUIRE(sync_error->status == ErrorCodes::AutoClientResetFailed); } enum class ResetMode { NoReset, InitiateClientReset }; @@ -1196,6 +1267,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { auto subs = realm->get_latest_subscription_set(); auto result = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get(); CHECK(result == sync::SubscriptionSet::State::Complete); + SyncSession::OnlyForTesting::pause_async(*realm->sync_session()).get(); if (reset_mode == ResetMode::InitiateClientReset) { reset_utils::trigger_client_reset(harness.session().app_session(), realm); } @@ -1543,6 +1615,134 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { }) ->run(); } + + SECTION("client reset immediately reports download completion") { + config_local.sync_config->client_resync_mode = + GENERATE(ClientResyncMode::Recover, ClientResyncMode::DiscardLocal); + seed_realm(config_local, ResetMode::InitiateClientReset); + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect = true; + }; + // Should complete even though the connection was dropped while applying + // the client reset as the fresh realm download waited for download completion + successfully_async_open_realm(config_local); + } + + SECTION("DiscardLocal immediately reports upload completion") { + config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect = true; + }; + + auto realm = Realm::get_shared_realm(config_local); + subscribe_to_and_add_objects(realm, 1); + wait_for_upload(*realm); + realm->sync_session()->pause(); + + auto pf = util::make_promise_future(); + realm->sync_session()->wait_for_upload_completion([&](Status status) { + if (status.is_ok()) + pf.promise.emplace_value(); + else + pf.promise.set_error(status); + }); + subscribe_to_and_add_objects(realm, 5); + reset_utils::trigger_client_reset(harness.session().app_session(), realm); + realm->sync_session()->resume(); + // Upload completion is fired even though we're offline because the things + // we were waiting to upload were discarded instead + pf.future.get(); + } + + SECTION("Recover reports upload completion after recovered changesets are uploaded") { + config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect = true; + }; + + auto realm = Realm::get_shared_realm(config_local); + subscribe_to_and_add_objects(realm, 1); + wait_for_upload(*realm); + realm->sync_session()->pause(); + + auto pf = util::make_promise_future(); + realm->sync_session()->wait_for_upload_completion([&](Status status) { + if (status.is_ok()) + pf.promise.emplace_value(); + else + pf.promise.set_error(status); + }); + subscribe_to_and_add_objects(realm, 5); + reset_utils::trigger_client_reset(harness.session().app_session(), realm); + realm->sync_session()->resume(); + wait_for_download(*realm); // i.e. wait for the reset to complete + + // Upload completion has not fired because we recovered local changes + // that are waiting to be uploaded + REQUIRE_FALSE(pf.future.is_ready()); + socket_provider->disconnect = false; + pf.future.get(); + } + + SECTION("DiscardLocal immediately marks client reset as successful") { + config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; + seed_realm(config_local, ResetMode::InitiateClientReset); + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect = true; + }; + successfully_async_open_realm(config_local); + } + + SECTION("Recover immediately marks client reset as successful if there was nothing to recover") { + config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect = true; + }; + + auto realm = Realm::get_shared_realm(config_local); + subscribe_to_and_add_objects(realm, 1); + wait_for_upload(*realm); + reset_utils::trigger_client_reset(harness.session().app_session(), realm); + wait_for_download(*realm); // i.e. wait for the reset to complete + + REQUIRE_FALSE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + } + + SECTION("Recover marks client reset with changes to recover as successful after uploading") { + config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect = true; + }; + + auto realm = Realm::get_shared_realm(config_local); + subscribe_to_and_add_objects(realm, 1); + wait_for_upload(*realm); + realm->sync_session()->pause(); + + subscribe_to_and_add_objects(realm, 5); + reset_utils::trigger_client_reset(harness.session().app_session(), realm); + realm->sync_session()->resume(); + wait_for_download(*realm); // i.e. wait for the reset to complete + + realm->refresh(); + REQUIRE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + + SECTION("existing session allowed to reconnect") { + socket_provider->disconnect = false; + wait_for_upload(*realm); + realm->refresh(); + REQUIRE_FALSE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + } + + SECTION("new session discovering the tracker when activating") { + SyncSession::OnlyForTesting::pause_async(*realm->sync_session()).get(); + socket_provider->disconnect = false; + realm->sync_session()->resume(); + wait_for_upload(*realm); + realm->refresh(); + REQUIRE_FALSE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + } + } } TEST_CASE("flx: creating an object on a class with no subscription throws", "[sync][flx][subscription][baas]") { diff --git a/test/object-store/util/sync/sync_test_utils.cpp b/test/object-store/util/sync/sync_test_utils.cpp index 80191b355b1..ae9d2a07084 100644 --- a/test/object-store/util/sync/sync_test_utils.cpp +++ b/test/object-store/util/sync/sync_test_utils.cpp @@ -308,6 +308,9 @@ StatusWith> async_open_realm(const Realm::Config& config) std::shared_ptr successfully_async_open_realm(const Realm::Config& config) { auto status = async_open_realm(config); + if (!status.is_ok()) { + FAIL(status.get_status().reason()); + } REQUIRE(status.is_ok()); return status.get_value(); } @@ -386,7 +389,7 @@ struct FakeLocalClientReset : public TestClientReset { progress.upload.client_version = current_version; progress.upload.last_integrated_server_version = current_version; sync::VersionInfo info_out; - history_local->set_sync_progress(progress, 0, info_out); + history_local->set_sync_progress(progress, 0, info_out, *util::Logger::get_default_logger()); } { local_realm->begin_transaction(); @@ -573,8 +576,16 @@ struct BaasClientReset : public TestClientReset { }, std::chrono::seconds(30), std::chrono::seconds(1)); + std::atomic got_error{false}; + if (m_error) { + m_local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { + *m_error = error; + got_error = true; + }; + } + auto realm = Realm::get_shared_realm(m_local_config); - auto session = sync_manager->get_existing_session(realm->config().path); + auto session = realm->sync_session(); const std::string object_schema_name = "object"; { wait_for_download(*realm); @@ -652,10 +663,14 @@ struct BaasClientReset : public TestClientReset { if (m_on_post_local) { m_on_post_local(realm); } - if (!m_wait_for_reset_completion) { - return; + if (m_error) { + timed_wait_for([&] { + return got_error.load(); + }); + } + else if (m_wait_for_reset_completion) { + wait_for_upload(*realm); } - wait_for_upload(*realm); if (m_on_post_reset) { m_on_post_reset(realm); } @@ -688,6 +703,14 @@ struct BaasFLXClientReset : public TestClientReset { m_did_run = true; const AppSession& app_session = m_test_app_session.app_session(); + std::atomic got_error{false}; + if (m_error) { + m_local_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { + *m_error = error; + got_error = true; + }; + } + auto realm = Realm::get_shared_realm(m_local_config); auto session = realm->sync_session(); if (m_on_setup) { @@ -750,7 +773,14 @@ struct BaasFLXClientReset : public TestClientReset { if (m_on_post_local) { m_on_post_local(realm); } - wait_for_download(*realm); + if (m_error) { + timed_wait_for([&] { + return got_error.load(); + }); + } + else if (m_wait_for_reset_completion) { + wait_for_upload(*realm); + } if (m_on_post_reset) { m_on_post_reset(realm); } @@ -873,9 +903,16 @@ ObjectId TestClientReset::get_pk_of_object_driving_reset() const return m_pk_driving_reset; } -void TestClientReset::disable_wait_for_reset_completion() +TestClientReset* TestClientReset::disable_wait_for_reset_completion() { m_wait_for_reset_completion = false; + return this; +} + +TestClientReset* TestClientReset::expect_reset_error(std::optional& err) +{ + m_error = &err; + return this; } std::unique_ptr make_fake_local_client_reset(const Realm::Config& local_config, diff --git a/test/object-store/util/sync/sync_test_utils.hpp b/test/object-store/util/sync/sync_test_utils.hpp index b4c5637ee35..a291af5c7c1 100644 --- a/test/object-store/util/sync/sync_test_utils.hpp +++ b/test/object-store/util/sync/sync_test_utils.hpp @@ -328,7 +328,8 @@ struct TestClientReset { TestClientReset* on_post_reset(Callback&& post_reset); void set_pk_of_object_driving_reset(const ObjectId& pk); ObjectId get_pk_of_object_driving_reset() const; - void disable_wait_for_reset_completion(); + TestClientReset* disable_wait_for_reset_completion(); + TestClientReset* expect_reset_error(std::optional&); virtual TestClientReset* set_development_mode(bool enable = true); virtual void run() = 0; @@ -343,8 +344,9 @@ struct TestClientReset { Callback m_make_remote_changes; Callback m_on_post_local; Callback m_on_post_reset; - bool m_did_run = false; ObjectId m_pk_driving_reset = ObjectId::gen(); + std::optional* m_error = nullptr; + bool m_did_run = false; bool m_wait_for_reset_completion = true; }; diff --git a/test/test_client_reset.cpp b/test/test_client_reset.cpp index fc297d68f56..24c71a87836 100644 --- a/test/test_client_reset.cpp +++ b/test/test_client_reset.cpp @@ -854,7 +854,7 @@ TEST(ClientReset_PinnedVersion) #endif // !REALM_MOBILE void expect_reset(unit_test::TestContext& test_context, DBRef& target, DBRef& fresh, ClientResyncMode mode, - SubscriptionStore* sub_store = nullptr, bool allow_recovery = true) + bool expect_recovered_changes, SubscriptionStore* sub_store = nullptr, bool allow_recovery = true) { CHECK(target); CHECK(fresh); @@ -917,9 +917,9 @@ void expect_reset(unit_test::TestContext& test_context, DBRef& target, DBRef& fr CHECK_EQUAL(file_ident.salt, fresh_client_id.salt); } - // Client resets aren't marked as complete until the server has acknowledged - // sync completion to avoid reset cycles - { + // Client resets which recover changes aren't marked as complete until the + // server has acknowledged sync completion to avoid reset cycles + if (expect_recovered_changes) { auto tr = target->start_read(); auto pending_reset = PendingResetStore::has_pending_reset(*tr); CHECK(pending_reset); @@ -931,6 +931,10 @@ void expect_reset(unit_test::TestContext& test_context, DBRef& target, DBRef& fr tr->commit_and_continue_as_read(); CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); } + else { + auto pending_reset = PendingResetStore::has_pending_reset(*target->start_read()); + CHECK_NOT(pending_reset); + } } TEST(ClientReset_ConvertResyncMode) @@ -1047,7 +1051,8 @@ void mark_as_synchronized(DB& db) progress.upload.client_version = current_version; progress.upload.last_integrated_server_version = current_version; sync::VersionInfo info_out; - history.set_sync_progress(progress, 0, info_out); + util::NullLogger logger; + history.set_sync_progress(progress, 0, info_out, logger); history.set_client_file_ident({1, 0}, false); } @@ -1125,7 +1130,7 @@ TEST(ClientReset_NoChanges) // one, which shouldn't result in any changes regardless of mode db->write_copy(path_fresh, nullptr); auto db_fresh = DB::create(make_client_replication(), path_fresh); - expect_reset(test_context, db, db_fresh, mode); + expect_reset(test_context, db, db_fresh, mode, false); // End state should exactly match the pre-reset state CHECK_OR_RETURN(compare_groups(*db->start_read(), *backup_db->start_read())); @@ -1167,7 +1172,7 @@ TEST(ClientReset_SimpleNonconflictingChanges) wt->commit(); } - expect_reset(test_context, db, db_fresh, mode, nullptr, allow_recovery); + expect_reset(test_context, db, db_fresh, mode, allow_recovery, nullptr, allow_recovery); if (allow_recovery) { // Should have both the objects created locally and from the reset realm @@ -1224,7 +1229,7 @@ TEST(ClientReset_SimpleConflictingWrites) wt->commit(); } - expect_reset(test_context, db, db_fresh, mode, nullptr, allow_recovery); + expect_reset(test_context, db, db_fresh, mode, allow_recovery, nullptr, allow_recovery); auto tr = db->start_read(); auto table = tr->get_table("class_table"); @@ -1292,7 +1297,7 @@ TEST(ClientReset_Recover_ModificationsOnDeletedObject) wt->commit(); } - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true); auto tr = db->start_read(); auto table = tr->get_table("class_table"); @@ -1332,7 +1337,7 @@ TEST(ClientReset_DiscardLocal_DiscardsPendingSubscriptions) pending_sets.push_back(std::move(set)); } - expect_reset(test_context, db, db_fresh, ClientResyncMode::DiscardLocal, sub_store.get()); + expect_reset(test_context, db, db_fresh, ClientResyncMode::DiscardLocal, false, sub_store.get()); CHECK(sub_store->get_pending_subscriptions().empty()); auto subs = sub_store->get_latest(); @@ -1366,7 +1371,7 @@ TEST_TYPES(ClientReset_DiscardLocal_MakesAwaitingMarkActiveSubscriptionsComplete auto set = add_subscription(*sub_store, "complete", query, SubscriptionSet::State::AwaitingMark); auto future = set.get_state_change_notification(SubscriptionSet::State::Complete); - expect_reset(test_context, db, db_fresh, TEST_TYPE::value, sub_store.get()); + expect_reset(test_context, db, db_fresh, TEST_TYPE::value, false, sub_store.get()); CHECK_EQUAL(future.get(), SubscriptionSet::State::Complete); CHECK_EQUAL(set.state(), SubscriptionSet::State::AwaitingMark); @@ -1394,7 +1399,7 @@ TEST(ClientReset_Recover_DoesNotCompletePendingSubscriptions) futures.push_back(subs.get_state_change_notification(SubscriptionSet::State::Complete)); } - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, sub_store.get()); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, false, sub_store.get()); for (auto& fut : futures) { CHECK_NOT(fut.is_ready()); @@ -1442,10 +1447,11 @@ TEST(ClientReset_Recover_UpdatesRemoteServerVersions) sync::VersionInfo info_out; auto& history = static_cast(db_fresh->get_replication())->get_history(); - history.set_sync_progress(progress, 0, info_out); + util::NullLogger logger; + history.set_sync_progress(progress, 0, info_out, logger); } - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, nullptr); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true); auto& history = static_cast(db->get_replication())->get_history(); history.ensure_updated(db->get_version_of_latest_snapshot()); @@ -1508,7 +1514,7 @@ TEST(ClientReset_Recover_UploadableBytes) pre_reset_uploadable_bytes, unused, unused_version); CHECK_GREATER(pre_reset_uploadable_bytes, 0); - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, nullptr); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true); uint_fast64_t post_reset_uploadable_bytes; history.get_upload_download_state(*db->start_read(), db->get_alloc(), unused, unused_progress, unused, @@ -1555,7 +1561,7 @@ TEST(ClientReset_Recover_ListsAreOnlyCopiedOnce) wt->commit(); } - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, nullptr); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true); // List should match the pre-reset local state auto rt = db->start_read(); @@ -1612,7 +1618,7 @@ TEST(ClientReset_Recover_RecoverableChangesOnListsAfterUnrecoverableAreNotDuplic wt->commit(); } - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, sub_store.get()); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true, sub_store.get()); // List should match the pre-reset local state auto rt = db->start_read(); @@ -1708,7 +1714,7 @@ TEST(ClientReset_Recover_ReciprocalListChanges) // shouldn't modify the group. However, if it reapplied the original changesets // and not the reciprocal history, it'd result in the list being // [0, 1, 2, 11, 10, 21, 12, 31, 20, 41, 22, 30, 32, 40, 42] - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, nullptr); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true); auto rt = db->start_read(); auto list = rt->get_table("class_table")->begin()->get_list("list"); @@ -1767,7 +1773,7 @@ TEST(ClientReset_Recover_UpdatesReciprocalHistory) // client reset will discard the recovered array insertion as the object // doesn't exist, but keep the object creation - expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, nullptr); + expect_reset(test_context, db, db_fresh, ClientResyncMode::Recover, true); // Recreate the object and add a different value to the list { diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp index 784b8d55af5..bcb05c282ae 100644 --- a/test/test_sync_subscriptions.cpp +++ b/test/test_sync_subscriptions.cpp @@ -576,11 +576,12 @@ TEST_IF(Sync_SubscriptionStoreInternalSchemaMigration, REALM_MAX_BPNODE_SIZE == auto tr = fixture.db->start_read(); SyncMetadataSchemaVersions versions(tr); - auto flx_sub_store_version = versions.get_version_for(tr, sync::internal_schema_groups::c_flx_subscription_store); + auto flx_sub_store_version = + versions.get_version_for(*tr, sync::internal_schema_groups::c_flx_subscription_store); CHECK(flx_sub_store_version); CHECK_EQUAL(*flx_sub_store_version, 2); - CHECK(!versions.get_version_for(tr, "non_existent_table")); + CHECK(!versions.get_version_for(*tr, "non_existent_table")); } TEST(Sync_SubscriptionStoreNextPendingVersion) @@ -843,8 +844,8 @@ TEST(Sync_SyncMetadataSchemaVersionsReader) { auto tr = db->start_read(); // Verify opening a reader on an unitialized versions table returns uninitialized - SyncMetadataSchemaVersionsReader reader(tr); - auto schema_version = reader.get_version_for(tr, schema_group_name); + SyncMetadataSchemaVersionsReader reader(*tr); + auto schema_version = reader.get_version_for(*tr, schema_group_name); CHECK(!schema_version); } @@ -855,7 +856,7 @@ TEST(Sync_SyncMetadataSchemaVersionsReader) tr->promote_to_write(); schema_versions.set_version_for(tr, schema_group_name, version); tr->commit_and_continue_as_read(); - auto schema_version = schema_versions.get_version_for(tr, schema_group_name); + auto schema_version = schema_versions.get_version_for(*tr, schema_group_name); CHECK(schema_version); CHECK(*schema_version == version); } @@ -863,8 +864,8 @@ TEST(Sync_SyncMetadataSchemaVersionsReader) { auto tr = db->start_read(); // Verify opening a reader on an initialized versions table returns initialized - SyncMetadataSchemaVersionsReader reader(tr); - auto schema_version = reader.get_version_for(tr, schema_group_name); + SyncMetadataSchemaVersionsReader reader(*tr); + auto schema_version = reader.get_version_for(*tr, schema_group_name); CHECK(schema_version); CHECK(*schema_version == version); } @@ -874,8 +875,8 @@ TEST(Sync_SyncMetadataSchemaVersionsReader) { auto tr = db->start_read(); // Verify opening a reader with legacy data returns uninitialized - SyncMetadataSchemaVersionsReader reader(tr); - auto schema_version = reader.get_version_for(tr, schema_group_name); + SyncMetadataSchemaVersionsReader reader(*tr); + auto schema_version = reader.get_version_for(*tr, schema_group_name); CHECK(!schema_version); } @@ -884,7 +885,7 @@ TEST(Sync_SyncMetadataSchemaVersionsReader) auto tr = db->start_read(); // Initialize the schema versions table and verify the converted flx subscription store version SyncMetadataSchemaVersions schema_versions(tr); - auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_subscription_store); + auto schema_version = schema_versions.get_version_for(*tr, internal_schema_groups::c_flx_subscription_store); CHECK(schema_version); CHECK(*schema_version == legacy_version); // Verify the legacy table has been deleted after the conversion @@ -903,7 +904,7 @@ TEST(Sync_SyncMetadataSchemaVersions) auto check_version = [this, &db](SyncMetadataSchemaVersionsReader& schema_versions, const std::string_view& group_name, int64_t expected_version) { auto tr = db->start_read(); - auto schema_version = schema_versions.get_version_for(tr, group_name); + auto schema_version = schema_versions.get_version_for(*tr, group_name); CHECK(schema_version); CHECK(*schema_version == expected_version); }; @@ -953,7 +954,7 @@ TEST(Sync_SyncMetadataSchemaVersions) { // Re-read the data and verify the new values with a reader auto tr = db->start_read(); - SyncMetadataSchemaVersionsReader schema_versions(tr); + SyncMetadataSchemaVersionsReader schema_versions(*tr); check_version(schema_versions, internal_schema_groups::c_flx_subscription_store, flx_version2); check_version(schema_versions, internal_schema_groups::c_pending_bootstraps, btstrp_version2); @@ -973,7 +974,7 @@ TEST(Sync_SyncMetadataSchemaVersions_LegacyTable) auto tr = db->start_read(); // Converts the legacy table to the unified table SyncMetadataSchemaVersions schema_versions(tr); - auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_subscription_store); + auto schema_version = schema_versions.get_version_for(*tr, internal_schema_groups::c_flx_subscription_store); CHECK(schema_version); CHECK(*schema_version == version); // Verify the legacy table has been deleted after the conversion