Skip to content

Commit

Permalink
Fix FLX download progress reporting
Browse files Browse the repository at this point in the history
We need to store the download progress for each batch of a bootstrap and not
just at the end for it to be useful in any way.

The server will sometimes send us DOWNLOAD messages with a non-one estimate
followed by a one estimate where the byte-level information is the same (as the
final message is empty). When this happens we need to report the download
completion to the user, so add the estimate to the fields checked for changes.

A subscription change which doesn't actually change what set of objects is in
view can result in an empty DOWNLOAD message with no changes other than the
query version, and we should report that too.
  • Loading branch information
tgoyne committed Jul 9, 2024
1 parent 3334d38 commit 39d7751
Show file tree
Hide file tree
Showing 17 changed files with 279 additions and 90 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### Enhancements
* <New feature description> (PR [#????](https://github.com/realm/realm-core/pull/????))
* None.
* FLX download progress was only updated when bootstraps completed, making it always be 0 before the first completion and then forever 1. ([#7869](https://github.com/realm/realm-core/issues/7869), since v14.10.3)

### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
Expand Down
4 changes: 2 additions & 2 deletions dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ VERSION: 14.10.3
OPENSSL_VERSION: 3.2.0
ZLIB_VERSION: 1.2.13
# https://github.com/10gen/baas/commits
# 9d1b4d6 is 2024 May 8
BAAS_VERSION: 9d1b4d628babadfb606ebcadb93b1e5cae3c9565
# 998d92f is 2024 Jul 8
BAAS_VERSION: 998d92fdb6cce9df1d3e6255105fa75c1d940ac0
19 changes: 19 additions & 0 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ void AsyncOpenTask::start(AsyncOpenCallback callback)
session->revive_if_needed();
}

util::Future<ThreadSafeReference> AsyncOpenTask::start()
{
auto pf = util::make_promise_future<ThreadSafeReference>();
start([promise = std::move(pf.promise)](ThreadSafeReference&& ref, std::exception_ptr e) mutable {
if (e) {
try {
std::rethrow_exception(e);
}
catch (...) {
promise.set_error(exception_to_status());
}
}
else {
promise.emplace_value(std::move(ref));
}
});
return std::move(pf.future);
}

void AsyncOpenTask::cancel()
{
std::shared_ptr<SyncSession> session;
Expand Down
9 changes: 9 additions & 0 deletions src/realm/object-store/sync/async_open_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <realm/util/checked_mutex.hpp>
#include <realm/util/functional.hpp>
#include <realm/util/future.hpp>

#include <memory>
#include <vector>
Expand All @@ -47,13 +48,21 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
std::shared_ptr<realm::SyncSession> session, bool db_open_for_the_first_time);
AsyncOpenTask(const AsyncOpenTask&) = delete;
AsyncOpenTask& operator=(const AsyncOpenTask&) = delete;

// Starts downloading the Realm. The callback will be triggered either when the download completes
// or an error is encountered.
//
// If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled,
// the other tasks will receive a "Cancelled" exception.
void start(AsyncOpenCallback callback) REQUIRES(!m_mutex);

// Starts downloading the Realm. The future will be fulfilled either when the download completes
// or an error is encountered.
//
// If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled,
// the other tasks will receive a cancelled Status
util::Future<ThreadSafeReference> start() REQUIRES(!m_mutex);

// Cancels the download and stops the session. No further functions should be called on this class.
void cancel() REQUIRES(!m_mutex);

Expand Down
37 changes: 21 additions & 16 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,15 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
uint64_t uploadable;
uint64_t downloaded;
uint64_t downloadable;
int64_t query_version;
double download_estimate;

// Does not check snapshot
bool operator==(const ReportedProgress& p) const noexcept
{
return uploaded == p.uploaded && uploadable == p.uploadable && downloaded == p.downloaded &&
downloadable == p.downloadable;
downloadable == p.downloadable && query_version == p.query_version &&
download_estimate == p.download_estimate;
}
};
std::optional<ReportedProgress> m_reported_progress;
Expand Down Expand Up @@ -801,7 +804,8 @@ void SessionImpl::update_subscription_version_info()
}

bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
int64_t query_version, const ReceivedChangesets& received_changesets)
int64_t query_version, DownloadableProgress download_progress,
const ReceivedChangesets& received_changesets)
{
// Ignore the call if the session is not active
if (m_state != State::Active) {
Expand All @@ -820,7 +824,8 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do

bool new_batch = false;
try {
bootstrap_store->add_batch(query_version, std::move(maybe_progress), received_changesets, &new_batch);
bootstrap_store->add_batch(query_version, std::move(maybe_progress), download_progress, received_changesets,
&new_batch);
}
catch (const LogicError& ex) {
if (ex.code() == ErrorCodes::LimitExceeded) {
Expand Down Expand Up @@ -904,7 +909,6 @@ void SessionImpl::process_pending_flx_bootstrap()

auto batch_state =
pending_batch.remaining_changesets > 0 ? DownloadBatchState::MoreToCome : DownloadBatchState::LastInBatch;
uint64_t downloadable_bytes = 0;
query_version = pending_batch.query_version;
bool simulate_integration_error =
(m_wrapper.m_simulate_integration_error && !pending_batch.changesets.empty());
Expand All @@ -916,8 +920,8 @@ void SessionImpl::process_pending_flx_bootstrap()
batch_state, pending_batch.changesets.size());

history.integrate_server_changesets(
*pending_batch.progress, downloadable_bytes, pending_batch.changesets, new_version, batch_state, logger,
transact, [&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
*pending_batch.progress, 1.0, pending_batch.changesets, new_version, batch_state, logger, transact,
[&](const TransactionRef& tr, util::Span<Changeset> changesets_applied) {
REALM_ASSERT_3(changesets_applied.size(), <=, pending_batch.changesets.size());
bootstrap_store->pop_front_pending(tr, changesets_applied.size());
});
Expand Down Expand Up @@ -1647,6 +1651,7 @@ void SessionWrapper::check_progress()
DownloadableProgress downloadable;
ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
uploaded_version);
p.query_version = m_flx_last_seen_version;

report_progress(p, downloadable);
report_upload_completion(uploaded_version);
Expand Down Expand Up @@ -1701,28 +1706,28 @@ void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress d
upload_estimate = calculate_progress(p.uploaded, p.uploadable, m_final_uploaded);

bool download_completed = p.downloaded == 0;
double download_estimate = 1.00;
p.download_estimate = 1.00;
if (m_flx_pending_bootstrap_store) {
if (m_flx_pending_bootstrap_store->has_pending()) {
download_estimate = downloadable.as_estimate();
p.download_estimate = downloadable.as_estimate();
p.downloaded += m_flx_pending_bootstrap_store->pending_stats().pending_changeset_bytes;
}
download_completed = download_estimate >= 1.0;
download_completed = p.download_estimate >= 1.0;

// for flx with download estimate these bytes are not known
// provide some sensible value for non-streaming version of object-store callbacks
// until these field are completely removed from the api after pbs deprecation
p.downloadable = p.downloaded;
if (download_estimate > 0 && download_estimate < 1.0 && p.downloaded > m_final_downloaded)
p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / download_estimate);
if (p.download_estimate > 0 && p.download_estimate < 1.0 && p.downloaded > m_final_downloaded)
p.downloadable = m_final_downloaded + uint64_t((p.downloaded - m_final_downloaded) / p.download_estimate);
}
else {
// uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes
// is only the remaining to download. This is confusing, so make them use
// the same units.
p.downloadable = downloadable.as_bytes() + p.downloaded;
if (!download_completed)
download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
p.download_estimate = calculate_progress(p.downloaded, p.downloadable, m_final_downloaded);
}

if (download_completed)
Expand All @@ -1745,12 +1750,12 @@ void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress d
m_sess->logger.debug(
"Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
to_str(upload_estimate), p.snapshot, m_flx_active_version);
p.downloaded, p.downloadable, to_str(p.download_estimate), p.uploaded, p.uploadable,
to_str(upload_estimate), p.snapshot, p.query_version);
}

