Skip to content

Commit

Permalink
Extract some duplicated code for sync triggers and timers
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed Jul 22, 2024
1 parent 6a9a087 commit a3ae265
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 162 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-----------

### Internals
* None.
* Refactor the implementation of sync triggers and timers to eliminate some duplicated code. ([PR #7912](https://github.com/realm/realm-core/pull/7912))

----------------------------------------------

Expand Down
23 changes: 7 additions & 16 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,9 @@ void ClientImpl::register_unactualized_session_wrapper(SessionWrapper* wrapper)
return;
}

REALM_ASSERT(m_actualize_and_finalize);
m_unactualized_session_wrappers.push(util::bind_ptr(wrapper));
}
m_actualize_and_finalize->trigger();
m_actualize_and_finalize.trigger();
}


Expand All @@ -514,7 +513,6 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
// Thread safety required.
{
util::CheckedLockGuard lock{m_mutex};
REALM_ASSERT(m_actualize_and_finalize);
// The wrapper may have already been finalized before being abandoned
// if we were stopped when it was created.
if (wrapper->mark_abandoned())
Expand All @@ -530,7 +528,7 @@ void ClientImpl::register_abandoned_session_wrapper(util::bind_ptr<SessionWrappe
}
m_abandoned_session_wrappers.push(std::move(wrapper));
}
m_actualize_and_finalize->trigger();
m_actualize_and_finalize.trigger();
}


Expand Down Expand Up @@ -1841,23 +1839,12 @@ ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ide
, m_ssl_verify_callback{std::move(ssl_verify_callback)} // DEPRECATED
, m_proxy_config{std::move(proxy_config)} // DEPRECATED
, m_reconnect_info{reconnect_info}
, m_on_idle{m_client, &Connection::on_idle, this}
, m_ident{ident}
, m_server_endpoint{std::move(endpoint)}
, m_authorization_header_name{authorization_header_name} // DEPRECATED
, m_custom_http_headers{custom_http_headers} // DEPRECATED
{
m_on_idle = m_client.create_trigger([this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

REALM_ASSERT(m_activated);
if (m_state == ConnectionState::disconnected && m_num_active_sessions == 0) {
on_idle(); // Throws
// Connection object may be destroyed now.
}
});
}

inline connection_ident_type ClientImpl::Connection::get_ident() const noexcept
Expand Down Expand Up @@ -1889,6 +1876,10 @@ void ClientImpl::Connection::resume_active_sessions()

void ClientImpl::Connection::on_idle()
{
REALM_ASSERT(m_activated);
if (m_state != ConnectionState::disconnected || m_num_active_sessions != 0)
return;

logger.debug(util::LogCategory::session, "Destroying connection object");
ClientImpl& client = get_client();
client.remove_connection(*this);
Expand Down
104 changes: 26 additions & 78 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ ClientImpl::ClientImpl(ClientConfig config)
, m_fix_up_object_ids{config.fix_up_object_ids}
, m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
, m_socket_provider{std::move(config.socket_provider)}
, m_client_protocol{} // Throws
, m_one_connection_per_session{config.one_connection_per_session}
, m_random{}
, m_actualize_and_finalize{*this, &ClientImpl::actualize_and_finalize_session_wrappers, this}
{
// FIXME: Would be better if seeding was up to the application.
util::seed_prng_nondeterministically(m_random); // Throws
Expand Down Expand Up @@ -220,14 +219,6 @@ ClientImpl::ClientImpl(ClientConfig config)
logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
"never do this in production");
}

m_actualize_and_finalize = create_trigger([this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);
actualize_and_finalize_session_wrappers(); // Throws
});
}

void ClientImpl::incr_outstanding_posts()
Expand Down Expand Up @@ -297,25 +288,23 @@ void ClientImpl::drain_connections()


SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
SyncSocketProvider::FunctionHandler&& handler)
util::UniqueFunction<void()>&& handler)
{
REALM_ASSERT(m_socket_provider);
incr_outstanding_posts();
return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
auto decr_guard = util::make_scope_exit([&]() noexcept {
ScopeExit decr_guard([&]() noexcept {
decr_outstanding_posts();
});
handler(status);
if (status == ErrorCodes::OperationAborted)
return;
if (!status.is_ok())
throw Exception(status);
handler();
});
}


ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
{
REALM_ASSERT(m_socket_provider);
return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
}

Connection::~Connection()
{
if (m_websocket_sentinel) {
Expand All @@ -326,10 +315,9 @@ Connection::~Connection()

void Connection::activate()
{
REALM_ASSERT(m_on_idle);
m_activated = true;
if (m_num_active_sessions == 0)
m_on_idle->trigger();
m_on_idle.trigger();
// We cannot in general connect immediately, because a prior failure to
// connect may require a delay before reconnecting (see `m_reconnect_info`).
initiate_reconnect_wait(); // Throws
Expand Down Expand Up @@ -371,7 +359,7 @@ void Connection::initiate_session_deactivation(Session* sess)
}
if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
if (m_activated && m_state == ConnectionState::disconnected)
m_on_idle->trigger();
m_on_idle.trigger();
}
}

