Skip to content

Commit

Permalink
Calculate client-side progress from the start of the upload/download
Browse files Browse the repository at this point in the history
  • Loading branch information
kiburtse committed Mar 7, 2024
1 parent 1fcdfda commit ad7df00
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 73 deletions.
156 changes: 115 additions & 41 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,22 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

util::Optional<ProxyConfig> m_proxy_config;

uint_fast64_t m_last_reported_uploadable_bytes = 0;
struct ReportedProgress {
uint64_t snapshot = 0;
uint64_t uploaded = 0;
uint64_t uploadable = 0;
uint64_t downloaded = 0;
uint64_t downloadable = 0;
uint64_t final_uploaded = 0;
uint64_t final_downloaded = 0;
bool compare(const ReportedProgress& o, bool with_snapshot)
{
return (!with_snapshot || snapshot == o.snapshot) && uploaded == o.uploaded &&
uploadable == o.uploadable && final_uploaded == o.final_uploaded && downloaded == o.downloaded &&
downloadable == o.downloadable && final_downloaded == o.final_downloaded;
}
} m_reported_progress;

util::UniqueFunction<ProgressHandler> m_progress_handler;
util::UniqueFunction<ConnectionStateChangeListener> m_connection_state_change_listener;

Expand Down Expand Up @@ -245,7 +260,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;

void on_sync_progress();
void on_upload_progress(bool completed = false, bool only_if_new_uploadable_data = false);
void on_download_progress(bool completed = false, const std::optional<uint64_t>& transient_bytes = {});
void on_upload_completion();
void on_download_completion();
void on_suspended(const SessionErrorInfo& error_info);
Expand All @@ -255,7 +271,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
void on_flx_sync_error(int64_t version, std::string_view err_msg);
void on_flx_sync_version_complete(int64_t version);

void report_progress(bool only_if_new_uploadable_data = false);
void init_progress_handler();
void report_progress(bool is_download, bool is_completed = false, bool only_if_new_uploadable_data = false);

friend class SessionWrapperStack;
friend class ClientImpl::Session;
Expand Down Expand Up @@ -771,14 +788,14 @@ void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_
catch (const IntegrationException& e) {
on_integration_failure(e);
}
notify_download_progress(); // Throws
}


void SessionImpl::on_upload_completion()
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.on_upload_progress(true);
m_wrapper.on_upload_completion(); // Throws
}
}
Expand All @@ -788,6 +805,7 @@ void SessionImpl::on_download_completion()
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.on_download_progress(true);
m_wrapper.on_download_completion(); // Throws
}
}
Expand Down Expand Up @@ -876,6 +894,10 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do
notify_download_progress(bootstrap_store->pending_stats().pending_changeset_bytes);
return true;
}
else {
// FIXME this shouldn't be needed anymore when progress is reported separately for every direction
m_wrapper.m_transient_downloaded_bytes.reset();
}

try {
process_pending_flx_bootstrap();
Expand Down Expand Up @@ -965,8 +987,6 @@ void SessionImpl::process_pending_flx_bootstrap()
on_changesets_integrated(new_version.realm_version, progress);

REALM_ASSERT_3(query_version, !=, -1);
update_download_estimate(1.0);
notify_download_progress(); // Throws
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);

auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
Expand Down Expand Up @@ -1107,6 +1127,22 @@ bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_stat
return false;
}

void SessionImpl::init_progress_handler()
{
if (m_state != State::Unactivated)
return;

m_wrapper.init_progress_handler();
}

void SessionImpl::notify_upload_progress()
{
if (m_state != State::Active)
return;

m_wrapper.on_upload_progress();
}

void SessionImpl::update_download_estimate(double download_estimate)
{
if (m_state != State::Active)
Expand All @@ -1120,8 +1156,7 @@ void SessionImpl::notify_download_progress(const std::optional<uint64_t>& transi
if (m_state != State::Active)
return;

m_wrapper.m_transient_downloaded_bytes = transient_bytes;
m_wrapper.on_sync_progress(); // Throws
m_wrapper.on_download_progress(false, transient_bytes); // Throws
}

util::Future<std::string> SessionImpl::send_test_command(std::string body)
Expand Down Expand Up @@ -1347,8 +1382,7 @@ void SessionWrapper::on_commit(version_type new_version)
return; // Already finalized
SessionImpl& sess = *self->m_sess;
sess.recognize_sync_version(new_version); // Throws
bool only_if_new_uploadable_data = true;
self->report_progress(only_if_new_uploadable_data); // Throws
self->on_upload_progress(/* is_completed = */ false, /* only_if_new_uploadable_data = */ true); // Throws
});
}

