Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify sync::Transformer and eliminate the abstract interface #7099

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Add support for building against the musl library. ([PR #7067](https://github.com/realm/realm-core/pull/7067))
* Remove ArrayWithFind's ability to use a templated callback parameter. The QueryStateBase consumers now use an index and the array leaf to get the actual value if needed. This allows certain queries such as count() to not do as many lookups to the actual values and results in a small performance gain. Also remove `find_action_pattern()` which was unused for a long time. This reduction in templating throughout the query system produces a small (~100k) binary size reduction. ([#7095](https://github.com/realm/realm-core/pull/7095))
* Rework the implemenatation of the set algrebra functions on Set<T> to reduce the compiled size.
* Rework the internal interface for sync Transformers to simplify it and reduce the compiled size ([PR #7098](https://github.com/realm/realm-core/pull/7098)).

----------------------------------------------

Expand Down
11 changes: 5 additions & 6 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,6 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset
changeset.last_integrated_remote_version = m_sync_history_base_version;
}

Transformer& transformer = get_transformer(); // Throws
constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB

auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool {
Expand All @@ -551,12 +550,12 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset
}
downloaded_bytes += transformed_changeset->original_changeset_size;

return !(m_db->other_writers_waiting_for_lock() &&
transact->get_commit_size() >= commit_byte_size_limit && allow_lock_release);
return !(allow_lock_release && m_db->other_writers_waiting_for_lock() &&
transact->get_commit_size() >= commit_byte_size_limit);
};
auto changesets_transformed_count =
transformer.transform_remote_changesets(*this, sync_file_id, local_version, changesets_to_integrate,
std::move(changeset_applier), logger); // Throws
sync::Transformer transformer;
auto changesets_transformed_count = transformer.transform_remote_changesets(
*this, sync_file_id, local_version, changesets_to_integrate, changeset_applier, logger); // Throws
return changesets_transformed_count;
}
catch (const BadChangesetError& e) {
Expand Down
13 changes: 0 additions & 13 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class IntegrationException : public RuntimeError {
class ClientHistory final : public _impl::History, public TransformHistory {
public:
using version_type = sync::version_type;
using RemoteChangeset = Transformer::RemoteChangeset;

struct UploadChangeset {
timestamp_type origin_timestamp;
Expand Down Expand Up @@ -273,10 +272,6 @@ class ClientHistory final : public _impl::History, public TransformHistory {
ClientReplication& m_replication;
DB* m_db = nullptr;

// FIXME: All history objects belonging to a particular client object
// (sync::Client) should use a single shared transformer object.
std::unique_ptr<Transformer> m_transformer;

/// The version on which the first changeset in the continuous transactions
/// history is based, or if that history is empty, the version associated
/// with currently bound snapshot. In general, `m_ct_history_base_version +
Expand Down Expand Up @@ -413,7 +408,6 @@ class ClientHistory final : public _impl::History, public TransformHistory {
void trim_sync_history();
void do_trim_sync_history(std::size_t n);
void clamp_sync_version_range(version_type& begin, version_type& end) const noexcept;
Transformer& get_transformer();
void fix_up_client_file_ident_in_stored_changesets(Transaction&, file_ident_type);
void record_current_schema_version();
static void record_current_schema_version(Array& schema_versions, version_type snapshot_version);
Expand Down Expand Up @@ -536,13 +530,6 @@ inline void ClientHistory::clamp_sync_version_range(version_type& begin, version
}
}

inline auto ClientHistory::get_transformer() -> Transformer&
{
if (!m_transformer)
m_transformer = make_transformer(); // Throws
return *m_transformer;
}


/// \brief Create a "sync history" implementation of the realm::Replication
/// interface.
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2379,7 +2379,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint

version_type server_version = m_progress.download.server_version;
version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
for (const Transformer::RemoteChangeset& changeset : received_changesets) {
for (const RemoteChangeset& changeset : received_changesets) {
// Check that per-changeset server version is strictly increasing, except in FLX sync where the server
// version must be increasing, but can stay the same during bootstraps.
bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/noinst/pending_bootstrap_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ PendingBootstrapStore::PendingBatch PendingBootstrapStore::peek_pending(size_t l
}
REALM_ASSERT_3(ec, ==, std::error_code{});

Transformer::RemoteChangeset parsed_changeset;
RemoteChangeset parsed_changeset;
parsed_changeset.original_changeset_size =
static_cast<size_t>(cur_changeset.get<int64_t>(m_changeset_original_changeset_size));
parsed_changeset.origin_timestamp = cur_changeset.get<int64_t>(m_changeset_origin_timestamp);
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/pending_bootstrap_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PendingBootstrapStore {

struct PendingBatch {
int64_t query_version = 0;
std::vector<Transformer::RemoteChangeset> changesets;
std::vector<RemoteChangeset> changesets;
std::vector<util::AppendBuffer<char>> changeset_data;
util::Optional<SyncProgress> progress;
size_t remaining_changesets = 0;
Expand All @@ -79,7 +79,7 @@ class PendingBootstrapStore {

// Adds a set of changesets to the store.
void add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
const std::vector<Transformer::RemoteChangeset>& changesets, bool* created_new_batch);
const std::vector<RemoteChangeset>& changesets, bool* created_new_batch);

void clear();

Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/noinst/protocol_codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ClientProtocol {
// clang-format on

using OutputBuffer = util::ResettableExpandableBufferOutputStream;
using RemoteChangeset = sync::Transformer::RemoteChangeset;
using RemoteChangeset = sync::RemoteChangeset;
using ReceivedChangesets = std::vector<RemoteChangeset>;

/// Messages sent by the client.
Expand Down Expand Up @@ -422,7 +422,7 @@ class ClientProtocol {

// Loop through the body and find the changesets.
while (!msg.at_end()) {
realm::sync::Transformer::RemoteChangeset cur_changeset;
RemoteChangeset cur_changeset;
cur_changeset.remote_version = msg.read_next<version_type>();
cur_changeset.last_integrated_local_version = msg.read_next<version_type>();
cur_changeset.origin_timestamp = msg.read_next<timestamp_type>();
Expand Down
34 changes: 0 additions & 34 deletions src/realm/sync/noinst/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,14 +714,10 @@ class Worker : public ServerHistory::Context {

// Overriding members of ServerHistory::Context
std::mt19937_64& server_history_get_random() noexcept override final;
sync::Transformer& get_transformer() override final;
util::Buffer<char>& get_transform_buffer() override final;

private:
ServerImpl& m_server;
std::mt19937_64 m_random;
const std::unique_ptr<Transformer> m_transformer;
util::Buffer<char> m_transform_buffer;
ServerFileAccessCache m_file_access_cache;

util::Mutex m_mutex;
Expand Down Expand Up @@ -976,8 +972,6 @@ class ServerImpl : public ServerImplBase, public ServerHistory::Context {

// Overriding member functions in _impl::ServerHistory::Context
std::mt19937_64& server_history_get_random() noexcept override final;
Transformer& get_transformer() noexcept override final;
util::Buffer<char>& get_transform_buffer() noexcept override final;

private:
Server::Config m_config;
Expand Down Expand Up @@ -1014,8 +1008,6 @@ class ServerImpl : public ServerImplBase, public ServerHistory::Context {
ServerProtocol m_server_protocol;
compression::CompressMemoryArena m_compress_memory_arena;
MiscBuffers m_misc_buffers;
std::unique_ptr<Transformer> m_transformer;
util::Buffer<char> m_transform_buffer;
int_fast64_t m_current_server_session_ident;
Optional<network::DeadlineTimer> m_connection_reaper_timer;
bool m_allow_load_balancing = false;
Expand Down Expand Up @@ -3756,7 +3748,6 @@ void ServerFile::finalize_work_stage_2()
Worker::Worker(ServerImpl& server)
: logger{"Worker: ", server.logger_ptr} // Throws
, m_server{server}
, m_transformer{make_transformer()} // Throws
, m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key}
{
util::seed_prng_nondeterministically(m_random); // Throws
Expand All @@ -3777,17 +3768,6 @@ std::mt19937_64& Worker::server_history_get_random() noexcept
}


sync::Transformer& Worker::get_transformer()
{
return *m_transformer;
}


util::Buffer<char>& Worker::get_transform_buffer()
{
return m_transform_buffer;
}

void Worker::run()
{
for (;;) {
Expand Down Expand Up @@ -3913,8 +3893,6 @@ void ServerImpl::start()
logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout); // Throws
logger.debug("Authorization header name: %1", m_config.authorization_header_name); // Throws

m_transformer = make_transformer(); // Throws

m_realm_names = _impl::find_realm_files(m_root_dir); // Throws

initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws
Expand Down Expand Up @@ -3985,18 +3963,6 @@ std::mt19937_64& ServerImpl::server_history_get_random() noexcept
}


Transformer& ServerImpl::get_transformer() noexcept
{
return *m_transformer;
}


util::Buffer<char>& ServerImpl::get_transform_buffer() noexcept
{
return m_transform_buffer;
}


void ServerImpl::listen()
{
network::Resolver resolver{get_service()};
Expand Down
17 changes: 2 additions & 15 deletions src/realm/sync/noinst/server/server_history.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1200,10 +1200,9 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden
// Merge with causally unrelated changesets, and resolve the
// conflicts if there are any.
TransformHistoryImpl transform_hist{remote_file_ident, *this, recip_hist};
Transformer& transformer = m_context.get_transformer(); // Throws
Transformer transformer;
transformer.transform_remote_changesets(transform_hist, m_local_file_ident, current_server_version,
parsed_transformed_changesets, std::move(apply),
logger); // Throws
parsed_transformed_changesets, apply, logger); // Throws

for (std::size_t i = 0; i < num_changesets; ++i) {
REALM_ASSERT(get_instruction_encoder().buffer().size() == 0);
Expand Down Expand Up @@ -2247,18 +2246,6 @@ void ServerHistory::record_current_schema_version(Array& schema_versions, versio
}


Transformer& ServerHistory::Context::get_transformer()
{
throw util::runtime_error("Not supported");
}


util::Buffer<char>& ServerHistory::Context::get_transform_buffer()
{
throw util::runtime_error("Not supported");
}


std::ostream& _impl::operator<<(std::ostream& out, const ServerHistory::HistoryContents& hc)
{
out << "client files:\n";
Expand Down
12 changes: 1 addition & 11 deletions src/realm/sync/noinst/server/server_history.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class ServerHistory : public sync::SyncReplication,
using UploadCursor = sync::UploadCursor;
using SyncProgress = sync::SyncProgress;
using HistoryEntry = sync::HistoryEntry;
using RemoteChangeset = sync::Transformer::RemoteChangeset;
using RemoteChangeset = sync::RemoteChangeset;
// clang-format on

enum class BootstrapError {
Expand Down Expand Up @@ -742,16 +742,6 @@ class ServerHistory::Context {
public:
virtual std::mt19937_64& server_history_get_random() noexcept = 0;

// @{
/// These are guaranteed to not be called until a remote changeset needs to
/// be integrated into the history.
///
/// The default implementations throw std::runtime_error, with a message
/// saying "Not supported".
virtual sync::Transformer& get_transformer();
virtual util::Buffer<char>& get_transform_buffer();
// @}

protected:
Context() noexcept = default;
};
Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/tools/apply_to_state_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct DownloadMessage {
int64_t query_version;

Buffer<char> uncompressed_body_buffer;
std::vector<realm::sync::Transformer::RemoteChangeset> changesets;
std::vector<realm::sync::RemoteChangeset> changesets;

static DownloadMessage parse(HeaderLineParser& msg, Logger& logger, bool is_flx_sync);
};
Expand Down Expand Up @@ -135,7 +135,7 @@ DownloadMessage DownloadMessage::parse(HeaderLineParser& msg, Logger& logger, bo

HeaderLineParser body(body_str);
while (!body.at_end()) {
realm::sync::Transformer::RemoteChangeset cur_changeset;
realm::sync::RemoteChangeset cur_changeset;
cur_changeset.remote_version = body.read_next<sync::version_type>();
cur_changeset.last_integrated_local_version = body.read_next<sync::version_type>();
cur_changeset.origin_timestamp = body.read_next<sync::timestamp_type>();
Expand Down
Loading