From 3b2010491b5006fc2b20a03cb3b721fc6b46c740 Mon Sep 17 00:00:00 2001 From: Daniel Tabacaru <96778637+danieltabacaru@users.noreply.github.com> Date: Thu, 9 Nov 2023 09:52:27 +0100 Subject: [PATCH] Handle client resets during schema migrations + integration tests (#7106) * Add option to use draft deployments when creating a schema for baas tests * Track sync schema migration between sessions + minor refactoring * Integration tests * Use Transaction refs instead of shared pointers --- src/realm/exceptions.cpp | 1 + src/realm/exceptions.hpp | 8 + .../object-store/sync/async_open_task.cpp | 42 +- .../object-store/sync/async_open_task.hpp | 1 - src/realm/object-store/sync/sync_session.cpp | 42 +- src/realm/object-store/sync/sync_session.hpp | 8 +- src/realm/sync/CMakeLists.txt | 2 + src/realm/sync/noinst/client_impl_base.cpp | 5 +- src/realm/sync/noinst/protocol_codec.hpp | 10 + .../sync/noinst/sync_schema_migration.cpp | 109 +++ .../sync/noinst/sync_schema_migration.hpp | 33 + src/realm/sync/protocol.hpp | 1 + test/object-store/CMakeLists.txt | 1 + .../sync/flx_schema_migration.cpp | 887 ++++++++++++++++++ .../object-store/util/sync/baas_admin_api.cpp | 109 ++- .../object-store/util/sync/baas_admin_api.hpp | 1 + .../util/sync/flx_sync_harness.hpp | 12 +- test/object-store/util/test_file.cpp | 2 +- 18 files changed, 1198 insertions(+), 76 deletions(-) create mode 100644 src/realm/sync/noinst/sync_schema_migration.cpp create mode 100644 src/realm/sync/noinst/sync_schema_migration.hpp create mode 100644 test/object-store/sync/flx_schema_migration.cpp diff --git a/src/realm/exceptions.cpp b/src/realm/exceptions.cpp index 57528c79dac..966572fe3f5 100644 --- a/src/realm/exceptions.cpp +++ b/src/realm/exceptions.cpp @@ -166,6 +166,7 @@ InvalidTableRef::~InvalidTableRef() noexcept = default; SerializationError::~SerializationError() noexcept = default; NotImplemented::~NotImplemented() noexcept = default; MigrationFailed::~MigrationFailed() noexcept = default; +SyncSchemaMigrationFailed::~SyncSchemaMigrationFailed() noexcept = default; ObjectAlreadyExists::~ObjectAlreadyExists() noexcept = default; CrossTableLinkTarget::~CrossTableLinkTarget() noexcept = default; SystemError::~SystemError() noexcept = default; diff --git a/src/realm/exceptions.hpp b/src/realm/exceptions.hpp index 7521c40e77c..3ac43305e6f 100644 --- a/src/realm/exceptions.hpp +++ b/src/realm/exceptions.hpp @@ -306,6 +306,14 @@ struct MigrationFailed : LogicError { ~MigrationFailed() noexcept override; }; +struct SyncSchemaMigrationFailed : LogicError { + SyncSchemaMigrationFailed(std::string_view msg) + : LogicError(ErrorCodes::SyncSchemaMigrationError, msg) + { + } + ~SyncSchemaMigrationFailed() noexcept override; +}; + struct ObjectAlreadyExists : RuntimeError { template ObjectAlreadyExists(const U& object_type, T pk_val) diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index 5ff79181c1f..6c6f459e8be 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -61,15 +62,6 @@ void AsyncOpenTask::start(AsyncOpenCallback callback) self->migrate_schema_or_complete(std::move(callback), coordinator, status); }); - // The callback does not extend the lifetime of the task if it's never invoked. - SyncSession::Internal::set_sync_schema_migration_callback(*session, [weak_self = weak_from_this(), this]() { - if (auto self = weak_self.lock()) { - util::CheckedLockGuard lock(m_mutex); - if (!m_session) - return; - m_sync_schema_migration_required = true; - } - }); session->revive_if_needed(); } @@ -128,7 +120,13 @@ void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callb // If the same Realm file is already opened, there is the possibility that this code may wait on a subscription // that was not committed by init_subscription_initializer. - auto shared_realm = coordinator->get_realm(nullptr, m_db_first_open); + SharedRealm shared_realm; + try { + shared_realm = coordinator->get_realm(nullptr, m_db_first_open); + } + catch (...) { + return callback({}, std::current_exception()); + } const auto init_subscription = shared_realm->get_latest_subscription_set(); const auto sub_state = init_subscription.state(); @@ -181,18 +179,32 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback, if (!m_session) return; auto session = m_session; - auto migrate_schema = m_sync_schema_migration_required; lock.unlock(); - if (!migrate_schema) { + auto pending_migration = [&] { + auto rt = coordinator->begin_read(); + return _impl::sync_schema_migration::has_pending_migration(*rt); + }(); + + if (!pending_migration) { wait_for_bootstrap_or_complete(std::move(callback), coordinator, status); return; } + // Sync schema migrations require setting a subscription initializer callback to bootstrap the data. The + // subscriptions in the current realm file may not be compatible with the new schema so cannot rely on them. + auto config = coordinator->get_config(); + if (!config.sync_config->subscription_initializer) { + status = Status(ErrorCodes::SyncSchemaMigrationError, + "Sync schema migrations must provide a subscription initializer callback in the sync config"); + async_open_complete(std::move(callback), coordinator, status); + return; + } + // Migrate the schema. // * First upload the changes at the old schema version - // * Then delete the realm, reopen it, and rebootstrap at new schema version - // The lifetime of the task is extended until the bootstrap completes. + // * Then delete the realm, reopen it, and bootstrap at new schema version + // The lifetime of the task is extended until bootstrap completes. std::shared_ptr self(shared_from_this()); session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self, this](Status status) mutable { @@ -240,7 +252,7 @@ void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback, { auto config = coordinator->get_config(); if (config.sync_config && config.sync_config->flx_sync_requested && - config.sync_config->subscription_initializer) { + config.sync_config->subscription_initializer && status.is_ok()) { const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open; attach_to_subscription_initializer(std::move(callback), coordinator, rerun_on_launch); } diff --git a/src/realm/object-store/sync/async_open_task.hpp b/src/realm/object-store/sync/async_open_task.hpp index 99df1e0e797..31f9798cd88 100644 --- a/src/realm/object-store/sync/async_open_task.hpp +++ b/src/realm/object-store/sync/async_open_task.hpp @@ -72,7 +72,6 @@ class AsyncOpenTask : public std::enable_shared_from_this { std::vector m_registered_callbacks GUARDED_BY(m_mutex); mutable util::CheckedMutex m_mutex; const bool m_db_first_open; - bool m_sync_schema_migration_required GUARDED_BY(m_mutex) = false; }; } // namespace realm diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index d28227e1be2..e0d613f98a9 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include using namespace realm; @@ -366,6 +367,7 @@ SyncSession::SyncSession(SyncClient& client, std::shared_ptr db, const Realm , m_migration_store{sync::MigrationStore::create(m_db)} , m_client(client) , m_sync_manager(sync_manager) + , m_previous_schema_version(_impl::sync_schema_migration::has_pending_migration(*m_db->start_read())) { REALM_ASSERT(m_config.sync_config); // we don't want the following configs enabled during a client reset @@ -497,6 +499,10 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re // deep copy the sync config so we don't modify the live session's config config.sync_config = std::make_shared(fresh_config); config.sync_config->client_resync_mode = ClientResyncMode::Manual; + // Do not run the subscription initializer on fresh realms used in client resets. + config.sync_config->rerun_init_subscription_on_open = false; + config.sync_config->subscription_initializer = nullptr; + config.schema_version = m_previous_schema_version.value_or(m_config.schema_version); fresh_sync_session = m_sync_manager->get_session(db, config); auto& history = static_cast(*db->get_replication()); // the fresh Realm may apply writes to this db after it has outlived its sync session @@ -703,7 +709,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error) [[fallthrough]]; case ClientResyncMode::Recover: download_fresh_realm(error.server_requests_action); - return; // do not propgate the error to the user at this point + return; // do not propagate the error to the user at this point } break; case sync::ProtocolErrorInfo::Action::MigrateToFLX: @@ -747,15 +753,11 @@ void SyncSession::handle_error(sync::SessionErrorInfo error) log_out_user = true; break; case sync::ProtocolErrorInfo::Action::MigrateSchema: - std::function callback; - { - util::CheckedLockGuard l(m_state_mutex); - callback = std::move(m_sync_schema_migration_callback); - } - if (callback) { - callback(); - } - return; // do not propgate the error to the user at this point + util::CheckedUniqueLock lock(m_state_mutex); + // Should only be received for FLX sync. + REALM_ASSERT(m_original_sync_config->flx_sync_requested); + m_previous_schema_version = error.previous_schema_version; + return; // do not propagate the error to the user at this point } } else { @@ -837,7 +839,8 @@ void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloada static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config, const std::shared_ptr& sync_config, - DBRef&& fresh_copy, bool recovery_is_allowed) + DBRef&& fresh_copy, bool recovery_is_allowed, + bool schema_migration_detected) { REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual); @@ -852,6 +855,11 @@ static sync::Session::Config::ClientReset make_client_reset_config(const RealmCo if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset) return config; + // Do not run the client reset callbacks in case of a sync schema migration. + // (opening the realm with the new schema will result in a crash due to breaking changes) + if (schema_migration_detected) + return config; + RealmConfig realm_config = base_config; realm_config.sync_config = std::make_shared(*sync_config); // deep copy realm_config.scheduler = util::Scheduler::make_dummy(); @@ -942,8 +950,10 @@ void SyncSession::create_sync_session() m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX || m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS; // Use the original sync config, not the updated one from the migration store - session_config.client_reset_config = make_client_reset_config( - m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy), allowed_to_recover); + session_config.client_reset_config = + make_client_reset_config(m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy), + allowed_to_recover, m_previous_schema_version.has_value()); + session_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version); m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction; } @@ -998,12 +1008,6 @@ void SyncSession::create_sync_session() }); } -void SyncSession::set_sync_schema_migration_callback(std::function&& callback) -{ - util::CheckedLockGuard l(m_state_mutex); - m_sync_schema_migration_callback = std::move(callback); -} - void SyncSession::nonsync_transact_notify(sync::version_type version) { m_progress_notifier.set_local_version(version); diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index 573d4ade22d..b266e710158 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -286,11 +286,6 @@ class SyncSession : public std::enable_shared_from_this { return session.m_db; } - static void set_sync_schema_migration_callback(SyncSession& session, std::function&& callback) - { - session.set_sync_schema_migration_callback(std::move(callback)); - } - static util::Future pause_async(SyncSession& session); }; @@ -504,6 +499,9 @@ class SyncSession : public std::enable_shared_from_this { mutable util::CheckedMutex m_external_reference_mutex; class ExternalReference; std::weak_ptr m_external_reference GUARDED_BY(m_external_reference_mutex); + + // Set if ProtocolError::schema_version_changed error is received from the server. + std::optional m_previous_schema_version GUARDED_BY(m_state_mutex); }; } // namespace realm diff --git a/src/realm/sync/CMakeLists.txt b/src/realm/sync/CMakeLists.txt index 38d0f98065a..afa711d9e04 100644 --- a/src/realm/sync/CMakeLists.txt +++ b/src/realm/sync/CMakeLists.txt @@ -11,6 +11,7 @@ set(SYNC_SOURCES noinst/pending_bootstrap_store.cpp noinst/protocol_codec.cpp noinst/sync_metadata_schema.cpp + noinst/sync_schema_migration.cpp changeset_encoder.cpp changeset_parser.cpp changeset.cpp @@ -79,6 +80,7 @@ set(NOINST_HEADERS noinst/protocol_codec.hpp noinst/root_certs.hpp noinst/sync_metadata_schema.hpp + noinst/sync_schema_migration.hpp ) set(SYNC_HEADERS ${IMPL_INSTALL_HEADESR} diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 59cb883cd10..cd62e2ed84b 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -2544,7 +2545,9 @@ Status Session::receive_error_message(const ProtocolErrorInfo& info) if (protocol_error == ProtocolError::schema_version_changed) { // Enable upload immediately if the session is still active. if (m_state == Active) { - m_allow_upload = true; + auto wt = get_db()->start_write(); + _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version); + wt->commit(); // Notify SyncSession a schema migration is required. on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info}); } diff --git a/src/realm/sync/noinst/protocol_codec.hpp b/src/realm/sync/noinst/protocol_codec.hpp index f60818ffd70..9913aa58bf2 100644 --- a/src/realm/sync/noinst/protocol_codec.hpp +++ b/src/realm/sync/noinst/protocol_codec.hpp @@ -284,6 +284,16 @@ class ClientProtocol { info.migration_query_string.emplace(query_string->get()); } + if (info.raw_error_code == static_cast(sync::ProtocolError::schema_version_changed)) { + auto schema_version = json.find("previousSchemaVersion"); + if (schema_version == json.end() || !schema_version->is_number_unsigned()) { + return report_error( + "Missing/invalid previous schema version in schema migration error response"); + } + + info.previous_schema_version.emplace(schema_version->get()); + } + if (auto rejected_updates = json.find("rejectedUpdates"); rejected_updates != json.end()) { if (!rejected_updates->is_array()) { return report_error( diff --git a/src/realm/sync/noinst/sync_schema_migration.cpp b/src/realm/sync/noinst/sync_schema_migration.cpp new file mode 100644 index 00000000000..423c609a5bc --- /dev/null +++ b/src/realm/sync/noinst/sync_schema_migration.cpp @@ -0,0 +1,109 @@ +/************************************************************************* + * + * Copyright 2023 Realm, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **************************************************************************/ + +#include + +#include + +#include + +using namespace realm; +using namespace _impl; + +namespace realm::_impl::sync_schema_migration { + +// A table without a "class_" prefix will not generate sync instructions. +constexpr static std::string_view s_meta_schema_migration_table_name("schema_migration_metadata"); +constexpr static std::string_view s_pk_col_name("id"); +constexpr static std::string_view s_version_column_name("version"); +constexpr static std::string_view s_timestamp_col_name("event_time"); +constexpr static std::string_view s_previous_schema_version_col_name("previous_schema_version"); +constexpr int64_t metadata_version = 1; + +std::optional has_pending_migration(const Transaction& rt) +{ + ConstTableRef table = rt.get_table(s_meta_schema_migration_table_name); + if (!table || table->size() == 0) { + return none; + } + ColKey timestamp_col = table->get_column_key(s_timestamp_col_name); + ColKey version_col = table->get_column_key(s_version_column_name); + ColKey previous_schema_version_col = table->get_column_key(s_previous_schema_version_col_name); + REALM_ASSERT(timestamp_col); + REALM_ASSERT(version_col); + REALM_ASSERT(previous_schema_version_col); + + Obj first = *table->begin(); + REALM_ASSERT(first); + auto version = first.get(version_col); + auto time = first.get(timestamp_col); + if (version != metadata_version) { + throw SyncSchemaMigrationFailed( + util::format("Unsupported sync schema migration metadata version: %1 vs %2, from %3", version, + metadata_version, time)); + } + return first.get(previous_schema_version_col); +} + +void track_sync_schema_migration(Transaction& wt, uint64_t previous_schema_version) +{ + TableRef table = wt.get_table(s_meta_schema_migration_table_name); + ColKey version_col, timestamp_col, previous_schema_version_col; + if (!table) { + table = wt.add_table_with_primary_key(s_meta_schema_migration_table_name, type_ObjectId, s_pk_col_name); + REALM_ASSERT(table); + version_col = table->add_column(type_Int, s_version_column_name); + timestamp_col = table->add_column(type_Timestamp, s_timestamp_col_name); + previous_schema_version_col = table->add_column(type_Int, s_previous_schema_version_col_name); + } + else { + version_col = table->get_column_key(s_version_column_name); + timestamp_col = table->get_column_key(s_timestamp_col_name); + previous_schema_version_col = table->get_column_key(s_previous_schema_version_col_name); + } + REALM_ASSERT(version_col); + REALM_ASSERT(timestamp_col); + REALM_ASSERT(previous_schema_version_col); + + // A migration object may exist if the migration was started in a previous session. + if (table->is_empty()) { + table->create_object_with_primary_key(ObjectId::gen(), + {{version_col, metadata_version}, + {timestamp_col, Timestamp(std::chrono::system_clock::now())}, + {previous_schema_version_col, int64_t(previous_schema_version)}}); + } + else { + auto first = *table->begin(); + auto version = first.get(version_col); + auto time = first.get(timestamp_col); + if (version != metadata_version) { + throw SyncSchemaMigrationFailed( + util::format("Unsupported sync schema migration metadata version: %1 vs %2, from %3", version, + metadata_version, time)); + } + uint64_t schema_version = first.get(previous_schema_version_col); + if (schema_version != previous_schema_version) { + throw SyncSchemaMigrationFailed( + util::format("Cannot continue sync schema migration with different previous schema version (existing " + "previous_schema_version=%1, new previous_schema_version=%2)", + schema_version, previous_schema_version)); + } + } +} + +} // namespace realm::_impl::sync_schema_migration \ No newline at end of file diff --git a/src/realm/sync/noinst/sync_schema_migration.hpp b/src/realm/sync/noinst/sync_schema_migration.hpp new file mode 100644 index 00000000000..8ae60e385b4 --- /dev/null +++ b/src/realm/sync/noinst/sync_schema_migration.hpp @@ -0,0 +1,33 @@ +/************************************************************************* + * + * Copyright 2023 Realm, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **************************************************************************/ + +#pragma once + +#include + +#include + +namespace realm { +namespace _impl::sync_schema_migration { + +std::optional has_pending_migration(const Transaction& rt); + +void track_sync_schema_migration(Transaction& wt, uint64_t previous_schema_version); + +} // namespace _impl::sync_schema_migration +} // namespace realm \ No newline at end of file diff --git a/src/realm/sync/protocol.hpp b/src/realm/sync/protocol.hpp index 7f4009079c1..7f261f36903 100644 --- a/src/realm/sync/protocol.hpp +++ b/src/realm/sync/protocol.hpp @@ -296,6 +296,7 @@ struct ProtocolErrorInfo { std::optional resumption_delay_interval; Action server_requests_action; std::optional migration_query_string; + std::optional previous_schema_version; }; diff --git a/test/object-store/CMakeLists.txt b/test/object-store/CMakeLists.txt index d29e960c645..aac511f7428 100644 --- a/test/object-store/CMakeLists.txt +++ b/test/object-store/CMakeLists.txt @@ -60,6 +60,7 @@ if(REALM_ENABLE_SYNC) sync/client_reset.cpp sync/file.cpp sync/flx_migration.cpp + sync/flx_schema_migration.cpp sync/flx_sync.cpp sync/metadata.cpp sync/migration_store_test.cpp diff --git a/test/object-store/sync/flx_schema_migration.cpp b/test/object-store/sync/flx_schema_migration.cpp new file mode 100644 index 00000000000..868331dbc08 --- /dev/null +++ b/test/object-store/sync/flx_schema_migration.cpp @@ -0,0 +1,887 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2023 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#if REALM_ENABLE_SYNC +#if REALM_ENABLE_AUTH_TESTS + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include + +using namespace std::string_literals; +using namespace realm::sync; + +namespace realm::app { + +namespace { + +void create_schema(const AppSession& app_session, std::shared_ptr user, Schema target_schema, + int64_t target_schema_version) +{ + auto create_config = app_session.config; + create_config.schema = target_schema; + app_session.admin_api.create_schema(app_session.server_app_id, create_config); + + auto remote_client = user->mongo_client("BackingDB"); + auto db = remote_client.db("app"); + auto settings = db["schema_history"]; + + timed_sleeping_wait_for( + [&] { + bson::BsonDocument filter_doc{{"app_id", ObjectId(app_session.server_app_id)}, + {"version", target_schema_version - 1}}; + bool found = false; + settings.find_one(filter_doc, + [&](util::Optional document, util::Optional error) { + REQUIRE_FALSE(error); + found = document.has_value(); + }); + return found; + }, + std::chrono::minutes(5), std::chrono::milliseconds(500)); +} + +std::pair async_open_realm(const Realm::Config& config) +{ + std::mutex mutex; + ThreadSafeReference realm_ref; + std::exception_ptr error; + auto task = Realm::get_synchronized_realm(config); + task->start([&](ThreadSafeReference&& ref, std::exception_ptr e) { + std::lock_guard lock(mutex); + realm_ref = std::move(ref); + error = e; + }); + util::EventLoop::main().run_until([&] { + std::lock_guard lock(mutex); + return realm_ref || error; + }); + auto realm = error ? nullptr : Realm::get_shared_realm(std::move(realm_ref)); + return std::pair(realm, error); +} + +std::vector get_schema_v0() +{ + return { + {"TopLevel", + {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}, + {"queryable_str_field", PropertyType::String | PropertyType::Nullable}, + {"queryable_int_field", PropertyType::Int | PropertyType::Nullable}, + {"non_queryable_field", PropertyType::String | PropertyType::Nullable}, + {"non_queryable_field2", PropertyType::String}}}, + {"TopLevel2", + {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}, + {"queryable_str_field", PropertyType::String | PropertyType::Nullable}, + {"queryable_int_field", PropertyType::Int | PropertyType::Nullable}, + {"non_queryable_field", PropertyType::String | PropertyType::Nullable}}}, + {"TopLevel3", + {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}, {"queryable_int_field", PropertyType::Int}}}, + }; +} + +auto get_subscription_initializer_callback_for_schema_v0() +{ + return [](std::shared_ptr realm) mutable { + REQUIRE(realm); + auto table = realm->read_group().get_table("class_TopLevel"); + auto col_key = table->get_column_key("queryable_int_field"); + auto query = Query(table).greater_equal(col_key, int64_t(0)); + auto table2 = realm->read_group().get_table("class_TopLevel2"); + Query query2(table2); + table = realm->read_group().get_table("class_TopLevel3"); + col_key = table->get_column_key("queryable_int_field"); + auto query3 = Query(table).greater_equal(col_key, int64_t(0)); + auto subs = realm->get_latest_subscription_set().make_mutable_copy(); + subs.clear(); + subs.insert_or_assign(query); + subs.insert_or_assign(query2); + subs.insert_or_assign(query3); + subs.commit(); + }; +} + +// The following breaking changes are applied to schema at v0: +// * Table 'TopLevel2' is removed +// * Field 'queryable_str_field' in table 'TopLevel' is removed (the user does not query on it) +// * Field 'non_queryable_field' in table 'TopLevel' is marked required +// * Field 'non_queryable_field2' in table 'TopLevel' is marked optional +// * Filed 'queryable_int_field' in table 'TopLevel3' is removed (the user queries on it) +std::vector get_schema_v1() +{ + return { + {"TopLevel", + {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}, + {"queryable_int_field", PropertyType::Int | PropertyType::Nullable}, + {"non_queryable_field", PropertyType::String}, + {"non_queryable_field2", PropertyType::String | PropertyType::Nullable}}}, + {"TopLevel3", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}}}, + }; +} + +auto get_subscription_initializer_callback_for_schema_v1() +{ + return [](std::shared_ptr realm) mutable { + REQUIRE(realm); + auto table = realm->read_group().get_table("class_TopLevel"); + Query query(table); + table = realm->read_group().get_table("class_TopLevel3"); + Query query2(table); + auto subs = realm->get_latest_subscription_set().make_mutable_copy(); + subs.clear(); + subs.insert_or_assign(query); + subs.insert_or_assign(query2); + subs.commit(); + }; +} + +// The following breaking changes are applied to schema at v1: +// * Field 'queryable_int_field' in table 'TopLevel' is marked required +std::vector get_schema_v2() +{ + return { + {"TopLevel", + {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}, + {"queryable_int_field", PropertyType::Int}, + {"non_queryable_field", PropertyType::String}, + {"non_queryable_field2", PropertyType::String | PropertyType::Nullable}}}, + {"TopLevel3", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}}}}, + }; +} + +auto get_subscription_initializer_callback_for_schema_v2() +{ + return [](std::shared_ptr realm) mutable { + REQUIRE(realm); + auto table = realm->read_group().get_table("class_TopLevel"); + auto col_key = table->get_column_key("queryable_int_field"); + auto query = Query(table).greater_equal(col_key, int64_t(5)); + table = realm->read_group().get_table("class_TopLevel3"); + Query query2(table); + auto subs = realm->get_latest_subscription_set().make_mutable_copy(); + subs.clear(); + subs.insert_or_assign(query); + subs.insert_or_assign(query2); + subs.commit(); + }; +} + +// Sort 'computed_properties' and 'persisted_properties'. +ObjectSchema sort_schema_properties(const ObjectSchema& schema) +{ + ObjectSchema target_schema = schema; + auto predicate = [](const Property& a, const Property& b) { + return a.name < b.name; + }; + std::vector persisted_properties = schema.persisted_properties; + std::sort(std::begin(persisted_properties), std::end(persisted_properties), predicate); + target_schema.persisted_properties = persisted_properties; + std::vector computed_properties = schema.computed_properties; + std::sort(std::begin(computed_properties), std::end(computed_properties), predicate); + target_schema.computed_properties = computed_properties; + return target_schema; +} + +// Check realm's schema and target_schema match. +void check_realm_schema(SharedRealm& realm, const std::vector& target_schema, + uint64_t target_schema_version) +{ + auto realm_schema = ObjectStore::schema_from_group(realm->read_group()); + CHECK(realm->schema_version() == target_schema_version); + CHECK(realm_schema.size() == target_schema.size()); + + for (auto& object : target_schema) { + auto it = realm_schema.find(object); + CHECK(it != realm_schema.end()); + auto target_object_schema = sort_schema_properties(object); + auto realm_object_schema = sort_schema_properties(*it); + CHECK(target_object_schema == realm_object_schema); + } +} + +} // namespace + +TEST_CASE("Sync schema migrations don't not work with sync open", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + // First open the realm at schema version 0. + { + auto realm = Realm::get_shared_realm(config); + subscribe_to_all_and_bootstrap(*realm); + wait_for_upload(*realm); + } + + const AppSession& app_session = harness.session().app_session(); + + // Bump the schema version. + config.schema_version = 1; + auto schema_v1 = schema_v0; + + SECTION("Breaking change detected by client") { + // Make field 'non_queryable_field2' of table 'TopLevel' optional. + schema_v1[0].persisted_properties.back() = {"non_queryable_field2", + PropertyType::String | PropertyType::Nullable}; + config.schema = schema_v1; + create_schema(app_session, harness.app()->current_user(), *config.schema, config.schema_version); + + REQUIRE_THROWS_AS(Realm::get_shared_realm(config), InvalidAdditiveSchemaChangeException); + } + + SECTION("Breaking change detected by server") { + // Remove table 'TopLevel2'. + schema_v1.pop_back(); + config.schema = schema_v1; + create_schema(app_session, harness.app()->current_user(), *config.schema, config.schema_version); + + config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr weak_session, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code == sync::ProtocolError::initial_sync_not_completed) { + return SyncClientHookAction::NoAction; + } + CHECK(error_code == sync::ProtocolError::schema_version_changed); + return SyncClientHookAction::NoAction; + }; + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + + auto table = realm->read_group().get_table("class_TopLevel2"); + // Migration did not succeed because table 'TopLevel2' still exists (but there is no error). + CHECK(table); + } +} + +TEST_CASE("Cannot migrate schema to unknown version", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + + SECTION("Fresh realm") { + SECTION("No schema versions") { + } + + SECTION("Schema versions") { + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + } + } + + SECTION("Existing realm") { + // First open the realm at schema version 0. + { + auto realm = Realm::get_shared_realm(config); + subscribe_to_all_and_bootstrap(*realm); + wait_for_upload(*realm); + } + + SECTION("No schema versions") { + } + + SECTION("Schema versions") { + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + } + } + + // Bump the schema to a version the server does not know about. + config.schema_version = 42; + config.schema = schema_v0; + config.sync_config->error_handler = nullptr; + config.sync_config->on_sync_client_event_hook = [](std::weak_ptr weak_session, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code == sync::ProtocolError::initial_sync_not_completed) { + return SyncClientHookAction::NoAction; + } + + CHECK(error_code == sync::ProtocolError::bad_schema_version); + session->force_close(); + return SyncClientHookAction::NoAction; + }; + + auto [realm, error] = async_open_realm(config); + REQUIRE_FALSE(realm); + REQUIRE(error); + + // Update schema version to 0 and try again (the version now matches the actual schema). + config.schema_version = 0; + config.sync_config->on_sync_client_event_hook = nullptr; + std::tie(realm, error) = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v0, 0); +} + +TEST_CASE("Schema version mismatch between client and server", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + + bool schema_migration_expected = false; + + SECTION("Fresh realm") { + schema_migration_expected = false; + } + + SECTION("Existing realm") { + auto realm = Realm::get_shared_realm(config); + subscribe_to_all_and_bootstrap(*realm); + wait_for_upload(*realm); + + schema_migration_expected = true; + + SECTION("Realm already on the latest schema version") { + DBOptions options; + options.encryption_key = test_util::crypt_key(); + auto db = DB::create(sync::make_client_replication(), config.path, options); + auto tr = db->start_write(); + ObjectStore::set_schema_version(*tr, 1); + tr->commit(); + auto schema_version = ObjectStore::get_schema_version(*db->start_read()); + CHECK(schema_version == 1); + } + SECTION("Open realm with the lastest schema version for the first time") { + } + } + + config.schema_version = 1; + config.schema = schema_v0; + + auto schema_migration_required = false; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + config.sync_config->error_handler = nullptr; + config.sync_config->on_sync_client_event_hook = + [&schema_migration_required](std::weak_ptr weak_session, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code != sync::ProtocolError::schema_version_changed) { + return SyncClientHookAction::NoAction; + } + schema_migration_required = true; + return SyncClientHookAction::NoAction; + }; + + auto [realm, error] = async_open_realm(config); + REQUIRE_FALSE(realm); + REQUIRE(error); + REQUIRE(schema_migration_expected == schema_migration_required); +} + +TEST_CASE("Fresh realm does not require schema migration", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + config.sync_config->on_sync_client_event_hook = [](std::weak_ptr weak_session, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + CHECK(error_code == sync::ProtocolError::initial_sync_not_completed); + return SyncClientHookAction::NoAction; + }; + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v1, 1); +} + +TEST_CASE("Upgrade schema version (with recovery) then downgrade", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(realm, schema_v0, 0); + + realm->sync_session()->pause(); + + // Subscription to recover when upgrading the schema. + auto subs = realm->get_latest_subscription_set().make_mutable_copy(); + CHECK(subs.erase_by_class_name("TopLevel2")); + auto table = realm->read_group().get_table("class_TopLevel2"); + auto col_key = table->get_column_key("queryable_int_field"); + auto query = Query(table).greater_equal(col_key, int64_t(0)); + subs.insert_or_assign(query); + subs.commit(); + + // Object to recover when upgrading the schema. + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "biz"s}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field2", "non queryable 33"s}})); + realm->commit_transaction(); + // This server drops this object because the client is querying on a removed field. + realm->begin_transaction(); + Object::create( + c, realm, "TopLevel3", + std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); + + realm->close(); + } + + auto obj3_id = ObjectId::gen(); + harness.load_initial_data([&](SharedRealm realm) { + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "foo"s}, + {"queryable_int_field", static_cast(5)}, + {"non_queryable_field", "non queryable 1"s}, + {"non_queryable_field2", "non queryable 11"s}})); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "bar"s}, + {"queryable_int_field", static_cast(10)}, + {"non_queryable_field", "non queryable 2"s}, + {"non_queryable_field2", "non queryable 22"s}})); + Object::create(c, realm, "TopLevel2", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "foo2"s}, + {"queryable_int_field", static_cast(10)}, + {"non_queryable_field", "non queryable 2"s}})); + Object::create(c, realm, "TopLevel3", + std::any(AnyDict{{"_id", obj3_id}, {"queryable_int_field", static_cast(10000)}})); + }); + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + auto schema_v2 = get_schema_v2(); + create_schema(app_session, harness.app()->current_user(), schema_v2, 2); + + // First schema upgrade. + { + // Upgrade the schema version + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = nullptr; + + // Cannot migrate the schema without setting the subscription initializer callback. + auto [realm, error] = async_open_realm(config); + REQUIRE_FALSE(realm); + REQUIRE(error); + + // Retry migration with subscription initializer callback set. + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + std::tie(realm, error) = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v1, 1); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 3); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->size() == 1); + CHECK(table->get_object_with_primary_key(obj3_id)); + + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field", "non queryable 4"s}, + {"non_queryable_field2", "non queryable 44"s}})); + realm->commit_transaction(); + + wait_for_upload(*realm); + wait_for_download(*realm); + } + + // Second schema upgrade. + { + config.schema_version = 2; + config.schema = schema_v2; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v2(); + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v2, 2); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 4); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->size() == 1); + CHECK(table->get_object_with_primary_key(obj3_id)); + } + + // First schema downgrade. + { + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v1, 1); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 4); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->size() == 1); + CHECK(table->get_object_with_primary_key(obj3_id)); + } + + // Second schema downgrade. + { + config.schema_version = 0; + config.schema = schema_v0; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v0, 0); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 4); + table = realm->read_group().get_table("class_TopLevel2"); + CHECK(table->is_empty()); + auto table3 = realm->read_group().get_table("class_TopLevel3"); + CHECK(table3->is_empty()); + + // The existing subscription for 'TopLevel3' is on a removed field (in version 1), so data cannot be sync'd. + // Update subscription so data can be sync'd. + auto subs = realm->get_latest_subscription_set().make_mutable_copy(); + CHECK(subs.erase_by_class_name("TopLevel3")); + subs.insert_or_assign(Query(table3)); + auto new_subs = subs.commit(); + new_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get(); + realm->refresh(); + CHECK(table3->size() == 1); + CHECK(table3->get_object_with_primary_key(obj3_id)); + } +} + +TEST_CASE("An interrupted schema migration can recover on the next session", + "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(realm, schema_v0, 0); + } + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + + config.schema_version = 1; + config.schema = schema_v1; + auto bad_schema_version_count = 0; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + config.sync_config->on_sync_client_event_hook = + [&bad_schema_version_count](std::weak_ptr weak_session, const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code == sync::ProtocolError::initial_sync_not_completed) { + return SyncClientHookAction::NoAction; + } + + CHECK(error_code == sync::ProtocolError::schema_version_changed); + // Pause and resume the session the first time the a schema migration is required. + if (++bad_schema_version_count == 1) { + session->force_close(); + } + return SyncClientHookAction::NoAction; + }; + + auto [realm, error] = async_open_realm(config); + REQUIRE_FALSE(realm); + REQUIRE(error); + + // Retry the migration. + std::tie(realm, error) = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + REQUIRE(bad_schema_version_count == 2); + check_realm_schema(realm, schema_v1, 1); +} + +TEST_CASE("Migrate to new schema version with a schema subset", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(realm, schema_v0, 0); + } + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + + config.schema_version = 1; + auto schema_subset = schema_v1; + // One of the columns in 'TopLevel' is not needed by the user. + schema_subset[0].persisted_properties.pop_back(); + config.schema = schema_subset; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v1, 1); +} + +TEST_CASE("Client reset during schema migration", "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(realm, schema_v0, 0); + + realm->sync_session()->pause(); + + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "foo"s}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field2", "non queryable 11"s}})); + Object::create( + c, realm, "TopLevel3", + std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); + realm->commit_transaction(); + realm->close(); + } + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + config.sync_config->client_resync_mode = ClientResyncMode::Recover; + config.sync_config->on_sync_client_event_hook = [&harness, schema_version_changed_count = + 0](std::weak_ptr weak_session, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code == sync::ProtocolError::initial_sync_not_completed) { + return SyncClientHookAction::NoAction; + } + + if (error_code == sync::ProtocolError::schema_version_changed) { + ++schema_version_changed_count; + if (schema_version_changed_count == 1) { + reset_utils::trigger_client_reset(harness.session().app_session()); + } + } + + return SyncClientHookAction::NoAction; + }; + size_t before_reset_count = 0; + size_t after_reset_count = 0; + config.sync_config->notify_before_client_reset = [&before_reset_count](SharedRealm) { + ++before_reset_count; + }; + config.sync_config->notify_after_client_reset = [&after_reset_count](SharedRealm, ThreadSafeReference, bool) { + ++after_reset_count; + }; + + auto [realm, error] = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + REQUIRE(before_reset_count == 0); + REQUIRE(after_reset_count == 0); + check_realm_schema(realm, schema_v1, 1); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 1); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->is_empty()); +} + +TEST_CASE("Migrate to new schema version after migration to intermediate version is interrupted", + "[sync][flx][flx schema migration][baas]") { + auto schema_v0 = get_schema_v0(); + FLXSyncTestHarness harness("flx_sync_schema_migration", + {schema_v0, {"queryable_str_field", "queryable_int_field"}}); + SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); + + { + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v0(); + auto realm = Realm::get_shared_realm(config); + wait_for_download(*realm); + wait_for_upload(*realm); + check_realm_schema(realm, schema_v0, 0); + + realm->sync_session()->pause(); + + realm->begin_transaction(); + CppContext c(realm); + Object::create(c, realm, "TopLevel", + std::any(AnyDict{{"_id", ObjectId::gen()}, + {"queryable_str_field", "foo"s}, + {"queryable_int_field", static_cast(15)}, + {"non_queryable_field2", "non queryable 11"s}})); + Object::create( + c, realm, "TopLevel3", + std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", static_cast(42)}})); + realm->commit_transaction(); + realm->close(); + } + + const AppSession& app_session = harness.session().app_session(); + auto schema_v1 = get_schema_v1(); + create_schema(app_session, harness.app()->current_user(), schema_v1, 1); + auto schema_v2 = get_schema_v2(); + create_schema(app_session, harness.app()->current_user(), schema_v2, 2); + + config.schema_version = 1; + config.schema = schema_v1; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v1(); + config.sync_config->client_resync_mode = ClientResyncMode::Recover; + config.sync_config->on_sync_client_event_hook = [](std::weak_ptr weak_session, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::ErrorMessageReceived) { + return SyncClientHookAction::NoAction; + } + auto session = weak_session.lock(); + REQUIRE(session); + + auto error_code = sync::ProtocolError(data.error_info->raw_error_code); + if (error_code != sync::ProtocolError::schema_version_changed) { + return SyncClientHookAction::NoAction; + } + // Close the session once the first migration is requested by the server. + session->force_close(); + return SyncClientHookAction::NoAction; + }; + + // Migration to v1 is interrupted. + auto [realm, error] = async_open_realm(config); + REQUIRE_FALSE(realm); + REQUIRE(error); + + // Migrate to v2. + config.schema_version = 2; + config.schema = schema_v2; + config.sync_config->subscription_initializer = get_subscription_initializer_callback_for_schema_v2(); + config.sync_config->on_sync_client_event_hook = nullptr; + std::tie(realm, error) = async_open_realm(config); + REQUIRE(realm); + REQUIRE_FALSE(error); + check_realm_schema(realm, schema_v2, 2); + + auto table = realm->read_group().get_table("class_TopLevel"); + CHECK(table->size() == 1); + table = realm->read_group().get_table("class_TopLevel3"); + CHECK(table->is_empty()); +} + +} // namespace realm::app + +#endif // REALM_ENABLE_AUTH_TESTS +#endif // REALM_ENABLE_SYNC \ No newline at end of file diff --git a/test/object-store/util/sync/baas_admin_api.cpp b/test/object-store/util/sync/baas_admin_api.cpp index 0b8a0074bfb..1cf1896e269 100644 --- a/test/object-store/util/sync/baas_admin_api.cpp +++ b/test/object-store/util/sync/baas_admin_api.cpp @@ -349,10 +349,10 @@ AdminAPIEndpoint AdminAPIEndpoint::operator[](StringData name) const app::Response AdminAPIEndpoint::do_request(app::Request request) const { if (request.url.find('?') == std::string::npos) { - request.url = util::format("%1?bypass_service_change=DestructiveSyncProtocolVersionIncrease", request.url); + request.url = util::format("%1?bypass_service_change=SyncSchemaVersionIncrease", request.url); } else { - request.url = util::format("%1&bypass_service_change=DestructiveSyncProtocolVersionIncrease", request.url); + request.url = util::format("%1&bypass_service_change=SyncSchemaVersionIncrease", request.url); } request.headers["Content-Type"] = "application/json;charset=utf-8"; request.headers["Accept"] = "application/json"; @@ -592,6 +592,79 @@ void AdminAPISession::migrate_to_flx(const std::string& app_id, const std::strin endpoint.put_json(nlohmann::json{{"serviceId", service_id}, {"action", migrate_to_flx ? "start" : "rollback"}}); } +// Each breaking change bumps the schema version, so you can create a new version for each breaking change if +// 'use_draft' is false. Set 'use_draft' to true if you want all changes to the schema to be deployed at once +// resulting in only one schema version. +void AdminAPISession::create_schema(const std::string& app_id, const AppCreateConfig& config, bool use_draft) const +{ + static const std::string mongo_service_name = "BackingDB"; + + auto drafts = apps()[app_id]["drafts"]; + std::string draft_id; + if (use_draft) { + auto draft_create_resp = drafts.post_json({}); + draft_id = draft_create_resp["_id"]; + } + + auto schemas = apps()[app_id]["schemas"]; + auto current_schema = schemas.get_json(); + auto target_schema = config.schema; + + std::unordered_map current_schema_tables; + for (const auto& schema : current_schema) { + current_schema_tables[schema["metadata"]["collection"]] = schema["_id"]; + } + + // Add new tables + + auto pk_and_queryable_only = [&](const Property& prop) { + if (config.flx_sync_config) { + const auto& queryable_fields = config.flx_sync_config->queryable_fields; + + if (std::find(queryable_fields.begin(), queryable_fields.end(), prop.name) != queryable_fields.end()) { + return true; + } + } + return prop.name == "_id" || prop.name == config.partition_key.name; + }; + + // Create the schemas in two passes: first populate just the primary key and + // partition key, then add the rest of the properties. This ensures that the + // targets of links exist before adding the links. + std::vector> object_schema_to_create; + BaasRuleBuilder rule_builder(target_schema, config.partition_key, mongo_service_name, config.mongo_dbname, + static_cast(config.flx_sync_config)); + for (const auto& obj_schema : target_schema) { + auto it = current_schema_tables.find(obj_schema.name); + if (it != current_schema_tables.end()) { + object_schema_to_create.push_back({it->second, &obj_schema}); + continue; + } + + auto schema_to_create = rule_builder.object_schema_to_baas_schema(obj_schema, pk_and_queryable_only); + auto schema_create_resp = schemas.post_json(schema_to_create); + object_schema_to_create.push_back({schema_create_resp["_id"], &obj_schema}); + } + + // Update existing tables (including the ones just created) + for (const auto& [id, obj_schema] : object_schema_to_create) { + auto schema_to_create = rule_builder.object_schema_to_baas_schema(*obj_schema, nullptr); + schema_to_create["_id"] = id; + schemas[id].put_json(schema_to_create); + } + + // Delete removed tables + for (const auto& table : current_schema_tables) { + if (target_schema.find(table.first) == target_schema.end()) { + schemas[table.second].del(); + } + } + + if (use_draft) { + drafts[draft_id]["deployment"].post_json({}); + } +} + static nlohmann::json convert_config(AdminAPISession::ServiceConfig config) { if (config.mode == AdminAPISession::ServiceConfig::SyncMode::Flexible) { @@ -1058,30 +1131,6 @@ AppSession create_app(const AppCreateConfig& config) auto create_mongo_service_resp = services.post_json(std::move(mongo_service_def)); std::string mongo_service_id = create_mongo_service_resp["_id"]; - auto schemas = app["schemas"]; - - auto pk_and_queryable_only = [&](const Property& prop) { - if (config.flx_sync_config) { - const auto& queryable_fields = config.flx_sync_config->queryable_fields; - - if (std::find(queryable_fields.begin(), queryable_fields.end(), prop.name) != queryable_fields.end()) { - return true; - } - } - return prop.name == "_id" || prop.name == config.partition_key.name; - }; - - // Create the schemas in two passes: first populate just the primary key and - // partition key, then add the rest of the properties. This ensures that the - // targets of links exist before adding the links. - std::vector> object_schema_to_create; - BaasRuleBuilder rule_builder(config.schema, config.partition_key, mongo_service_name, config.mongo_dbname, - static_cast(config.flx_sync_config)); - for (const auto& obj_schema : config.schema) { - auto schema_to_create = rule_builder.object_schema_to_baas_schema(obj_schema, pk_and_queryable_only); - auto schema_create_resp = schemas.post_json(schema_to_create); - object_schema_to_create.push_back({schema_create_resp["_id"], &obj_schema}); - } auto default_rule = services[mongo_service_id]["default_rule"]; auto service_roles = nlohmann::json::array(); @@ -1120,11 +1169,9 @@ AppSession create_app(const AppCreateConfig& config) default_rule.post_json({{"roles", service_roles}}); - for (const auto& [id, obj_schema] : object_schema_to_create) { - auto schema_to_create = rule_builder.object_schema_to_baas_schema(*obj_schema, nullptr); - schema_to_create["_id"] = id; - schemas[id].put_json(schema_to_create); - } + // No need for a draft because there are no breaking changes in the initial schema when the app is created. + bool use_draft = false; + session.create_schema(app_id, config, use_draft); // For PBS, enable sync after schema is created. if (!config.flx_sync_config) { diff --git a/test/object-store/util/sync/baas_admin_api.hpp b/test/object-store/util/sync/baas_admin_api.hpp index fcf3d5e3ab4..d083193e40a 100644 --- a/test/object-store/util/sync/baas_admin_api.hpp +++ b/test/object-store/util/sync/baas_admin_api.hpp @@ -83,6 +83,7 @@ class AdminAPISession { void delete_app(const std::string& app_id) const; void trigger_client_reset(const std::string& app_id, int64_t file_ident) const; void migrate_to_flx(const std::string& app_id, const std::string& service_id, bool migrate_to_flx) const; + void create_schema(const std::string& app_id, const AppCreateConfig& config, bool use_draft = true) const; struct Service { std::string id; diff --git a/test/object-store/util/sync/flx_sync_harness.hpp b/test/object-store/util/sync/flx_sync_harness.hpp index 5c4b0701268..7d08c861616 100644 --- a/test/object-store/util/sync/flx_sync_harness.hpp +++ b/test/object-store/util/sync/flx_sync_harness.hpp @@ -49,8 +49,8 @@ class FLXSyncTestHarness { return ServerSchema{std::move(schema), {"queryable_str_field", "queryable_int_field"}}; } - static AppSession make_app_from_server_schema(const std::string& test_name, - const FLXSyncTestHarness::ServerSchema& server_schema) + static AppCreateConfig make_config_from_server_schema(const std::string& test_name, + const FLXSyncTestHarness::ServerSchema& server_schema) { auto server_app_config = minimal_app_config(test_name, server_schema.schema); server_app_config.dev_mode_enabled = server_schema.dev_mode_enabled; @@ -60,7 +60,13 @@ class FLXSyncTestHarness { server_app_config.flx_sync_config = std::move(flx_config); server_app_config.service_roles = server_schema.service_roles; - return create_app(server_app_config); + return server_app_config; + } + + static AppSession make_app_from_server_schema(const std::string& test_name, + const FLXSyncTestHarness::ServerSchema& server_schema) + { + return create_app(make_config_from_server_schema(test_name, server_schema)); } struct Config { diff --git a/test/object-store/util/test_file.cpp b/test/object-store/util/test_file.cpp index 40ee67f286a..694e5b044c8 100644 --- a/test/object-store/util/test_file.cpp +++ b/test/object-store/util/test_file.cpp @@ -181,7 +181,7 @@ SyncTestFile::SyncTestFile(std::shared_ptr user, realm::Schema error.status, session->path()); abort(); }; - schema_version = 1; + schema_version = 0; schema = _schema; schema_mode = SchemaMode::AdditiveExplicit; }