Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2185 Sync client should steal file ident of fresh realm when performing client reset #7850

Merged
merged 30 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
520faad
Initial changes to use the file ident from the fresh realm during cli…
Jun 27, 2024
b1b1dd9
Fixed failing realm_sync_test tests
Jun 27, 2024
5e1e1e9
lint
Jun 27, 2024
73ad369
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jun 28, 2024
f5e1044
Updated changelog
Jun 28, 2024
1d3ca5b
Don't send UPLOAD Messages while downloading fresh realm
Jun 28, 2024
4b02363
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jun 28, 2024
9c38925
Allow sending QUERY bootstrap for fresh download sessions
Jun 28, 2024
3a2af9a
Added SHARED_GROUP_FRESH_PATH to generate path for fresh realm
Jun 29, 2024
aae37b7
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jun 29, 2024
6842197
Removed SHARED_GROUP_FRESH_PATH and used session_reason setting instead
Jun 29, 2024
2ddca98
Some cleanup after tests passing
Jun 29, 2024
86ca88a
Added test to verify no UPLOAD messages are sent during fresh realm d…
Jun 29, 2024
005fea6
Use is_fresh_path to determine if hook event called by client reset f…
Jul 1, 2024
705a114
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 1, 2024
59a7b51
Fixed tsan failure around REQUIRE() within hook event callback in flx…
Jul 1, 2024
5072e32
Updates from review and streamlined changes based on recommendations
Jul 1, 2024
1e51055
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 1, 2024
427d8fe
Some minor cleanup after self review
Jul 1, 2024
c82d03b
Reverted some test changes that are no longer needed
Jul 1, 2024
bacdd57
More updates from review
Jul 1, 2024
49ec153
Updated logic for when to perform a client reset diff
Jul 2, 2024
f5569c1
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 2, 2024
043a470
Updated fresh realm download to update upload progress but not send u…
Jul 2, 2024
f7d915c
Removed has_client_reset_config flag in favor of get_cliet_reset_conf…
Jul 2, 2024
49a5016
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 2, 2024
aa50680
Updats from the review - renamed m_allow_uploads to m_delay_uploads
Jul 2, 2024
d88f2e4
Updated assert
Jul 3, 2024
85abf4b
Updated test to start with file ident, added comment about client res…
Jul 3, 2024
6a426a3
Updated comment for m_delay_uploads
Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
### Internals
* Fixed `Table::remove_object_recursive` which wouldn't recursively follow links through a single `Mixed` property. This feature is exposed publicly on `Table` but no SDK currently uses it, so this is considered internal. ([#7829](https://github.com/realm/realm-core/issues/7829), likely since the introduction of Mixed)
* Upload completion is now tracked in a multiprocess-compatible manner ([PR #7796](https://github.com/realm/realm-core/pull/7796)).
* The local realm will assume the the client file ident of the fresh realm during a client reset. ([PR #7850](https://github.com/realm/realm-core/pull/7850))

----------------------------------------------

Expand Down
3 changes: 1 addition & 2 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,7 @@ void SyncSession::create_sync_session()
session_config.proxy_config = sync_config.proxy_config;
session_config.simulate_integration_error = sync_config.simulate_integration_error;
session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
session_config.session_reason =
client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync;
session_config.fresh_realm_download = client_reset::is_fresh_path(m_config.path);
session_config.schema_version = m_config.schema_version;

if (sync_config.on_sync_client_event_hook) {
Expand Down
15 changes: 14 additions & 1 deletion src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

const SessionReason m_session_reason;

// If false, QUERY and MARK messages are allowed but UPLOAD messages will not
// be sent to the server.
const bool m_allow_upload_messages = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to assign a default value since you must do it in the constructor


const uint64_t m_schema_version;

std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
Expand Down Expand Up @@ -716,6 +720,13 @@ uint64_t SessionImpl::get_schema_version() noexcept
return m_wrapper.m_schema_version;
}

bool SessionImpl::are_uploads_allowed() noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.m_allow_upload_messages;
}

void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
const SyncProgress& progress, const ReceivedChangesets& changesets)
{
Expand Down Expand Up @@ -1150,7 +1161,9 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_progress_handler(std::move(config.progress_handler))
, m_connection_state_change_listener(std::move(config.connection_state_change_listener))
, m_debug_hook(std::move(config.on_sync_client_event_hook))
, m_session_reason(m_client_reset_config ? SessionReason::ClientReset : config.session_reason)
, m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
: SessionReason::Sync)
, m_allow_upload_messages(!config.fresh_realm_download)
, m_schema_version(config.schema_version)
, m_flx_subscription_store(std::move(flx_sub_store))
, m_migration_store(std::move(migration_store))
Expand Down
6 changes: 3 additions & 3 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ class Session {
/// through Client::run().
util::UniqueFunction<ConnectionStateChangeListener> connection_state_change_listener;

/// The purpose of this sync session. Reported to the server for informational purposes and has no functional
/// effect.
SessionReason session_reason = SessionReason::Sync;
/// Is this session being opened for a realm whose path ends in ".fresh"? If so,
/// it will be downloading a fresh copy of the realm data from the server.
bool fresh_realm_download = false;

/// Schema version
///
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum class SyncClientHookEvent {
BindMessageSent,
ClientResetMergeComplete,
BootstrapBatchAboutToProcess,
UploadMessageSent,
};

enum class SyncClientHookAction {
Expand Down
114 changes: 62 additions & 52 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1687,12 +1687,11 @@ void Session::activate()

if (REALM_LIKELY(!get_client().is_dry_run())) {
bool file_exists = util::File::exists(get_realm_path());
m_performing_client_reset = get_client_reset_config().has_value();
m_has_client_reset_config = get_client_reset_config().has_value();

logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
if (!m_performing_client_reset) {
get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
}
logger.info("client_reset_config = %1, Realm exists = %2, uploads allowed = %3", m_has_client_reset_config,
file_exists, are_uploads_allowed() ? "yes" : "no");
get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
}
logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
m_client_file_ident.salt); // Throws
Expand All @@ -1701,7 +1700,7 @@ void Session::activate()
REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
init_progress_handler();

logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
logger.debug("progress_download_client_version = %1",
m_progress.download.last_integrated_client_version); // Throws
Expand Down Expand Up @@ -1871,13 +1870,17 @@ void Session::send_message()
return false;
}

// Send QUERY messages when the upload progress client version reaches the snapshot version
// of a pending subscription, or if this is a fresh realm download session, since UPLOAD
// messages are not allowed and the upload progress will not be updated.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the local upload progress is actually updated, but no upload messages are sent

return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
};

if (check_pending_flx_version()) {
return send_query_change_message(); // throws
}

// Don't allow UPLOAD messages for client reset fresh realm download sessions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really doing anything in that sense to justify the comment

if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
return send_upload_message(); // Throws
}
Expand All @@ -1889,10 +1892,10 @@ void Session::send_bind_message()
REALM_ASSERT_EX(m_state == Active, m_state);