Expand Down Expand Up @@ -1609,8 +1643,10 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)
}
}

if (!m_client_reset_config)
report_progress(); // Throws
if (!m_client_reset_config && m_reliable_download_progress) {
// FIXME looks like noop, could be removed without an issue?
on_upload_progress(); // Throws
}
}

void SessionWrapper::force_close()
Expand Down Expand Up @@ -1700,12 +1736,22 @@ inline void SessionWrapper::finalize_before_actualization() noexcept
m_force_closed = true;
}

inline void SessionWrapper::on_upload_progress(bool is_completed, bool only_if_new_uploadable_data)
{
REALM_ASSERT(!m_finalized);

if (!only_if_new_uploadable_data)
m_reliable_download_progress = true;

inline void SessionWrapper::on_sync_progress()
report_progress(/* is_download = */ false, is_completed, only_if_new_uploadable_data); // Throws
}

inline void SessionWrapper::on_download_progress(bool is_completed, const std::optional<uint64_t>& transient_bytes)
{
REALM_ASSERT(!m_finalized);
m_reliable_download_progress = true;
report_progress(); // Throws
m_transient_downloaded_bytes = transient_bytes;
report_progress(/* is_download = */ true, is_completed); // Throws
}


Expand All @@ -1732,9 +1778,6 @@ void SessionWrapper::on_upload_completion()

void SessionWrapper::on_download_completion()
{
m_download_estimate.reset();
m_transient_downloaded_bytes.reset();

while (!m_download_completion_handlers.empty()) {
auto handler = std::move(m_download_completion_handlers.back());
m_download_completion_handlers.pop_back();
Expand Down Expand Up @@ -1795,8 +1838,14 @@ void SessionWrapper::on_connection_state_changed(ConnectionState state,
}
}

void SessionWrapper::init_progress_handler()
{
uint64_t unused = 0;
ClientHistory::get_upload_download_bytes(m_db.get(), m_reported_progress.final_downloaded, unused,
m_reported_progress.final_uploaded, unused, unused);
}

void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
void SessionWrapper::report_progress(bool is_download, bool is_completed, bool only_if_new_uploadable_data)
{
REALM_ASSERT(!m_finalized);
REALM_ASSERT(m_sess);
Expand All @@ -1808,46 +1857,71 @@ void SessionWrapper::report_progress(bool only_if_new_uploadable_data)
if (!m_reliable_download_progress)
return;

std::uint_fast64_t downloaded_bytes = 0;
std::uint_fast64_t downloadable_bytes = 0;
std::uint_fast64_t uploaded_bytes = 0;
std::uint_fast64_t uploadable_bytes = 0;
std::uint_fast64_t snapshot_version = 0;
ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes,
uploadable_bytes, snapshot_version);
ReportedProgress p = m_reported_progress;
ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, p.downloadable, p.uploaded, p.uploadable,
p.snapshot);

// If this progress notification was triggered by a commit being made we
// only want to send it if the uploadable bytes has actually increased,
// and not if it was an empty commit.
if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes)
if (only_if_new_uploadable_data && m_reported_progress.uploadable == p.uploadable)
return;
m_last_reported_uploadable_bytes = uploadable_bytes;

// uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
// is only the remaining to download. This is confusing, so make them use
// the same units.
downloadable_bytes += downloaded_bytes;
p.downloadable += p.downloaded;

m_sess->logger.debug("Progress handler called, downloaded = %1, "
"downloadable = %2, uploaded = %3, "
"uploadable = %4, snapshot version = %5",
downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, snapshot_version);
double upload_estimate = 1.0, download_estimate = 1.0;

// FIXME take into account first initial reported value in this batch
double upload_estimate = uploadable_bytes > 0 ? (uploaded_bytes / double(uploadable_bytes)) : 1;
double download_estimate = downloadable_bytes > 0 ? (downloaded_bytes / double(downloadable_bytes)) : 1;
if ((!is_completed || is_download) && p.uploaded != p.uploadable) {
if (p.uploadable > p.final_uploaded)
upload_estimate = (p.uploaded - p.final_uploaded) / double(p.uploadable - p.final_uploaded);
}

