Skip to content

Commit

Permalink
Handle client resets during schema migrations + integration tests (#7106
Browse files Browse the repository at this point in the history
)

* Add option to use draft deployments when creating a schema for baas tests

* Track sync schema migration between sessions + minor refactoring

* Integration tests

* Use Transaction refs instead of shared pointers
  • Loading branch information
danieltabacaru committed Jan 8, 2024
1 parent 484f247 commit 3b20104
Show file tree
Hide file tree
Showing 18 changed files with 1,198 additions and 76 deletions.
1 change: 1 addition & 0 deletions src/realm/exceptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ InvalidTableRef::~InvalidTableRef() noexcept = default;
SerializationError::~SerializationError() noexcept = default;
NotImplemented::~NotImplemented() noexcept = default;
MigrationFailed::~MigrationFailed() noexcept = default;
SyncSchemaMigrationFailed::~SyncSchemaMigrationFailed() noexcept = default;
ObjectAlreadyExists::~ObjectAlreadyExists() noexcept = default;
CrossTableLinkTarget::~CrossTableLinkTarget() noexcept = default;
SystemError::~SystemError() noexcept = default;
Expand Down
8 changes: 8 additions & 0 deletions src/realm/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ struct MigrationFailed : LogicError {
~MigrationFailed() noexcept override;
};

struct SyncSchemaMigrationFailed : LogicError {
SyncSchemaMigrationFailed(std::string_view msg)
: LogicError(ErrorCodes::SyncSchemaMigrationError, msg)
{
}
~SyncSchemaMigrationFailed() noexcept override;
};

struct ObjectAlreadyExists : RuntimeError {
template <class T, class U>
ObjectAlreadyExists(const U& object_type, T pk_val)
Expand Down
42 changes: 27 additions & 15 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <realm/object-store/sync/async_open_task.hpp>

#include <realm/sync/subscriptions.hpp>
#include <realm/sync/noinst/sync_schema_migration.hpp>
#include <realm/object-store/impl/realm_coordinator.hpp>
#include <realm/object-store/sync/sync_manager.hpp>
#include <realm/object-store/sync/sync_session.hpp>
Expand Down Expand Up @@ -61,15 +62,6 @@ void AsyncOpenTask::start(AsyncOpenCallback callback)

self->migrate_schema_or_complete(std::move(callback), coordinator, status);
});
// The callback does not extend the lifetime of the task if it's never invoked.
SyncSession::Internal::set_sync_schema_migration_callback(*session, [weak_self = weak_from_this(), this]() {
if (auto self = weak_self.lock()) {
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return;
m_sync_schema_migration_required = true;
}
});
session->revive_if_needed();
}

Expand Down Expand Up @@ -128,7 +120,13 @@ void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callb
// If the same Realm file is already opened, there is the possibility that this code may wait on a subscription
// that was not committed by init_subscription_initializer.

auto shared_realm = coordinator->get_realm(nullptr, m_db_first_open);
SharedRealm shared_realm;
try {
shared_realm = coordinator->get_realm(nullptr, m_db_first_open);
}
catch (...) {
return callback({}, std::current_exception());
}
const auto init_subscription = shared_realm->get_latest_subscription_set();
const auto sub_state = init_subscription.state();

Expand Down Expand Up @@ -181,18 +179,32 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
if (!m_session)
return;
auto session = m_session;
auto migrate_schema = m_sync_schema_migration_required;
lock.unlock();