Expand Down Expand Up @@ -695,22 +683,14 @@ void Connection::initiate_reconnect_wait()
// We create a timer for the reconnect_disconnect timer even if the delay is zero because
// we need it to be cancelable in case the connection is terminated before the timer
// callback is run.
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
// If the operation is aborted, the connection object may have been
// destroyed.
if (status != ErrorCodes::OperationAborted)
handle_reconnect_wait(status); // Throws
}); // Throws
m_reconnect_disconnect_timer = m_client.create_timer(delay, [this] {
handle_reconnect_wait(); // Throws
}); // Throws
}


void Connection::handle_reconnect_wait(Status status)
void Connection::handle_reconnect_wait()
{
if (!status.is_ok()) {
REALM_ASSERT(status != ErrorCodes::OperationAborted);
throw Exception(status);
}

REALM_ASSERT(m_reconnect_delay_in_progress);
m_reconnect_delay_in_progress = false;

Expand Down Expand Up @@ -822,24 +802,15 @@ void Connection::initiate_connect_wait()
// fully establish the connection (including SSL and WebSocket
// handshakes). Without such a watchdog, connect operations could take very
// long, or even indefinite time.
milliseconds_type time = m_client.m_connect_timeout;

m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
// If the operation is aborted, the connection object may have been
// destroyed.
if (status != ErrorCodes::OperationAborted)
handle_connect_wait(status); // Throws
}); // Throws
std::chrono::milliseconds time(m_client.m_connect_timeout);
m_connect_timer = m_client.create_timer(time, [this] {
handle_connect_wait(); // Throws
}); // Throws
}


void Connection::handle_connect_wait(Status status)
void Connection::handle_connect_wait()
{
if (!status.is_ok()) {
REALM_ASSERT(status != ErrorCodes::OperationAborted);
throw Exception(status);
}

REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
logger.info("Connect timeout"); // Throws
SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
Expand Down Expand Up @@ -933,12 +904,7 @@ void Connection::initiate_ping_delay(milliseconds_type now)

m_ping_delay_in_progress = true;

m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this] {
handle_ping_delay(); // Throws
}); // Throws
logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
Expand Down Expand Up @@ -968,12 +934,7 @@ void Connection::initiate_pong_timeout()
m_pong_wait_started_at = monotonic_clock_now();

milliseconds_type time = m_client.m_pong_keepalive_timeout;
m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
handle_pong_timeout(); // Throws
}); // Throws
}
Expand Down Expand Up @@ -1124,23 +1085,15 @@ void Connection::initiate_disconnect_wait()

milliseconds_type time = m_client.m_connection_linger_time;

m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
// If the operation is aborted, the connection object may have been
// destroyed.
if (status != ErrorCodes::OperationAborted)
handle_disconnect_wait(status); // Throws
}); // Throws
m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
handle_disconnect_wait(); // Throws
}); // Throws
m_disconnect_delay_in_progress = true;
}


void Connection::handle_disconnect_wait(Status status)
void Connection::handle_disconnect_wait()
{
if (!status.is_ok()) {
REALM_ASSERT(status != ErrorCodes::OperationAborted);
throw Exception(status);
}

m_disconnect_delay_in_progress = false;

REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
Expand Down Expand Up @@ -2704,12 +2657,7 @@ void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
try_again_interval = std::chrono::milliseconds{1000};
}
logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
if (status == ErrorCodes::OperationAborted)
return;
else if (!status.is_ok())
throw Exception(status);

m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this] {
m_try_again_activation_timer.reset();
cancel_resumption_delay();
});
Expand Down
15 changes: 6 additions & 9 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,8 @@ class ClientImpl {
void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
void post(util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
SyncSocketProvider::FunctionHandler&& handler)
REQUIRES(!m_drain_mutex);
using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
util::UniqueFunction<void()>&& handler) REQUIRES(!m_drain_mutex);
using SyncTrigger = Trigger<ClientImpl>;

RandomEngine& get_random() noexcept;

Expand Down Expand Up @@ -521,10 +519,10 @@ class ClientImpl::Connection {
std::string get_http_request_path() const;

void initiate_reconnect_wait();
void handle_reconnect_wait(Status status);
void handle_reconnect_wait();
void initiate_reconnect();
void initiate_connect_wait();
void handle_connect_wait(Status status);
void handle_connect_wait();

void handle_connection_established();
void schedule_urgent_ping();
Expand All @@ -540,7 +538,7 @@ class ClientImpl::Connection {
void handle_write_ping();
void handle_message_received(util::Span<const char> data);
void initiate_disconnect_wait();
void handle_disconnect_wait(Status status);
void handle_disconnect_wait();
void close_due_to_protocol_error(Status status);
void close_due_to_client_side_error(Status, IsFatal is_fatal, ConnectionTerminationReason reason);
void close_due_to_transient_error(Status status, ConnectionTerminationReason reason);
Expand Down Expand Up @@ -1221,12 +1219,11 @@ inline void ClientImpl::Connection::involuntary_disconnect(const SessionErrorInf

inline void ClientImpl::Connection::change_state_to_disconnected() noexcept
{
REALM_ASSERT(m_on_idle);
REALM_ASSERT(m_state != ConnectionState::disconnected);
m_state = ConnectionState::disconnected;

if (m_num_active_sessions == 0)
m_on_idle->trigger();
m_on_idle.trigger();

REALM_ASSERT(!m_reconnect_delay_in_progress);
if (m_disconnect_delay_in_progress) {
Expand Down
Loading

0 comments on commit a3ae265

Please sign in to comment.