Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2185 Sync client should steal file ident of fresh realm when performing client reset #7850

Merged
merged 30 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
520faad
Initial changes to use the file ident from the fresh realm during cli…
Jun 27, 2024
b1b1dd9
Fixed failing realm_sync_test tests
Jun 27, 2024
5e1e1e9
lint
Jun 27, 2024
73ad369
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jun 28, 2024
f5e1044
Updated changelog
Jun 28, 2024
1d3ca5b
Don't send UPLOAD Messages while downloading fresh realm
Jun 28, 2024
4b02363
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jun 28, 2024
9c38925
Allow sending QUERY bootstrap for fresh download sessions
Jun 28, 2024
3a2af9a
Added SHARED_GROUP_FRESH_PATH to generate path for fresh realm
Jun 29, 2024
aae37b7
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jun 29, 2024
6842197
Removed SHARED_GROUP_FRESH_PATH and used session_reason setting instead
Jun 29, 2024
2ddca98
Some cleanup after tests passing
Jun 29, 2024
86ca88a
Added test to verify no UPLOAD messages are sent during fresh realm d…
Jun 29, 2024
005fea6
Use is_fresh_path to determine if hook event called by client reset f…
Jul 1, 2024
705a114
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 1, 2024
59a7b51
Fixed tsan failure around REQUIRE() within hook event callback in flx…
Jul 1, 2024
5072e32
Updates from review and streamlined changes based on recommendations
Jul 1, 2024
1e51055
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 1, 2024
427d8fe
Some minor cleanup after self review
Jul 1, 2024
c82d03b
Reverted some test changes that are no longer needed
Jul 1, 2024
bacdd57
More updates from review
Jul 1, 2024
49ec153
Updated logic for when to perform a client reset diff
Jul 2, 2024
f5569c1
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 2, 2024
043a470
Updated fresh realm download to update upload progress but not send u…
Jul 2, 2024
f7d915c
Removed has_client_reset_config flag in favor of get_cliet_reset_conf…
Jul 2, 2024
49a5016
Merge branch 'master' of github.com:realm/realm-core into mwb/fix-cli…
Jul 2, 2024
aa50680
Updats from the review - renamed m_allow_uploads to m_delay_uploads
Jul 2, 2024
d88f2e4
Updated assert
Jul 3, 2024
85abf4b
Updated test to start with file ident, added comment about client res…
Jul 3, 2024
6a426a3
Updated comment for m_delay_uploads
Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
### Internals
* Fixed `Table::remove_object_recursive` which wouldn't recursively follow links through a single `Mixed` property. This feature is exposed publicly on `Table` but no SDK currently uses it, so this is considered internal. ([#7829](https://github.com/realm/realm-core/issues/7829), likely since the introduction of Mixed)
* Upload completion is now tracked in a multiprocess-compatible manner ([PR #7796](https://github.com/realm/realm-core/pull/7796)).
* The local realm will assume the the client file ident of the fresh realm during a client reset. ([PR #7850](https://github.com/realm/realm-core/pull/7850))

----------------------------------------------

Expand Down
3 changes: 1 addition & 2 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,7 @@ void SyncSession::create_sync_session()
session_config.proxy_config = sync_config.proxy_config;
session_config.simulate_integration_error = sync_config.simulate_integration_error;
session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
session_config.session_reason =
client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync;
session_config.fresh_realm_download = client_reset::is_fresh_path(m_config.path);
session_config.schema_version = m_config.schema_version;

if (sync_config.on_sync_client_event_hook) {
Expand Down
15 changes: 14 additions & 1 deletion src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

const SessionReason m_session_reason;

// If false, QUERY and MARK messages are allowed but UPLOAD messages will not
// be sent to the server.
const bool m_allow_upload_messages;

const uint64_t m_schema_version;

std::shared_ptr<SubscriptionStore> m_flx_subscription_store;
Expand Down Expand Up @@ -716,6 +720,13 @@ uint64_t SessionImpl::get_schema_version() noexcept
return m_wrapper.m_schema_version;
}

bool SessionImpl::upload_messages_allowed() noexcept
{
// Can only be called if the session is active or being activated
REALM_ASSERT_EX(m_state == State::Active || m_state == State::Unactivated, m_state);
return m_wrapper.m_allow_upload_messages;
}

void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_bytes, DownloadBatchState batch_state,
const SyncProgress& progress, const ReceivedChangesets& changesets)
{
Expand Down Expand Up @@ -1150,7 +1161,9 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr<Sub
, m_progress_handler(std::move(config.progress_handler))
, m_connection_state_change_listener(std::move(config.connection_state_change_listener))
, m_debug_hook(std::move(config.on_sync_client_event_hook))
, m_session_reason(m_client_reset_config ? SessionReason::ClientReset : config.session_reason)
, m_session_reason(m_client_reset_config || config.fresh_realm_download ? SessionReason::ClientReset
: SessionReason::Sync)
, m_allow_upload_messages(!config.fresh_realm_download)
, m_schema_version(config.schema_version)
, m_flx_subscription_store(std::move(flx_sub_store))
, m_migration_store(std::move(migration_store))
Expand Down
6 changes: 3 additions & 3 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ class Session {
/// through Client::run().
util::UniqueFunction<ConnectionStateChangeListener> connection_state_change_listener;

/// The purpose of this sync session. Reported to the server for informational purposes and has no functional
/// effect.
SessionReason session_reason = SessionReason::Sync;
/// Is this session being opened for a realm whose path ends in ".fresh"? If so,
/// it will be downloading a fresh copy of the realm data from the server.
bool fresh_realm_download = false;

/// Schema version
///
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum class SyncClientHookEvent {
BindMessageSent,
ClientResetMergeComplete,
BootstrapBatchAboutToProcess,
UploadMessageSent,
};

enum class SyncClientHookAction {
Expand Down
126 changes: 66 additions & 60 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1687,12 +1687,10 @@ void Session::activate()

if (REALM_LIKELY(!get_client().is_dry_run())) {
bool file_exists = util::File::exists(get_realm_path());
m_performing_client_reset = get_client_reset_config().has_value();

logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
if (!m_performing_client_reset) {
get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
}
logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
}
logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
m_client_file_ident.salt); // Throws
Expand All @@ -1701,7 +1699,7 @@ void Session::activate()
REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
init_progress_handler();

logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
logger.debug("progress_download_client_version = %1",
m_progress.download.last_integrated_client_version); // Throws
Expand Down Expand Up @@ -1861,7 +1859,7 @@ void Session::send_message()
return false;
}

if (!m_allow_upload) {
if (m_delay_uploads) {
return false;
}

Expand All @@ -1871,14 +1869,16 @@ void Session::send_message()
return false;
}

// Send QUERY messages when the upload progress client version reaches the snapshot version
// of a pending subscription
return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
};

if (check_pending_flx_version()) {
return send_query_change_message(); // throws
}

if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
return send_upload_message(); // Throws
}
}
Expand All @@ -1889,10 +1889,13 @@ void Session::send_bind_message()
REALM_ASSERT_EX(m_state == Active, m_state);

session_ident_type session_ident = m_ident;
bool need_client_file_ident = !have_client_file_ident();
// Request an ident if we don't already have one and there isn't a pending client reset diff
// The file ident can be 0 when a client reset is being performed if a brand new local realm
// has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
// to the server.
bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
danieltabacaru marked this conversation as resolved.
Show resolved Hide resolved
const bool is_subserver = false;


ClientProtocol& protocol = m_conn.get_client_protocol();
int protocol_version = m_conn.get_negotiated_protocol_version();
OutputBuffer& out = m_conn.get_output_buffer();
Expand Down Expand Up @@ -1931,8 +1934,9 @@ void Session::send_bind_message()
m_bind_message_sent = true;
call_debug_hook(SyncClientHookEvent::BindMessageSent);

// Ready to send the IDENT message if the file identifier pair is already
// available.
// If there is a pending client reset diff, process that when the BIND message has
// been sent successfully and wait before sending the IDENT message. Otherwise,
// ready to send the IDENT message if the file identifier pair is already available.
if (!need_client_file_ident)
enlist_to_send(); // Throws
}
Expand All @@ -1945,7 +1949,6 @@ void Session::send_ident_message()
REALM_ASSERT(!m_unbind_message_sent);
REALM_ASSERT(have_client_file_ident());