if (!migrate_schema) {
auto pending_migration = [&] {
auto rt = coordinator->begin_read();
return _impl::sync_schema_migration::has_pending_migration(*rt);
}();

if (!pending_migration) {
wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
return;
}

// Sync schema migrations require setting a subscription initializer callback to bootstrap the data. The
// subscriptions in the current realm file may not be compatible with the new schema so cannot rely on them.
auto config = coordinator->get_config();
if (!config.sync_config->subscription_initializer) {
status = Status(ErrorCodes::SyncSchemaMigrationError,
"Sync schema migrations must provide a subscription initializer callback in the sync config");
async_open_complete(std::move(callback), coordinator, status);
return;
}

// Migrate the schema.
// * First upload the changes at the old schema version
// * Then delete the realm, reopen it, and rebootstrap at new schema version
// The lifetime of the task is extended until the bootstrap completes.
// * Then delete the realm, reopen it, and bootstrap at new schema version
// The lifetime of the task is extended until bootstrap completes.
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self,
this](Status status) mutable {
Expand Down Expand Up @@ -240,7 +252,7 @@ void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
{
auto config = coordinator->get_config();
if (config.sync_config && config.sync_config->flx_sync_requested &&
config.sync_config->subscription_initializer) {
config.sync_config->subscription_initializer && status.is_ok()) {
const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open;
attach_to_subscription_initializer(std::move(callback), coordinator, rerun_on_launch);
}
Expand Down
1 change: 0 additions & 1 deletion src/realm/object-store/sync/async_open_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
std::vector<uint64_t> m_registered_callbacks GUARDED_BY(m_mutex);
mutable util::CheckedMutex m_mutex;
const bool m_db_first_open;
bool m_sync_schema_migration_required GUARDED_BY(m_mutex) = false;
};

} // namespace realm
Expand Down
42 changes: 23 additions & 19 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <realm/sync/noinst/client_history_impl.hpp>
#include <realm/sync/noinst/client_reset_operation.hpp>
#include <realm/sync/noinst/migration_store.hpp>
#include <realm/sync/noinst/sync_schema_migration.hpp>
#include <realm/sync/protocol.hpp>

using namespace realm;
Expand Down Expand Up @@ -366,6 +367,7 @@ SyncSession::SyncSession(SyncClient& client, std::shared_ptr<DB> db, const Realm
, m_migration_store{sync::MigrationStore::create(m_db)}
, m_client(client)
, m_sync_manager(sync_manager)
, m_previous_schema_version(_impl::sync_schema_migration::has_pending_migration(*m_db->start_read()))
{
REALM_ASSERT(m_config.sync_config);
// we don't want the following configs enabled during a client reset
Expand Down Expand Up @@ -497,6 +499,10 @@ void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_re
// deep copy the sync config so we don't modify the live session's config
config.sync_config = std::make_shared<SyncConfig>(fresh_config);
config.sync_config->client_resync_mode = ClientResyncMode::Manual;
// Do not run the subscription initializer on fresh realms used in client resets.
config.sync_config->rerun_init_subscription_on_open = false;
config.sync_config->subscription_initializer = nullptr;
config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
fresh_sync_session = m_sync_manager->get_session(db, config);
auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
// the fresh Realm may apply writes to this db after it has outlived its sync session
Expand Down Expand Up @@ -703,7 +709,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
[[fallthrough]];
case ClientResyncMode::Recover:
download_fresh_realm(error.server_requests_action);
return; // do not propgate the error to the user at this point
return; // do not propagate the error to the user at this point
}
break;
case sync::ProtocolErrorInfo::Action::MigrateToFLX:
Expand Down Expand Up @@ -747,15 +753,11 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
log_out_user = true;
break;
case sync::ProtocolErrorInfo::Action::MigrateSchema:
std::function<void()> callback;
{
util::CheckedLockGuard l(m_state_mutex);
callback = std::move(m_sync_schema_migration_callback);
}
if (callback) {
callback();
}
return; // do not propgate the error to the user at this point
util::CheckedUniqueLock lock(m_state_mutex);
// Should only be received for FLX sync.
REALM_ASSERT(m_original_sync_config->flx_sync_requested);
m_previous_schema_version = error.previous_schema_version;
return; // do not propagate the error to the user at this point
}
}
else {
Expand Down Expand Up @@ -837,7 +839,8 @@ void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloada

static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
const std::shared_ptr<SyncConfig>& sync_config,
DBRef&& fresh_copy, bool recovery_is_allowed)
DBRef&& fresh_copy, bool recovery_is_allowed,
bool schema_migration_detected)
{
REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);

Expand All @@ -852,6 +855,11 @@ static sync::Session::Config::ClientReset make_client_reset_config(const RealmCo
if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset)
return config;

// Do not run the client reset callbacks in case of a sync schema migration.
// (opening the realm with the new schema will result in a crash due to breaking changes)
if (schema_migration_detected)
return config;

