Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel subscription notifications #7073

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ void SyncSession::become_dying(util::CheckedUniqueLock lock)
m_state_mutex.unlock(lock);
}

void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status)
void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status, bool cancel_subscription_notifications)
{
REALM_ASSERT(m_state != State::Inactive);
m_state = State::Inactive;

do_become_inactive(std::move(lock), status);
do_become_inactive(std::move(lock), status, cancel_subscription_notifications);
}

void SyncSession::become_paused(util::CheckedUniqueLock lock)
Expand All @@ -172,7 +172,7 @@ void SyncSession::become_paused(util::CheckedUniqueLock lock)
return;
}

do_become_inactive(std::move(lock), Status::OK());
do_become_inactive(std::move(lock), Status::OK(), true);
}

void SyncSession::do_restart_session(util::CheckedUniqueLock)
Expand All @@ -198,7 +198,8 @@ void SyncSession::do_restart_session(util::CheckedUniqueLock)
become_active();
}

void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status)
void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status,
bool cancel_subscription_notifications)
{
// Manually set the disconnected state. Sync would also do this, but
// since the underlying SyncSession object already have been destroyed,
Expand All @@ -216,6 +217,7 @@ void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status
m_sync_manager->unregister_session(m_db->get_path());
}

auto subscription_store = m_flx_subscription_store;
m_state_mutex.unlock(lock);

// Send notifications after releasing the lock to prevent deadlocks in the callback.
Expand All @@ -226,6 +228,10 @@ void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status
if (status.is_ok())
status = Status(ErrorCodes::OperationAborted, "Sync session became inactive");

if (subscription_store && cancel_subscription_notifications) {
subscription_store->notify_all_state_change_notifications(status);
}

// Inform any queued-up completion handlers that they were cancelled.
for (auto& [id, callback] : waits)
callback.second(status);
Expand Down Expand Up @@ -610,7 +616,9 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
else
m_completion_callbacks.merge(std::move(callbacks));
});
become_inactive(std::move(lock)); // unlocks the lock
// Do not cancel the notifications on subscriptions.
bool cancel_subscription_notifications = false;
become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications); // unlocks the lock

// Once the session is inactive, update sync config and subscription store after migration.
if (server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
Expand Down Expand Up @@ -651,7 +659,6 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
case sync::ProtocolErrorInfo::Action::ApplicationBug:
[[fallthrough]];
case sync::ProtocolErrorInfo::Action::ProtocolViolation:
next_state = NextStateAfterError::inactive;
danieltabacaru marked this conversation as resolved.
Show resolved Hide resolved
break;
case sync::ProtocolErrorInfo::Action::Warning:
break; // not fatal, but should be bubbled up to the user below.
Expand Down Expand Up @@ -772,17 +779,16 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
}
}

void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error,
std::optional<Status> subs_notify_error)
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error)
{
CompletionCallbacks callbacks;
std::swap(callbacks, m_completion_callbacks);

// Inform any waiters on pending subscription states that they were cancelled
if (subs_notify_error && m_flx_subscription_store) {
if (m_flx_subscription_store) {
auto subscription_store = m_flx_subscription_store;
m_state_mutex.unlock(lock);
subscription_store->notify_all_state_change_notifications(*subs_notify_error);
subscription_store->notify_all_state_change_notifications(error);
}
else {
m_state_mutex.unlock(lock);
Expand Down
7 changes: 3 additions & 4 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
REQUIRES(!m_state_mutex, !m_config_mutex);
// If sub_notify_error is set (including Status::OK()), then the pending subscription waiters will
// also be called with the sub_notify_error status value.
void cancel_pending_waits(util::CheckedUniqueLock, Status, std::optional<Status> subs_notify_error = std::nullopt)
RELEASE(m_state_mutex);
void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex);
enum class ShouldBackup { yes, no };
void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex);
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t);
Expand All @@ -398,7 +397,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