ClientProtocol& protocol = m_conn.get_client_protocol();
OutputBuffer& out = m_conn.get_output_buffer();
session_ident_type session_ident = m_ident;
Expand Down Expand Up @@ -2050,6 +2053,14 @@ void Session::send_upload_message()
version_type progress_client_version = m_upload_progress.client_version;
version_type progress_server_version = m_upload_progress.last_integrated_server_version;

if (!upload_messages_allowed()) {
logger.debug("UPLOAD not allowed: upload progress(progress_client_version=%1, progress_server_version=%2, "
"locked_server_version=%3, num_changesets=%4)",
progress_client_version, progress_server_version, locked_server_version,
uploadable_changesets.size()); // Throws
return;
}

logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
"locked_server_version=%3, num_changesets=%4)",
progress_client_version, progress_server_version, locked_server_version,
Expand Down Expand Up @@ -2135,6 +2146,8 @@ void Session::send_upload_message()
locked_server_version); // Throws
m_conn.initiate_write_message(out, this); // Throws

call_debug_hook(SyncClientHookEvent::UploadMessageSent);

// Other messages may be waiting to be sent
enlist_to_send(); // Throws
}
Expand Down Expand Up @@ -2234,48 +2247,66 @@ void Session::send_test_command_message()

bool Session::client_reset_if_needed()
{
// Regardless of what happens, once we return from this function we will
// no longer be in the middle of a client reset
m_performing_client_reset = false;

// Even if we end up not actually performing a client reset, consume the
// config to ensure that the resources it holds are released
auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
if (!client_reset_config) {
return false;
}

// Save a copy of the status and action in case an error/exception occurs
Status cr_status = client_reset_config->error;
ProtocolErrorInfo::Action cr_action = client_reset_config->action;

auto on_flx_version_complete = [this](int64_t version) {
this->on_flx_sync_version_complete(version);
};
bool did_reset =
client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config), m_client_file_ident,
get_flx_subscription_store(), on_flx_version_complete);
try {
// The file ident from the fresh realm will be copied over to the local realm
bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
get_flx_subscription_store(), on_flx_version_complete);

