From 5a1a7b69db2f8f6f0ca4a96542350abefba75008 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Wed, 31 Jul 2024 11:53:07 -0700 Subject: [PATCH] Only track pending client resets done by the same core version If the previous attempt at performing a client reset was done with a different core version then we should retry the client reset as the new version may have fixed a bug that made the previous attempt fail (or may be a downgrade to a version before when the bug was introduced). This also simplifies the tracking as it means that we don't need to be able to read trackers created by different versions. This also means that we can freely change the schema of the table, which this takes advantage of to drop the unused primary key and make the error required, as we never actually stored null and the code reading it would have crashed if it encountered a null error. --- src/realm/sync/CMakeLists.txt | 1 + src/realm/sync/client.cpp | 6 +- src/realm/sync/noinst/client_reset.cpp | 15 +- src/realm/sync/noinst/client_reset.hpp | 4 - src/realm/sync/noinst/migration_store.cpp | 4 +- .../sync/noinst/pending_bootstrap_store.cpp | 4 +- src/realm/sync/noinst/pending_reset_store.cpp | 189 +++++---------- src/realm/sync/noinst/pending_reset_store.hpp | 20 +- .../sync/noinst/sync_metadata_schema.cpp | 64 +++--- .../sync/noinst/sync_metadata_schema.hpp | 7 +- src/realm/sync/subscriptions.cpp | 4 +- test/object-store/sync/client_reset.cpp | 6 +- test/test_client_reset.cpp | 216 ++++++++---------- test/test_sync_subscriptions.cpp | 2 +- 14 files changed, 209 insertions(+), 333 deletions(-) diff --git a/src/realm/sync/CMakeLists.txt b/src/realm/sync/CMakeLists.txt index 19f49857bea..8896f15b50f 100644 --- a/src/realm/sync/CMakeLists.txt +++ b/src/realm/sync/CMakeLists.txt @@ -76,6 +76,7 @@ set(NOINST_HEADERS noinst/integer_codec.hpp noinst/migration_store.hpp noinst/pending_bootstrap_store.hpp + noinst/pending_reset_store.hpp noinst/protocol_codec.hpp noinst/root_certs.hpp noinst/sync_metadata_schema.hpp diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 7e6f7afb45f..214a938969c 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1655,7 +1655,7 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement() { REALM_ASSERT(!m_finalized); - auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen()); + auto has_pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen()); if (!has_pending_reset) { return; // nothing to do } @@ -1678,7 +1678,7 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement() logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset); auto tr = self->m_db->start_write(); - auto cur_pending_reset = PendingResetStore::has_pending_reset(tr); + auto cur_pending_reset = PendingResetStore::has_pending_reset(*tr); if (!cur_pending_reset) { logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed."); return; @@ -1689,7 +1689,7 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement() else { logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset); } - PendingResetStore::clear_pending_reset(tr); + PendingResetStore::clear_pending_reset(*tr); tr->commit(); }); } diff --git a/src/realm/sync/noinst/client_reset.cpp b/src/realm/sync/noinst/client_reset.cpp index 8c00fed2d48..6a9e875e345 100644 --- a/src/realm/sync/noinst/client_reset.cpp +++ b/src/realm/sync/noinst/client_reset.cpp @@ -410,18 +410,17 @@ void transfer_group(const Transaction& group_src, Transaction& group_dst, util:: } } -ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResyncMode mode, - PendingReset::Action action, const std::optional& error, - util::Logger& logger) +static ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResyncMode mode, + PendingReset::Action action, const Status& error, util::Logger& logger) { - if (auto previous_reset = sync::PendingResetStore::has_pending_reset(wt_local)) { + if (auto previous_reset = sync::PendingResetStore::has_pending_reset(*wt_local)) { logger.info(util::LogCategory::reset, "Found a previous %1", *previous_reset); if (action != previous_reset->action) { // IF a different client reset is being performed, cler the pending client reset and start over. logger.info(util::LogCategory::reset, "New '%1' client reset of type: '%2' is incompatible - clearing previous reset", action, mode); - sync::PendingResetStore::clear_pending_reset(wt_local); + sync::PendingResetStore::clear_pending_reset(*wt_local); } else { switch (previous_reset->mode) { @@ -444,10 +443,10 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy util::LogCategory::reset, "A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal", previous_reset->mode, previous_reset->time, mode); - sync::PendingResetStore::clear_pending_reset(wt_local); + sync::PendingResetStore::clear_pending_reset(*wt_local); break; case ClientResyncMode::DiscardLocal: - sync::PendingResetStore::clear_pending_reset(wt_local); + sync::PendingResetStore::clear_pending_reset(*wt_local); // previous mode Recover and this mode is Discard, this is not a cycle yet break; case ClientResyncMode::Manual: @@ -473,7 +472,7 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy mode = ClientResyncMode::DiscardLocal; } } - sync::PendingResetStore::track_reset(wt_local, mode, action, error); + sync::PendingResetStore::track_reset(*wt_local, mode, action, error); // Ensure we save the tracker object even if we encounter an error and roll // back the client reset later wt_local->commit_and_continue_writing(); diff --git a/src/realm/sync/noinst/client_reset.hpp b/src/realm/sync/noinst/client_reset.hpp index 58cb0f9815d..d28714bb21f 100644 --- a/src/realm/sync/noinst/client_reset.hpp +++ b/src/realm/sync/noinst/client_reset.hpp @@ -62,10 +62,6 @@ namespace _impl::client_reset { void transfer_group(const Transaction& tr_src, Transaction& tr_dst, util::Logger& logger, bool allow_schema_additions); -ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResyncMode mode, - sync::ProtocolErrorInfo::Action action, const std::optional& error, - util::Logger& logger); - // preform_client_reset_diff() takes the Realm performs a client reset on // the Realm in 'path_local' given the Realm 'path_fresh' as the source of truth. // If the fresh path is not provided, discard mode is assumed and all data in the local diff --git a/src/realm/sync/noinst/migration_store.cpp b/src/realm/sync/noinst/migration_store.cpp index db27e510141..6f8e3d10d99 100644 --- a/src/realm/sync/noinst/migration_store.cpp +++ b/src/realm/sync/noinst/migration_store.cpp @@ -60,7 +60,7 @@ bool MigrationStore::load_data(bool read_only) throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion, "Invalid schema version for flexible sync migration store metadata"); } - load_sync_metadata_schema(tr, &internal_tables); + load_sync_metadata_schema(*tr, &internal_tables); } else { if (read_only) { @@ -72,7 +72,7 @@ bool MigrationStore::load_data(bool read_only) SyncMetadataSchemaVersions schema_versions(tr); // Create the metadata schema and set the version (in the same commit) schema_versions.set_version_for(tr, internal_schema_groups::c_flx_migration_store, c_schema_version); - create_sync_metadata_schema(tr, &internal_tables); + create_sync_metadata_schema(*tr, &internal_tables); tr->commit_and_continue_as_read(); } REALM_ASSERT(m_migration_table); diff --git a/src/realm/sync/noinst/pending_bootstrap_store.cpp b/src/realm/sync/noinst/pending_bootstrap_store.cpp index 10fcfed55d2..187de9b58fc 100644 --- a/src/realm/sync/noinst/pending_bootstrap_store.cpp +++ b/src/realm/sync/noinst/pending_bootstrap_store.cpp @@ -109,7 +109,7 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger, throw RuntimeError(ErrorCodes::SchemaVersionMismatch, "Invalid schema version for FLX sync pending bootstrap table group"); } - load_sync_metadata_schema(tr, &internal_tables); + load_sync_metadata_schema(*tr, &internal_tables); } else { tr->promote_to_write(); @@ -117,7 +117,7 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger, SyncMetadataSchemaVersions schema_versions(tr); // Create the metadata schema and set the version (in the same commit) schema_versions.set_version_for(tr, internal_schema_groups::c_pending_bootstraps, c_schema_version); - create_sync_metadata_schema(tr, &internal_tables); + create_sync_metadata_schema(*tr, &internal_tables); tr->commit_and_continue_as_read(); } REALM_ASSERT(m_table); diff --git a/src/realm/sync/noinst/pending_reset_store.cpp b/src/realm/sync/noinst/pending_reset_store.cpp index 266dbb36d39..7afa7083713 100644 --- a/src/realm/sync/noinst/pending_reset_store.cpp +++ b/src/realm/sync/noinst/pending_reset_store.cpp @@ -26,7 +26,6 @@ using namespace realm; using namespace _impl; -using namespace sync; namespace realm::sync { @@ -41,9 +40,7 @@ std::ostream& operator<<(std::ostream& os, const sync::PendingReset& pr) else { os << "pending client reset of type: '" << pr.mode << "' at: " << pr.time; } - if (pr.error) { - os << " for error: " << *pr.error; - } + os << " for error: " << pr.error; return os; } @@ -59,196 +56,112 @@ bool operator==(const sync::PendingReset& lhs, const PendingReset::Action& actio // A table without a "class_" prefix will not generate sync instructions. constexpr static std::string_view s_meta_reset_table_name("client_reset_metadata"); -constexpr static std::string_view s_pk_col_name("id"); -constexpr static std::string_view s_timestamp_col_name("reset_time"); -constexpr static std::string_view s_reset_recovery_mode_col_name("reset_mode"); -constexpr static std::string_view s_reset_action_col_name("reset_action"); -constexpr static std::string_view s_reset_error_code_col_name("reset_error_code"); -constexpr static std::string_view s_reset_error_msg_col_name("reset_error_msg"); -constexpr int64_t s_pending_reset_version = 2; - -void PendingResetStore::clear_pending_reset(const TransactionRef& wr_tr) +constexpr static std::string_view s_version_col_name("core_version"); +constexpr static std::string_view s_timestamp_col_name("time"); +constexpr static std::string_view s_reset_recovery_mode_col_name("mode"); +constexpr static std::string_view s_reset_action_col_name("action"); +constexpr static std::string_view s_reset_error_code_col_name("error_code"); +constexpr static std::string_view s_reset_error_msg_col_name("error_msg"); + +void PendingResetStore::clear_pending_reset(Group& group) { - // Write transaction required - REALM_ASSERT(wr_tr->get_transact_stage() == DB::TransactStage::transact_Writing); - auto reset_store = PendingResetStore::load_or_create_schema(wr_tr); - REALM_ASSERT(reset_store.m_pending_reset_table); - // Ensure the pending reset table is empty - if (auto table = wr_tr->get_table(reset_store.m_pending_reset_table); table && !table->is_empty()) { + if (auto table = group.get_table(s_meta_reset_table_name); table && !table->is_empty()) { table->clear(); } - // Don't commit at the end - allow the caller to do it when they are ready } -std::optional PendingResetStore::has_pending_reset(const TransactionRef& rd_tr) +std::optional PendingResetStore::has_pending_reset(const Group& group) { - // Make sure the schema has been loaded and try to read legacy data if it's not found - auto reset_store = PendingResetStore::load_schema(rd_tr); + auto reset_store = PendingResetStore::load_schema(group); if (!reset_store) { - return PendingResetStore::read_legacy_pending_reset(rd_tr); + // Table hasn't been created yet (or has the wrong schema) + return std::nullopt; } - // Otherwise, read the pending reset entry using the schema metadata REALM_ASSERT(reset_store->m_pending_reset_table); - auto table = rd_tr->get_table(reset_store->m_pending_reset_table); + auto table = group.get_table(reset_store->m_pending_reset_table); - if (!table || table->size() == 0) { + if (!table || table->size() != 1) { return std::nullopt; } - if (table->size() > 1) { - // this may happen if a future version of this code changes the format and expectations around reset metadata. - throw ClientResetFailed( - util::format("Previous client resets detected (%1) but only one is expected.", table->size())); - } auto reset_entry = *table->begin(); + if (reset_entry.get(reset_store->m_version) != REALM_VERSION_STRING) { + // Previous pending reset was written by a different version, so ignore it + return std::nullopt; + } + PendingReset pending; pending.time = reset_entry.get(reset_store->m_timestamp); pending.mode = to_resync_mode(reset_entry.get(reset_store->m_recovery_mode)); pending.action = to_reset_action(reset_entry.get(reset_store->m_action)); auto error_code = reset_entry.get(reset_store->m_error_code); - if (error_code > 0) { + if (error_code != 0) { pending.error = Status(static_cast(error_code), reset_entry.get(reset_store->m_error_message)); } return pending; } -void PendingResetStore::track_reset(const TransactionRef& wr_tr, ClientResyncMode mode, PendingReset::Action action, - const std::optional& error) +void PendingResetStore::track_reset(Group& group, ClientResyncMode mode, PendingReset::Action action, Status error) { REALM_ASSERT(mode != ClientResyncMode::Manual); - // Write transaction required - REALM_ASSERT(wr_tr->get_transact_stage() == DB::TransactStage::transact_Writing); - if (auto table = wr_tr->get_table(s_meta_reset_table_name); table && table->size() > 0) { - // this may happen if a future version of this code changes the format and expectations around reset - // metadata. - throw ClientResetFailed( - util::format("Previous client resets detected (%1) but only one is expected.", table->size())); - } - auto reset_store = PendingResetStore::load_or_create_schema(wr_tr); + auto reset_store = PendingResetStore::load_or_create_schema(group); REALM_ASSERT(reset_store.m_pending_reset_table); - auto table = wr_tr->get_table(reset_store.m_pending_reset_table); + auto table = group.get_table(reset_store.m_pending_reset_table); REALM_ASSERT(table); - // Create the new object - auto obj = table->create_object_with_primary_key( - ObjectId::gen(), { - {reset_store.m_timestamp, Timestamp(std::chrono::system_clock::now())}, - {reset_store.m_recovery_mode, from_resync_mode(mode)}, - {reset_store.m_action, from_reset_action(action)}, - }); - // Add the error, if provided - if (error) { - obj.set(reset_store.m_error_code, static_cast(error->code())); - obj.set(reset_store.m_error_message, error->reason()); - } - // Don't commit at the end - allow the caller to do it when they are ready + table->clear(); + table->create_object(null_key, { + {reset_store.m_version, Mixed(REALM_VERSION_STRING)}, + {reset_store.m_timestamp, Timestamp(std::chrono::system_clock::now())}, + {reset_store.m_recovery_mode, from_resync_mode(mode)}, + {reset_store.m_action, from_reset_action(action)}, + {reset_store.m_error_code, static_cast(error.code())}, + {reset_store.m_error_message, error.reason()}, + }); } -PendingResetStore::PendingResetStore(const TransactionRef& rd_tr) +PendingResetStore::PendingResetStore(const Group& g) : m_internal_tables{ {&m_pending_reset_table, s_meta_reset_table_name, - {&m_id, s_pk_col_name, type_ObjectId}, { + {&m_version, s_version_col_name, type_String}, {&m_timestamp, s_timestamp_col_name, type_Timestamp}, {&m_recovery_mode, s_reset_recovery_mode_col_name, type_Int}, {&m_action, s_reset_action_col_name, type_Int}, - {&m_error_code, s_reset_error_code_col_name, type_Int, true}, - {&m_error_message, s_reset_error_msg_col_name, type_String, true}, + {&m_error_code, s_reset_error_code_col_name, type_Int}, + {&m_error_message, s_reset_error_msg_col_name, type_String}, }}, } { - // Works with read, write, and frozen transactions - SyncMetadataSchemaVersionsReader schema_versions(rd_tr); - auto schema_version = schema_versions.get_version_for(rd_tr, internal_schema_groups::c_pending_reset_store); - - // Load the metadata schema info if a schema version was found - if (schema_version) { - if (*schema_version != s_pending_reset_version) { - // Unsupported schema version - throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion, - "Found invalid schema version for existing client reset cycle tracking metadata"); - } - load_sync_metadata_schema(rd_tr, &m_internal_tables); - if (m_pending_reset_table) { - // If the schema info was read, then store the schema version - m_schema_version = schema_version; - } + if (!try_load_sync_metadata_schema(g, &m_internal_tables).is_ok()) { + m_pending_reset_table = {}; } } -std::optional PendingResetStore::load_schema(const TransactionRef& rd_tr) +std::optional PendingResetStore::load_schema(const Group& group) { - PendingResetStore reset_store(rd_tr); - if (reset_store.m_schema_version) { + if (PendingResetStore reset_store(group); reset_store.m_pending_reset_table) { return reset_store; } return std::nullopt; } -PendingResetStore PendingResetStore::load_or_create_schema(const TransactionRef& wr_tr) +PendingResetStore PendingResetStore::load_or_create_schema(Group& group) { - PendingResetStore reset_store(wr_tr); - if (reset_store.m_schema_version) { - // If the schema metadata was found, return the initialized class - return reset_store; - } - // Otherwise, set it up from scratch - Make sure the transaction is set for writing - if (wr_tr->get_transact_stage() == DB::TransactStage::transact_Reading) { - wr_tr->promote_to_write(); - } - // Ensure writing - all other transaction stages are not allowed - REALM_ASSERT_EX(wr_tr->get_transact_stage() == DB::TransactStage::transact_Writing, wr_tr->get_transact_stage()); + PendingResetStore reset_store(group); + if (!reset_store.m_pending_reset_table) { + // If the table exists but has the wrong schema just drop it + if (group.has_table(s_meta_reset_table_name)) { + group.remove_table(s_meta_reset_table_name); + } - // Drop the old table and any stale pending resets - if (wr_tr->has_table(s_meta_reset_table_name)) { - wr_tr->remove_table(s_meta_reset_table_name); + // Create the table with the correct schema + create_sync_metadata_schema(group, &reset_store.m_internal_tables); } - - // Ensure the schema versions table is initialized (may add its own commit) - SyncMetadataSchemaVersions schema_versions(wr_tr); - // Create the metadata schema and set the version (in the same commit) - schema_versions.set_version_for(wr_tr, internal_schema_groups::c_pending_reset_store, s_pending_reset_version); - create_sync_metadata_schema(wr_tr, &reset_store.m_internal_tables); - REALM_ASSERT(reset_store.m_pending_reset_table); - reset_store.m_schema_version = s_pending_reset_version; - - // Don't commit yet return reset_store; } -std::optional PendingResetStore::read_legacy_pending_reset(const TransactionRef& rd_tr) -{ - // Try to read the pending reset info from v1 of the schema - constexpr static std::string_view s_v1_version_column_name("version"); - constexpr static std::string_view s_v1_timestamp_col_name("event_time"); - constexpr static std::string_view s_v1_reset_mode_col_name("type_of_reset"); - - // Check for pending reset v1 - does not use schema version - TableRef table = rd_tr->get_table(s_meta_reset_table_name); - if (table && table->size() > 0) { - ColKey version_col = table->get_column_key(s_v1_version_column_name); - ColKey timestamp_col = table->get_column_key(s_v1_timestamp_col_name); - ColKey mode_col = table->get_column_key(s_v1_reset_mode_col_name); - Obj reset_entry = *table->begin(); - - if (version_col && reset_entry.get(version_col) == 1LL) { - REALM_ASSERT(timestamp_col); - REALM_ASSERT(mode_col); - PendingReset pending; - pending.time = reset_entry.get(timestamp_col); - pending.mode = to_resync_mode(reset_entry.get(mode_col)); - // Create a fake action depending on the resync mode - pending.action = pending.mode == ClientResyncMode::DiscardLocal - ? sync::ProtocolErrorInfo::Action::ClientResetNoRecovery - : sync::ProtocolErrorInfo::Action::ClientReset; - return pending; - } - } - // Add checking for future schema versions here - return std::nullopt; -} - int64_t PendingResetStore::from_reset_action(PendingReset::Action action) { switch (action) { diff --git a/src/realm/sync/noinst/pending_reset_store.hpp b/src/realm/sync/noinst/pending_reset_store.hpp index a6e0878d8b7..b8e06ab84b9 100644 --- a/src/realm/sync/noinst/pending_reset_store.hpp +++ b/src/realm/sync/noinst/pending_reset_store.hpp @@ -37,7 +37,7 @@ struct PendingReset { Timestamp time; ClientResyncMode mode; Action action = Action::NoAction; - std::optional error; + Status error = Status::OK(); }; std::ostream& operator<<(std::ostream& os, const sync::PendingReset& pr); @@ -49,12 +49,11 @@ class PendingResetStore { // Store the pending reset tracking information - it is an error if the tracking info already // exists in the store // Requires a writable transaction and changes must be committed manually - static void track_reset(const TransactionRef& wr_tr, ClientResyncMode mode, PendingReset::Action action, - const std::optional& error = std::nullopt); + static void track_reset(Group& group, ClientResyncMode mode, PendingReset::Action action, Status error); // Clear the pending reset tracking information, if it exists // Requires a writable transaction and changes must be committed manually - static void clear_pending_reset(const TransactionRef& wr_tr); - static std::optional has_pending_reset(const TransactionRef& rd_tr); + static void clear_pending_reset(Group& group); + static std::optional has_pending_reset(const Group& group); static int64_t from_reset_action(PendingReset::Action action); static PendingReset::Action to_reset_action(int64_t action); @@ -63,27 +62,22 @@ class PendingResetStore { private: // The instantiated class is only used internally - PendingResetStore(const TransactionRef& rd_tr); + PendingResetStore(const Group& group); std::vector m_internal_tables; TableKey m_pending_reset_table; - ColKey m_id; ColKey m_version; ColKey m_timestamp; ColKey m_recovery_mode; ColKey m_action; ColKey m_error_code; ColKey m_error_message; - std::optional m_schema_version = std::nullopt; // Returns true if the schema was loaded - static std::optional load_schema(const TransactionRef& rd_tr); + static std::optional load_schema(const Group& group); // Loads the schema or creates it if it doesn't exist // Requires a writable transaction and changes must be committed manually - static PendingResetStore load_or_create_schema(const TransactionRef& wr_tr); - - // Try to read the pending reset info from v1 of the schema - static std::optional read_legacy_pending_reset(const TransactionRef& rd_tr); + static PendingResetStore load_or_create_schema(Group& group); }; } // namespace realm::sync diff --git a/src/realm/sync/noinst/sync_metadata_schema.cpp b/src/realm/sync/noinst/sync_metadata_schema.cpp index 4b5f2c76617..90b101041bb 100644 --- a/src/realm/sync/noinst/sync_metadata_schema.cpp +++ b/src/realm/sync/noinst/sync_metadata_schema.cpp @@ -33,26 +33,26 @@ constexpr static std::string_view c_meta_schema_schema_group_field("schema_group } // namespace -void create_sync_metadata_schema(const TransactionRef& tr, std::vector* tables) +void create_sync_metadata_schema(Group& g, std::vector* tables) { util::FlatMap found_tables; for (auto& table : *tables) { - if (tr->has_table(table.name)) { + if (g.has_table(table.name)) { throw RuntimeError( ErrorCodes::RuntimeError, util::format("table %1 already existed when creating internal tables for sync", table.name)); } TableRef table_ref; if (table.is_embedded) { - table_ref = tr->add_table(table.name, Table::Type::Embedded); + table_ref = g.add_table(table.name, Table::Type::Embedded); } else if (table.pk_info) { - table_ref = tr->add_table_with_primary_key(table.name, table.pk_info->data_type, table.pk_info->name, - table.pk_info->is_optional); + table_ref = g.add_table_with_primary_key(table.name, table.pk_info->data_type, table.pk_info->name, + table.pk_info->is_optional); *table.pk_info->key_out = table_ref->get_primary_key_column(); } else { - table_ref = tr->add_table(table.name); + table_ref = g.add_table(table.name); } found_tables.insert({table.name, table_ref}); @@ -83,34 +83,41 @@ void create_sync_metadata_schema(const TransactionRef& tr, std::vector* tables) +void load_sync_metadata_schema(const Group& g, std::vector* tables) +{ + if (auto status = try_load_sync_metadata_schema(g, tables); !status.is_ok()) { + throw Exception(std::move(status)); + } +} + +Status try_load_sync_metadata_schema(const Group& g, std::vector* tables) { for (auto& table : *tables) { - auto table_ref = tr->get_table(table.name); + auto table_ref = g.get_table(table.name); if (!table_ref) { - throw RuntimeError(ErrorCodes::RuntimeError, - util::format("could not find internal sync table %1", table.name)); + return Status(ErrorCodes::RuntimeError, + util::format("could not find internal sync table %1", table.name)); } *table.key_out = table_ref->get_key(); if (table.pk_info) { auto pk_col = table_ref->get_primary_key_column(); if (auto pk_name = table_ref->get_column_name(pk_col); pk_name != table.pk_info->name) { - throw RuntimeError( + return Status( ErrorCodes::RuntimeError, util::format( "primary key name of sync internal table %1 does not match (stored: %2, defined: %3)", table.name, pk_name, table.pk_info->name)); } if (auto pk_type = table_ref->get_column_type(pk_col); pk_type != table.pk_info->data_type) { - throw RuntimeError( + return Status( ErrorCodes::RuntimeError, util::format( "primary key type of sync internal table %1 does not match (stored: %2, defined: %3)", table.name, pk_type, table.pk_info->data_type)); } if (auto is_nullable = table_ref->is_nullable(pk_col); is_nullable != table.pk_info->is_optional) { - throw RuntimeError( + return Status( ErrorCodes::RuntimeError, util::format( "primary key nullabilty of sync internal table %1 does not match (stored: %2, defined: %3)", @@ -119,12 +126,12 @@ void load_sync_metadata_schema(const TransactionRef& tr, std::vectorkey_out = pk_col; } else if (table.is_embedded && !table_ref->is_embedded()) { - throw RuntimeError(ErrorCodes::RuntimeError, - util::format("internal sync table %1 should be embedded, but is not", table.name)); + return Status(ErrorCodes::RuntimeError, + util::format("internal sync table %1 should be embedded, but is not", table.name)); } if (table.columns.size() + size_t(table.pk_info ? 1 : 0) != table_ref->get_column_count()) { - throw RuntimeError( + return Status( ErrorCodes::RuntimeError, util::format("sync internal table %1 has a different number of columns than its schema", table.name)); } @@ -132,20 +139,19 @@ void load_sync_metadata_schema(const TransactionRef& tr, std::vectorget_column_key(col.name); if (!col_key) { - throw RuntimeError( - ErrorCodes::RuntimeError, - util::format("column %1 is missing in sync internal table %2", col.name, table.name)); + return Status(ErrorCodes::RuntimeError, + util::format("column %1 is missing in sync internal table %2", col.name, table.name)); } auto found_col_type = table_ref->get_column_type(col_key); if (found_col_type != col.data_type) { - throw RuntimeError( + return Status( ErrorCodes::RuntimeError, util::format("column %1 in sync internal table %2 is the wrong type", col.name, table.name)); } if (col.is_optional != table_ref->is_nullable(col_key)) { - throw RuntimeError( + return Status( ErrorCodes::RuntimeError, util::format("column %1 in sync internal table %2 has different nullabilty than in its schema", col.name, table.name)); @@ -153,14 +159,16 @@ void load_sync_metadata_schema(const TransactionRef& tr, std::vectorget_link_target(col_key)->get_name() != col.target_table) { - RuntimeError(ErrorCodes::RuntimeError, - util::format("column %1 in sync internal table %2 links to the wrong table %3", - col.name, table.name, table_ref->get_link_target(col_key)->get_name())); + return Status(ErrorCodes::RuntimeError, + util::format("column %1 in sync internal table %2 links to the wrong table %3", + col.name, table.name, + table_ref->get_link_target(col_key)->get_name())); } } *col.key_out = col_key; } } + return Status::OK(); } SyncMetadataSchemaVersionsReader::SyncMetadataSchemaVersionsReader(const TransactionRef& tr) @@ -181,7 +189,7 @@ SyncMetadataSchemaVersionsReader::SyncMetadataSchemaVersionsReader(const Transac if (tr->has_table(c_sync_internal_schemas_table)) { // Load m_table with the table/schema information - load_sync_metadata_schema(tr, &unified_schema_version_table_def); + load_sync_metadata_schema(*tr, &unified_schema_version_table_def); } } @@ -197,7 +205,7 @@ std::optional SyncMetadataSchemaVersionsReader::get_legacy_version(cons {&legacy_table_key, c_flx_metadata_table, {{&legacy_version_key, c_meta_schema_version_field, type_Int}}}}; // Convert the legacy table to the regular schema versions table if it exists - load_sync_metadata_schema(tr, &legacy_table_def); + load_sync_metadata_schema(*tr, &legacy_table_def); if (auto legacy_meta_table = tr->get_table(legacy_table_key); legacy_meta_table && legacy_meta_table->size() > 0) { @@ -254,14 +262,14 @@ SyncMetadataSchemaVersions::SyncMetadataSchemaVersions(const TransactionRef& tr) // table should have already been initialized or needs to be created, // but re-initialize in case it isn't (e.g. both unified and legacy tables exist in DB) if (REALM_UNLIKELY(tr->has_table(c_sync_internal_schemas_table))) { - load_sync_metadata_schema(tr, &unified_schema_version_table_def); + load_sync_metadata_schema(*tr, &unified_schema_version_table_def); } else { // Only write the versions table if it doesn't exist if (tr->get_transact_stage() != DB::transact_Writing) { tr->promote_to_write(); } - create_sync_metadata_schema(tr, &unified_schema_version_table_def); + create_sync_metadata_schema(*tr, &unified_schema_version_table_def); modified = true; } } diff --git a/src/realm/sync/noinst/sync_metadata_schema.hpp b/src/realm/sync/noinst/sync_metadata_schema.hpp index fcbaa026b99..35f7d547231 100644 --- a/src/realm/sync/noinst/sync_metadata_schema.hpp +++ b/src/realm/sync/noinst/sync_metadata_schema.hpp @@ -27,6 +27,8 @@ #include namespace realm { +class Group; +class Status; class Transaction; using TransactionRef = std::shared_ptr; } // namespace realm @@ -121,8 +123,9 @@ struct SyncMetadataTable { }; -void create_sync_metadata_schema(const TransactionRef& tr, std::vector* tables); -void load_sync_metadata_schema(const TransactionRef& tr, std::vector* tables); +void create_sync_metadata_schema(Group& g, std::vector* tables); +void load_sync_metadata_schema(const Group& g, std::vector* tables); +Status try_load_sync_metadata_schema(const Group& g, std::vector* tables); class SyncMetadataSchemaVersionsReader { public: diff --git a/src/realm/sync/subscriptions.cpp b/src/realm/sync/subscriptions.cpp index 47682f288a5..35f2901557f 100644 --- a/src/realm/sync/subscriptions.cpp +++ b/src/realm/sync/subscriptions.cpp @@ -666,7 +666,7 @@ SubscriptionStore::SubscriptionStore(Private, DBRef db) throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion, "Invalid schema version for flexible sync metadata"); } - load_sync_metadata_schema(tr, &internal_tables); + load_sync_metadata_schema(*tr, &internal_tables); } else { tr->promote_to_write(); @@ -674,7 +674,7 @@ SubscriptionStore::SubscriptionStore(Private, DBRef db) SyncMetadataSchemaVersions schema_versions(tr); // Create the metadata schema and set the version (in the same commit) schema_versions.set_version_for(tr, internal_schema_groups::c_flx_subscription_store, c_flx_schema_version); - create_sync_metadata_schema(tr, &internal_tables); + create_sync_metadata_schema(*tr, &internal_tables); tr->commit_and_continue_as_read(); } REALM_ASSERT(m_sub_set_table); diff --git a/test/object-store/sync/client_reset.cpp b/test/object-store/sync/client_reset.cpp index 05352002d39..b1eda09ff7d 100644 --- a/test/object-store/sync/client_reset.cpp +++ b/test/object-store/sync/client_reset.cpp @@ -1683,7 +1683,7 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { auto has_reset_cycle_flag = [](SharedRealm realm) -> util::Optional { auto db = TestHelper::get_db(realm); auto rd_tr = db->start_frozen(); - return sync::PendingResetStore::has_pending_reset(rd_tr); + return sync::PendingResetStore::has_pending_reset(*rd_tr); }; auto logger = util::Logger::get_default_logger(); ThreadSafeSyncError err; @@ -1697,8 +1697,8 @@ TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") { local_config.sync_config->notify_before_client_reset = [mode, action](SharedRealm realm) { auto db = TestHelper::get_db(realm); auto wr_tr = db->start_write(); - sync::PendingResetStore::track_reset( - wr_tr, mode, action, {{ErrorCodes::SyncClientResetRequired, "Bad client file ident"}}); + sync::PendingResetStore::track_reset(*wr_tr, mode, action, + {ErrorCodes::SyncClientResetRequired, "Bad client file ident"}); wr_tr->commit(); }; }; diff --git a/test/test_client_reset.cpp b/test/test_client_reset.cpp index d99c365c028..fc297d68f56 100644 --- a/test/test_client_reset.cpp +++ b/test/test_client_reset.cpp @@ -780,10 +780,9 @@ TEST(ClientReset_DoNotRecoverSchema) CHECK(!compare_groups(rt_1, rt_2)); const Group& group = rt_1.get_group(); - CHECK_EQUAL(group.size(), 3); + CHECK_EQUAL(group.size(), 2); CHECK(group.get_table("class_table1")); CHECK(group.get_table("client_reset_metadata")); - CHECK(group.get_table("sync_internal_schemas")); CHECK_NOT(group.get_table("class_table2")); const Group& group2 = rt_2.get_group(); CHECK_EQUAL(group2.size(), 1); @@ -854,32 +853,11 @@ TEST(ClientReset_PinnedVersion) } #endif // !REALM_MOBILE -void mark_as_synchronized(DB& db) -{ - auto& history = static_cast(db.get_replication())->get_history(); - sync::version_type current_version; - sync::SaltedFileIdent file_ident; - sync::SyncProgress progress; - history.get_status(current_version, file_ident, progress); - progress.download.last_integrated_client_version = current_version; - progress.upload.client_version = current_version; - progress.upload.last_integrated_server_version = current_version; - sync::VersionInfo info_out; - history.set_sync_progress(progress, 0, info_out); - history.set_client_file_ident({1, 0}, false); -} - void expect_reset(unit_test::TestContext& test_context, DBRef& target, DBRef& fresh, ClientResyncMode mode, SubscriptionStore* sub_store = nullptr, bool allow_recovery = true) { CHECK(target); CHECK(fresh); - // Ensure the schema is initialized before starting the test - { - auto wr_tr = target->start_write(); - PendingResetStore::clear_pending_reset(wr_tr); - wr_tr->commit(); - } auto db_version = target->get_version_of_latest_snapshot(); auto fresh_path = fresh->get_path(); @@ -943,31 +921,16 @@ void expect_reset(unit_test::TestContext& test_context, DBRef& target, DBRef& fr // sync completion to avoid reset cycles { auto tr = target->start_read(); - auto pending_reset = PendingResetStore::has_pending_reset(tr); + auto pending_reset = PendingResetStore::has_pending_reset(*tr); CHECK(pending_reset); CHECK(pending_reset->action == action); CHECK(pending_reset->mode == expected_mode); CHECK(pending_reset->error == error); tr->promote_to_write(); - PendingResetStore::clear_pending_reset(tr); + PendingResetStore::clear_pending_reset(*tr); tr->commit_and_continue_as_read(); - CHECK_NOT(PendingResetStore::has_pending_reset(tr)); - } -} - -std::pair prepare_db(const std::string& path, const std::string& copy_path, - util::FunctionRef fn) -{ - DBRef db = DB::create(make_client_replication(), path); - { - auto wt = db->start_write(); - fn(*wt); - wt->commit(); + CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); } - mark_as_synchronized(*db); - db->write_copy(copy_path, nullptr); - auto db_2 = DB::create(make_client_replication(), copy_path); - return {db, db_2}; } TEST(ClientReset_ConvertResyncMode) @@ -999,89 +962,8 @@ TEST(ClientReset_ConvertResetAction) sync::ClientResetFailed); } -DBRef setup_metadata_table_v1(test_util::unit_test::TestContext& test_context, std::string path, Timestamp ts, - int64_t type) -{ - DBRef db = DB::create(make_client_replication(), path); - auto wt = db->start_write(); - auto table = wt->add_table_with_primary_key("client_reset_metadata", type_ObjectId, "id"); - CHECK(table); - auto version_col = table->add_column(type_Int, "version"); - auto timestamp_col = table->add_column(type_Timestamp, "event_time"); - auto type_col = table->add_column(type_Int, "type_of_reset"); - wt->commit_and_continue_writing(); - auto id = ObjectId::gen(); - table->create_object_with_primary_key(id, { - {version_col, 1}, - {timestamp_col, ts}, - {type_col, type}, - }); - wt->commit_and_continue_as_read(); - table = wt->get_table("client_reset_metadata"); - size_t table_size = table->size(); - CHECK(table_size == 1); - return db; -} - -TEST_TYPES(ClientReset_V1Table, std::integral_constant, - std::integral_constant) -{ - SHARED_GROUP_TEST_PATH(path_v1); - auto timestamp = Timestamp(std::chrono::system_clock::now()); - auto reset_type = PendingResetStore::from_resync_mode(TEST_TYPE::value); - DBRef db = setup_metadata_table_v1(test_context, path_v1, timestamp, reset_type); - auto rd_tr = db->start_read(); - auto reset = PendingResetStore::has_pending_reset(rd_tr); - CHECK(reset); - CHECK(reset->time == timestamp); - CHECK(reset->mode == TEST_TYPE::value); - if (TEST_TYPE::value == ClientResyncMode::DiscardLocal) { - CHECK(reset->action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery); - } - else { - CHECK(reset->action == sync::ProtocolErrorInfo::Action::ClientReset); - } -} - -TEST(ClientReset_TrackReset_V1_EntryExists) -{ - SHARED_GROUP_TEST_PATH(path_v1); - auto timestamp = Timestamp(std::chrono::system_clock::now()); - auto reset_type = PendingResetStore::from_resync_mode(ClientResyncMode::Recover); - // Create a previous v1 entry - DBRef db = setup_metadata_table_v1(test_context, path_v1, timestamp, reset_type); - auto wr_tr = db->start_write(); - // Should throw an exception, since the table isn't empty - CHECK_THROW(PendingResetStore::track_reset(wr_tr, ClientResyncMode::DiscardLocal, - sync::ProtocolErrorInfo::Action::RevertToPBS), - sync::ClientResetFailed); -} - -TEST(ClientReset_TrackReset_Existing_empty_V1_table) -{ - SHARED_GROUP_TEST_PATH(path_v1); - auto timestamp = Timestamp(std::chrono::system_clock::now()); - auto reset_type = PendingResetStore::from_resync_mode(ClientResyncMode::Recover); - Status error{ErrorCodes::SyncClientResetRequired, "Bad client file ident"}; - DBRef db = setup_metadata_table_v1(test_context, path_v1, timestamp, reset_type); - auto wr_tr = db->start_write(); - PendingResetStore::clear_pending_reset(wr_tr); - wr_tr->commit_and_continue_writing(); - PendingResetStore::track_reset(wr_tr, ClientResyncMode::DiscardLocal, - sync::ProtocolErrorInfo::Action::RevertToPBS, error); - wr_tr->commit_and_continue_as_read(); - auto reset = PendingResetStore::has_pending_reset(wr_tr); - CHECK(reset); - CHECK(reset->mode == ClientResyncMode::DiscardLocal); - CHECK(reset->action == sync::ProtocolErrorInfo::Action::RevertToPBS); - CHECK(reset->error == error); - timestamp = Timestamp(std::chrono::system_clock::now()); - // Verify timestamp is at least close to current time - CHECK(abs(reset->time.get_seconds() - timestamp.get_seconds()) < 5); -} - TEST_TYPES( - ClientReset_TrackReset_v2, + ClientReset_TrackReset, std::integral_constant, std::integral_constant, std::integral_constant, @@ -1092,9 +974,9 @@ TEST_TYPES( Status error{ErrorCodes::SyncClientResetRequired, "Bad client file ident"}; sync::ProtocolErrorInfo::Action reset_action = TEST_TYPE::value; auto tr = db->start_write(); - PendingResetStore::track_reset(tr, ClientResyncMode::DiscardLocal, reset_action, error); + PendingResetStore::track_reset(*tr, ClientResyncMode::DiscardLocal, reset_action, error); tr->commit_and_continue_as_read(); - auto reset = PendingResetStore::has_pending_reset(tr); + auto reset = PendingResetStore::has_pending_reset(*tr); CHECK(reset); CHECK(reset->mode == ClientResyncMode::DiscardLocal); CHECK(reset->action == reset_action); @@ -1104,6 +986,86 @@ TEST_TYPES( CHECK((reset->time.get_seconds() - timestamp.get_seconds() < 5)); } +TEST(ClientReset_TrackReset_SchemaMismatches) +{ + SHARED_GROUP_TEST_PATH(test_path); + DBRef db = DB::create(make_client_replication(), test_path); + auto tr = db->start_write(); + + // Table does not exist + CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); + PendingResetStore::track_reset(*tr, ClientResyncMode::DiscardLocal, sync::ProtocolErrorInfo::Action::ClientReset, + Status::OK()); + CHECK(PendingResetStore::has_pending_reset(*tr)); + + // Table exists but has no columns + tr->remove_table("client_reset_metadata"); + tr->add_table("client_reset_metadata"); + CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); + PendingResetStore::track_reset(*tr, ClientResyncMode::DiscardLocal, sync::ProtocolErrorInfo::Action::ClientReset, + Status::OK()); + CHECK(PendingResetStore::has_pending_reset(*tr)); + + // Table has columns but is missing one + auto table = tr->get_table("client_reset_metadata"); + table->remove_column(*table->get_column_keys().begin()); + CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); + PendingResetStore::track_reset(*tr, ClientResyncMode::DiscardLocal, sync::ProtocolErrorInfo::Action::ClientReset, + Status::OK()); + CHECK(PendingResetStore::has_pending_reset(*tr)); + + // Table has too many objects + tr->get_table("client_reset_metadata")->create_object(); + CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); + PendingResetStore::track_reset(*tr, ClientResyncMode::DiscardLocal, sync::ProtocolErrorInfo::Action::ClientReset, + Status::OK()); + CHECK(PendingResetStore::has_pending_reset(*tr)); +} + +TEST(ClientReset_TrackReset_ChecksForMatchingVersion) +{ + SHARED_GROUP_TEST_PATH(test_path); + DBRef db = DB::create(make_client_replication(), test_path); + auto tr = db->start_write(); + PendingResetStore::track_reset(*tr, ClientResyncMode::DiscardLocal, sync::ProtocolErrorInfo::Action::ClientReset, + Status::OK()); + + CHECK(PendingResetStore::has_pending_reset(*tr)); + auto obj = *tr->get_table("client_reset_metadata")->begin(); + obj.set("core_version", "not a valid version"); + CHECK_NOT(PendingResetStore::has_pending_reset(*tr)); +} + +void mark_as_synchronized(DB& db) +{ + auto& history = static_cast(db.get_replication())->get_history(); + sync::version_type current_version; + sync::SaltedFileIdent file_ident; + sync::SyncProgress progress; + history.get_status(current_version, file_ident, progress); + progress.download.last_integrated_client_version = current_version; + progress.upload.client_version = current_version; + progress.upload.last_integrated_server_version = current_version; + sync::VersionInfo info_out; + history.set_sync_progress(progress, 0, info_out); + history.set_client_file_ident({1, 0}, false); +} + +std::pair prepare_db(const std::string& path, const std::string& copy_path, + util::FunctionRef fn) +{ + DBRef db = DB::create(make_client_replication(), path); + { + auto wt = db->start_write(); + fn(*wt); + wt->commit(); + } + mark_as_synchronized(*db); + db->write_copy(copy_path, nullptr); + auto db_2 = DB::create(make_client_replication(), copy_path); + return {db, db_2}; +} + TEST(ClientReset_UninitializedFile) { SHARED_GROUP_TEST_PATH(path_1); @@ -1125,7 +1087,7 @@ TEST(ClientReset_UninitializedFile) _impl::client_reset::perform_client_reset(*test_context.logger, *db_empty, std::move(cr_config), nullptr); CHECK_NOT(did_reset); auto rd_tr = db_empty->start_frozen(); - CHECK_NOT(PendingResetStore::has_pending_reset(rd_tr)); + CHECK_NOT(PendingResetStore::has_pending_reset(*rd_tr)); // Should still have closed and deleted the fresh realm CHECK_NOT(db_fresh->is_attached()); @@ -1297,7 +1259,7 @@ TEST(ClientReset_Recover_RecoveryDisabled) _impl::client_reset::perform_client_reset(*test_context.logger, *dbs.first, std::move(cr_config), nullptr), sync::ClientResetFailed); auto rd_tr = dbs.first->start_frozen(); - CHECK_NOT(PendingResetStore::has_pending_reset(rd_tr)); + CHECK_NOT(PendingResetStore::has_pending_reset(*rd_tr)); } TEST(ClientReset_Recover_ModificationsOnDeletedObject) diff --git a/test/test_sync_subscriptions.cpp b/test/test_sync_subscriptions.cpp index 99d9d831cdc..784b8d55af5 100644 --- a/test/test_sync_subscriptions.cpp +++ b/test/test_sync_subscriptions.cpp @@ -823,7 +823,7 @@ static void create_legacy_metadata_schema(DBRef db, int64_t version) std::vector legacy_table_def{ {&legacy_table_key, c_flx_metadata_table, {{&legacy_version_key, c_meta_schema_version_field, type_Int}}}}; auto tr = db->start_write(); - create_sync_metadata_schema(tr, &legacy_table_def); + create_sync_metadata_schema(*tr, &legacy_table_def); tr->commit_and_continue_writing(); auto legacy_meta_table = tr->get_table(legacy_table_key); auto legacy_object = legacy_meta_table->create_object();