session_ident_type session_ident = m_ident;
bool need_client_file_ident = !have_client_file_ident();
// Request an ident if we don't already have one and there isn't a pending client reset diff
bool need_client_file_ident = !have_client_file_ident() && !m_has_client_reset_config;
const bool is_subserver = false;


ClientProtocol& protocol = m_conn.get_client_protocol();
int protocol_version = m_conn.get_negotiated_protocol_version();
OutputBuffer& out = m_conn.get_output_buffer();
Expand Down Expand Up @@ -1931,8 +1934,9 @@ void Session::send_bind_message()
m_bind_message_sent = true;
call_debug_hook(SyncClientHookEvent::BindMessageSent);

// Ready to send the IDENT message if the file identifier pair is already
// available.
// If there is a pending client reset diff, process that when the BIND message has
// been sent successfully and wait before sending the IDENT message. Otherwise,
// ready to send the IDENT message if the file identifier pair is already available.
if (!need_client_file_ident)
enlist_to_send(); // Throws
}
Expand All @@ -1945,7 +1949,6 @@ void Session::send_ident_message()
REALM_ASSERT(!m_unbind_message_sent);
REALM_ASSERT(have_client_file_ident());


ClientProtocol& protocol = m_conn.get_client_protocol();
OutputBuffer& out = m_conn.get_output_buffer();
session_ident_type session_ident = m_ident;
Expand Down Expand Up @@ -2050,6 +2053,14 @@ void Session::send_upload_message()
version_type progress_client_version = m_upload_progress.client_version;
version_type progress_server_version = m_upload_progress.last_integrated_server_version;