call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
if (!did_reset) {
call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
if (!did_reset) {
return false;
}
}
catch (const std::exception& e) {
auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
cr_status, e.what());
logger.error(err_msg.c_str());
SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
suspend(err_info);
return false;
}

// The fresh Realm has been used to reset the state
logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws

// Update the version, file ident and progress info after the client reset diff is done
get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
// Print the version/progress information before performing the asserts
logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
m_client_file_ident.salt); // Throws
logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
m_progress.upload.client_version,
m_progress.upload.last_integrated_server_version); // Throws
logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
m_progress.download.last_integrated_client_version,
m_progress.download.server_version); // Throws

SaltedFileIdent client_file_ident;
get_history().get_status(m_last_version_available, client_file_ident, m_progress); // Throws
REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
m_progress.download.last_integrated_client_version);
REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
logger.trace("last_version_available = %1", m_last_version_available); // Throws

m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
init_progress_handler();
// In recovery mode, there may be new changesets to upload and nothing left to download.
// In FLX DiscardLocal mode, there may be new commits due to subscription handling.
// For both, we want to allow uploads again without needing external changes to download first.
m_allow_upload = true;
m_delay_uploads = false;

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
Expand Down Expand Up @@ -2321,35 +2352,10 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
return Status::OK(); // Success
}

// if a client reset happens, it will take care of setting the file ident
// and if not, we do it here
bool did_client_reset = false;

// Save some of the client reset info for reporting to the client if an error occurs.
Status cr_status(Status::OK()); // Start with no client reset
ProtocolErrorInfo::Action cr_action = ProtocolErrorInfo::Action::NoAction;
if (auto& cr_config = get_client_reset_config()) {
cr_status = cr_config->error;
cr_action = cr_config->action;
}

try {
did_client_reset = client_reset_if_needed();
}
catch (const std::exception& e) {
auto err_msg = util::format("A fatal error occurred during '%1' client reset for %2: '%3'", cr_action,
cr_status, e.what());
logger.error(err_msg.c_str());
SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
suspend(err_info);
return Status::OK();
}
if (!did_client_reset) {
get_history().set_client_file_ident(client_file_ident,
m_fix_up_object_ids); // Throws
m_progress.download.last_integrated_client_version = 0;
m_progress.upload.client_version = 0;
}
get_history().set_client_file_ident(client_file_ident,
m_fix_up_object_ids); // Throws
m_progress.download.last_integrated_client_version = 0;
m_progress.upload.client_version = 0;

