Skip to content

Commit

Permalink
Merge pull request #7897 from realm/feature/role-change
Browse files Browse the repository at this point in the history
* RCORE-1872 Sync client should allow server bootstrapping at any time (#7440)
* First round of changes for server-initiated bootstraps
* Added test for role change bootstraps
* Updated test for handle role bootstraps
* Updated baas/baasaas to use branch with fixes
* Updated test to verify bootstrap actually occurred
* Fixed tsan warning
* Updates from review; added comments to clarify bootstrap detection logic
* Reverted baas branch to master and protocol version to 12
* Added comments to changes needed when merging to master; update baas version to not use master
* Pulled over changes from other branch and tweaking download params
* Refactored tests to validate different bootstrap types
* Updated tests to get passing using the server params
* Updated to support new batch_state protocol changes; updated tests
* Updated role change tests and merged test from separate PR
* Fixed issue with flx query verion 0 not being treated as a bootstrap
* Cleaned up the tests a bit and reworked query version 0 handling
* Updates from review; updated batch_state for schema bootstraps
* Removed extra mutex in favor of state machine's mutex
* Increased timeout when waiting for app initial sync to complete
* Updated role change test to use test commands
* Update resume and ident message handling
* Updated future waits for the pause/resume test command
* Added session connected event for when session multiplexing is disabled
* Added wait_until() to state machine to wait for callback; updated role change test

* RCORE-1973 Add role/permissions tests for new bootstrap feature (#7675)
* Moved role change tests to separate test file
* Fixed building of new flx_role_change.cpp file
* Added local changes w/role bootstrap test - fixed exception in subscription store during server initiated boostrap
* Updated local change test to include valid offline writes during role change
* Added role change test during initial schema bootstrap
* Wrapped up role change during bootstrap tests
* Removed debug statments to fix thread sanitizer
* Updated sub state comments and reverted a minor change
* Refactored role change tests and broke out into 2 separate test cases
* Moved harness from a global to a static var in each test case
* Reverted resetting the bootstrapping subscription state back to Pending
* Updated baas to use protocol v14 and removed the feature flag for role change bootstraps
* Updated baasaas version to be a cached version
* Updated baasaas githash and reordered role change during bootstrap to check for role change bootstrap as first validation step
* Minor updates to reuse the verify_records() fcn

* RCORE-2174 Bootstrap store is not being reset if initial subscription bootstrap is interrupted by role change (#7831)
* Updated pending bootstrap store to be processed (applied or cleared) when the session or connection is restarted without restarting the Sync Session

* RCORE-1974 Add tests for role/permissions changed during client reset (#7840)
* re-applied changes after base branch was merged to feature branch
* Updates to address test failures
* Disable role change check during fresh realm download
* Updated comments for clarity
* Updates from review; added a bunch of comments to test
* Updates to role change tests per review comments
* removed ostream support for SyncClientHookEvent
  • Loading branch information
Michael Wilkerson-Barker authored Jul 19, 2024
2 parents 1f0378a + 30ce24b commit 45f1e85
Show file tree
Hide file tree
Showing 21 changed files with 1,496 additions and 191 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
* Updated the return type of `LogCategory::get_category_names()` from `std::vector<const char*>` to `std::vector<std::string_view>`. ([PR #7879](https://github.com/realm/realm-core/pull/7879))
* Added `realm_get_persisted_schema_version` for reading the version of the schema currently stored locally. (PR [#7873](https://github.com/realm/realm-core/pull/7873))
* Added `realm_app_config_get_sync_client_config()` function to the C_API to get the sync_client_config value in `realm_app_config_t` if REALM_APP_SERVICES is enabled. If REALM_APP_SERVICES is not available, `realm_sync_client_config_new()` is available to create a new `sync_client_config_t` to use when initializing the sync manager. ([PR #7891](https://github.com/realm/realm-core/pull/7891))
* Role and permissions changes no longer require a client reset to update the local realm. ([PR #7440](https://github.com/realm/realm-core/pull/7440))

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* FLX download progress was only updated when bootstraps completed, making it always be 0 before the first completion and then forever 1. ([PR #7869](https://github.com/realm/realm-core/issues/7869), since v14.10.2)
* Sync client can crash if a session is resumed while the session is being suspended. ([#7860](https://github.com/realm/realm-core/issues/7860), since v12.0.0)
* If a sync session is interrupted by a disconnect or restart while downloading a bootstrap, stale data from the previous bootstrap may be included when the session reconnects and downloads the bootstrap. This can lead to objects stored in the database that do not match the actual state of the server and potentially leading to compensating writes. ([#7827](https://github.com/realm/realm-core/issues/7827), since v12.0.0)

### Breaking changes
* None.
Expand All @@ -20,7 +22,8 @@
-----------

### Internals
* None.
* Protocol version has been updated to v14 to support server intiated bootstraps and role change updates without a client reset. ([PR #7440](https://github.com/realm/realm-core/pull/7440))
* Add support for server initiated bootstraps. ([PR #7440](https://github.com/realm/realm-core/pull/7440))

----------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ ZLIB_VERSION: 1.2.13
# https://github.com/10gen/baas/commits
# 2f308db is 2024 July 10
BAAS_VERSION: 2f308db6f65333728a101d1fecbb792f9659a5ce
BAAS_VERSION_TYPE: githash
3 changes: 2 additions & 1 deletion evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ functions:
shell: bash
env:
BAASAAS_API_KEY: "${baasaas_api_key}"
# BAAS_VERSION and VERSION_TYPE are set by realm-core/dependencies.yml
BAASAAS_REF_SPEC: "${BAAS_VERSION}"
BAASAAS_START_MODE: "githash"
BAASAAS_START_MODE: "${BAAS_VERSION_TYPE|githash}"
script: |-
set -o errexit
set -o verbose
Expand Down
4 changes: 3 additions & 1 deletion src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
#include <realm/util/future.hpp>
#include <realm/version_id.hpp>

#include <map>
#include <mutex>
#include <unordered_map>
#include <map>

namespace realm {
class DB;
Expand Down Expand Up @@ -345,6 +345,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.get_appservices_connection_id();
}

// Supported commands can be found in `handleTestCommandMessage()`
// in baas/devicesync/server/qbs_client_handler_functions.go
static util::Future<std::string> send_test_command(SyncSession& session, std::string request)
{
return session.send_test_command(std::move(request));
Expand Down
81 changes: 31 additions & 50 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,22 +805,16 @@ void SessionImpl::update_subscription_version_info()

bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
{
// Not a bootstrap message if this isn't a FLX download
if (!message.last_in_batch || !message.query_version) {
// Ignore the message if the session is not active or a steady state message
if (m_state != State::Active || message.batch_state == DownloadBatchState::SteadyState) {
return false;
}

REALM_ASSERT(m_is_flx_sync_session);

// Not a bootstrap message if it's for the already active query version
if (*message.last_in_batch && *message.query_version == m_wrapper.m_flx_active_version) {
return false;
}

auto batch_state = *message.last_in_batch ? DownloadBatchState::LastInBatch : DownloadBatchState::MoreToCome;
auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
std::optional<SyncProgress> maybe_progress;
if (batch_state == DownloadBatchState::LastInBatch) {
if (message.batch_state == DownloadBatchState::LastInBatch) {
maybe_progress = message.progress;
}

Expand All @@ -842,23 +836,23 @@ bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)

// If we've started a new batch and there is more to come, call on_flx_sync_progress to mark the subscription as
// bootstrapping.
if (new_batch && batch_state == DownloadBatchState::MoreToCome) {
if (new_batch && message.batch_state == DownloadBatchState::MoreToCome) {
on_flx_sync_progress(*message.query_version, DownloadBatchState::MoreToCome);
}

auto hook_action = call_debug_hook(SyncClientHookEvent::BootstrapMessageProcessed, message.progress,
*message.query_version, batch_state, message.changesets.size());
*message.query_version, message.batch_state, message.changesets.size());
if (hook_action == SyncClientHookAction::EarlyReturn) {
return true;
}
REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);

if (batch_state == DownloadBatchState::MoreToCome) {
if (message.batch_state == DownloadBatchState::MoreToCome) {
return true;
}

try {
process_pending_flx_bootstrap();
process_pending_flx_bootstrap(); // throws
}
catch (const IntegrationException& e) {
on_integration_failure(e);
Expand All @@ -877,8 +871,6 @@ void SessionImpl::process_pending_flx_bootstrap()
if (!m_is_flx_sync_session || m_state != State::Active) {
return;
}
// Should never be called if session is not active
REALM_ASSERT_EX(m_state == SessionImpl::Active, m_state);
auto bootstrap_store = m_wrapper.get_flx_pending_bootstrap_store();
if (!bootstrap_store->has_pending()) {
return;
Expand All @@ -902,7 +894,7 @@ void SessionImpl::process_pending_flx_bootstrap()
auto pending_batch = bootstrap_store->peek_pending(m_wrapper.m_flx_bootstrap_batch_size_bytes);
if (!pending_batch.progress) {
logger.info("Incomplete pending bootstrap found for query version %1", pending_batch.query_version);
// Close the write transation before clearing the bootstrap store to avoid a deadlock because the
// Close the write transaction before clearing the bootstrap store to avoid a deadlock because the
// bootstrap store requires a write transaction itself.
transact->close();
bootstrap_store->clear();
Expand Down Expand Up @@ -1044,7 +1036,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
return call_debug_hook(data);
}

SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info)
SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo* error_info)
{
if (REALM_LIKELY(!m_wrapper.m_debug_hook)) {
return SyncClientHookAction::NoAction;
Expand All @@ -1058,37 +1050,12 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con
data.batch_state = DownloadBatchState::SteadyState;
data.progress = m_progress;
data.num_changesets = 0;
data.query_version = 0;
data.error_info = &error_info;
data.query_version = m_last_sent_flx_query_version;
data.error_info = error_info;

return call_debug_hook(data);
}

SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event)
{
return call_debug_hook(event, m_progress, m_last_sent_flx_query_version, DownloadBatchState::SteadyState, 0);
}

bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version)
{
// Should never be called if session is not active
REALM_ASSERT_EX(m_state == State::Active, m_state);
if (batch_state == DownloadBatchState::SteadyState) {
return true;
}

if (!m_is_flx_sync_session) {
return true;
}

// If this is a steady state DOWNLOAD, no need for special handling.
if (batch_state == DownloadBatchState::LastInBatch && query_version == m_wrapper.m_flx_active_version) {
return true;
}

return false;
}

void SessionImpl::init_progress_handler()
{
REALM_ASSERT_EX(m_state == State::Unactivated || m_state == State::Active, m_state);
Expand Down Expand Up @@ -1233,10 +1200,27 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
if (!has_flx_subscription_store()) {
return;
}

REALM_ASSERT(!m_finalized);
REALM_ASSERT(new_version >= m_flx_last_seen_version);
REALM_ASSERT(new_version >= m_flx_active_version);
REALM_ASSERT(batch_state != DownloadBatchState::SteadyState);
if (batch_state == DownloadBatchState::SteadyState) {
throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
"Unexpected batch state of SteadyState while downloading bootstrap");
}
// Is this a server-initiated bootstrap? Skip notifying the subscription store
if (new_version == m_flx_active_version) {
return;
}
if (new_version < m_flx_active_version) {
throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
util::format("Bootstrap query version %1 is less than active version %2",
new_version, m_flx_active_version));
}
if (new_version < m_flx_last_seen_version) {
throw IntegrationException(
ErrorCodes::SyncProtocolInvariantFailed,
util::format("Download message query version %1 is less than current bootstrap version %2", new_version,
m_flx_last_seen_version));
}

SubscriptionSet::State new_state = SubscriptionSet::State::Uncommitted; // Initialize to make compiler happy

Expand All @@ -1245,9 +1229,6 @@ void SessionWrapper::on_flx_sync_progress(int64_t new_version, DownloadBatchStat
// Cannot be called with this value.
REALM_UNREACHABLE();
case DownloadBatchState::LastInBatch:
if (m_flx_active_version == new_version) {
return;
}
on_flx_sync_version_complete(new_version);
if (new_version == 0) {
new_state = SubscriptionSet::State::Complete;
Expand Down
32 changes: 21 additions & 11 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,28 @@ enum class ClientResyncMode : unsigned char {
RecoverOrDiscard,
};

// clang-format off
#define REALM_FOR_EACH_SYNC_CLIENT_HOOK_EVENT(X) \
X(DownloadMessageReceived), \
X(DownloadMessageIntegrated), \
X(BootstrapMessageProcessed), \
X(BootstrapProcessed), \
X(ErrorMessageReceived), \
X(SessionActivating), \
X(SessionSuspended), \
X(SessionConnected), \
X(SessionResumed), \
X(BindMessageSent), \
X(IdentMessageSent), \
X(ClientResetMergeComplete), \
X(BootstrapBatchAboutToProcess), \
X(UploadMessageSent)
// clang-format on

enum class SyncClientHookEvent {
DownloadMessageReceived,
DownloadMessageIntegrated,
BootstrapMessageProcessed,
BootstrapProcessed,
ErrorMessageReceived,
SessionActivating,
SessionSuspended,
BindMessageSent,
ClientResetMergeComplete,
BootstrapBatchAboutToProcess,
UploadMessageSent,
#define REALM_DECLARE_SYNC_CLIENT_HOOK_EVENT(X) X
REALM_FOR_EACH_SYNC_CLIENT_HOOK_EVENT(REALM_DECLARE_SYNC_CLIENT_HOOK_EVENT)
#undef REALM_DECLARE_SYNC_CLIENT_HOOK_EVENT
};

enum class SyncClientHookAction {
Expand Down
47 changes: 27 additions & 20 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,16 @@ void Session::cancel_resumption_delay()
if (unbind_process_complete())
initiate_rebind(); // Throws

try {
process_pending_flx_bootstrap(); // throws
}
catch (const IntegrationException& error) {
on_integration_failure(error);
}
catch (...) {
on_integration_failure(IntegrationException(exception_to_status()));
}

m_conn.one_more_active_unsuspended_session(); // Throws
if (m_try_again_activation_timer) {
m_try_again_activation_timer.reset();
Expand Down Expand Up @@ -1715,7 +1725,7 @@ void Session::activate()
m_conn.one_more_active_unsuspended_session(); // Throws

try {
process_pending_flx_bootstrap();
process_pending_flx_bootstrap(); // throws
}
catch (const IntegrationException& error) {
on_integration_failure(error);
Expand Down Expand Up @@ -1806,12 +1816,7 @@ void Session::send_message()
if (!m_bind_message_sent)
return send_bind_message(); // Throws

if (!m_ident_message_sent) {
if (have_client_file_ident())
send_ident_message(); // Throws
return;
}

// Pending test commands can be sent any time after the BIND message is sent
const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
[](const PendingTestCommand& command) {
return command.pending;
Expand All @@ -1820,6 +1825,12 @@ void Session::send_message()
return send_test_command_message();
}

if (!m_ident_message_sent) {
if (have_client_file_ident())
send_ident_message(); // Throws
return;
}

if (m_error_to_send)
return send_json_error_message(); // Throws

Expand Down Expand Up @@ -1979,6 +1990,7 @@ void Session::send_ident_message()
m_conn.initiate_write_message(out, this); // Throws

m_ident_message_sent = true;
call_debug_hook(SyncClientHookEvent::IdentMessageSent);

// Other messages may be waiting to be sent
enlist_to_send(); // Throws
Expand Down Expand Up @@ -2376,22 +2388,16 @@ Status Session::receive_download_message(const DownloadMessage& message)
if (!is_flx || query_version > 0)
enable_progress_notifications();

// If this is a PBS connection, then every download message is its own complete batch.
bool last_in_batch = is_flx ? *message.last_in_batch : true;
auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
if (is_steady_state_download_message(batch_state, query_version))
batch_state = DownloadBatchState::SteadyState;

auto&& progress = message.progress;
if (is_flx) {
logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
"latest_server_version=%3, latest_server_version_salt=%4, "
"upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
"last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
"batch_state=%8, query_version=%9, num_changesets=%10, ...)",
progress.download.server_version, progress.download.last_integrated_client_version,
progress.latest_server_version.version, progress.latest_server_version.salt,
progress.upload.client_version, progress.upload.last_integrated_server_version,
message.downloadable.as_estimate(), last_in_batch, query_version,
message.downloadable.as_estimate(), message.batch_state, query_version,
message.changesets.size()); // Throws
}
else {
Expand Down Expand Up @@ -2436,6 +2442,7 @@ Status Session::receive_download_message(const DownloadMessage& message)
changeset.remote_version, server_version, progress.download.server_version)};
}
server_version = changeset.remote_version;

// Check that per-changeset last integrated client version is "weakly"
// increasing.
bool good_client_version =
Expand All @@ -2461,7 +2468,7 @@ Status Session::receive_download_message(const DownloadMessage& message)
}

auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
batch_state, message.changesets.size());
message.batch_state, message.changesets.size());
if (hook_action == SyncClientHookAction::EarlyReturn) {
return Status::OK();
}
Expand All @@ -2472,11 +2479,11 @@ Status Session::receive_download_message(const DownloadMessage& message)
return Status::OK();
}

initiate_integrate_changesets(message.downloadable.as_bytes(), batch_state, progress,
initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
message.changesets); // Throws

hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
batch_state, message.changesets.size());
message.batch_state, message.changesets.size());
if (hook_action == SyncClientHookAction::EarlyReturn) {
return Status::OK();
}
Expand Down Expand Up @@ -2586,7 +2593,7 @@ Status Session::receive_error_message(const ProtocolErrorInfo& info)
// Can't process debug hook actions once the Session is undergoing deactivation, since
// the SessionWrapper may not be available
if (m_state == Active) {
auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
if (debug_action == SyncClientHookAction::EarlyReturn) {
return Status::OK();
}
Expand Down Expand Up @@ -2646,7 +2653,7 @@ void Session::suspend(const SessionErrorInfo& info)
// Notify the application of the suspension of the session if the session is
// still in the Active state
if (m_state == Active) {
call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
m_conn.one_less_active_unsuspended_session(); // Throws
on_suspended(info); // Throws
}
Expand Down
Loading

0 comments on commit 45f1e85

Please sign in to comment.