if (!are_uploads_allowed()) {
logger.debug("UPLOAD not allowed: upload progress(progress_client_version=%1, progress_server_version=%2, "
"locked_server_version=%3, num_changesets=%4)",
progress_client_version, progress_server_version, locked_server_version,
uploadable_changesets.size()); // Throws
return;
}

logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
"locked_server_version=%3, num_changesets=%4)",
progress_client_version, progress_server_version, locked_server_version,
Expand Down Expand Up @@ -2135,6 +2146,8 @@ void Session::send_upload_message()
locked_server_version); // Throws
m_conn.initiate_write_message(out, this); // Throws

call_debug_hook(SyncClientHookEvent::UploadMessageSent);

// Other messages may be waiting to be sent
enlist_to_send(); // Throws
}
Expand Down Expand Up @@ -2236,7 +2249,7 @@ bool Session::client_reset_if_needed()
{
// Regardless of what happens, once we return from this function we will
// no longer be in the middle of a client reset
m_performing_client_reset = false;
m_has_client_reset_config = false;

// Even if we end up not actually performing a client reset, consume the
// config to ensure that the resources it holds are released
Expand All @@ -2245,29 +2258,51 @@ bool Session::client_reset_if_needed()
return false;
}

// Save a copy of the status and action in case an error/exception occurs
Status cr_status = client_reset_config->error;
ProtocolErrorInfo::Action cr_action = client_reset_config->action;

auto on_flx_version_complete = [this](int64_t version) {
this->on_flx_sync_version_complete(version);
};
bool did_reset =
client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config), m_client_file_ident,
get_flx_subscription_store(), on_flx_version_complete);
try {
// The file ident from the fresh realm will be copied over to the local realm
bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
get_flx_subscription_store(), on_flx_version_complete);

call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
if (!did_reset) {
call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
if (!did_reset) {
return false;
}
}
catch (const std::exception& e) {
auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
cr_status, e.what());
logger.error(err_msg.c_str());
SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
suspend(err_info);
return false;
}

// The fresh Realm has been used to reset the state
logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws

// Update the version, file ident and progress info after the client reset diff is done
get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
// Print the version/progress information before performing the asserts
logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
m_client_file_ident.salt); // Throws
logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
m_progress.upload.client_version,
m_progress.upload.last_integrated_server_version); // Throws
logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
m_progress.download.last_integrated_client_version,
m_progress.download.server_version); // Throws

SaltedFileIdent client_file_ident;
get_history().get_status(m_last_version_available, client_file_ident, m_progress); // Throws
REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
m_progress.download.last_integrated_client_version);
REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
logger.trace("last_version_available = %1", m_last_version_available); // Throws

m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
Expand Down Expand Up @@ -2321,35 +2356,10 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
return Status::OK(); // Success
}

// if a client reset happens, it will take care of setting the file ident
// and if not, we do it here
bool did_client_reset = false;

// Save some of the client reset info for reporting to the client if an error occurs.
Status cr_status(Status::OK()); // Start with no client reset
ProtocolErrorInfo::Action cr_action = ProtocolErrorInfo::Action::NoAction;
if (auto& cr_config = get_client_reset_config()) {
cr_status = cr_config->error;
cr_action = cr_config->action;
}

try {
did_client_reset = client_reset_if_needed();
}
catch (const std::exception& e) {
auto err_msg = util::format("A fatal error occurred during '%1' client reset for %2: '%3'", cr_action,
cr_status, e.what());
logger.error(err_msg.c_str());
SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
suspend(err_info);
return Status::OK();
}
if (!did_client_reset) {
get_history().set_client_file_ident(client_file_ident,
m_fix_up_object_ids); // Throws
m_progress.download.last_integrated_client_version = 0;
m_progress.upload.client_version = 0;
}
get_history().set_client_file_ident(client_file_ident,
m_fix_up_object_ids); // Throws
m_progress.download.last_integrated_client_version = 0;
m_progress.upload.client_version = 0;