// Ready to send the IDENT message
ensure_enlisted_to_send(); // Throws
Expand Down Expand Up @@ -2772,10 +2778,10 @@ void Session::check_for_download_completion()
if (m_download_progress.server_version < m_server_version_at_last_download_mark)
return;
m_last_triggering_download_mark = m_target_download_mark;
if (REALM_UNLIKELY(!m_allow_upload)) {
if (REALM_UNLIKELY(m_delay_uploads)) {
// Activate the upload process now, and enable immediate reactivation
// after a subsequent fast reconnect.
m_allow_upload = true;
m_delay_uploads = false;
ensure_enlisted_to_send(); // Throws
}
on_download_completion(); // Throws
Expand Down
25 changes: 17 additions & 8 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,10 @@ class ClientImpl::Session {
/// Returns the schema version the synchronization session connects with to the server.
uint64_t get_schema_version() noexcept;

// Returns false if this session is not allowed to send UPLOAD messages to the server to
// update the cursor info, such as during a client reset fresh realm download
bool upload_messages_allowed() noexcept;

/// \brief Initiate the integration of downloaded changesets.
///
/// This function must provide for the passed changesets (if any) to
Expand Down Expand Up @@ -952,7 +956,7 @@ class ClientImpl::Session {
// Set to true when download completion is reached. Set to false after a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to update the comment

// slow reconnect, such that the upload process will become suspended until
// download completion is reached again.
bool m_allow_upload = false;
bool m_delay_uploads = true;

bool m_is_flx_sync_session = false;

Expand Down Expand Up @@ -980,9 +984,6 @@ class ClientImpl::Session {
// `ident == 0` means unassigned.
SaltedFileIdent m_client_file_ident = {0, 0};

// True while this session is in the process of performing a client reset.
bool m_performing_client_reset = false;

// The latest sync progress reported by the server via a DOWNLOAD
// message. See struct SyncProgress for a description. The values stored in
// `m_progress` either are persisted, or are about to be.
Expand Down Expand Up @@ -1320,7 +1321,7 @@ inline void ClientImpl::Session::recognize_sync_version(version_type version)
// Since the deactivation process has not been initiated, the UNBIND
// message cannot have been sent unless the session was suspended due to
// an error.
REALM_ASSERT(m_suspended || !m_unbind_message_sent);
REALM_ASSERT_3(m_suspended, ||, !m_unbind_message_sent);
if (m_ident_message_sent && !m_suspended)
ensure_enlisted_to_send(); // Throws
}
Expand Down Expand Up @@ -1356,7 +1357,7 @@ inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, s
, m_wrapper{wrapper}
{
if (get_client().m_disable_upload_activation_delay)
m_allow_upload = true;
m_delay_uploads = false;
}

inline bool ClientImpl::Session::do_recognize_sync_version(version_type version) noexcept
Expand Down Expand Up @@ -1385,10 +1386,10 @@ inline void ClientImpl::Session::connection_established(bool fast_reconnect)
if (!fast_reconnect && !get_client().m_disable_upload_activation_delay) {
// Disallow immediate activation of the upload process, even if download
// completion was reached during an earlier period of connectivity.
m_allow_upload = false;
m_delay_uploads = true;
}

if (!m_allow_upload) {
if (m_delay_uploads) {
// Request download completion notification
++m_target_download_mark;
}
Expand Down Expand Up @@ -1427,6 +1428,14 @@ inline void ClientImpl::Session::message_sent()
// No message will be sent after the UNBIND message
REALM_ASSERT(!m_unbind_message_send_complete);

// If the client reset config structure is populated, then try to perform
// the client reset diff once the BIND message has been sent successfully
if (m_bind_message_sent && m_state == Active && get_client_reset_config()) {
client_reset_if_needed();
// Ready to send the IDENT message
ensure_enlisted_to_send(); // Throws
}

if (m_unbind_message_sent) {
REALM_ASSERT(!m_enlisted_to_send);

Expand Down
Loading
Loading