diff --git a/CHANGELOG.md b/CHANGELOG.md index 2092f71ded4..b92cce59ae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,7 @@ # NEXT RELEASE ### Enhancements -* (PR [#????](https://github.com/realm/realm-core/pull/????)) -* None. +* Introduce sync 'progress_estimate' parameter (value from 0.0 to 1.0) for existing sync 'ProgressNotifierCallback' api to report sync progress on current batch of upload/download until completion ([#7450](https://github.com/realm/realm-core/issues/7450)) ### Fixed * ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) diff --git a/bindgen/spec.yml b/bindgen/spec.yml index 46cb5b55e23..26b056f79ec 100644 --- a/bindgen/spec.yml +++ b/bindgen/spec.yml @@ -1344,7 +1344,7 @@ classes: methods: start: '(callback: AsyncCallback<(realm: Nullable, error: Nullable) off_thread>)' cancel: () - register_download_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t) off_thread) -> uint64_t' + register_download_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t, progress_estimate: double) off_thread) -> uint64_t' unregister_download_progress_notifier: '(token: uint64_t)' SyncSession: @@ -1359,7 +1359,7 @@ classes: methods: wait_for_upload_completion: '(callback: AsyncCallback<(err: Status) off_thread>)' wait_for_download_completion: '(callback: AsyncCallback<(err: Status) off_thread>)' - register_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t) off_thread, direction: ProgressDirection, is_streaming: bool) -> uint64_t' + register_progress_notifier: '(callback: (transferred_bytes: uint64_t, transferrable_bytes: uint64_t, progress_estimate: double) off_thread, direction: ProgressDirection, is_streaming: bool) -> uint64_t' unregister_progress_notifier: '(token: uint64_t)' register_connection_change_callback: '(callback: (old_state: SyncSessionConnectionState, new_state: SyncSessionConnectionState) off_thread) -> uint64_t' unregister_connection_change_callback: '(token: uint64_t)' diff --git a/src/realm.h b/src/realm.h index fb9cd347e92..4cedcd7824c 100644 --- a/src/realm.h +++ b/src/realm.h @@ -3570,7 +3570,7 @@ typedef void (*realm_sync_connection_state_changed_func_t)(realm_userdata_t user realm_sync_connection_state_e old_state, realm_sync_connection_state_e new_state); typedef void (*realm_sync_progress_func_t)(realm_userdata_t userdata, uint64_t transferred_bytes, - uint64_t total_bytes); + uint64_t total_bytes, double progress_estimate); typedef void (*realm_sync_error_handler_func_t)(realm_userdata_t userdata, realm_sync_session_t*, const realm_sync_error_t); typedef bool (*realm_sync_ssl_verify_func_t)(realm_userdata_t userdata, const char* server_address, short server_port, diff --git a/src/realm/object-store/c_api/sync.cpp b/src/realm/object-store/c_api/sync.cpp index 8038dc3f997..8a34d87f66e 100644 --- a/src/realm/object-store/c_api/sync.cpp +++ b/src/realm/object-store/c_api/sync.cpp @@ -730,9 +730,9 @@ realm_async_open_task_register_download_progress_notifier(realm_async_open_task_ realm_userdata_t userdata, realm_free_userdata_func_t userdata_free) noexcept { - auto cb = [notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](uint64_t transferred, - uint64_t transferrable) { - notifier(userdata.get(), transferred, transferrable); + auto cb = [notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))]( + uint64_t transferred, uint64_t transferrable, double progress_estimate) { + notifier(userdata.get(), transferred, transferrable, progress_estimate); }; auto token = (*task)->register_download_progress_notifier(std::move(cb)); return new realm_async_open_task_progress_notification_token_t{(*task), token}; @@ -819,9 +819,9 @@ RLM_API realm_sync_session_connection_state_notification_token_t* realm_sync_ses bool is_streaming, realm_userdata_t userdata, realm_free_userdata_func_t userdata_free) noexcept { std::function cb = - [notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))](uint64_t transferred, - uint64_t transferrable) { - notifier(userdata.get(), transferred, transferrable); + [notifier, userdata = SharedUserdata(userdata, FreeUserdata(userdata_free))]( + uint64_t transferred, uint64_t transferrable, double progress_estimate) { + notifier(userdata.get(), transferred, transferrable, progress_estimate); }; auto token = (*session)->register_progress_notifier(std::move(cb), SyncSession::ProgressDirection(direction), is_streaming); diff --git a/src/realm/object-store/sync/async_open_task.cpp b/src/realm/object-store/sync/async_open_task.cpp index da7568c212f..2f76a0880ab 100644 --- a/src/realm/object-store/sync/async_open_task.cpp +++ b/src/realm/object-store/sync/async_open_task.cpp @@ -90,13 +90,12 @@ void AsyncOpenTask::cancel() } } -uint64_t -AsyncOpenTask::register_download_progress_notifier(std::function&& callback) +uint64_t AsyncOpenTask::register_download_progress_notifier(std::function&& callback) { util::CheckedLockGuard lock(m_mutex); if (m_session) { auto token = m_session->register_progress_notifier(std::move(callback), - SyncSession::ProgressDirection::download, false); + SyncSession::ProgressDirection::download, true); m_registered_callbacks.emplace_back(token); return token; } diff --git a/src/realm/object-store/sync/async_open_task.hpp b/src/realm/object-store/sync/async_open_task.hpp index e1f28add434..b3498d613f4 100644 --- a/src/realm/object-store/sync/async_open_task.hpp +++ b/src/realm/object-store/sync/async_open_task.hpp @@ -57,8 +57,10 @@ class AsyncOpenTask : public std::enable_shared_from_this { // Cancels the download and stops the session. No further functions should be called on this class. void cancel() REQUIRES(!m_mutex); - uint64_t register_download_progress_notifier( - std::function&& callback) REQUIRES(!m_mutex); + using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes, + double progress_estimate); + uint64_t register_download_progress_notifier(std::function&& callback) + REQUIRES(!m_mutex); void unregister_download_progress_notifier(uint64_t token) REQUIRES(!m_mutex); private: diff --git a/src/realm/object-store/sync/sync_session.cpp b/src/realm/object-store/sync/sync_session.cpp index 5560a7c61ce..a85c13472fc 100644 --- a/src/realm/object-store/sync/sync_session.cpp +++ b/src/realm/object-store/sync/sync_session.cpp @@ -832,9 +832,11 @@ void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status erro } void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, - uint64_t uploadable, uint64_t download_version, uint64_t snapshot_version) + uint64_t uploadable, uint64_t snapshot_version, double download_estimate, + double upload_estimate) { - m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, download_version, snapshot_version); + m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate, + upload_estimate); } static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config, @@ -971,10 +973,11 @@ void SyncSession::create_sync_session() // Set up the wrapped progress handler callback m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, uint_fast64_t uploadable, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double download_estimate, + double upload_estimate) { if (auto self = weak_self.lock()) { - self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, progress_version, - snapshot_version); + self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version, + download_estimate, upload_estimate); } }); @@ -1225,10 +1228,9 @@ void SyncSession::initiate_access_token_refresh() } } -void SyncSession::add_completion_callback(util::UniqueFunction callback, - _impl::SyncProgressNotifier::NotifierType direction) +void SyncSession::add_completion_callback(util::UniqueFunction callback, ProgressDirection direction) { - bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download); + bool is_download = (direction == ProgressDirection::download); m_completion_request_counter++; m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter, @@ -1556,16 +1558,13 @@ void SyncProgressNotifier::unregister_callback(uint64_t token) } void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, - uint64_t download_version, uint64_t snapshot_version) + uint64_t snapshot_version, double download_estimate, double upload_estimate) { - // Ignore progress messages from before we first receive a DOWNLOAD message - if (download_version == 0) - return; - std::vector> invocations; { std::lock_guard lock(m_mutex); - m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, snapshot_version}; + m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, + upload_estimate, download_estimate, snapshot_version}; for (auto it = m_packages.begin(); it != m_packages.end();) { bool should_delete = false; @@ -1589,13 +1588,15 @@ SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current { uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded; uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable; - if (!is_streaming) { - // If the sync client has not yet processed all of the local - // transactions then the uploadable data is incorrect and we should - // not invoke the callback - if (!is_download && snapshot_version > current_progress.snapshot_version) - return [] {}; + double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate; + // If the sync client has not yet processed all of the local + // transactions then the uploadable data is incorrect and we should + // not invoke the callback + if (!is_download && snapshot_version > current_progress.snapshot_version) + return [] {}; + + if (!is_streaming) { // The initial download size we get from the server is the uncompacted // size, and so the download may complete before we actually receive // that much data. When that happens, transferrable will drop and we @@ -1609,7 +1610,7 @@ SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current // as were originally considered transferrable. is_expired = !is_streaming && transferred >= transferrable; return [=, notifier = notifier] { - notifier(transferred, transferrable); + notifier(transferred, transferrable, progress_estimate); }; } diff --git a/src/realm/object-store/sync/sync_session.hpp b/src/realm/object-store/sync/sync_session.hpp index ac564b06ab1..57b429c24dc 100644 --- a/src/realm/object-store/sync/sync_session.hpp +++ b/src/realm/object-store/sync/sync_session.hpp @@ -52,14 +52,15 @@ struct SyncClient; class SyncProgressNotifier { public: enum class NotifierType { upload, download }; - using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes); + using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes, + double progress_estimate); uint64_t register_callback(std::function, NotifierType direction, bool is_streaming); void unregister_callback(uint64_t); void set_local_version(uint64_t); - void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, uint64_t, - uint64_t); + void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable, + uint64_t snapshot_version, double download_estimate = 1.0, double upload_estimate = 1.0); private: mutable std::mutex m_mutex; @@ -70,6 +71,8 @@ class SyncProgressNotifier { uint64_t downloadable; uint64_t uploaded; uint64_t downloaded; + double upload_estimate; + double download_estimate; uint64_t snapshot_version; }; @@ -415,7 +418,7 @@ class SyncSession : public std::enable_shared_from_this { void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex); enum class ShouldBackup { yes, no }; void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex); - void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t); + void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double); void handle_new_flx_sync_query(int64_t version); void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex); diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 2a35c746aab..99f9113791e 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -171,7 +171,16 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener util::Optional m_proxy_config; - uint_fast64_t m_last_reported_uploadable_bytes = 0; + struct ReportedProgress { + uint64_t snapshot = 0; + uint64_t uploaded = 0; + uint64_t uploadable = 0; + uint64_t downloaded = 0; + uint64_t downloadable = 0; + uint64_t final_uploaded = 0; + uint64_t final_downloaded = 0; + } m_reported_progress; + util::UniqueFunction m_progress_handler; util::UniqueFunction m_connection_state_change_listener; @@ -224,6 +233,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener // the download progress is likely completely out of date. bool m_reliable_download_progress = false; + std::optional m_download_estimate; + std::optional m_bootstrap_store_bytes; + // Set to point to an activated session object during actualization of the // session wrapper. Set to null during finalization of the session // wrapper. Both modifications are guaranteed to be performed by the event @@ -257,7 +269,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0; std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0; - void on_sync_progress(); + void on_upload_progress(bool only_if_new_uploadable_data = false); + void on_download_progress(const std::optional& bootstrap_store_bytes = {}); void on_upload_completion(); void on_download_completion(); void on_suspended(const SessionErrorInfo& error_info); @@ -267,7 +280,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener void on_flx_sync_error(int64_t version, std::string_view err_msg); void on_flx_sync_version_complete(int64_t version); - void report_progress(bool only_if_new_uploadable_data = false); + void init_progress_handler(); + // only_if_new_uploadable_data can be true only if is_download is false + void report_progress(bool is_download, bool only_if_new_uploadable_data = false); friend class SessionWrapperStack; friend class ClientImpl::Session; @@ -762,12 +777,11 @@ void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_ // Fake it for "dry run" mode client_version = m_last_version_available + 1; } - on_changesets_integrated(client_version, progress); // Throws + on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws } catch (const IntegrationException& e) { on_integration_failure(e); } - m_wrapper.on_sync_progress(); // Throws } @@ -869,8 +883,17 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action); if (batch_state == DownloadBatchState::MoreToCome) { + notify_download_progress(bootstrap_store->pending_stats().pending_changeset_bytes); return true; } + else { + // FIXME (#7451) this variable is not needed in principle, and bootstrap store bytes could be passed just + // through notify_download_progress, but since it is needed in report_progress, and it is also called on + // upload progress for now until progress is reported separately. As soon as we understand here that there + // are no more changesets for bootstrap store, and we want to process bootstrap, we don't need to notify + // intermediate progress - so reset these bytes to not accidentally double report them. + m_wrapper.m_bootstrap_store_bytes.reset(); + } try { process_pending_flx_bootstrap(); @@ -957,10 +980,9 @@ void SessionImpl::process_pending_flx_bootstrap() std::chrono::duration_cast(duration).count(), pending_batch.remaining_changesets); } - on_changesets_integrated(new_version.realm_version, progress); + on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0); REALM_ASSERT_3(query_version, !=, -1); - m_wrapper.on_sync_progress(); on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch); auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version, @@ -1101,6 +1123,38 @@ bool SessionImpl::is_steady_state_download_message(DownloadBatchState batch_stat return false; } +void SessionImpl::init_progress_handler() +{ + if (m_state != State::Unactivated && m_state != State::Active) + return; + + m_wrapper.init_progress_handler(); +} + +void SessionImpl::notify_upload_progress() +{ + if (m_state != State::Active) + return; + + m_wrapper.on_upload_progress(); +} + +void SessionImpl::update_download_estimate(double download_estimate) +{ + if (m_state != State::Active) + return; + + m_wrapper.m_download_estimate = download_estimate; +} + +void SessionImpl::notify_download_progress(const std::optional& bootstrap_store_bytes) +{ + if (m_state != State::Active) + return; + + m_wrapper.on_download_progress(bootstrap_store_bytes); // Throws +} + util::Future SessionImpl::send_test_command(std::string body) { if (m_state != State::Active) { @@ -1339,8 +1393,7 @@ void SessionWrapper::on_commit(version_type new_version) return; // Already finalized SessionImpl& sess = *self->m_sess; sess.recognize_sync_version(new_version); // Throws - bool only_if_new_uploadable_data = true; - self->report_progress(only_if_new_uploadable_data); // Throws + self->on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws }); } @@ -1604,7 +1657,7 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) } if (!m_client_reset_config) - report_progress(); // Throws + on_upload_progress(/* only_if_new_uploadable_data = */ true); // Throws } void SessionWrapper::force_close() @@ -1695,12 +1748,25 @@ inline void SessionWrapper::finalize_before_actualization() noexcept m_force_closed = true; } +inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data) +{ + REALM_ASSERT(!m_finalized); + + // don't set the flag in case of the progress change of local origin + // progress should be delayed until first DOWNLOAD message received + // since uploads are not allowed before that and can't progress + if (!only_if_new_uploadable_data) + m_reliable_download_progress = true; -inline void SessionWrapper::on_sync_progress() + report_progress(/* is_download = */ false, only_if_new_uploadable_data); // Throws +} + +inline void SessionWrapper::on_download_progress(const std::optional& bootstrap_store_bytes) { REALM_ASSERT(!m_finalized); m_reliable_download_progress = true; - report_progress(); // Throws + m_bootstrap_store_bytes = bootstrap_store_bytes; + report_progress(/* is_download = */ true); // Throws } @@ -1787,47 +1853,121 @@ void SessionWrapper::on_connection_state_changed(ConnectionState state, } } +void SessionWrapper::init_progress_handler() +{ + uint64_t unused = 0; + ClientHistory::get_upload_download_bytes(m_db.get(), m_reported_progress.final_downloaded, unused, + m_reported_progress.final_uploaded, unused, unused); +} -void SessionWrapper::report_progress(bool only_if_new_uploadable_data) +void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadable_data) { REALM_ASSERT(!m_finalized); REALM_ASSERT(m_sess); + REALM_ASSERT(!(only_if_new_uploadable_data && is_download)); if (!m_progress_handler) return; - std::uint_fast64_t downloaded_bytes = 0; - std::uint_fast64_t downloadable_bytes = 0; - std::uint_fast64_t uploaded_bytes = 0; - std::uint_fast64_t uploadable_bytes = 0; - std::uint_fast64_t snapshot_version = 0; - ClientHistory::get_upload_download_bytes(m_db.get(), downloaded_bytes, downloadable_bytes, uploaded_bytes, - uploadable_bytes, snapshot_version); + // Ignore progress messages from before we first receive a DOWNLOAD message + if (!m_reliable_download_progress) + return; + + ReportedProgress p = m_reported_progress; + ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, p.downloadable, p.uploaded, p.uploadable, + p.snapshot); // If this progress notification was triggered by a commit being made we // only want to send it if the uploadable bytes has actually increased, // and not if it was an empty commit. - if (only_if_new_uploadable_data && m_last_reported_uploadable_bytes == uploadable_bytes) + if (only_if_new_uploadable_data && m_reported_progress.uploadable == p.uploadable) return; - m_last_reported_uploadable_bytes = uploadable_bytes; // uploadable_bytes is uploaded + remaining to upload, while downloadable_bytes // is only the remaining to download. This is confusing, so make them use // the same units. - std::uint_fast64_t total_bytes = downloaded_bytes + downloadable_bytes; - - m_sess->logger.debug("Progress handler called, downloaded = %1, " - "downloadable(total) = %2, uploaded = %3, " - "uploadable = %4, reliable_download_progress = %5, " - "snapshot version = %6", - downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, - m_reliable_download_progress, snapshot_version); - - // FIXME: Why is this boolean status communicated to the application as - // a 64-bit integer? Also, the name `progress_version` is confusing. - std::uint_fast64_t progress_version = (m_reliable_download_progress ? 1 : 0); - m_progress_handler(downloaded_bytes, total_bytes, uploaded_bytes, uploadable_bytes, progress_version, - snapshot_version); + p.downloadable += p.downloaded; + + bool is_completed = false; + if (is_download) { + if (m_download_estimate) + is_completed = *m_download_estimate >= 1.0; + else + is_completed = p.downloaded == p.downloadable; + } + else { + is_completed = p.uploaded == p.uploadable; + } + + auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) { + REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable); + REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable); + + // The effect of this calculation is that if new bytes are added for download/upload, + // the progress estimate doesn't go back to zero, but it goes back to some non-zero percentage. + // This calculation allows a clean progression from 0 to 1.0 even if the new data is added for the sync + // before progress has reached 1.0. + // Then once it is at 1.0 the next batch of changes will restart the estimate at 0. + // Example for upload progress reported: + // 0 -> 1.0 -> new data added -> 0.0 -> 0.1 ...sync... -> 0.4 -> new data added -> 0.3 ...sync.. -> 1.0 + + double progress_estimate = 1.0; + if (final_transferred < transferable && transferred < transferable) + progress_estimate = (transferred - final_transferred) / double(transferable - final_transferred); + return progress_estimate; + }; + + double upload_estimate = 1.0, download_estimate = 1.0; + + // calculate estimate for both download/upload since the progress is reported all at once + if (!is_completed || is_download) + upload_estimate = calculate_progress(p.uploaded, p.uploadable, p.final_uploaded); + + // download estimate only known for flx + if (m_download_estimate) { + download_estimate = *m_download_estimate; + + // ... bootstrap store bytes should be null after initial sync when every changeset integrated immediately + if (m_bootstrap_store_bytes) + p.downloaded += *m_bootstrap_store_bytes; + + // FIXME for flx with download estimate these bytes are not known + // provide some sensible value for non-streaming version of object-store callbacks + // until these field are completely removed from the api after pbs deprecation + p.downloadable = p.downloaded; + if (0.01 <= download_estimate && download_estimate <= 0.99) + if (p.downloaded > p.final_downloaded) + p.downloadable = p.final_downloaded + (p.downloaded - p.final_downloaded) / download_estimate; + } + else { + if (!is_completed || !is_download) + download_estimate = calculate_progress(p.downloaded, p.downloadable, p.final_downloaded); + } + + if (is_completed) { + if (is_download) + p.final_downloaded = p.downloaded; + else + p.final_uploaded = p.uploaded; + } + + m_reported_progress = p; + + if (m_sess->logger.would_log(Logger::Level::debug)) { + auto to_str = [](double d) { + std::ostringstream ss; + // progress estimate string in the DOWNLOAD message isn't expected to have more than 4 digits of precision + ss << std::fixed << std::setprecision(4) << d; + return ss.str(); + }; + m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, " + "uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7", + p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable, + to_str(upload_estimate), p.snapshot); + } + + m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate, + upload_estimate); } util::Future SessionWrapper::send_test_command(std::string body) diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index 15667eeb11f..c9bffdbe8bd 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -161,7 +161,8 @@ class Session { using SyncTransactCallback = void(VersionID old_version, VersionID new_version); using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version); + std::uint_fast64_t snapshot_version, double download_estimate, + double upload_estimate); using WaitOperCompletionHandler = util::UniqueFunction; using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data, size_t pem_size, int preverify_ok, int depth); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 1d73f893368..dd2c1931b96 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -396,7 +396,7 @@ void Connection::cancel_reconnect_delay() // soon as there are any sessions that are both active and unsuspended. } -void ClientImpl::Connection::finish_session_deactivation(Session* sess) +void Connection::finish_session_deactivation(Session* sess) { REALM_ASSERT(sess->m_state == Session::Deactivated); auto ident = sess->m_ident; @@ -1372,19 +1372,14 @@ void Connection::receive_ident_message(session_ident_type session_ident, SaltedF close_due_to_protocol_error(std::move(status)); // Throws } -void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress, - std::uint_fast64_t downloadable_bytes, int64_t query_version, - DownloadBatchState batch_state, - const ReceivedChangesets& received_changesets) +void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message) { Session* sess = find_and_validate_session(session_ident, "DOWNLOAD"); if (REALM_UNLIKELY(!sess)) { return; } - if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version, - received_changesets); - !status.is_ok()) { + if (auto status = sess->receive_download_message(message); !status.is_ok()) { close_due_to_protocol_error(std::move(status)); } } @@ -1619,13 +1614,16 @@ void Session::on_integration_failure(const IntegrationException& error) } } -void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress) +void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress, + bool changesets_integrated) { REALM_ASSERT_EX(m_state == Active, m_state); REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version); - m_download_progress = progress.download; bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version); + + m_download_progress = progress.download; m_progress = progress; + if (upload_progressed) { if (progress.upload.client_version > m_last_version_selected_for_upload) { if (progress.upload.client_version > m_upload_progress.client_version) @@ -1633,11 +1631,18 @@ void Session::on_changesets_integrated(version_type client_version, const SyncPr m_last_version_selected_for_upload = progress.upload.client_version; } + notify_upload_progress(); check_for_upload_completion(); } - do_recognize_sync_version(client_version); // Allows upload process to resume - check_for_download_completion(); // Throws + bool resume_upload = do_recognize_sync_version(client_version); // Allows upload process to resume + + // notify also when final DOWNLOAD received with no changesets + bool download_progressed = changesets_integrated || (!upload_progressed && resume_upload); + if (download_progressed) + notify_download_progress(); + + check_for_download_completion(); // Throws // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server. if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) { @@ -1692,6 +1697,7 @@ void Session::activate() m_last_version_selected_for_upload = m_upload_progress.client_version; m_download_progress = m_progress.download; REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version); + init_progress_handler(); logger.debug("last_version_available = %1", m_last_version_available); // Throws logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws @@ -2272,6 +2278,7 @@ bool Session::client_reset_if_needed() m_upload_progress = m_progress.upload; m_download_progress = m_progress.download; + init_progress_handler(); // In recovery mode, there may be new changesets to upload and nothing left to download. // In FLX DiscardLocal mode, there may be new commits due to subscription handling. // For both, we want to allow uploads again without needing external changes to download first. @@ -2349,29 +2356,44 @@ Status Session::receive_ident_message(SaltedFileIdent client_file_ident) return Status::OK(); // Success } -Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes, - DownloadBatchState batch_state, int64_t query_version, - const ReceivedChangesets& received_changesets) +Status Session::receive_download_message(const DownloadMessage& message) { - REALM_ASSERT_EX(query_version >= 0, query_version); // Ignore the message if the deactivation process has been initiated, // because in that case, the associated Realm and SessionWrapper must // not be accessed any longer. if (m_state != Active) return Status::OK(); - if (is_steady_state_download_message(batch_state, query_version)) { + bool is_flx = m_conn.is_flx_sync_connection(); + int64_t query_version = is_flx ? message.query_version.value() : 0; + + // If this is a PBS connection, then every download message is its own complete batch. + bool last_in_batch = is_flx ? message.last_in_batch.value() : true; + auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome; + if (is_steady_state_download_message(batch_state, query_version)) batch_state = DownloadBatchState::SteadyState; - } - logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, " - "latest_server_version=%3, latest_server_version_salt=%4, " - "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, " - "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)", - progress.download.server_version, progress.download.last_integrated_client_version, - progress.latest_server_version.version, progress.latest_server_version.salt, - progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes, - batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws + auto&& progress = message.progress; + if (is_flx) { + logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, " + "latest_server_version=%3, latest_server_version_salt=%4, " + "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, " + "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)", + progress.download.server_version, progress.download.last_integrated_client_version, + progress.latest_server_version.version, progress.latest_server_version.salt, + progress.upload.client_version, progress.upload.last_integrated_server_version, + message.progress_estimate, last_in_batch, query_version, message.changesets.size()); // Throws + } + else { + logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, " + "latest_server_version=%3, latest_server_version_salt=%4, " + "upload_client_version=%5, upload_server_version=%6, " + "downloadable_bytes=%7, num_changesets=%8, ...)", + progress.download.server_version, progress.download.last_integrated_client_version, + progress.latest_server_version.version, progress.latest_server_version.salt, + progress.upload.client_version, progress.upload.last_integrated_server_version, + message.downloadable_bytes, message.changesets.size()); // Throws + } // Ignore download messages when the client detects an error. This is to prevent transforming the same bad // changeset over and over again. @@ -2391,7 +2413,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint version_type server_version = m_progress.download.server_version; version_type last_integrated_client_version = m_progress.download.last_integrated_client_version; - for (const RemoteChangeset& changeset : received_changesets) { + for (const RemoteChangeset& changeset : message.changesets) { // Check that per-changeset server version is strictly increasing, except in FLX sync where the server // version must be increasing, but can stay the same during bootstraps. bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version) @@ -2429,21 +2451,25 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint } auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version, - batch_state, received_changesets.size()); + batch_state, message.changesets.size()); if (hook_action == SyncClientHookAction::EarlyReturn) { return Status::OK(); } REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action); - if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) { + if (is_flx) + update_download_estimate(message.progress_estimate); + + if (process_flx_bootstrap_message(progress, batch_state, query_version, message.changesets)) { clear_resumption_delay_state(); return Status::OK(); } - initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws + uint64_t downloadable_bytes = is_flx ? 0 : message.downloadable_bytes; + initiate_integrate_changesets(downloadable_bytes, batch_state, progress, message.changesets); // Throws hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version, - batch_state, received_changesets.size()); + batch_state, message.changesets.size()); if (hook_action == SyncClientHookAction::EarlyReturn) { return Status::OK(); } @@ -2678,7 +2704,7 @@ void Session::clear_resumption_delay_state() } } -Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept +Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept { const SyncProgress& a = m_progress; const SyncProgress& b = progress; diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index a354e74761a..b7a6eda3198 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -389,7 +389,6 @@ enum class ClientImpl::ConnectionTerminationReason { missing_protocol_feature, }; - /// All use of connection objects, including construction and destruction, must /// occur on behalf of the event loop thread of the associated client object. @@ -400,6 +399,7 @@ class ClientImpl::Connection { using SSLVerifyCallback = SyncConfig::SSLVerifyCallback; using ProxyConfig = SyncConfig::ProxyConfig; using ReconnectInfo = ClientImpl::ReconnectInfo; + using DownloadMessage = ClientProtocol::DownloadMessage; std::shared_ptr logger_ptr; util::Logger& logger; @@ -563,8 +563,8 @@ class ClientImpl::Connection { void receive_query_error_message(int error_code, std::string_view message, int64_t query_version, session_ident_type); void receive_ident_message(session_ident_type, SaltedFileIdent); - void receive_download_message(session_ident_type, const SyncProgress&, std::uint_fast64_t downloadable_bytes, - int64_t query_version, DownloadBatchState batch_state, const ReceivedChangesets&); + void receive_download_message(session_ident_type, const DownloadMessage& message); + void receive_mark_message(session_ident_type, request_ident_type); void receive_unbound_message(session_ident_type); void receive_test_command_response(session_ident_type, request_ident_type, std::string_view body); @@ -711,6 +711,7 @@ class ClientImpl::Connection { class ClientImpl::Session { public: using ReceivedChangesets = ClientProtocol::ReceivedChangesets; + using DownloadMessage = ClientProtocol::DownloadMessage; std::shared_ptr logger_ptr; util::Logger& logger; @@ -847,7 +848,8 @@ class ClientImpl::Session { /// It is an error to call this function before activation of the session /// (Connection::activate_session()), or after initiation of deactivation /// (Connection::initiate_session_deactivation()). - void on_changesets_integrated(version_type client_version, const SyncProgress& progress); + void on_changesets_integrated(version_type client_version, const SyncProgress& progress, + bool changesets_integrated); void on_integration_failure(const IntegrationException& e); @@ -1175,9 +1177,7 @@ class ClientImpl::Session { void send_json_error_message(); void send_test_command_message(); Status receive_ident_message(SaltedFileIdent); - Status receive_download_message(const SyncProgress&, std::uint_fast64_t downloadable_bytes, - DownloadBatchState last_in_batch, int64_t query_version, - const ReceivedChangesets&); + Status receive_download_message(const DownloadMessage& message); Status receive_mark_message(request_ident_type); Status receive_unbound_message(); Status receive_error_message(const ProtocolErrorInfo& info); @@ -1199,6 +1199,11 @@ class ClientImpl::Session { bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version); + void init_progress_handler(); + void notify_upload_progress(); + void update_download_estimate(double download_estimate); + void notify_download_progress(const std::optional& bootstrap_store_bytes = {}); + friend class Connection; }; diff --git a/src/realm/sync/noinst/protocol_codec.hpp b/src/realm/sync/noinst/protocol_codec.hpp index 9119bdf8c24..9810d205ba5 100644 --- a/src/realm/sync/noinst/protocol_codec.hpp +++ b/src/realm/sync/noinst/protocol_codec.hpp @@ -87,7 +87,8 @@ class HeaderLineParser { std::pair peek_token_impl() const { // We currently only support numeric, string, and boolean values in header lines. - static_assert(std::is_integral_v || is_any_v); + static_assert(std::is_integral_v || std::is_floating_point_v || + is_any_v); if (at_end()) { throw ProtocolCodecException("reached end of header line prematurely"); } @@ -121,6 +122,30 @@ class HeaderLineParser { return {(cur_arg != 0), m_sv.substr(parse_res.ptr - m_sv.data())}; } + else if constexpr (std::is_floating_point_v) { + // Currently all double are in the middle of the string delimited by a space. + auto delim_at = m_sv.find(' '); + if (delim_at == std::string_view::npos) + throw ProtocolCodecException("reached end of header line prematurely for double value parsing"); + + // FIXME use std::from_chars one day when it's availiable in every std lib + T val = {}; + try { + std::string str(m_sv.substr(0, delim_at)); + if constexpr (std::is_same_v) + val = std::stof(str); + else if constexpr (std::is_same_v) + val = std::stod(str); + else if constexpr (std::is_same_v) + val = std::stold(str); + } + catch (const std::exception& err) { + throw ProtocolCodecException( + util::format("error parsing floating-point number in header line: %1", err.what())); + } + + return {val, m_sv.substr(delim_at)}; + } } std::string_view m_sv; @@ -376,10 +401,23 @@ class ClientProtocol { } } + struct DownloadMessage { + SyncProgress progress; + std::optional query_version; + std::optional last_in_batch; + union { + uint64_t downloadable_bytes = 0; + double progress_estimate; + }; + ReceivedChangesets changesets; + }; + private: template void parse_download_message(Connection& connection, HeaderLineParser& msg) { + bool is_flx = connection.is_flx_sync_connection(); + util::Logger& logger = connection.logger; auto report_error = [&](ErrorCodes::Error code, const auto fmt, auto&&... args) { auto msg = util::format(fmt, std::forward(args)...); @@ -388,18 +426,32 @@ class ClientProtocol { auto msg_with_header = msg.remaining(); auto session_ident = msg.read_next(); - SyncProgress progress; + + DownloadMessage message; + auto&& progress = message.progress; progress.download.server_version = msg.read_next(); progress.download.last_integrated_client_version = msg.read_next(); progress.latest_server_version.version = msg.read_next(); progress.latest_server_version.salt = msg.read_next(); progress.upload.client_version = msg.read_next(); progress.upload.last_integrated_server_version = msg.read_next(); - auto query_version = connection.is_flx_sync_connection() ? msg.read_next() : 0; - // If this is a PBS connection, then every download message is its own complete batch. - auto last_in_batch = connection.is_flx_sync_connection() ? msg.read_next() : true; - auto downloadable_bytes = msg.read_next(); + if (is_flx) { + message.query_version = msg.read_next(); + if (message.query_version < 0) + return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Bad query version", + message.query_version); + + message.last_in_batch = msg.read_next(); + + message.progress_estimate = msg.read_next(); + if (message.progress_estimate < 0 || message.progress_estimate > 1) + return report_error(ErrorCodes::SyncProtocolInvariantFailed, "Bad progress value: %1", + message.progress_estimate); + } + else + message.downloadable_bytes = msg.read_next(); + auto is_body_compressed = msg.read_next(); auto uncompressed_body_size = msg.read_next(); auto compressed_body_size = msg.read_next('\n'); @@ -429,8 +481,6 @@ class ClientProtocol { "compressed_body_size=%3, uncompressed_body_size=%4", session_ident, is_body_compressed, compressed_body_size, uncompressed_body_size); - ReceivedChangesets received_changesets; - // Loop through the body and find the changesets. while (!msg.at_end()) { RemoteChangeset cur_changeset; @@ -477,13 +527,10 @@ class ClientProtocol { } cur_changeset.data = changeset_data; - received_changesets.push_back(std::move(cur_changeset)); // Throws + message.changesets.push_back(std::move(cur_changeset)); // Throws } - auto batch_state = - last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome; - connection.receive_download_message(session_ident, progress, downloadable_bytes, query_version, batch_state, - received_changesets); // Throws + connection.receive_download_message(session_ident, message); // Throws } static sync::ProtocolErrorInfo::Action string_to_action(const std::string& action_string) diff --git a/src/realm/sync/protocol.hpp b/src/realm/sync/protocol.hpp index 32455019ee0..4142281fdab 100644 --- a/src/realm/sync/protocol.hpp +++ b/src/realm/sync/protocol.hpp @@ -53,6 +53,10 @@ namespace sync { // Update JSON_ERROR message to read the previous schema version sent by // the server // +// 12 Support for estimated progress in DOWNLOAD message for FLX +// Server replaces 'downloadable_bytes' (which was always zero prior this version) +// with an estimated progress value (double from 0.0 to 1.0) for flx sessions +// // XX Changes: // - TBD // @@ -60,7 +64,7 @@ constexpr int get_current_protocol_version() noexcept { // Also update the current protocol version test in flx_sync.cpp when // updating this value - return 11; + return 12; } constexpr std::string_view get_pbs_websocket_protocol_prefix() noexcept diff --git a/test/object-store/realm.cpp b/test/object-store/realm.cpp index 93f10916b31..a9a2a0ad43e 100644 --- a/test/object-store/realm.cpp +++ b/test/object-store/realm.cpp @@ -1054,12 +1054,12 @@ TEST_CASE("Get Realm using Async Open", "[sync][pbs][async open]") { std::shared_ptr task2 = Realm::get_synchronized_realm(config); REQUIRE(task); REQUIRE(task2); - task->register_download_progress_notifier([&](uint64_t, uint64_t) { + task->register_download_progress_notifier([&](uint64_t, uint64_t, double) { std::lock_guard guard(mutex); REQUIRE(!task1_completed); progress_notifier1_called = true; }); - task2->register_download_progress_notifier([&](uint64_t, uint64_t) { + task2->register_download_progress_notifier([&](uint64_t, uint64_t, double) { std::lock_guard guard(mutex); REQUIRE(!task2_completed); progress_notifier2_called = true; diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index ab1fadde4ed..9ba6e1688ef 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2581,7 +2581,7 @@ TEST_CASE("flx: writes work without waiting for sync", "[sync][flx][baas]") { TEST_CASE("flx: verify websocket protocol number and prefixes", "[sync][protocol]") { // Update the expected value whenever the protocol version is updated - this ensures // that the current protocol version does not change unexpectedly. - REQUIRE(11 == sync::get_current_protocol_version()); + REQUIRE(12 == sync::get_current_protocol_version()); // This was updated in Protocol V8 to use '#' instead of '/' to support the Web SDK REQUIRE("com.mongodb.realm-sync#" == sync::get_pbs_websocket_protocol_prefix()); REQUIRE("com.mongodb.realm-query-sync#" == sync::get_flx_websocket_protocol_prefix()); diff --git a/test/object-store/sync/session/progress_notifications.cpp b/test/object-store/sync/session/progress_notifications.cpp index 97cdb2c0eda..60766ffa665 100644 --- a/test/object-store/sync/session/progress_notifications.cpp +++ b/test/object-store/sync/session/progress_notifications.cpp @@ -20,58 +20,79 @@ #include +#if REALM_ENABLE_AUTH_TESTS +#include "util/test_file.hpp" +#include "util/sync/flx_sync_harness.hpp" +#include "util/sync/sync_test_utils.hpp" + +#include +#include +#include + +using namespace realm::app; +#endif + #include +#include +using namespace Catch::Matchers; using namespace realm; +using NotifierType = SyncSession::ProgressDirection; + TEST_CASE("progress notification", "[sync][session][progress]") { - using NotifierType = SyncSession::ProgressDirection; _impl::SyncProgressNotifier progress; + uint64_t transferred = 0; + uint64_t transferrable = 0; + double progress_estimate = 0; + bool callback_was_called = false; + + auto default_callback = [&](uint64_t xferred, uint64_t xferable, double p) { + transferred = xferred; + transferrable = xferable; + progress_estimate = p; + callback_was_called = true; + }; + + auto register_default_callback = [&](NotifierType type, bool is_streaming = false) { + return progress.register_callback(default_callback, type, is_streaming); + }; + auto register_default_upload_callback = [&](bool is_streaming = false) { + return register_default_callback(NotifierType::upload, is_streaming); + }; + auto register_default_download_callback = [&](bool is_streaming = false) { + return register_default_callback(NotifierType::download, is_streaming); + }; SECTION("callback is not called prior to first update") { - bool callback_was_called = false; - progress.register_callback( - [&](auto, auto) { - callback_was_called = true; - }, - NotifierType::upload, false); - progress.register_callback( - [&](auto, auto) { - callback_was_called = true; - }, - NotifierType::download, false); + register_default_upload_callback(); + register_default_download_callback(); REQUIRE_FALSE(callback_was_called); } SECTION("callback is invoked immediately when a progress update has already occurred") { progress.set_local_version(1); - progress.update(0, 0, 0, 0, 1, 1); + progress.update(0, 0, 0, 0, 1); - bool callback_was_called = false; SECTION("for upload notifications, with no data transfer ongoing") { - progress.register_callback( - [&](auto, auto) { - callback_was_called = true; - }, - NotifierType::upload, false); + REQUIRE_FALSE(callback_was_called); + register_default_upload_callback(); REQUIRE(callback_was_called); } SECTION("for download notifications, with no data transfer ongoing") { - progress.register_callback( - [&](auto, auto) { - callback_was_called = true; - }, - NotifierType::download, false); + REQUIRE_FALSE(callback_was_called); + register_default_download_callback(); + REQUIRE(callback_was_called); } SECTION("can register another notifier while in the initial notification without deadlock") { int counter = 0; progress.register_callback( - [&](auto, auto) { + [&](auto, auto, auto) { counter++; progress.register_callback( - [&](auto, auto) { + [&](auto, auto, auto) { counter++; }, NotifierType::upload, false); @@ -82,29 +103,20 @@ TEST_CASE("progress notification", "[sync][session][progress]") { } SECTION("callback is invoked after each update for streaming notifiers") { - progress.update(0, 0, 0, 0, 1, 1); + progress.update(0, 0, 0, 0, 1); - bool callback_was_called = false; - uint64_t transferred = 0; - uint64_t transferrable = 0; uint64_t current_transferred = 0; uint64_t current_transferrable = 0; SECTION("for upload notifications") { - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::upload, true); + register_default_upload_callback(true); REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. callback_was_called = false; current_transferred = 60; current_transferrable = 912; - progress.update(25, 26, current_transferred, current_transferrable, 1, 1); + progress.update(25, 26, current_transferred, current_transferrable, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); @@ -113,7 +125,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 79; current_transferrable = 1021; - progress.update(68, 191, current_transferred, current_transferrable, 1, 1); + progress.update(68, 191, current_transferred, current_transferrable, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); @@ -122,27 +134,21 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 150; current_transferrable = 1228; - progress.update(199, 591, current_transferred, current_transferrable, 1, 1); + progress.update(199, 591, current_transferred, current_transferrable, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); } SECTION("for download notifications") { - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, true); + register_default_download_callback(true); REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. callback_was_called = false; current_transferred = 60; current_transferrable = 912; - progress.update(current_transferred, current_transferrable, 25, 26, 1, 1); + progress.update(current_transferred, current_transferrable, 25, 26, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); @@ -151,7 +157,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 79; current_transferrable = 1021; - progress.update(current_transferred, current_transferrable, 68, 191, 1, 1); + progress.update(current_transferred, current_transferrable, 68, 191, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); @@ -160,27 +166,21 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 150; current_transferrable = 1228; - progress.update(current_transferred, current_transferrable, 199, 591, 1, 1); + progress.update(current_transferred, current_transferrable, 199, 591, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); } SECTION("token unregistration works") { - uint64_t token = progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, true); + uint64_t token = register_default_download_callback(true); REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. callback_was_called = false; current_transferred = 60; current_transferrable = 912; - progress.update(current_transferred, current_transferrable, 25, 26, 1, 1); + progress.update(current_transferred, current_transferrable, 25, 26, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); @@ -192,28 +192,24 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 150; current_transferrable = 1228; - progress.update(current_transferred, current_transferrable, 199, 591, 1, 1); + progress.update(current_transferred, current_transferrable, 199, 591, 1); CHECK(!callback_was_called); } SECTION("for multiple notifiers") { - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, true); + register_default_download_callback(true); REQUIRE(callback_was_called); // Register a second notifier. bool callback_was_called_2 = false; uint64_t transferred_2 = 0; uint64_t transferrable_2 = 0; + double progress_estimate_2 = 0; progress.register_callback( - [&](auto xferred, auto xferable) { + [&](auto xferred, auto xferable, auto p) { transferred_2 = xferred; transferrable_2 = xferable; + progress_estimate_2 = p; callback_was_called_2 = true; }, NotifierType::upload, true); @@ -226,7 +222,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { uint64_t current_uploadable = 201; uint64_t current_downloaded = 68; uint64_t current_downloadable = 182; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(callback_was_called); CHECK(transferred == current_downloaded); CHECK(transferrable == current_downloadable); @@ -241,7 +237,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 329; current_downloaded = 76; current_downloadable = 191; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(callback_was_called); CHECK(transferred == current_downloaded); CHECK(transferrable == current_downloadable); @@ -252,9 +248,6 @@ TEST_CASE("progress notification", "[sync][session][progress]") { } SECTION("properly runs for non-streaming notifiers") { - bool callback_was_called = false; - uint64_t transferred = 0; - uint64_t transferrable = 0; uint64_t current_transferred = 0; uint64_t current_transferrable = 0; @@ -263,15 +256,9 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_transferred = 60; current_transferrable = 501; const uint64_t original_transferrable = current_transferrable; - progress.update(21, 26, current_transferred, current_transferrable, 1, 1); + progress.update(21, 26, current_transferred, current_transferrable, 1); - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::upload, false); + register_default_upload_callback(); // Wait for the initial callback. REQUIRE(callback_was_called); @@ -279,7 +266,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 66; current_transferrable = 582; - progress.update(25, 26, current_transferred, current_transferrable, 1, 1); + progress.update(25, 26, current_transferred, current_transferrable, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == original_transferrable); @@ -288,7 +275,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = original_transferrable + 100; current_transferrable = 1021; - progress.update(68, 191, current_transferred, current_transferrable, 1, 1); + progress.update(68, 191, current_transferred, current_transferrable, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == original_transferrable); @@ -297,30 +284,24 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = original_transferrable + 250; current_transferrable = 1228; - progress.update(199, 591, current_transferred, current_transferrable, 1, 1); + progress.update(199, 591, current_transferred, current_transferrable, 1); CHECK(!callback_was_called); } SECTION("upload notifications are not sent until all local changesets have been processed") { progress.set_local_version(4); - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::upload, false); + register_default_upload_callback(); REQUIRE_FALSE(callback_was_called); current_transferred = 66; current_transferrable = 582; - progress.update(0, 0, current_transferred, current_transferrable, 1, 3); + progress.update(0, 0, current_transferred, current_transferrable, 3); REQUIRE_FALSE(callback_was_called); current_transferred = 77; current_transferrable = 1021; - progress.update(0, 0, current_transferred, current_transferrable, 1, 4); + progress.update(0, 0, current_transferred, current_transferrable, 4); REQUIRE(callback_was_called); CHECK(transferred == current_transferred); // should not have captured transferrable from the first update @@ -332,15 +313,9 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_transferred = 60; current_transferrable = 501; const uint64_t original_transferrable = current_transferrable; - progress.update(current_transferred, current_transferrable, 21, 26, 1, 1); + progress.update(current_transferred, current_transferrable, 21, 26, 1); - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, false); + register_default_download_callback(); // Wait for the initial callback. REQUIRE(callback_was_called); @@ -348,7 +323,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 66; current_transferrable = 582; - progress.update(current_transferred, current_transferrable, 25, 26, 1, 1); + progress.update(current_transferred, current_transferrable, 25, 26, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == original_transferrable); @@ -357,7 +332,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = original_transferrable + 100; current_transferrable = 1021; - progress.update(current_transferred, current_transferrable, 68, 191, 1, 1); + progress.update(current_transferred, current_transferrable, 68, 191, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == original_transferrable); @@ -366,51 +341,31 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = original_transferrable + 250; current_transferrable = 1228; - progress.update(current_transferred, current_transferrable, 199, 591, 1, 1); + progress.update(current_transferred, current_transferrable, 199, 591, 1); CHECK(!callback_was_called); } SECTION("download notifications are not sent until a DOWNLOAD message has been received") { - _impl::SyncProgressNotifier progress; - - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, false); + register_default_download_callback(); current_transferred = 100; current_transferrable = 100; - // Last time we ran we downloaded everything, so sync will send us an - // update reporting that - progress.update(current_transferred, current_transferrable, 0, 0, 0, 1); - REQUIRE_FALSE(callback_was_called); - current_transferred = 100; - current_transferrable = 200; // Next we get a DOWNLOAD message telling us there's more to download - progress.update(current_transferred, current_transferrable, 0, 0, 1, 1); + progress.update(current_transferred, current_transferrable, 0, 0, 1); REQUIRE(callback_was_called); REQUIRE(current_transferrable == transferrable); REQUIRE(current_transferred == transferred); current_transferred = 200; - progress.update(current_transferred, current_transferrable, 0, 0, 1, 1); + progress.update(current_transferred, current_transferrable, 0, 0, 1); // After the download has completed, new notifications complete immediately transferred = 0; transferrable = 0; callback_was_called = false; - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, false); + register_default_download_callback(); REQUIRE(callback_was_called); REQUIRE(current_transferrable == transferrable); @@ -422,15 +377,9 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_transferred = 60; current_transferrable = 501; const uint64_t original_transferrable = current_transferrable; - progress.update(21, 26, current_transferred, current_transferrable, 1, 1); + progress.update(21, 26, current_transferred, current_transferrable, 1); - uint64_t token = progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::upload, false); + uint64_t token = register_default_upload_callback(); // Wait for the initial callback. REQUIRE(callback_was_called); @@ -438,7 +387,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 66; current_transferrable = 912; - progress.update(25, 26, current_transferred, current_transferrable, 1, 1); + progress.update(25, 26, current_transferred, current_transferrable, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == original_transferrable); @@ -450,7 +399,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 67; current_transferrable = 1228; - progress.update(199, 591, current_transferred, current_transferrable, 1, 1); + progress.update(199, 591, current_transferred, current_transferrable, 1); CHECK(!callback_was_called); } @@ -462,25 +411,21 @@ TEST_CASE("progress notification", "[sync][session][progress]") { uint64_t current_downloadable = 182; const uint64_t original_uploadable = current_uploadable; const uint64_t original_downloadable = current_downloadable; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::upload, false); + register_default_upload_callback(); REQUIRE(callback_was_called); // Register a second notifier. bool callback_was_called_2 = false; uint64_t downloaded = 0; uint64_t downloadable = 0; + double download_progress = 0; progress.register_callback( - [&](auto xferred, auto xferable) { + [&](auto xferred, auto xferable, auto p) { downloaded = xferred; downloadable = xferable; + download_progress = p; callback_was_called_2 = true; }, NotifierType::download, false); @@ -493,7 +438,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 171; current_downloadable = 185; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(callback_was_called); CHECK(transferred == current_uploaded); CHECK(transferrable == original_uploadable); @@ -508,7 +453,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 174; current_downloadable = 190; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(callback_was_called); CHECK(transferred == current_uploaded); CHECK(transferrable == original_uploadable); @@ -523,7 +468,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 182; current_downloadable = 196; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(!callback_was_called); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); @@ -535,7 +480,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 410; current_downloaded = 192; current_downloadable = 591; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(!callback_was_called); CHECK(!callback_was_called_2); } @@ -547,15 +492,9 @@ TEST_CASE("progress notification", "[sync][session][progress]") { uint64_t current_downloaded = 68; uint64_t current_downloadable = 182; const uint64_t original_downloadable = current_downloadable; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, false); + register_default_download_callback(); REQUIRE(callback_was_called); // Now manually call the notifier handler a few times. @@ -564,7 +503,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 171; current_downloadable = 185; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(callback_was_called); CHECK(transferred == current_downloaded); CHECK(transferrable == original_downloadable); @@ -573,11 +512,13 @@ TEST_CASE("progress notification", "[sync][session][progress]") { bool callback_was_called_2 = false; uint64_t downloaded = 0; uint64_t downloadable = 0; + double download_progress = 0; const uint64_t original_downloadable_2 = current_downloadable; progress.register_callback( - [&](auto xferred, auto xferable) { + [&](auto xferred, auto xferable, auto p) { downloaded = xferred; downloadable = xferable; + download_progress = p; callback_was_called_2 = true; }, NotifierType::download, false); @@ -591,7 +532,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 182; current_downloadable = 190; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(callback_was_called); CHECK(transferred == current_downloaded); CHECK(transferrable == original_downloadable); @@ -606,7 +547,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 189; current_downloadable = 250; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(!callback_was_called); CHECK(callback_was_called_2); CHECK(downloaded == current_downloaded); @@ -618,7 +559,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_uploadable = 310; current_downloaded = 201; current_downloadable = 289; - progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1, 1); + progress.update(current_downloaded, current_downloadable, current_uploaded, current_uploadable, 1); CHECK(!callback_was_called_2); } @@ -627,15 +568,9 @@ TEST_CASE("progress notification", "[sync][session][progress]") { current_transferred = 60; current_transferrable = 501; const uint64_t original_transferrable = current_transferrable; - progress.update(current_transferred, current_transferrable, 21, 26, 1, 1); + progress.update(current_transferred, current_transferrable, 21, 26, 1); - progress.register_callback( - [&](auto xferred, auto xferable) { - transferred = xferred; - transferrable = xferable; - callback_was_called = true; - }, - NotifierType::download, false); + register_default_download_callback(); // Wait for the initial callback. REQUIRE(callback_was_called); @@ -644,7 +579,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 160; current_transferrable = 451; - progress.update(current_transferred, current_transferrable, 25, 26, 1, 1); + progress.update(current_transferred, current_transferrable, 25, 26, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferrable); @@ -653,7 +588,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { const uint64_t previous_transferrable = current_transferrable; callback_was_called = false; current_transferrable = 1000; - progress.update(current_transferred, current_transferrable, 68, 191, 1, 1); + progress.update(current_transferred, current_transferrable, 68, 191, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == previous_transferrable); @@ -663,7 +598,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = 200; current_transferrable = current_transferred; - progress.update(current_transferred, current_transferrable, 191, 192, 1, 1); + progress.update(current_transferred, current_transferrable, 191, 192, 1); CHECK(callback_was_called); CHECK(transferred == current_transferred); CHECK(transferrable == current_transferred); @@ -672,8 +607,388 @@ TEST_CASE("progress notification", "[sync][session][progress]") { callback_was_called = false; current_transferred = original_transferrable + 250; current_transferrable = 1228; - progress.update(current_transferred, current_transferrable, 199, 591, 1, 1); + progress.update(current_transferred, current_transferrable, 199, 591, 1); CHECK(!callback_was_called); } } } + +#if REALM_ENABLE_AUTH_TESTS + +struct TestSetup { + TableRef get_table(const SharedRealm& r) + { + return r->read_group().get_table("class_" + table_name); + } + + size_t add_objects(SharedRealm& r, int num = 5) + { + CppContext ctx(r); + for (int i = 0; i < num; ++i) { + // use specifically separate transactions for a bit of history + r->begin_transaction(); + Object::create(ctx, r, StringData(table_name), std::any(make_one(i))); + r->commit_transaction(); + } + return get_table(r)->size(); + } + + virtual SyncTestFile make_config() = 0; + virtual AnyDict make_one(int64_t idx) = 0; + + std::string table_name; +}; + +struct PBS : TestSetup { + PBS() + { + table_name = "Dog"; + } + + SyncTestFile make_config() override + { + const auto schema = get_default_schema(); + return SyncTestFile(session.app(), partition, schema); + } + + AnyDict make_one(int64_t /* idx */) override + { + return AnyDict{{"_id", std::any(ObjectId::gen())}, + {"breed", std::string("bulldog")}, + {"name", random_string(1024 * 1024)}}; + } + + TestAppSession session; + const std::string partition = random_string(100); +}; + +struct FLX : TestSetup { + FLX(const std::string& app_id = "flx_sync_progress") + : harness(app_id) + { + table_name = (*harness.schema().begin()).name; + } + + SyncTestFile make_config() override + { + auto config = harness.make_test_file(); + add_subscription(*config.sync_config); + return config; + } + + void add_subscription(SyncConfig& config) + { + config.rerun_init_subscription_on_open = true; + config.subscription_initializer = [&](SharedRealm&& realm) { + add_subscription(realm); + }; + } + + void add_subscription(SharedRealm& realm) + { + auto sub = realm->get_latest_subscription_set().make_mutable_copy(); + sub.insert_or_assign(Query(get_table(realm))); + sub.commit(); + } + + AnyDict make_one(int64_t idx) override + { + return AnyDict{{"_id", ObjectId::gen()}, + {"queryable_int_field", idx}, + {"queryable_str_field", random_string(1024 * 1024)}}; + } + + FLXSyncTestHarness harness; +}; + +/* + * This test runs a few scenarios for synchronizing changes between two separate realm files for the same app, + * and verifies high-level consistency in reported progress notification's values. + * + * It doesn't try to check for particular reported values: these are checked in sync impl tests, + * and specific combinations of updates verified directly in SyncProgressNotifier tests. + * + * First, test adds a few objects into one realm, verifies that the progress is reported until upload completion. + * Then it checks how this exact changes are downloaded into the second realm file (this essentially checks + * how progress is reported with bootstrap store for flx). + * + * Next subtests, are here to check how continuous sync reports progress. It reuses the same two realm files + * with synchronized objects in them both. Test adds more objects into the second realm to sync more changes + * the other way around: from second realm to the first one, and check if also upload progress correct for + * the second realm, and download progress for the first realm after its initial upload. + * - first by reusing the same realm instance for the second realm + * - second by closing and reopening second realm file with new SharedRealm instance + * + * Separately, AsyncOpenTask is checked twice: with initial empty third realm file, and with subsequent second opening + * with more changes to download from the server. The progress reported through task interface should behave in the + * same way as with cases tested above. + */ +TEMPLATE_TEST_CASE("sync progress notifications", "[sync][baas][progress]", PBS, FLX) +{ + TestType setup; + constexpr bool is_flx = std::is_same_v; + size_t expected_count = 0; + +#define VERIFY_REALM(realm_1, realm_2, expected) \ + { \ + REQUIRE(expected > 0); \ + REQUIRE(realm_1); \ + REQUIRE(realm_2); \ + REQUIRE(realm_1 != realm_2); \ + auto table1 = setup.get_table(realm_1); \ + auto table2 = setup.get_table(realm_2); \ + REQUIRE(table1); \ + REQUIRE(table2); \ + REQUIRE(table1->size() == expected); \ + REQUIRE(table2->size() == expected); \ + } + + struct Progress { + uint64_t xferred, xferable; + double estimate; + }; + typedef std::vector> ReportedProgress; + std::mutex progress_mutex; + + // register set of 4 callbacks to put values in predefined places in reported progress list: + // idx 0: non-streaming/download, 1: non-streaming/upload, 2: streaming/download, 3: streaming/upload + auto add_callbacks = [&](SharedRealm& realm, ReportedProgress& progress) { + std::lock_guard lock(progress_mutex); + size_t idx = progress.size(); + progress.resize(idx + 4); + for (auto&& stream : {false, true}) + for (auto&& direction : {NotifierType::download, NotifierType::upload}) + realm->sync_session()->register_progress_notifier( + [&, i = idx++](uint64_t xferred, uint64_t xferable, double estimate) { + progress[i].emplace_back(Progress{xferred, xferable, estimate}); + }, + direction, stream); + }; + + auto dump = [](const ReportedProgress& progress, size_t begin = 0, size_t end = -1) { + std::ostringstream out; + for (size_t i = begin, e = std::min(end, progress.size()); i < e; ++i) { + out << (i > begin ? "\n" : "") << i << " [" << progress[i].size() << "]: "; + for (auto&& p : progress[i]) + out << "(" << p.xferred << ", " << p.xferable << ", " << std::setprecision(4) << p.estimate << "), "; + } + return out.str(); + }; + + auto clear = [&](ReportedProgress& progress) { + std::lock_guard lock(progress_mutex); + for (auto&& values : progress) + values.clear(); + }; + +#define VERIFY_PROGRESS_EMPTY(progress, begin, end) \ + { \ + std::lock_guard lock(progress_mutex); \ + for (size_t i = begin; i < end; ++i) { \ + INFO(util::format("i = %1, %2", i, dump(progress, i, i + 1))); \ + auto&& values = progress[i]; \ + CHECK(values.size() == 0); \ + } \ + } + +#define VERIFY_PROGRESS_CONSISTENCY_ONE(progress, i, expected_download_stages, is_download, is_streaming) \ + { \ + INFO(i); \ + REQUIRE(expected_download_stages > 0); \ + REQUIRE(i < progress.size()); \ + auto&& values = progress[i]; \ + \ + REQUIRE(values.size() > 0); \ + int progress_stages = expected_download_stages; \ + \ + for (size_t j = 0; j < values.size(); ++j) { \ + auto&& p = values[j]; \ + INFO(util::format("Fail index i: %1, j: %2 | Reported progress:\n%3", i, j, dump(progress))); \ + \ + CHECK(0 <= p.xferred); \ + CHECK(p.xferred <= p.xferable); \ + CHECK(0 <= p.estimate); \ + CHECK(p.estimate <= 1.0); \ + \ + if (j <= 0) \ + continue; \ + \ + auto&& prev = values[j - 1]; \ + CHECK(prev.xferred <= p.xferred); \ + \ + /* downloadable may fluctuate by design: \ + * pbs: downloadable from the DOWNLOAD message is added to downloaded so far \ + * always after the changeset integration, commit is always a bit smaller, \ + * hence downloadable always gets a bit smaller than previous value \ + * flx: downloadable is always as good as an estimate from the server, fluctuates both ways */ \ + if (!is_download) \ + CHECK(prev.xferable <= p.xferable); \ + \ + if (is_download && is_streaming && prev.estimate > p.estimate) { \ + CHECK(prev.estimate == 1.0); \ + CHECK(progress_stages >= 1); \ + --progress_stages; \ + } \ + else { \ + CHECK(prev.estimate <= p.estimate); \ + } \ + } \ + /* FIXME with non-streaming download last estimate isn't necessarily 1.0 \ + * notification is emitted immediately upon registration and for download the state of remaining \ + * changesets for get is not known before first DOWNLOAD message, so until first update happened \ + * xferred == xferable and that concludes notifier calls for this callback immediately \ + * see #7452 for details for how this could be solved sensibly */ \ + if (!(is_download && !is_streaming && values.size() <= 1)) { \ + auto&& last = values.back(); \ + CHECK(last.estimate == 1.0); \ + CHECK(last.xferred == last.xferable); \ + } \ + } + +#define VERIFY_PROGRESS_CONSISTENCY_EX(progress, begin, end, expected_download_stages) \ + { \ + REQUIRE(begin < end); \ + REQUIRE(end <= progress.size()); \ + \ + std::lock_guard lock(progress_mutex); \ + for (size_t i = begin; i < end; ++i) { \ + /* from add_callbacks: odd sequence number: upload, even: download */ \ + bool is_download = i % 2 == 0; \ + /* first two lists are for non-streaming, next streaming callbacks */ \ + bool is_streaming = i % 4 >= 1; \ + VERIFY_PROGRESS_CONSISTENCY_ONE(progress, i, expected_download_stages, is_download, is_streaming); \ + } \ + } + +#define VERIFY_PROGRESS_CONSISTENCY(progress, begin, end) VERIFY_PROGRESS_CONSISTENCY_EX(progress, begin, end, 1) + + auto wait_for_sync = [](SharedRealm& realm) { + realm->sync_session()->resume(); + wait_for_upload(*realm); + wait_for_download(*realm); + realm->sync_session()->pause(); + realm->refresh(); + }; + + auto config_1 = setup.make_config(); + auto realm_1 = Realm::get_shared_realm(config_1); + realm_1->sync_session()->pause(); + + expected_count = setup.add_objects(realm_1); + ReportedProgress progress_1; + add_callbacks(realm_1, progress_1); + + wait_for_sync(realm_1); + VERIFY_PROGRESS_CONSISTENCY(progress_1, 0, 4); + clear(progress_1); + + SECTION("progress from second realm") { + auto config2 = setup.make_config(); + auto realm_2 = Realm::get_shared_realm(config2); + + ReportedProgress progress_2; + add_callbacks(realm_2, progress_2); + wait_for_sync(realm_2); + VERIFY_REALM(realm_1, realm_2, expected_count); + + int expected_download_stages = is_flx ? 2 : 1; // + query version 0 progress + VERIFY_PROGRESS_CONSISTENCY_EX(progress_2, 0, 4, expected_download_stages); + clear(progress_2); + + VERIFY_PROGRESS_EMPTY(progress_1, 0, progress_1.size()); + + SECTION("continuous sync with existing instances") { + expected_count = setup.add_objects(realm_2); + add_callbacks(realm_2, progress_2); + wait_for_sync(realm_2); + + add_callbacks(realm_1, progress_1); + wait_for_sync(realm_1); + VERIFY_REALM(realm_1, realm_2, expected_count); + + // initially registered non-streaming callbacks should stay empty + VERIFY_PROGRESS_EMPTY(progress_1, 0, 2); + VERIFY_PROGRESS_EMPTY(progress_2, 0, 2); + // old streaming and newly registered should be reported + VERIFY_PROGRESS_CONSISTENCY(progress_1, 2, 8); + VERIFY_PROGRESS_CONSISTENCY(progress_2, 2, 8); + } + + SECTION("reopen and sync existing realm") { + realm_2.reset(); + expected_count = setup.add_objects(realm_1); + wait_for_sync(realm_1); + + realm_2 = Realm::get_shared_realm(config2); + add_callbacks(realm_2, progress_2); + wait_for_sync(realm_2); + VERIFY_REALM(realm_1, realm_2, expected_count); + + VERIFY_PROGRESS_EMPTY(progress_1, 0, 2); + VERIFY_PROGRESS_CONSISTENCY(progress_1, 2, 4); + VERIFY_PROGRESS_EMPTY(progress_2, 0, 4); + VERIFY_PROGRESS_CONSISTENCY(progress_2, 4, 8); + } + + clear(progress_1); + clear(progress_2); + } + + SECTION("progress through async open task on a new realm") { + auto config_3 = setup.make_config(); + ReportedProgress progress; + + // FIXME hits no_sessions assert in SyncManager due to issue with libuv scheduler and notifications + config_3.scheduler = util::Scheduler::make_dummy(); + config_3.automatic_change_notifications = false; + + // 0: open and sync fresh realm - should be equal to the realm_1 + // 1: add more objects to sync through realm_1 and try async open again + for (int i = 0; i < 2; ++i) { + auto task = Realm::get_synchronized_realm(config_3); + REQUIRE(task); + + auto progress_index = progress.size(); + progress.resize(progress.size() + 1); + + task->register_download_progress_notifier([&](uint64_t xferred, uint64_t xferable, double estimate) { + std::lock_guard lock(progress_mutex); + progress[progress_index].emplace_back(Progress{xferred, xferable, estimate}); + }); + + std::atomic finished = false; + ThreadSafeReference ref; + std::exception_ptr err = nullptr; + task->start([&](ThreadSafeReference r, std::exception_ptr e) { + ref = std::move(r); + err = e; + finished = true; + }); + + util::EventLoop::main().run_until([&] { + return finished.load(); + }); + + CHECK_FALSE(err); + REQUIRE(ref); + auto realm_3 = Realm::get_shared_realm(std::move(ref), util::Scheduler::make_dummy()); + VERIFY_REALM(realm_1, realm_3, expected_count); + realm_3.reset(); + + VERIFY_PROGRESS_CONSISTENCY_ONE(progress, progress_index, 1, true, false); + VERIFY_PROGRESS_EMPTY(progress, 0, progress_index); // previous (from i = 0) should be empty + clear(progress); + + // add more objects through realm_1 and reopen existing realm on second iteration + if (i == 0) { + expected_count = setup.add_objects(realm_1); + add_callbacks(realm_1, progress_1); + wait_for_sync(realm_1); + VERIFY_PROGRESS_EMPTY(progress_1, 0, 2); + VERIFY_PROGRESS_CONSISTENCY(progress_1, 2, 8); + clear(progress_1); + } + } + } +} +#endif diff --git a/test/object-store/util/test_file.hpp b/test/object-store/util/test_file.hpp index 560d45d3dfb..cf9692c179d 100644 --- a/test/object-store/util/test_file.hpp +++ b/test/object-store/util/test_file.hpp @@ -76,6 +76,8 @@ struct TestFile : realm::Realm::Config { TestFile(const TestFile&) = delete; TestFile& operator=(const TestFile&) = delete; + TestFile(TestFile&&) = default; + TestFile& operator=(TestFile&&) = default; // The file should outlive the object, ie. should not be deleted in destructor void persist() diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 03565b1514f..22a29ff18af 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -107,11 +107,11 @@ class TestServerHistoryContext : public _impl::ServerHistory::Context { auto name = DB::create(make_client_replication(), name##_path); template -void write_transaction(DBRef db, Function&& function) +DB::version_type write_transaction(DBRef db, Function&& function) { WriteTransaction wt(db); function(wt); - wt.commit(); + return wt.commit(); } ClientReplication& get_replication(DBRef db) @@ -2986,8 +2986,11 @@ TEST(Sync_UploadDownloadProgress_1) TEST_DIR(server_dir); TEST_CLIENT_DB(db); - uint_fast64_t global_snapshot_version; - + std::atomic downloaded_bytes; + std::atomic downloadable_bytes; + std::atomic uploaded_bytes; + std::atomic uploadable_bytes; + std::atomic snapshot_version; { int handler_entry = 0; @@ -2995,56 +2998,40 @@ TEST(Sync_UploadDownloadProgress_1) std::mutex mutex; std::condition_variable cond_var; - std::atomic downloaded_bytes; - std::atomic downloadable_bytes; - std::atomic uploaded_bytes; - std::atomic uploadable_bytes; - std::atomic progress_version; - std::atomic snapshot_version; - ClientServerFixture fixture(server_dir, test_context); fixture.start(); Session session = fixture.make_session(db, "/test"); auto progress_handler = [&](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, - uint_fast64_t uploadable, uint_fast64_t progress, uint_fast64_t snapshot) { + uint_fast64_t uploadable, uint_fast64_t snapshot, double, double) { downloaded_bytes = downloaded; downloadable_bytes = downloadable; uploaded_bytes = uploaded; uploadable_bytes = uploadable; - progress_version = progress; snapshot_version = snapshot; + ++handler_entry; + }; - if (handler_entry == 0) { + std::unique_lock lock(mutex); + session.set_progress_handler(progress_handler); + session.set_connection_state_change_listener([&](ConnectionState state, util::Optional) { + if (state == ConnectionState::connected) { std::unique_lock lock(mutex); cond_var_signaled = true; lock.unlock(); cond_var.notify_one(); } - ++handler_entry; - }; - - std::unique_lock lock(mutex); - session.set_progress_handler(progress_handler); + }); session.bind(); cond_var.wait(lock, [&] { return cond_var_signaled; }); + CHECK_EQUAL(handler_entry, 0); - CHECK_EQUAL(downloaded_bytes, uint_fast64_t(0)); - CHECK_EQUAL(downloadable_bytes, uint_fast64_t(0)); - CHECK_EQUAL(uploaded_bytes, uint_fast64_t(0)); - CHECK_EQUAL(uploadable_bytes, uint_fast64_t(0)); - CHECK_GREATER_EQUAL(snapshot_version, uint_fast64_t(1)); - - uint_fast64_t commit_version; - { - WriteTransaction wt{db}; - TableRef tr = wt.get_group().add_table_with_primary_key("class_table", type_Int, "id"); - tr->add_column(type_Int, "integer column"); - commit_version = wt.commit(); - } + auto commit_version = write_transaction(db, [](WriteTransaction& wt) { + wt.get_group().add_table_with_primary_key("class_table", type_Int, "id"); + }); session.wait_for_upload_complete_or_client_stopped(); session.wait_for_download_complete_or_client_stopped(); @@ -3053,15 +3040,12 @@ TEST(Sync_UploadDownloadProgress_1) CHECK_EQUAL(downloadable_bytes, uint_fast64_t(0)); CHECK_NOT_EQUAL(uploaded_bytes, uint_fast64_t(0)); CHECK_NOT_EQUAL(uploadable_bytes, uint_fast64_t(0)); - CHECK_GREATER(progress_version, uint_fast64_t(0)); CHECK_GREATER_EQUAL(snapshot_version, commit_version); - { - WriteTransaction wt{db}; - TableRef tr = wt.get_table("class_table"); - tr->create_object_with_primary_key(1).set("integer column", 42); - commit_version = wt.commit(); - } + + commit_version = write_transaction(db, [](WriteTransaction& wt) { + wt.get_table("class_table")->create_object_with_primary_key(1); + }); session.wait_for_upload_complete_or_client_stopped(); session.wait_for_download_complete_or_client_stopped(); @@ -3071,8 +3055,6 @@ TEST(Sync_UploadDownloadProgress_1) CHECK_NOT_EQUAL(uploaded_bytes, uint_fast64_t(0)); CHECK_NOT_EQUAL(uploadable_bytes, uint_fast64_t(0)); CHECK_GREATER_EQUAL(snapshot_version, commit_version); - - global_snapshot_version = snapshot_version; } { @@ -3085,32 +3067,19 @@ TEST(Sync_UploadDownloadProgress_1) std::mutex mutex; std::condition_variable cond_var; - Client::Config config; - config.logger = std::make_shared("Client: ", test_context.logger); - auto socket_provider = std::make_shared(config.logger, ""); - config.socket_provider = socket_provider; - config.reconnect_mode = ReconnectMode::testing; - Client client(config); - - Session::Config sess_config; - sess_config.server_address = "no server"; - sess_config.server_port = 8000; - sess_config.realm_identifier = "/test"; - sess_config.signed_user_token = g_signed_test_user_token; - - Session session(client, db, nullptr, nullptr, std::move(sess_config)); + ClientServerFixture fixture(server_dir, test_context); + fixture.start(); + Session session = fixture.make_session(db, "/test"); int number_of_handler_calls = 0; - auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { - CHECK_EQUAL(downloaded_bytes, 0); - CHECK_EQUAL(downloadable_bytes, 0); - CHECK_NOT_EQUAL(uploaded_bytes, 0); - CHECK_NOT_EQUAL(uploadable_bytes, 0); - CHECK_EQUAL(progress_version, 0); - CHECK_EQUAL(snapshot_version, global_snapshot_version); + auto progress_handler = [&](uint_fast64_t downloaded, uint_fast64_t downloadable, uint_fast64_t uploaded, + uint_fast64_t uploadable, uint_fast64_t snapshot, double, double) { + CHECK_EQUAL(downloaded, downloaded_bytes); + CHECK_EQUAL(downloadable, downloaded_bytes); + CHECK_EQUAL(uploaded, uploaded_bytes); + CHECK_GREATER(uploadable, uploaded_bytes); + CHECK_GREATER(snapshot, snapshot_version); number_of_handler_calls++; std::unique_lock lock(mutex); @@ -3122,11 +3091,13 @@ TEST(Sync_UploadDownloadProgress_1) std::unique_lock lock(mutex); session.set_progress_handler(progress_handler); session.bind(); + write_transaction(db, [](WriteTransaction& wt) { + wt.get_table("class_table")->create_object_with_primary_key(2); + }); cond_var.wait(lock, [&] { return cond_var_signaled; }); - client.shutdown(); CHECK_EQUAL(number_of_handler_calls, 1); } } @@ -3154,17 +3125,15 @@ TEST(Sync_UploadDownloadProgress_2) uint_fast64_t downloadable_bytes_1 = 123; uint_fast64_t uploaded_bytes_1 = 123; uint_fast64_t uploadable_bytes_1 = 123; - uint_fast64_t progress_version_1 = 123; uint_fast64_t snapshot_version_1 = 0; auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; uploadable_bytes_1 = uploadable_bytes; - progress_version_1 = progress_version; snapshot_version_1 = snapshot_version; }; @@ -3174,17 +3143,15 @@ TEST(Sync_UploadDownloadProgress_2) uint_fast64_t downloadable_bytes_2 = 123; uint_fast64_t uploaded_bytes_2 = 123; uint_fast64_t uploadable_bytes_2 = 123; - uint_fast64_t progress_version_2 = 123; uint_fast64_t snapshot_version_2 = 0; auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; uploadable_bytes_2 = uploadable_bytes; - progress_version_2 = progress_version; snapshot_version_2 = snapshot_version; }; @@ -3202,7 +3169,6 @@ TEST(Sync_UploadDownloadProgress_2) CHECK_EQUAL(downloaded_bytes_2, downloadable_bytes_2); CHECK_EQUAL(downloaded_bytes_1, downloaded_bytes_2); CHECK_EQUAL(downloadable_bytes_1, 0); - CHECK_GREATER(progress_version_1, 0); CHECK_GREATER(snapshot_version_1, 0); CHECK_EQUAL(uploaded_bytes_1, 0); @@ -3210,7 +3176,6 @@ TEST(Sync_UploadDownloadProgress_2) CHECK_EQUAL(uploaded_bytes_2, 0); CHECK_EQUAL(uploadable_bytes_2, 0); - CHECK_GREATER(progress_version_2, 0); CHECK_GREATER(snapshot_version_2, 0); write_transaction(db_1, [](WriteTransaction& wt) { @@ -3355,7 +3320,6 @@ TEST(Sync_UploadDownloadProgress_3) wt.commit(); } - Client::Config client_config; client_config.logger = std::make_shared("Client: ", test_context.logger); auto socket_provider = std::make_shared(client_config.logger, ""); @@ -3384,18 +3348,16 @@ TEST(Sync_UploadDownloadProgress_3) uint_fast64_t downloadable_bytes_1 = 123; uint_fast64_t uploaded_bytes_1 = 123; uint_fast64_t uploadable_bytes_1 = 123; - uint_fast64_t progress_version_1 = 123; uint_fast64_t snapshot_version_1 = 0; auto progress_handler = [&, entry = int(0), promise = util::CopyablePromiseHolder(std::move(signal_pf.promise))]( uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) mutable { + uint_fast64_t snapshot_version, double, double) mutable { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; uploadable_bytes_1 = uploadable_bytes; - progress_version_1 = progress_version; snapshot_version_1 = snapshot_version; if (entry == 0) { @@ -3403,13 +3365,7 @@ TEST(Sync_UploadDownloadProgress_3) CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); CHECK_NOT_EQUAL(uploadable_bytes, 0); - CHECK_EQUAL(snapshot_version, 2); - } - - if (entry == 0) { - server_thread.start([&] { - server.run(); - }); + CHECK_EQUAL(snapshot_version, 4); } if (should_signal_cond_var) { @@ -3421,6 +3377,10 @@ TEST(Sync_UploadDownloadProgress_3) session.set_progress_handler(progress_handler); + server_thread.start([&] { + server.run(); + }); + session.bind(); session.wait_for_upload_complete_or_client_stopped(); @@ -3432,7 +3392,6 @@ TEST(Sync_UploadDownloadProgress_3) CHECK_EQUAL(downloadable_bytes_1, 0); CHECK_NOT_EQUAL(uploaded_bytes_1, 0); CHECK_NOT_EQUAL(uploadable_bytes_1, 0); - CHECK_GREATER(progress_version_1, 0); CHECK_GREATER_EQUAL(snapshot_version_1, 2); server.stop(); @@ -3506,39 +3465,29 @@ TEST(Sync_UploadDownloadProgress_4) auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_NOT_EQUAL(uploadable_bytes, 0); switch (entry_1) { case 0: - // Session is bound and initial state is reported - CHECK_EQUAL(progress_version, 0); - CHECK_EQUAL(uploaded_bytes, 0); - CHECK_EQUAL(snapshot_version, 3); - break; - - case 1: // We've received the empty DOWNLOAD message and now have reliable // download progress - CHECK_EQUAL(progress_version, 1); CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(snapshot_version, 5); break; - case 2: + case 1: // First UPLOAD is complete, but we still have more to upload // because the changesets are too large to batch into a single upload - CHECK_EQUAL(progress_version, 1); CHECK_GREATER(uploaded_bytes, 0); CHECK_LESS(uploaded_bytes, uploadable_bytes); CHECK_EQUAL(snapshot_version, 6); break; - case 3: + case 2: // Second UPLOAD is complete and we're done uploading - CHECK_EQUAL(progress_version, 1); CHECK_EQUAL(uploaded_bytes, uploadable_bytes); CHECK_EQUAL(snapshot_version, 7); break; @@ -3554,7 +3503,7 @@ TEST(Sync_UploadDownloadProgress_4) session_1.wait_for_upload_complete_or_client_stopped(); session_1.wait_for_download_complete_or_client_stopped(); - CHECK_EQUAL(entry_1, 4); + CHECK_EQUAL(entry_1, 3); Session session_2 = fixture.make_session(db_2, "/test"); @@ -3562,32 +3511,22 @@ TEST(Sync_UploadDownloadProgress_4) auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); switch (entry_2) { case 0: - // Session is bound and initial state is reported - CHECK_EQUAL(progress_version, 0); - CHECK_EQUAL(downloaded_bytes, 0); - CHECK_EQUAL(downloadable_bytes, 0); - CHECK_EQUAL(snapshot_version, 1); - break; - - case 1: // First DOWNLOAD message received. Some data is downloaded, but // download isn't compelte - CHECK_EQUAL(progress_version, 1); CHECK_GREATER(downloaded_bytes, 0); CHECK_GREATER(downloadable_bytes, 0); CHECK_LESS(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(snapshot_version, 3); break; - case 2: + case 1: // Second DOWNLOAD message received. Download is now complete. - CHECK_EQUAL(progress_version, 1); CHECK_GREATER(downloaded_bytes, 0); CHECK_GREATER(downloadable_bytes, 0); CHECK_EQUAL(downloaded_bytes, downloadable_bytes); @@ -3603,7 +3542,7 @@ TEST(Sync_UploadDownloadProgress_4) session_2.wait_for_upload_complete_or_client_stopped(); session_2.wait_for_download_complete_or_client_stopped(); - CHECK_EQUAL(entry_2, 3); + CHECK_EQUAL(entry_2, 2); } @@ -3615,32 +3554,39 @@ TEST(Sync_UploadDownloadProgress_5) TEST_DIR(server_dir); TEST_CLIENT_DB(db); - auto [progress_handled_promise, progress_handled] = util::make_promise_future(); + std::mutex mutex; + std::condition_variable session_cv; + bool signaled = false; ClientServerFixture fixture(server_dir, test_context); fixture.start(); Session session = fixture.make_session(db, "/test"); - auto progress_handler = [&, promise = util::CopyablePromiseHolder(std::move(progress_handled_promise))]( - uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, + auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) mutable { + uint_fast64_t snapshot_version, double, double) mutable { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); - - if (progress_version > 0) { - CHECK_EQUAL(snapshot_version, 3); - promise.get_promise().emplace_value(); - } + CHECK_EQUAL(snapshot_version, 3); + std::lock_guard lock{mutex}; + signaled = true; + session_cv.notify_one(); }; session.set_progress_handler(progress_handler); - session.bind(); - progress_handled.get(); + { + std::unique_lock lock{mutex}; + session.bind(); + // Wait until the progress handler is called on the session before tearing down the client + session_cv.wait_for(lock, std::chrono::seconds(5), [&]() { + return signaled; + }); + } + CHECK(signaled); // The check is that we reach this point. } @@ -3678,27 +3624,35 @@ TEST(Sync_UploadDownloadProgress_6) client_config.one_connection_per_session = false; Client client(client_config); + util::ScopeExit cleanup([&]() noexcept { + client.shutdown_and_wait(); + server.stop(); + server_thread.join(); + }); + Session::Config session_config; session_config.server_address = "localhost"; session_config.server_port = server_port; session_config.realm_identifier = "/test"; + session_config.service_identifier = "/realm-sync"; session_config.signed_user_token = g_signed_test_user_token; std::mutex mutex; std::condition_variable session_cv; + bool signaled = false; auto session = std::make_unique(client, db, nullptr, nullptr, std::move(session_config)); auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { CHECK_EQUAL(downloaded_bytes, 0); CHECK_EQUAL(downloadable_bytes, 0); CHECK_EQUAL(uploaded_bytes, 0); CHECK_EQUAL(uploadable_bytes, 0); - CHECK_EQUAL(progress_version, 0); - CHECK_EQUAL(snapshot_version, 1); + CHECK_EQUAL(snapshot_version, 3); std::lock_guard lock{mutex}; session.reset(); + signaled = true; session_cv.notify_one(); }; @@ -3708,14 +3662,12 @@ TEST(Sync_UploadDownloadProgress_6) std::unique_lock lock{mutex}; session->bind(); // Wait until the progress handler is called on the session before tearing down the client - session_cv.wait_for(lock, std::chrono::seconds(30), [&session]() { - return !bool(session); + session_cv.wait_for(lock, std::chrono::seconds(5), [&]() { + return signaled; }); } - - client.shutdown_and_wait(); - server.stop(); - server_thread.join(); + CHECK(signaled); + CHECK(!(session)); // The check is that we reach this point without deadlocking or throwing an assert while tearing // down the active session @@ -3790,7 +3742,7 @@ TEST(Sync_UploadProgress_EmptyCommits) std::atomic entry = 0; session.set_progress_handler( - [&](uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t) { + [&](uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, uint_fast64_t, double, double) { ++entry; }); session.bind(); @@ -3801,9 +3753,9 @@ TEST(Sync_UploadProgress_EmptyCommits) session.wait_for_upload_complete_or_client_stopped(); session.wait_for_upload_complete_or_client_stopped(); - // Binding produces three notifications: the initial state, one after receiving + // Binding produces two notifications: one after receiving // the DOWNLOAD message, and one after uploading the schema - CHECK_EQUAL(entry, 3); + CHECK_EQUAL(entry, 2); // No notification sent because an empty commit doesn't change uploadable_bytes { @@ -3812,7 +3764,7 @@ TEST(Sync_UploadProgress_EmptyCommits) } session.wait_for_upload_complete_or_client_stopped(); session.wait_for_upload_complete_or_client_stopped(); - CHECK_EQUAL(entry, 3); + CHECK_EQUAL(entry, 2); // Both the external and local commits are empty, so again no change in // uploadable_bytes @@ -3825,7 +3777,7 @@ TEST(Sync_UploadProgress_EmptyCommits) } session.wait_for_upload_complete_or_client_stopped(); session.wait_for_upload_complete_or_client_stopped(); - CHECK_EQUAL(entry, 3); + CHECK_EQUAL(entry, 2); // Local commit is empty, but the changeset created by the external write // is discovered after the local write, resulting in two notifications (one @@ -3840,7 +3792,7 @@ TEST(Sync_UploadProgress_EmptyCommits) } session.wait_for_upload_complete_or_client_stopped(); session.wait_for_upload_complete_or_client_stopped(); - CHECK_EQUAL(entry, 5); + CHECK_EQUAL(entry, 4); } TEST(Sync_MultipleSyncAgentsNotAllowed) @@ -4088,7 +4040,7 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32)) auto progress_handler_1 = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t, std::uint_fast64_t) { + std::uint_fast64_t, double, double) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -4101,8 +4053,8 @@ TEST_IF(Sync_MergeLargeBinary, !(REALM_ARCHITECTURE_X86_32)) std::uint_fast64_t uploadable_bytes_2 = 0; auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, - uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, uint_fast64_t, - uint_fast64_t) { + uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, uint_fast64_t, double, + double) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; @@ -4242,7 +4194,7 @@ TEST(Sync_MergeLargeBinaryReducedMemory) auto progress_handler_1 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t /* progress_version */, uint_fast64_t /* snapshot_version */) { + uint_fast64_t /* snapshot_version */, double, double) { downloaded_bytes_1 = downloaded_bytes; downloadable_bytes_1 = downloadable_bytes; uploaded_bytes_1 = uploaded_bytes; @@ -4256,7 +4208,7 @@ TEST(Sync_MergeLargeBinaryReducedMemory) auto progress_handler_2 = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t /* progress_version */, uint_fast64_t /* snapshot_version */) { + uint_fast64_t /* snapshot_version */, double, double) { downloaded_bytes_2 = downloaded_bytes; downloadable_bytes_2 = downloadable_bytes; uploaded_bytes_2 = uploaded_bytes; @@ -4677,7 +4629,7 @@ TEST(Sync_BatchedUploadMessages) auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { CHECK_GREATER(uploadable_bytes, 1000); // This is the important check. If the changesets were not batched, @@ -4686,7 +4638,6 @@ TEST(Sync_BatchedUploadMessages) CHECK(uploaded_bytes == 0 || uploaded_bytes == uploadable_bytes); CHECK_EQUAL(0, downloaded_bytes); CHECK_EQUAL(0, downloadable_bytes); - static_cast(progress_version); static_cast(snapshot_version); }; @@ -4730,13 +4681,12 @@ TEST(Sync_UploadLogCompactionEnabled) auto progress_handler = [&](uint_fast64_t downloaded_bytes, uint_fast64_t downloadable_bytes, uint_fast64_t uploaded_bytes, uint_fast64_t uploadable_bytes, - uint_fast64_t progress_version, uint_fast64_t snapshot_version) { + uint_fast64_t snapshot_version, double, double) { CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); static_cast(snapshot_version); - if (progress_version > 0) - CHECK_NOT_EQUAL(downloadable_bytes, 0); + CHECK_NOT_EQUAL(downloadable_bytes, 0); }; session_2.set_progress_handler(progress_handler); @@ -4789,13 +4739,12 @@ TEST(Sync_UploadLogCompactionDisabled) auto progress_handler = [&](std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes, std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes, - std::uint_fast64_t progress_version, std::uint_fast64_t snapshot_version) { + std::uint_fast64_t snapshot_version, double, double) { CHECK_EQUAL(downloaded_bytes, downloadable_bytes); CHECK_EQUAL(0, uploaded_bytes); CHECK_EQUAL(0, uploadable_bytes); static_cast(snapshot_version); - if (progress_version > 0) - CHECK_NOT_EQUAL(0, downloadable_bytes); + CHECK_NOT_EQUAL(0, downloadable_bytes); }; Session session_2 = fixture.make_session(db_2, "/test"); diff --git a/test/tsan.suppress b/test/tsan.suppress index e1a579a4a9f..4ae53f615b6 100644 --- a/test/tsan.suppress +++ b/test/tsan.suppress @@ -19,3 +19,8 @@ deadlock:realm::sync::MigrationStore::create_subscriptions # mktime, timegm, gmtime modify global time zone env var, but the race is harmless race:adjtime + +# libuv scheduler is only supposed to be used on main thread but constructed +# on the sync thread for async open task, should be harmless to suppress, +# but ultimately needs to be fixed: #7083 +race:uv_async_init