From c78df1832bf56ffad7967f56196dd63913ba3c14 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Wed, 7 Aug 2024 11:39:12 -0700 Subject: [PATCH] Do more precise checking of when all recovered changesets have been uploaded --- CHANGELOG.md | 2 +- src/realm/object-store/shared_realm.cpp | 5 + src/realm/object-store/shared_realm.hpp | 2 + src/realm/sync/network/network.cpp | 5 +- src/realm/sync/noinst/client_history_impl.cpp | 6 +- src/realm/sync/noinst/client_impl_base.cpp | 4 +- src/realm/sync/noinst/client_reset.cpp | 6 + src/realm/sync/noinst/pending_reset_store.cpp | 195 +++++++++------- src/realm/sync/noinst/pending_reset_store.hpp | 22 +- test/object-store/sync/flx_sync.cpp | 212 ++++++++++++------ .../object-store/util/sync/baas_admin_api.hpp | 16 ++ test/test_client_reset.cpp | 29 --- 12 files changed, 307 insertions(+), 197 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95eb5a1163e..9353f8ba060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) * `Realm::convert()` would sometimes incorrectly throw an exception claiming that there were unuploaded local changes when the source Realm is a synchronized Realm ([#7966](https://github.com/realm/realm-core/issues/7966), since v10.7.0). * Automatic client reset handling now reports download completion as soon as all changes from the newly downloaded file have been applied to the main Realm file rather than at an inconsistent time afterwards ([PR #7921](https://github.com/realm/realm-core/pull/7921)). -* Cycle detection for automatic client reset handling would sometimes consider two client resets in a row to be a cycle even when the first reset did not recover any changes and so could not have triggered the second. ([PR #7921](https://github.com/realm/realm-core/pull/7921)). +* Cycle detection for automatic client reset handling is now more precise. Previously errors which occurred after all recovered changesets were uploaded would sometimes be incorrectly considered a cycle and skip automatic handling. ([PR #7921](https://github.com/realm/realm-core/pull/7921)). ### Breaking changes * None. diff --git a/src/realm/object-store/shared_realm.cpp b/src/realm/object-store/shared_realm.cpp index 791efe2dabd..df6331c6a0f 100644 --- a/src/realm/object-store/shared_realm.cpp +++ b/src/realm/object-store/shared_realm.cpp @@ -1213,6 +1213,11 @@ void Realm::convert(const Config& config, bool merge_into_existing) } } +bool Realm::has_pending_unuploaded_changes() const noexcept +{ + return !m_transaction->get_history()->no_pending_local_changes(m_transaction->get_version()); +} + OwnedBinaryData Realm::write_copy() { verify_thread(); diff --git a/src/realm/object-store/shared_realm.hpp b/src/realm/object-store/shared_realm.hpp index 5e98c96149f..c681f63a963 100644 --- a/src/realm/object-store/shared_realm.hpp +++ b/src/realm/object-store/shared_realm.hpp @@ -321,6 +321,8 @@ class Realm : public std::enable_shared_from_this { // Returns `true` if the Realm is frozen, `false` otherwise. bool is_frozen() const; + bool has_pending_unuploaded_changes() const noexcept; + // Returns true if the Realm is either in a read or frozen transaction bool is_in_read_transaction() const { diff --git a/src/realm/sync/network/network.cpp b/src/realm/sync/network/network.cpp index 0090c4db326..4f08216ca2b 100644 --- a/src/realm/sync/network/network.cpp +++ b/src/realm/sync/network/network.cpp @@ -1414,10 +1414,7 @@ class Service::Impl { bool resolver_thread_started = m_resolver_thread.joinable(); if (resolver_thread_started) return; - auto func = [this]() noexcept { - resolver_thread(); - }; - m_resolver_thread = std::thread{std::move(func)}; + m_resolver_thread = std::thread{&Impl::resolver_thread, this}; } void add_wait_oper(LendersWaitOperPtr op) diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index 679d2d414f2..f4799c00a43 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -91,7 +91,7 @@ void ClientHistory::set_history_adjustments( for (size_t i = 0, size = m_arrays->remote_versions.size(); i < size; ++i) { m_arrays->remote_versions.set(i, server_version.version); version_type version = m_sync_history_base_version + i; - logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version + 1, + logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version, m_arrays->changesets.get(i).size(), server_version.version); } } @@ -956,9 +956,7 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada // server. The situation we want to avoid is that a recovery itself causes another reset // which creates a reset cycle. However, at this point, upload progress has been made // and we can remove the cycle detection flag if there is one. - if (PendingResetStore::clear_pending_reset(*m_group)) { - logger.info(util::LogCategory::reset, "Clearing pending reset tracker after upload completion."); - } + PendingResetStore::remove_if_complete(*m_group, progress.upload.client_version, logger); } m_progress_download = progress.download; diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 35d2cafbc7b..06a1b825c1a 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -978,8 +978,6 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess) if (m_websocket_error_received) return; - m_sending_session = sess; - m_sending = true; m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) { if (sentinel->destroyed) { return; @@ -993,6 +991,8 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess) } handle_write_message(); // Throws }); // Throws + m_sending_session = sess; + m_sending = true; } diff --git a/src/realm/sync/noinst/client_reset.cpp b/src/realm/sync/noinst/client_reset.cpp index e94ac8398ae..4a34d754479 100644 --- a/src/realm/sync/noinst/client_reset.cpp +++ b/src/realm/sync/noinst/client_reset.cpp @@ -552,6 +552,12 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, ut "Immediately removing client reset tracker as there are no recovered changesets to upload."); sync::PendingResetStore::clear_pending_reset(*wt_local); } + else { + logger.debug(util::LogCategory::reset, + "Marking %1 as the version which must be uploaded to complete client reset recovery.", + recovered.back().version); + sync::PendingResetStore::set_recovered_version(*wt_local, recovered.back().version); + } wt_local->commit_and_continue_as_read(); diff --git a/src/realm/sync/noinst/pending_reset_store.cpp b/src/realm/sync/noinst/pending_reset_store.cpp index fd722969f1c..fbe6587ef58 100644 --- a/src/realm/sync/noinst/pending_reset_store.cpp +++ b/src/realm/sync/noinst/pending_reset_store.cpp @@ -54,22 +54,117 @@ bool operator==(const sync::PendingReset& lhs, const PendingReset::Action& actio return lhs.action == action; } +namespace { // A table without a "class_" prefix will not generate sync instructions. -constexpr static std::string_view s_meta_reset_table_name("client_reset_metadata"); -constexpr static std::string_view s_version_col_name("core_version"); -constexpr static std::string_view s_timestamp_col_name("time"); -constexpr static std::string_view s_reset_recovery_mode_col_name("mode"); -constexpr static std::string_view s_reset_action_col_name("action"); -constexpr static std::string_view s_reset_error_code_col_name("error_code"); -constexpr static std::string_view s_reset_error_msg_col_name("error_msg"); - -bool PendingResetStore::clear_pending_reset(Group& group) +constexpr std::string_view s_meta_reset_table_name("client_reset_metadata"); +constexpr std::string_view s_core_version_col_name("core_version"); +constexpr std::string_view s_recovered_version_col_name("recovered_version"); +constexpr std::string_view s_timestamp_col_name("time"); +constexpr std::string_view s_reset_recovery_mode_col_name("mode"); +constexpr std::string_view s_reset_action_col_name("action"); +constexpr std::string_view s_reset_error_code_col_name("error_code"); +constexpr std::string_view s_reset_error_msg_col_name("error_msg"); + +int64_t from_reset_action(PendingReset::Action action) +{ + switch (action) { + case PendingReset::Action::ClientReset: + return 1; + case PendingReset::Action::ClientResetNoRecovery: + return 2; + case PendingReset::Action::MigrateToFLX: + return 3; + case PendingReset::Action::RevertToPBS: + return 4; + default: + throw ClientResetFailed(util::format("Unsupported client reset action: %1 for pending reset", action)); + } +} + +PendingReset::Action to_reset_action(int64_t action) +{ + switch (action) { + case 1: + return PendingReset::Action::ClientReset; + case 2: + return PendingReset::Action::ClientResetNoRecovery; + case 3: + return PendingReset::Action::MigrateToFLX; + case 4: + return PendingReset::Action::RevertToPBS; + default: + return PendingReset::Action::NoAction; + } +} + +ClientResyncMode to_resync_mode(int64_t mode) +{ + // Retains compatibility with v1 + // RecoverOrDiscard is treated as Recover and is not stored + switch (mode) { + case 0: // DiscardLocal + return ClientResyncMode::DiscardLocal; + case 1: // Recover + return ClientResyncMode::Recover; + default: + throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode)); + } +} + +int64_t from_resync_mode(ClientResyncMode mode) +{ + // Retains compatibility with v1 + switch (mode) { + case ClientResyncMode::DiscardLocal: + return 0; // DiscardLocal + case ClientResyncMode::RecoverOrDiscard: + [[fallthrough]]; // RecoverOrDiscard is treated as Recover + case ClientResyncMode::Recover: + return 1; // Recover + default: + throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode)); + } +} +} // namespace + +void PendingResetStore::clear_pending_reset(Group& group) { if (auto table = group.get_table(s_meta_reset_table_name); table && !table->is_empty()) { table->clear(); - return true; } - return false; +} + +void PendingResetStore::remove_if_complete(Group& group, version_type version, util::Logger& logger) +{ + auto table = group.get_table(s_meta_reset_table_name); + if (!table || table->is_empty()) + return; + + auto reset_store = PendingResetStore::load_schema(group); + if (!reset_store) { + logger.info(util::LogCategory::reset, "Clearing pending reset tracker created by different core version."); + table->clear(); + return; + } + + auto reset_entry = *table->begin(); + if (reset_entry.get(reset_store->m_core_version) != REALM_VERSION_STRING) { + logger.info(util::LogCategory::reset, "Clearing pending reset tracker created by different core version."); + table->clear(); + return; + } + + auto target_version = version_type(reset_entry.get(reset_store->m_recovered_version)); + if (target_version > version) { + logger.detail(util::LogCategory::reset, "Pending reset not complete: uploaded %1 but need to reach %2", + version, target_version); + return; + } + + logger.info(util::LogCategory::reset, + "Clearing pending reset tracker after upload of version %1 has been acknowledged by server.", + target_version); + table->clear(); } std::optional PendingResetStore::has_pending_reset(const Group& group) @@ -86,7 +181,7 @@ std::optional PendingResetStore::has_pending_reset(const Group& gr return std::nullopt; } auto reset_entry = *table->begin(); - if (reset_entry.get(reset_store->m_version) != REALM_VERSION_STRING) { + if (reset_entry.get(reset_store->m_core_version) != REALM_VERSION_STRING) { // Previous pending reset was written by a different version, so ignore it return std::nullopt; } @@ -113,7 +208,7 @@ void PendingResetStore::track_reset(Group& group, ClientResyncMode mode, Pending REALM_ASSERT(table); table->clear(); table->create_object(null_key, { - {reset_store.m_version, Mixed(REALM_VERSION_STRING)}, + {reset_store.m_core_version, Mixed(REALM_VERSION_STRING)}, {reset_store.m_timestamp, Timestamp(std::chrono::system_clock::now())}, {reset_store.m_recovery_mode, from_resync_mode(mode)}, {reset_store.m_action, from_reset_action(action)}, @@ -122,12 +217,23 @@ void PendingResetStore::track_reset(Group& group, ClientResyncMode mode, Pending }); } +void PendingResetStore::set_recovered_version(Group& group, version_type version) +{ + auto reset_store = PendingResetStore::load_schema(group); + REALM_ASSERT(reset_store); + auto table = group.get_table(reset_store->m_pending_reset_table); + REALM_ASSERT(table); + REALM_ASSERT(!table->is_empty()); + table->begin()->set(reset_store->m_recovered_version, int64_t(version)); +} + PendingResetStore::PendingResetStore(const Group& g) : m_internal_tables{ {&m_pending_reset_table, s_meta_reset_table_name, { - {&m_version, s_version_col_name, type_String}, + {&m_core_version, s_core_version_col_name, type_String}, + {&m_recovered_version, s_recovered_version_col_name, type_Int}, {&m_timestamp, s_timestamp_col_name, type_Timestamp}, {&m_recovery_mode, s_reset_recovery_mode_col_name, type_Int}, {&m_action, s_reset_action_col_name, type_Int}, @@ -164,65 +270,4 @@ PendingResetStore PendingResetStore::load_or_create_schema(Group& group) return reset_store; } -int64_t PendingResetStore::from_reset_action(PendingReset::Action action) -{ - switch (action) { - case PendingReset::Action::ClientReset: - return 1; - case PendingReset::Action::ClientResetNoRecovery: - return 2; - case PendingReset::Action::MigrateToFLX: - return 3; - case PendingReset::Action::RevertToPBS: - return 4; - default: - throw ClientResetFailed(util::format("Unsupported client reset action: %1 for pending reset", action)); - } -} - -PendingReset::Action PendingResetStore::to_reset_action(int64_t action) -{ - switch (action) { - case 1: - return PendingReset::Action::ClientReset; - case 2: - return PendingReset::Action::ClientResetNoRecovery; - case 3: - return PendingReset::Action::MigrateToFLX; - case 4: - return PendingReset::Action::RevertToPBS; - default: - return PendingReset::Action::NoAction; - } -} - -ClientResyncMode PendingResetStore::to_resync_mode(int64_t mode) -{ - // Retains compatibility with v1 - // RecoverOrDiscard is treated as Recover and is not stored - switch (mode) { - case 0: // DiscardLocal - return ClientResyncMode::DiscardLocal; - case 1: // Recover - return ClientResyncMode::Recover; - default: - throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode)); - } -} - -int64_t PendingResetStore::from_resync_mode(ClientResyncMode mode) -{ - // Retains compatibility with v1 - switch (mode) { - case ClientResyncMode::DiscardLocal: - return 0; // DiscardLocal - case ClientResyncMode::RecoverOrDiscard: - [[fallthrough]]; // RecoverOrDiscard is treated as Recover - case ClientResyncMode::Recover: - return 1; // Recover - default: - throw ClientResetFailed(util::format("Unsupported client reset resync mode: %1 for pending reset", mode)); - } -} - } // namespace realm::sync diff --git a/src/realm/sync/noinst/pending_reset_store.hpp b/src/realm/sync/noinst/pending_reset_store.hpp index 3c48d9f33a5..8b287e712cd 100644 --- a/src/realm/sync/noinst/pending_reset_store.hpp +++ b/src/realm/sync/noinst/pending_reset_store.hpp @@ -46,28 +46,30 @@ bool operator==(const sync::PendingReset& lhs, const PendingReset::Action& actio class PendingResetStore { public: - // Store the pending reset tracking information - it is an error if the tracking info already - // exists in the store + // Store the pending reset tracking information. Any pre-existing tracking + // will be deleted and replaced with this. // Requires a writable transaction and changes must be committed manually static void track_reset(Group& group, ClientResyncMode mode, PendingReset::Action action, Status error); + // Record the version of the final recovered changeset that must be uploaded + // for a client reset to be complete. Not called for DiscardLocal or if there + // was nothing to recover. + static void set_recovered_version(Group&, version_type); // Clear the pending reset tracking information, if it exists // Requires a writable transaction and changes must be committed manually - // Returns true if there was anything to remove - static bool clear_pending_reset(Group& group); + static void clear_pending_reset(Group& group); + // Remove the pending reset tracking information if it exists and the version + // set with set_recovered_version() is less than or equal to version. + static void remove_if_complete(Group& group, version_type version, util::Logger&); static std::optional has_pending_reset(const Group& group); - static int64_t from_reset_action(PendingReset::Action action); - static PendingReset::Action to_reset_action(int64_t action); - static ClientResyncMode to_resync_mode(int64_t mode); - static int64_t from_resync_mode(ClientResyncMode mode); - private: // The instantiated class is only used internally PendingResetStore(const Group& group); std::vector m_internal_tables; TableKey m_pending_reset_table; - ColKey m_version; + ColKey m_core_version; + ColKey m_recovered_version; ColKey m_timestamp; ColKey m_recovery_mode; ColKey m_action; diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index a083ded2bf4..d5d64ee125b 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -473,78 +473,83 @@ TEST_CASE("app: error handling integration test", "[sync][flx][baas]") { } namespace { -struct DisconnectingWebSocketInterface : sync::WebSocketInterface { - std::unique_ptr m_impl; - std::atomic* disconnect; +// DefaultSocketProvider begins the connection process as a side effect when +// constructed, so we need to be able to defer construction if connect() is +// called while disconnected. async_write_binary() should never be called before +// the websocket observer is notified of a connection completing. +struct WrappedWebSocketInterface : sync::WebSocketInterface { + std::unique_ptr impl; std::string_view get_appservices_request_id() const noexcept override { - return m_impl->get_appservices_request_id(); + return ""; } void async_write_binary(util::Span data, sync::SyncSocketProvider::FunctionHandler&& handler) override { - if (*disconnect) { - handler(Status::OK()); - } - else { - m_impl->async_write_binary(data, std::move(handler)); - } + REALM_ASSERT(impl); + impl->async_write_binary(data, std::move(handler)); } }; -struct DisconnectingWebSocketObserver : sync::WebSocketObserver { - std::unique_ptr m_impl; - std::atomic* disconnect; - - void websocket_connected_handler(const std::string& protocol) override - { - m_impl->websocket_connected_handler(protocol); - } - - void websocket_error_handler() override - { - m_impl->websocket_error_handler(); - } +// A socket provider which can trigger disconnects at arbitrary times and defer +// connection attempts, which lets us pause synchronization at specific times +// without tearing down the session. +struct DisconnectingSocketProvider : sync::websocket::DefaultSocketProvider { + std::mutex mutex; + sync::WebSocketObserver* observer = nullptr; + WrappedWebSocketInterface* deferred_interface = nullptr; + std::unique_ptr deferred_observer; + sync::WebSocketEndpoint deferred_endpoint; + bool block_connections = false; - bool websocket_binary_message_received(util::Span data) override + DisconnectingSocketProvider() + : sync::websocket::DefaultSocketProvider(util::Logger::get_default_logger(), "user agent") { - if (*disconnect) - return true; - return m_impl->websocket_binary_message_received(data); } - bool websocket_closed_handler(bool was_clean, sync::websocket::WebSocketError error_code, - std::string_view message) override + void disconnect() { - return m_impl->websocket_closed_handler(was_clean, error_code, message); + std::lock_guard lock(mutex); + REALM_ASSERT(!block_connections); + block_connections = true; + post([=](Status) { + observer->websocket_closed_handler(true, SocketProviderError::WebSocketError::websocket_write_error, + "fake write error"); + observer = nullptr; + }); } -}; - -// A socket provider which claims to always work, but when `disconnect = true` -// will actually drop all incoming and outgoing messages. This enables testing -// going offline at very specfic points. -struct DisconnectingSocketProvider : sync::websocket::DefaultSocketProvider { - std::atomic disconnect{false}; - DisconnectingSocketProvider() - : sync::websocket::DefaultSocketProvider(util::Logger::get_default_logger(), "user agent") + void allow_reconnect() { + std::lock_guard lock(mutex); + REALM_ASSERT(block_connections); + block_connections = false; + if (!deferred_interface) + return; + + deferred_interface->impl = + DefaultSocketProvider::connect(std::move(deferred_observer), std::move(deferred_endpoint)); + deferred_interface = nullptr; } std::unique_ptr connect(std::unique_ptr observer, sync::WebSocketEndpoint&& endpoint) override { - auto wrapped_observer = std::make_unique(); - wrapped_observer->m_impl = std::move(observer); - wrapped_observer->disconnect = &disconnect; - auto wrapped_interface = std::make_unique(); - wrapped_interface->m_impl = DefaultSocketProvider::connect(std::move(wrapped_observer), std::move(endpoint)); - wrapped_interface->disconnect = &disconnect; - return wrapped_interface; + std::lock_guard lock(mutex); + this->observer = observer.get(); + if (block_connections) { + REALM_ASSERT(!deferred_interface); + auto wrapped_interface = std::make_unique(); + deferred_interface = wrapped_interface.get(); + deferred_observer = std::move(observer); + deferred_endpoint = std::move(endpoint); + return wrapped_interface; + } + return DefaultSocketProvider::connect(std::move(observer), std::move(endpoint)); } }; -} // namespace +} // anonymous namespace TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { std::vector schema{ @@ -1174,8 +1179,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("DiscardLocal: an error is produced if a previously successful query becomes invalid due to " "server changes across a reset") { // Disable dev mode so non-queryable fields are not automatically added as queryable - const AppSession& app_session = harness.session().app_session(); - app_session.admin_api.set_development_mode_to(app_session.server_app_id, false); + DisableDevelopmentMode disable_development_mode(harness.session().app_session()); config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; auto&& [error_future, err_handler] = make_error_handler(); config_local.sync_config->error_handler = err_handler; @@ -1405,8 +1409,6 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { return schema; }; SECTION("Recover: additive schema changes are recovered in dev mode") { - const AppSession& app_session = harness.session().app_session(); - app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); seed_realm(config_local, ResetMode::InitiateClientReset); std::vector changed_schema = make_additive_changes(schema); config_local.schema = changed_schema; @@ -1511,15 +1513,9 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { } SECTION("Recover: additive schema changes without dev mode produce an error after client reset") { - const AppSession& app_session = harness.session().app_session(); - app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); seed_realm(config_local, ResetMode::InitiateClientReset); // Disable dev mode so that schema changes are not allowed - app_session.admin_api.set_development_mode_to(app_session.server_app_id, false); - auto cleanup = util::make_scope_exit([&]() noexcept { - const AppSession& app_session = harness.session().app_session(); - app_session.admin_api.set_development_mode_to(app_session.server_app_id, true); - }); + DisableDevelopmentMode disable_development_mode(harness.session().app_session()); std::vector changed_schema = make_additive_changes(schema); config_local.schema = changed_schema; @@ -1527,8 +1523,13 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema); auto&& [error_future, err_handler] = make_error_handler(); config_local.sync_config->error_handler = err_handler; + // Async open completes after applying the client reset diff, but before + // we've uploaded the recovered changeset for the schema change to the + // server, so it's successful auto realm = successfully_async_open_realm(config_local); - // make changes to the new property + // make changes to the new property to verify we actually created the + // new table. The test would pass without this as uploading will fail + // before we get to this changeset realm->begin_transaction(); auto table = realm->read_group().get_table("class_TopLevel"); ColKey new_col = table->get_column_key("added_oid_field"); @@ -1537,6 +1538,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { it->set(new_col, ObjectId::gen()); } realm->commit_transaction(); + auto err = error_future.get(); std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding " "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", " @@ -1548,6 +1550,10 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { Catch::Matchers::ContainsSubstring(class_err)); CHECK(before_reset_count == 1); CHECK(after_reset_count == 1); + + // Recovery failed, so there should still be a cycle detection tracker + realm->refresh(); + REQUIRE(sync::PendingResetStore::has_pending_reset(realm->read_group())); } SECTION("Recover: inserts in collections in mixed - collections cleared remotely") { @@ -1621,7 +1627,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { GENERATE(ClientResyncMode::Recover, ClientResyncMode::DiscardLocal); seed_realm(config_local, ResetMode::InitiateClientReset); config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { - socket_provider->disconnect = true; + socket_provider->disconnect(); }; // Should complete even though the connection was dropped while applying // the client reset as the fresh realm download waited for download completion @@ -1631,7 +1637,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("DiscardLocal immediately reports upload completion") { config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { - socket_provider->disconnect = true; + socket_provider->disconnect(); }; auto realm = Realm::get_shared_realm(config_local); @@ -1657,7 +1663,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("Recover reports upload completion after recovered changesets are uploaded") { config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { - socket_provider->disconnect = true; + socket_provider->disconnect(); }; auto realm = Realm::get_shared_realm(config_local); @@ -1680,7 +1686,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { // Upload completion has not fired because we recovered local changes // that are waiting to be uploaded REQUIRE_FALSE(pf.future.is_ready()); - socket_provider->disconnect = false; + socket_provider->allow_reconnect(); pf.future.get(); } @@ -1688,7 +1694,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; seed_realm(config_local, ResetMode::InitiateClientReset); config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { - socket_provider->disconnect = true; + socket_provider->disconnect(); }; successfully_async_open_realm(config_local); } @@ -1696,7 +1702,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("Recover immediately marks client reset as successful if there was nothing to recover") { config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { - socket_provider->disconnect = true; + socket_provider->disconnect(); }; auto realm = Realm::get_shared_realm(config_local); @@ -1711,7 +1717,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { SECTION("Recover marks client reset with changes to recover as successful after uploading") { config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { - socket_provider->disconnect = true; + socket_provider->disconnect(); }; auto realm = Realm::get_shared_realm(config_local); @@ -1728,20 +1734,82 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") { REQUIRE(sync::PendingResetStore::has_pending_reset(realm->read_group())); SECTION("existing session allowed to reconnect") { - socket_provider->disconnect = false; - wait_for_upload(*realm); - realm->refresh(); + socket_provider->allow_reconnect(); + while (realm->has_pending_unuploaded_changes()) { + REQUIRE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + realm->wait_for_change(); + realm->refresh(); + } REQUIRE_FALSE(sync::PendingResetStore::has_pending_reset(realm->read_group())); } SECTION("new session discovering the tracker when activating") { SyncSession::OnlyForTesting::pause_async(*realm->sync_session()).get(); - socket_provider->disconnect = false; + socket_provider->allow_reconnect(); realm->sync_session()->resume(); - wait_for_upload(*realm); - realm->refresh(); + while (realm->has_pending_unuploaded_changes()) { + REQUIRE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + realm->wait_for_change(); + realm->refresh(); + } REQUIRE_FALSE(sync::PendingResetStore::has_pending_reset(realm->read_group())); } + + SECTION("tracker is removed before commits made after the reset are uploaded") { + // this creates interleaved commits and query updates, which means + // each changeset is sent in a separate UPLOAD and we're very likely + // to see the intermediate state with only some of them having been processed + subscribe_to_and_add_objects(realm, 50); + + socket_provider->allow_reconnect(); + while (sync::PendingResetStore::has_pending_reset(realm->read_group())) { + realm->wait_for_change(); + realm->refresh(); + } + + // Tracker should have been removed before we uploaded everything + REQUIRE(realm->has_pending_unuploaded_changes()); + } + } + + SECTION("Recover marks client reset as complete even if commits added after the client reset are invalid") { + config_local.sync_config->client_resync_mode = ClientResyncMode::Recover; + config_local.sync_config->notify_before_client_reset = [&](std::shared_ptr) { + socket_provider->disconnect(); + }; + auto&& [error_future, err_handler] = make_error_handler(); + config_local.sync_config->error_handler = err_handler; + + auto realm = Realm::get_shared_realm(config_local); + subscribe_to_and_add_objects(realm, 1); + wait_for_upload(*realm); + realm->sync_session()->pause(); + + DisableDevelopmentMode disable_development_mode(harness.session().app_session()); + + subscribe_to_and_add_objects(realm, 5); + reset_utils::trigger_client_reset(harness.session().app_session(), realm); + realm->sync_session()->resume(); + wait_for_download(*realm); // i.e. wait for the reset to complete + REQUIRE_FALSE(error_future.is_ready()); + + // Ensure that there's a query change before the bad changeset so that + // it doesn't get batched together with the recovered changesets + subscribe_to_and_add_objects(realm, 5); + // Make a schema change that the server will reject due to developer mode + // being disabled + realm->update_schema(make_additive_changes(schema)); + + REQUIRE(sync::PendingResetStore::has_pending_reset(realm->read_group())); + + socket_provider->allow_reconnect(); + wait_for_future(std::move(error_future)).get(); + + // Cycle detector should be removed because we successfully uploaded + // all of the recovered changesets and the next client reset will be + // a "new" one + realm->refresh(); + REQUIRE_FALSE(sync::PendingResetStore::has_pending_reset(realm->read_group())); } } diff --git a/test/object-store/util/sync/baas_admin_api.hpp b/test/object-store/util/sync/baas_admin_api.hpp index f1d075323ea..a1a1cc0c180 100644 --- a/test/object-store/util/sync/baas_admin_api.hpp +++ b/test/object-store/util/sync/baas_admin_api.hpp @@ -293,6 +293,22 @@ inline app::AppConfig get_config(Factory factory, const AppSession& app_session) "A device version", "A framework name", "A framework version", "A bundle id"}}; } +class DisableDevelopmentMode { +public: + DisableDevelopmentMode(const AppSession& session) + : m_session(session) + { + session.admin_api.set_development_mode_to(session.server_app_id, false); + } + ~DisableDevelopmentMode() + { + m_session.admin_api.set_development_mode_to(m_session.server_app_id, true); + } + +private: + const AppSession& m_session; +}; + } // namespace realm #endif // REALM_ENABLE_AUTH_TESTS diff --git a/test/test_client_reset.cpp b/test/test_client_reset.cpp index 24c71a87836..e7bcc0490b7 100644 --- a/test/test_client_reset.cpp +++ b/test/test_client_reset.cpp @@ -937,35 +937,6 @@ void expect_reset(unit_test::TestContext& test_context, DBRef& target, DBRef& fr } } -TEST(ClientReset_ConvertResyncMode) -{ - CHECK(PendingResetStore::to_resync_mode(0) == ClientResyncMode::DiscardLocal); - CHECK(PendingResetStore::to_resync_mode(1) == ClientResyncMode::Recover); - CHECK_THROW(PendingResetStore::to_resync_mode(2), sync::ClientResetFailed); - - CHECK(PendingResetStore::from_resync_mode(ClientResyncMode::DiscardLocal) == 0); - CHECK(PendingResetStore::from_resync_mode(ClientResyncMode::RecoverOrDiscard) == 1); - CHECK(PendingResetStore::from_resync_mode(ClientResyncMode::Recover) == 1); - CHECK_THROW(PendingResetStore::from_resync_mode(ClientResyncMode::Manual), sync::ClientResetFailed); -} - -TEST(ClientReset_ConvertResetAction) -{ - CHECK(PendingResetStore::to_reset_action(0) == sync::ProtocolErrorInfo::Action::NoAction); - CHECK(PendingResetStore::to_reset_action(1) == sync::ProtocolErrorInfo::Action::ClientReset); - CHECK(PendingResetStore::to_reset_action(2) == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery); - CHECK(PendingResetStore::to_reset_action(3) == sync::ProtocolErrorInfo::Action::MigrateToFLX); - CHECK(PendingResetStore::to_reset_action(4) == sync::ProtocolErrorInfo::Action::RevertToPBS); - CHECK(PendingResetStore::to_reset_action(5) == sync::ProtocolErrorInfo::Action::NoAction); - - CHECK(PendingResetStore::from_reset_action(sync::ProtocolErrorInfo::Action::ClientReset) == 1); - CHECK(PendingResetStore::from_reset_action(sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) == 2); - CHECK(PendingResetStore::from_reset_action(sync::ProtocolErrorInfo::Action::MigrateToFLX) == 3); - CHECK(PendingResetStore::from_reset_action(sync::ProtocolErrorInfo::Action::RevertToPBS) == 4); - CHECK_THROW(PendingResetStore::from_reset_action(sync::ProtocolErrorInfo::Action::MigrateSchema), - sync::ClientResetFailed); -} - TEST_TYPES( ClientReset_TrackReset, std::integral_constant,