void become_active() REQUIRES(m_state_mutex, !m_config_mutex);
void become_dying(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_inactive(util::CheckedUniqueLock, Status ec = Status::OK()) RELEASE(m_state_mutex)
void become_inactive(util::CheckedUniqueLock, Status = Status::OK(), bool = true) RELEASE(m_state_mutex)
REQUIRES(!m_connection_state_mutex);
void become_paused(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
void become_waiting_for_access_token() REQUIRES(m_state_mutex);
Expand All @@ -409,7 +408,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

// do_become_inactive is called from both become_paused()/become_inactive() and does all the steps to
// shutdown and cleanup the sync session besides setting m_state.
void do_become_inactive(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex)
void do_become_inactive(util::CheckedUniqueLock, Status, bool) RELEASE(m_state_mutex)
REQUIRES(!m_connection_state_mutex);
// do_revive is called from both revive_if_needed() and resume(). It does all the steps to transition
// from a state that is not Active to Active.
Expand Down
76 changes: 75 additions & 1 deletion test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3472,10 +3472,11 @@ TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]")

interrupted.get();
problem_realm->sync_session()->shutdown_and_wait();
REQUIRE(!sub_complete_future.is_ready());
REQUIRE(sub_complete_future.is_ready());
sub_set.refresh();
REQUIRE(sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);

sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
problem_realm->sync_session()->resume();
sub_complete_future.get();
wait_for_advance(*problem_realm);
Expand Down Expand Up @@ -4375,6 +4376,79 @@ TEST_CASE("flx sync: resend pending subscriptions when reconnecting", "[sync][fl
wait_for_download(*realm);
}

TEST_CASE("flx: fatal errors and session becoming inactive cancel pending waits", "[sync][flx][baas]") {
std::vector<ObjectSchema> schema{
{"TopLevel",
{
{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
{"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
}},
};

FLXSyncTestHarness harness("flx_cancel_pending_waits", {schema, {"queryable_int_field"}});
SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});

auto check_status = [](auto status) {
CHECK(!status.is_ok());
std::string reason = status.get_status().reason();
// Subscription notification is cancelled either because the sync session is inactive, or because a fatal
// error is received from the server.
if (reason.find("Sync session became inactive") == std::string::npos &&
reason.find("Invalid schema change (UPLOAD): non-breaking schema change: adding \"Int\" column at field "
"\"other_col\" in schema \"TopLevel\", schema changes from clients are restricted when "
"developer mode is disabled") == std::string::npos) {
FAIL(reason);
}
};

auto create_subscription = [](auto realm) -> realm::sync::SubscriptionSet {
auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
auto table = realm->read_group().get_table("class_TopLevel");
mut_subs.insert_or_assign(Query(table));
return mut_subs.commit();
};

auto [error_occured_promise, error_occurred] = util::make_promise_future<void>();
config.sync_config->error_handler = [promise = util::CopyablePromiseHolder(std::move(error_occured_promise))](
std::shared_ptr<SyncSession>, SyncError) mutable {
promise.get_promise().emplace_value();
};

auto realm = Realm::get_shared_realm(config);
wait_for_download(*realm);

auto subs = create_subscription(realm);
auto subs_future = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);

realm->sync_session()->pause();
auto state = subs_future.get_no_throw();
check_status(state);

auto [download_complete_promise, download_complete] = util::make_promise_future<void>();
realm->sync_session()->wait_for_upload_completion([promise = std::move(download_complete_promise)](auto) mutable {
promise.emplace_value();
});
schema[0].persisted_properties.push_back({"other_col", PropertyType::Int | PropertyType::Nullable});
realm->update_schema(schema);

subs = create_subscription(realm);
subs_future = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);

harness.load_initial_data([&](SharedRealm realm) {
CppContext c(realm);
Object::create(c, realm, "TopLevel",
std::any(AnyDict{{"_id", ObjectId::gen()},
{"queryable_int_field", static_cast<int64_t>(5)},
{"other_col", static_cast<int64_t>(42)}}));
});

realm->sync_session()->resume();
download_complete.get();
error_occurred.get();
state = subs_future.get_no_throw();
check_status(state);
}

} // namespace realm::app

#endif // REALM_ENABLE_AUTH_TESTS
2 changes: 1 addition & 1 deletion test/object-store/sync/session/wait_for_completion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ TEST_CASE("SyncSession: wait_for_download_completion() API", "[sync][pbs][sessio
});
REQUIRE(handler_called == false);
// Now trigger an error
sync::SessionErrorInfo err{err_status, sync::IsFatal{false}};
sync::SessionErrorInfo err{err_status, sync::IsFatal{true}};
danieltabacaru marked this conversation as resolved.
Show resolved Hide resolved
err.server_requests_action = sync::ProtocolErrorInfo::Action::ProtocolViolation;
SyncSession::OnlyForTesting::handle_error(*session, std::move(err));
EventLoop::main().run_until([&] {
Expand Down