// download estimate only known for flx
if (m_download_estimate) {
download_estimate = m_download_estimate.value();
download_estimate = *m_download_estimate;

// ... and transient bytes are only relevant with bootstrap store
if (m_transient_downloaded_bytes)
downloaded_bytes += *m_transient_downloaded_bytes;
p.downloaded += *m_transient_downloaded_bytes;

// FIXME add some estimate to this
downloadable_bytes = downloaded_bytes;
// FIXME for flx with download estimate these bytest are not known
// provide some sensible value for non-streaming version of object-store callbackas
// until these field are completely removed from the api after pbs deprecation
p.downloadable = p.downloaded;
if (0.01 <= download_estimate && download_estimate <= 0.99)
if (p.downloaded > p.final_downloaded)
p.downloadable = p.final_downloaded + (p.downloaded - p.final_downloaded) / download_estimate;
}
else {
if ((!is_completed || !is_download) && p.downloaded != p.downloadable)
if (p.downloadable > p.final_downloaded)
download_estimate = (p.downloaded - p.final_downloaded) / double(p.downloadable - p.final_downloaded);
}

// exclude upload notification to not report completed sync with empty commits
bool may_skip = m_reported_progress.compare(p, is_download);

if (is_completed) {
if (is_download)
p.final_downloaded = p.downloaded;
else
p.final_uploaded = p.uploaded;
}

m_reported_progress = p;
if (may_skip)
return;

m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7",
p.downloaded, p.downloadable, download_estimate, p.uploaded, p.uploadable, upload_estimate,
p.snapshot);

m_progress_handler(downloaded_bytes, downloadable_bytes, uploaded_bytes, uploadable_bytes, snapshot_version,
download_estimate, upload_estimate);
m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
upload_estimate);
}

util::Future<std::string> SessionWrapper::send_test_command(std::string body)
Expand Down
37 changes: 25 additions & 12 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,21 +1596,31 @@ void Session::on_changesets_integrated(version_type client_version, const SyncPr
{
REALM_ASSERT_EX(m_state == Active, m_state);
REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
m_download_progress = progress.download;
bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
bool download_progressed =
progress.download.server_version > m_progress.download.server_version &&
progress.download.last_integrated_client_version == m_progress.download.last_integrated_client_version;

m_download_progress = progress.download;
m_progress = progress;

if (upload_progressed) {
if (progress.upload.client_version > m_last_version_selected_for_upload) {
if (progress.upload.client_version > m_upload_progress.client_version)
m_upload_progress = progress.upload;
m_last_version_selected_for_upload = progress.upload.client_version;
}

check_for_upload_completion();
bool is_completed = check_for_upload_completion();
if (!is_completed)
notify_upload_progress();
}

do_recognize_sync_version(client_version); // Allows upload process to resume
check_for_download_completion(); // Throws

bool is_completed = check_for_download_completion(); // Throws
if (!is_completed && download_progressed)
notify_download_progress();

// If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
Expand Down Expand Up @@ -1665,6 +1675,7 @@ void Session::activate()
m_last_version_selected_for_upload = m_upload_progress.client_version;
m_download_progress = m_progress.download;
REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
init_progress_handler();

logger.debug("last_version_available = %1", m_last_version_available); // Throws
logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
Expand Down Expand Up @@ -2723,44 +2734,45 @@ Status Session::check_received_sync_progress(const SyncProgress& progress) noexc
}


void Session::check_for_upload_completion()
bool Session::check_for_upload_completion()
{
REALM_ASSERT_EX(m_state == Active, m_state);
if (!m_upload_completion_notification_requested) {
return;
return false;
}

// during an ongoing client reset operation, we never upload anything
if (m_performing_client_reset)
return;
return false;

// Upload process must have reached end of history
REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
if (!scan_complete)
return;
return false;

// All uploaded changesets must have been acknowledged by the server
REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
if (!all_uploads_accepted)
return;
return false;

m_upload_completion_notification_requested = false;
on_upload_completion(); // Throws
return true;
}


void Session::check_for_download_completion()
bool Session::check_for_download_completion()
{
REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
if (m_last_download_mark_received == m_last_triggering_download_mark)
return;
return false;
if (m_last_download_mark_received < m_target_download_mark)
return;
return false;
if (m_download_progress.server_version < m_server_version_at_last_download_mark)
return;
return false;
m_last_triggering_download_mark = m_target_download_mark;
if (REALM_UNLIKELY(!m_allow_upload)) {
// Activate the upload process now, and enable immediate reactivation
Expand All @@ -2769,4 +2781,5 @@ void Session::check_for_download_completion()
ensure_enlisted_to_send(); // Throws
}
on_download_completion(); // Throws
return true;
}
Loading

0 comments on commit ad7df00

Please sign in to comment.