// Ready to send the IDENT message
ensure_enlisted_to_send(); // Throws
Expand Down
14 changes: 13 additions & 1 deletion src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,10 @@ class ClientImpl::Session {
/// Returns the schema version the synchronization session connects with to the server.
uint64_t get_schema_version() noexcept;

// Returns false if this session is not allowed to send UPLOAD messages to the server to
// update the cursor info, such as during a client reset fresh realm download
bool are_uploads_allowed() noexcept;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit odd to have this method while there is also m_allow_upload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the function to upload_messages_allowed() and m_allow_uploads to m_delay_uploads to hopefully better clarify their purpose.


/// \brief Initiate the integration of downloaded changesets.
///
/// This function must provide for the passed changesets (if any) to
Expand Down Expand Up @@ -981,7 +985,7 @@ class ClientImpl::Session {
SaltedFileIdent m_client_file_ident = {0, 0};

// True while this session is in the process of performing a client reset.
bool m_performing_client_reset = false;
bool m_has_client_reset_config = false;

// The latest sync progress reported by the server via a DOWNLOAD
// message. See struct SyncProgress for a description. The values stored in
Expand Down Expand Up @@ -1427,6 +1431,14 @@ inline void ClientImpl::Session::message_sent()
// No message will be sent after the UNBIND message
REALM_ASSERT(!m_unbind_message_send_complete);

// If the client reset config structure is populated, then try to perform
// the client reset diff once the BIND message has been sent successfully
if (m_bind_message_sent && m_has_client_reset_config) {
client_reset_if_needed(); // resets m_has_client_reset_config
// Ready to send the IDENT message
ensure_enlisted_to_send(); // Throws
}

if (m_unbind_message_sent) {
REALM_ASSERT(!m_enlisted_to_send);

Expand Down
25 changes: 13 additions & 12 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy
return mode;
}

bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, sync::SaltedFileIdent client_file_ident,
util::Logger& logger, sync::SubscriptionStore* sub_store,
bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, util::Logger& logger,
sync::SubscriptionStore* sub_store,
util::FunctionRef<void(int64_t)> on_flx_version_complete)
{
DB& db_remote = *reset_config.fresh_copy;
Expand All @@ -491,13 +491,6 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, sy
bool recover_local_changes =
actual_mode == ClientResyncMode::Recover || actual_mode == ClientResyncMode::RecoverOrDiscard;

logger.info(util::LogCategory::reset,
"Client reset: path_local = %1, client_file_ident = (ident: %2, salt: %3), "
"remote_path = %4, requested_mode = %5, action = %6, actual_mode = %7, will_recover = %8, "
"originating_error = %9",
db_local.get_path(), client_file_ident.ident, client_file_ident.salt, db_remote.get_path(),
reset_config.mode, reset_config.action, actual_mode, recover_local_changes, reset_config.error);

auto& repl_local = dynamic_cast<ClientReplication&>(*db_local.get_replication());
auto& history_local = repl_local.get_history();
history_local.ensure_updated(wt_local->get_version());
Expand All @@ -507,14 +500,22 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, sy
auto& history_remote = repl_remote.get_history();

sync::SaltedVersion fresh_server_version = {0, 0};
sync::SaltedFileIdent fresh_file_ident = {0, 0};
{
SyncProgress remote_progress;
sync::version_type remote_version_unused;
SaltedFileIdent remote_ident_unused;
history_remote.get_status(remote_version_unused, remote_ident_unused, remote_progress);
history_remote.get_status(remote_version_unused, fresh_file_ident, remote_progress);
fresh_server_version = remote_progress.latest_server_version;
}

logger.info(util::LogCategory::reset,
"Client reset: path_local = %1, fresh_file_ident = (ident: %2, salt: %3), "
"fresh_server_version = (ident: %4, salt: %5), remote_path = %6, requested_mode = %7, action = %8, "
"actual_mode = %9, will_recover = %10, originating_error = %11",
db_local.get_path(), fresh_file_ident.ident, fresh_file_ident.salt, fresh_server_version.version,
fresh_server_version.salt, db_remote.get_path(), reset_config.mode, reset_config.action, actual_mode,
recover_local_changes, reset_config.error);

TransactionRef tr_remote;
std::vector<client_reset::RecoveredChange> recovered;
if (recover_local_changes) {
Expand All @@ -534,7 +535,7 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, sy

// now that the state of the fresh and local Realms are identical,
// reset the local sync history and steal the fresh Realm's ident
history_local.set_history_adjustments(logger, wt_local->get_version(), client_file_ident, fresh_server_version,
history_local.set_history_adjustments(logger, wt_local->get_version(), fresh_file_ident, fresh_server_version,
recovered);

int64_t subscription_version = 0;
Expand Down
Loading
Loading