RealmConfig realm_config = base_config;
realm_config.sync_config = std::make_shared<SyncConfig>(*sync_config); // deep copy
realm_config.scheduler = util::Scheduler::make_dummy();
Expand Down Expand Up @@ -942,8 +950,10 @@ void SyncSession::create_sync_session()
m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
// Use the original sync config, not the updated one from the migration store
session_config.client_reset_config = make_client_reset_config(
m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy), allowed_to_recover);
session_config.client_reset_config =
make_client_reset_config(m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy),
allowed_to_recover, m_previous_schema_version.has_value());
session_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction;
}

Expand Down Expand Up @@ -998,12 +1008,6 @@ void SyncSession::create_sync_session()
});
}

void SyncSession::set_sync_schema_migration_callback(std::function<void()>&& callback)
{
util::CheckedLockGuard l(m_state_mutex);
m_sync_schema_migration_callback = std::move(callback);
}

void SyncSession::nonsync_transact_notify(sync::version_type version)
{
m_progress_notifier.set_local_version(version);
Expand Down
8 changes: 3 additions & 5 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,6 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.m_db;
}

static void set_sync_schema_migration_callback(SyncSession& session, std::function<void()>&& callback)
{
session.set_sync_schema_migration_callback(std::move(callback));
}

static util::Future<void> pause_async(SyncSession& session);
};

Expand Down Expand Up @@ -504,6 +499,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
mutable util::CheckedMutex m_external_reference_mutex;
class ExternalReference;
std::weak_ptr<ExternalReference> m_external_reference GUARDED_BY(m_external_reference_mutex);

// Set if ProtocolError::schema_version_changed error is received from the server.
std::optional<uint64_t> m_previous_schema_version GUARDED_BY(m_state_mutex);
};

} // namespace realm
Expand Down
2 changes: 2 additions & 0 deletions src/realm/sync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(SYNC_SOURCES
noinst/pending_bootstrap_store.cpp
noinst/protocol_codec.cpp
noinst/sync_metadata_schema.cpp
noinst/sync_schema_migration.cpp
changeset_encoder.cpp
changeset_parser.cpp
changeset.cpp
Expand Down Expand Up @@ -79,6 +80,7 @@ set(NOINST_HEADERS
noinst/protocol_codec.hpp
noinst/root_certs.hpp
noinst/sync_metadata_schema.hpp
noinst/sync_schema_migration.hpp
)

set(SYNC_HEADERS ${IMPL_INSTALL_HEADESR}
Expand Down
5 changes: 4 additions & 1 deletion src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <realm/sync/noinst/client_history_impl.hpp>
#include <realm/sync/noinst/compact_changesets.hpp>
#include <realm/sync/noinst/client_reset_operation.hpp>
#include <realm/sync/noinst/sync_schema_migration.hpp>
#include <realm/sync/protocol.hpp>
#include <realm/util/assert.hpp>
#include <realm/util/basic_system_errors.hpp>
Expand Down Expand Up @@ -2544,7 +2545,9 @@ Status Session::receive_error_message(const ProtocolErrorInfo& info)
if (protocol_error == ProtocolError::schema_version_changed) {
// Enable upload immediately if the session is still active.
if (m_state == Active) {
m_allow_upload = true;
auto wt = get_db()->start_write();
_impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
wt->commit();
// Notify SyncSession a schema migration is required.
on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
}
Expand Down
10 changes: 10 additions & 0 deletions src/realm/sync/noinst/protocol_codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ class ClientProtocol {
info.migration_query_string.emplace(query_string->get<std::string_view>());
}

if (info.raw_error_code == static_cast<int>(sync::ProtocolError::schema_version_changed)) {
auto schema_version = json.find("previousSchemaVersion");
if (schema_version == json.end() || !schema_version->is_number_unsigned()) {
return report_error(
"Missing/invalid previous schema version in schema migration error response");
}

info.previous_schema_version.emplace(schema_version->get<uint64_t>());
}

if (auto rejected_updates = json.find("rejectedUpdates"); rejected_updates != json.end()) {
if (!rejected_updates->is_array()) {
return report_error(
Expand Down
Loading

0 comments on commit 3b20104

Please sign in to comment.