m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
upload_estimate, m_flx_last_seen_version);
m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, p.download_estimate,
upload_estimate, p.query_version);
}

util::Future<std::string> SessionWrapper::send_test_command(std::string body)
Expand Down
12 changes: 12 additions & 0 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,18 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada
trim_sync_history(); // Throws
}

void ClientHistory::set_download_progress(Transaction& tr, DownloadableProgress p)
{
using gf = _impl::GroupFriend;
ref_type ref = gf::get_history_ref(tr);
REALM_ASSERT(ref);
Array root(gf::get_alloc(tr));
root.init_from_ref(ref);
gf::set_history_parent(tr, root);
REALM_ASSERT(root.size() > s_progress_uploadable_bytes_iip);
root.set(s_progress_downloadable_bytes_iip,
RefOrTagged::make_tagged(p.as_bytes())); // Throws
}

void ClientHistory::trim_ct_history()
{
Expand Down
9 changes: 9 additions & 0 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ class ClientHistory final : public _impl::History, public TransformHistory {
std::uint_fast64_t&, std::uint_fast64_t&, version_type&);
static void get_upload_download_state(DB*, std::uint_fast64_t&, std::uint_fast64_t&);

/// Record the current download progress.
///
/// This is used when storing FLX bootstraps to make the progress available
/// to other processes which are observing the file. It must be called
/// inside of a write transaction. The data stored here is only meaningful
/// until the next call of integrate_server_changesets(), which will
/// overwrite it.
static void set_download_progress(Transaction& tr, DownloadableProgress);

// Overriding member functions in realm::TransformHistory
version_type find_history_entry(version_type, version_type, HistoryEntry&) const noexcept override;
ChunkedBinaryData get_reciprocal_transform(version_type, bool&) const override;
Expand Down
3 changes: 2 additions & 1 deletion src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2467,7 +2467,8 @@ Status Session::receive_download_message(const DownloadMessage& message)
}
REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);

if (process_flx_bootstrap_message(progress, batch_state, query_version, message.changesets)) {
if (process_flx_bootstrap_message(progress, batch_state, query_version, message.downloadable,
message.changesets)) {
clear_resumption_delay_state();
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,8 @@ class ClientImpl::Session {
// message then this is a noop and will return false. Otherwise this will return true
// and no further processing of the download message should take place.
bool process_flx_bootstrap_message(const SyncProgress& progress, DownloadBatchState batch_state,
int64_t query_version, const ReceivedChangesets& received_changesets);
int64_t query_version, DownloadableProgress download_progress,
const ReceivedChangesets& received_changesets);

// Processes any pending FLX bootstraps, if one exists. Otherwise this is a noop.
void process_pending_flx_bootstrap();
Expand Down
4 changes: 4 additions & 0 deletions src/realm/sync/noinst/pending_bootstrap_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "realm/list.hpp"
#include "realm/query.hpp"
#include "realm/sync/changeset_parser.hpp"
#include "realm/sync/noinst/client_history_impl.hpp"
#include "realm/sync/noinst/protocol_codec.hpp"
#include "realm/sync/noinst/sync_metadata_schema.hpp"
#include "realm/sync/protocol.hpp"
Expand Down Expand Up @@ -127,6 +128,7 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger)
}

void PendingBootstrapStore::add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
DownloadableProgress download_progress,
const _impl::ClientProtocol::ReceivedChangesets& changesets,
bool* created_new_batch_out)
{
Expand Down Expand Up @@ -176,6 +178,8 @@ void PendingBootstrapStore::add_batch(int64_t query_version, util::Optional<Sync
cur_changeset.set(m_changeset_data, compressed_data);
}

ClientHistory::set_download_progress(*tr, download_progress);

tr->commit();

if (created_new_batch_out) {
Expand Down
3 changes: 2 additions & 1 deletion src/realm/sync/noinst/pending_bootstrap_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class PendingBootstrapStore {

// Adds a set of changesets to the store.
void add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
const std::vector<RemoteChangeset>& changesets, bool* created_new_batch);
DownloadableProgress download_progress, const std::vector<RemoteChangeset>& changesets,
bool* created_new_batch);

void clear();
void clear(Transaction& wt);
Expand Down
17 changes: 1 addition & 16 deletions test/object-store/sync/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4471,22 +4471,7 @@ TEST_CASE("app: full-text compatible with sync", "[sync][app][baas]") {
INFO("realm opened with async open");
auto async_open_task = Realm::get_synchronized_realm(config);

auto [realm_promise, realm_future] = util::make_promise_future<ThreadSafeReference>();
async_open_task->start(
[promise = std::move(realm_promise)](ThreadSafeReference ref, std::exception_ptr ouch) mutable {
if (ouch) {
try {
std::rethrow_exception(ouch);
}
catch (...) {
promise.set_error(exception_to_status());
}
}
else {
promise.emplace_value(std::move(ref));
}
});

auto realm_future = async_open_task->start();
realm = Realm::get_shared_realm(std::move(realm_future.get()));
}

Expand Down
16 changes: 2 additions & 14 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4690,20 +4690,8 @@ TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset]
};

auto realm_task = Realm::get_synchronized_realm(realm_config);
auto realm_pf = util::make_promise_future<SharedRealm>();
realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) {
auto& promise = realm_pf.promise;
try {
if (ex) {
std::rethrow_exception(ex);
}
promise.emplace_value(Realm::get_shared_realm(std::move(ref), util::Scheduler::make_dummy()));
}
catch (...) {
promise.set_error(exception_to_status());
}
});
auto realm = realm_pf.future.get();
auto realm_future = realm_task->start();
auto realm = Realm::get_shared_realm(std::move(realm_future).get(), util::Scheduler::make_dummy());
before_callback_called.future.get();
after_callback_called.future.get();
REQUIRE(subscription_invoked.load());
Expand Down
Loading

0 comments on commit 39d7751

Please sign in to comment.