From a4034dca1ccd1a5a29ac2882f2c473b4f6013ec4 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Fri, 13 Oct 2023 14:21:24 -0700 Subject: [PATCH 1/3] Export fewer functions from transform.cpp transform.cpp uses the magic of templates to generate a very, very large number of functions, and many of them were unnecessarily extern. This resulted in the symbol table being over half of transform.o in Release builds. Making more of them private cuts the arm64 Release build from 250 KB to 128 KB. --- src/realm/sync/noinst/client_history_impl.hpp | 1 - src/realm/sync/noinst/client_impl_base.cpp | 2 +- .../sync/noinst/pending_bootstrap_store.cpp | 2 +- .../sync/noinst/pending_bootstrap_store.hpp | 4 +- src/realm/sync/noinst/protocol_codec.hpp | 4 +- .../sync/noinst/server/server_history.hpp | 2 +- .../sync/tools/apply_to_state_command.cpp | 4 +- src/realm/sync/transform.cpp | 140 +++++++----------- src/realm/sync/transform.hpp | 33 +---- test/peer.hpp | 14 +- test/test_sync.cpp | 8 +- test/test_sync_pending_bootstraps.cpp | 4 +- 12 files changed, 81 insertions(+), 137 deletions(-) diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index 78ff4bd91da..4223b7de5bc 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -82,7 +82,6 @@ class IntegrationException : public RuntimeError { class ClientHistory final : public _impl::History, public TransformHistory { public: using version_type = sync::version_type; - using RemoteChangeset = Transformer::RemoteChangeset; struct UploadChangeset { timestamp_type origin_timestamp; diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index b98694b7e22..11e48d7cc6d 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -2379,7 +2379,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint version_type server_version = m_progress.download.server_version; version_type last_integrated_client_version = m_progress.download.last_integrated_client_version; - for (const Transformer::RemoteChangeset& changeset : received_changesets) { + for (const RemoteChangeset& changeset : received_changesets) { // Check that per-changeset server version is strictly increasing, except in FLX sync where the server // version must be increasing, but can stay the same during bootstraps. bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version) diff --git a/src/realm/sync/noinst/pending_bootstrap_store.cpp b/src/realm/sync/noinst/pending_bootstrap_store.cpp index 912b86320ad..2529c82f25b 100644 --- a/src/realm/sync/noinst/pending_bootstrap_store.cpp +++ b/src/realm/sync/noinst/pending_bootstrap_store.cpp @@ -245,7 +245,7 @@ PendingBootstrapStore::PendingBatch PendingBootstrapStore::peek_pending(size_t l } REALM_ASSERT_3(ec, ==, std::error_code{}); - Transformer::RemoteChangeset parsed_changeset; + RemoteChangeset parsed_changeset; parsed_changeset.original_changeset_size = static_cast(cur_changeset.get(m_changeset_original_changeset_size)); parsed_changeset.origin_timestamp = cur_changeset.get(m_changeset_origin_timestamp); diff --git a/src/realm/sync/noinst/pending_bootstrap_store.hpp b/src/realm/sync/noinst/pending_bootstrap_store.hpp index 4ac6c6d5f57..0fae7dc3b8a 100644 --- a/src/realm/sync/noinst/pending_bootstrap_store.hpp +++ b/src/realm/sync/noinst/pending_bootstrap_store.hpp @@ -56,7 +56,7 @@ class PendingBootstrapStore { struct PendingBatch { int64_t query_version = 0; - std::vector changesets; + std::vector changesets; std::vector> changeset_data; util::Optional progress; size_t remaining_changesets = 0; @@ -79,7 +79,7 @@ class PendingBootstrapStore { // Adds a set of changesets to the store. void add_batch(int64_t query_version, util::Optional progress, - const std::vector& changesets, bool* created_new_batch); + const std::vector& changesets, bool* created_new_batch); void clear(); diff --git a/src/realm/sync/noinst/protocol_codec.hpp b/src/realm/sync/noinst/protocol_codec.hpp index 0967d513c20..eeaf71c569c 100644 --- a/src/realm/sync/noinst/protocol_codec.hpp +++ b/src/realm/sync/noinst/protocol_codec.hpp @@ -144,7 +144,7 @@ class ClientProtocol { // clang-format on using OutputBuffer = util::ResettableExpandableBufferOutputStream; - using RemoteChangeset = sync::Transformer::RemoteChangeset; + using RemoteChangeset = sync::RemoteChangeset; using ReceivedChangesets = std::vector; /// Messages sent by the client. @@ -422,7 +422,7 @@ class ClientProtocol { // Loop through the body and find the changesets. while (!msg.at_end()) { - realm::sync::Transformer::RemoteChangeset cur_changeset; + RemoteChangeset cur_changeset; cur_changeset.remote_version = msg.read_next(); cur_changeset.last_integrated_local_version = msg.read_next(); cur_changeset.origin_timestamp = msg.read_next(); diff --git a/src/realm/sync/noinst/server/server_history.hpp b/src/realm/sync/noinst/server/server_history.hpp index cfd1b58188d..23c7e0ef991 100644 --- a/src/realm/sync/noinst/server/server_history.hpp +++ b/src/realm/sync/noinst/server/server_history.hpp @@ -119,7 +119,7 @@ class ServerHistory : public sync::SyncReplication, using UploadCursor = sync::UploadCursor; using SyncProgress = sync::SyncProgress; using HistoryEntry = sync::HistoryEntry; - using RemoteChangeset = sync::Transformer::RemoteChangeset; + using RemoteChangeset = sync::RemoteChangeset; // clang-format on enum class BootstrapError { diff --git a/src/realm/sync/tools/apply_to_state_command.cpp b/src/realm/sync/tools/apply_to_state_command.cpp index 0ed12e17849..780210a7df2 100644 --- a/src/realm/sync/tools/apply_to_state_command.cpp +++ b/src/realm/sync/tools/apply_to_state_command.cpp @@ -42,7 +42,7 @@ struct DownloadMessage { int64_t query_version; Buffer uncompressed_body_buffer; - std::vector changesets; + std::vector changesets; static DownloadMessage parse(HeaderLineParser& msg, Logger& logger, bool is_flx_sync); }; @@ -135,7 +135,7 @@ DownloadMessage DownloadMessage::parse(HeaderLineParser& msg, Logger& logger, bo HeaderLineParser body(body_str); while (!body.at_end()) { - realm::sync::Transformer::RemoteChangeset cur_changeset; + realm::sync::RemoteChangeset cur_changeset; cur_changeset.remote_version = body.read_next(); cur_changeset.last_integrated_local_version = body.read_next(); cur_changeset.origin_timestamp = body.read_next(); diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index 322088c0ab2..3bbae0d3199 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -1,34 +1,17 @@ -#include -#include -#include -#include -#include -#include -#include +#include + +#include +#include #if REALM_DEBUG +#include #include // std::cerr used for debug tracing #include // std::unique_lock used for debug tracing #endif // REALM_DEBUG -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace realm { +using namespace realm; +using namespace realm::sync; +using namespace realm::util; namespace { @@ -52,15 +35,7 @@ namespace { #endif #endif -} // unnamed namespace - -using namespace realm; -using namespace realm::sync; -using namespace realm::util; - -namespace _impl { - -struct TransformerImpl::Discriminant { +struct Discriminant { timestamp_type timestamp; file_ident_type client_file_ident; Discriminant(timestamp_type t, file_ident_type p) @@ -88,8 +63,10 @@ struct TransformerImpl::Discriminant { } }; -struct TransformerImpl::Side { - Transformer& m_transformer; +struct TransformerImpl; + +struct Side { + TransformerImpl& m_transformer; Changeset* m_changeset = nullptr; Discriminant m_discriminant; @@ -97,7 +74,7 @@ struct TransformerImpl::Side { bool was_replaced = false; size_t m_path_len = 0; - Side(Transformer& transformer) + Side(TransformerImpl& transformer) : m_transformer(transformer) , m_discriminant(0, 0) { @@ -184,8 +161,8 @@ struct TransformerImpl::Side { } }; -struct TransformerImpl::MajorSide : TransformerImpl::Side { - MajorSide(Transformer& transformer) +struct MajorSide : Side { + MajorSide(TransformerImpl& transformer) : Side(transformer) { } @@ -237,10 +214,10 @@ struct TransformerImpl::MajorSide : TransformerImpl::Side { Changeset::iterator m_position; }; -struct TransformerImpl::MinorSide : TransformerImpl::Side { +struct MinorSide : Side { using Position = _impl::ChangesetIndex::RangeIterator; - MinorSide(Transformer& transformer) + MinorSide(TransformerImpl& transformer) : Side(transformer) { } @@ -327,7 +304,7 @@ struct TransformerImpl::MinorSide : TransformerImpl::Side { #if defined(REALM_DEBUG) // LCOV_EXCL_START Debug utilities -struct TransformerImpl::MergeTracer { +struct MergeTracer { public: Side& m_minor; Side& m_major; @@ -602,14 +579,13 @@ struct TransformerImpl::MergeTracer { }; #endif // LCOV_EXCL_STOP REALM_DEBUG - -struct TransformerImpl::Transformer { +struct TransformerImpl { MajorSide m_major_side; MinorSide m_minor_side; MinorSide::Position m_minor_end; bool m_trace; - Transformer(bool trace) + TransformerImpl(bool trace) : m_major_side{*this} , m_minor_side{*this} , m_trace{trace} @@ -854,43 +830,38 @@ struct TransformerImpl::Transformer { } void merge_instructions(MajorSide& left, MinorSide& right); - template - void merge_nested(OuterSide& outer, InnerSide& inner); }; -void TransformerImpl::MajorSide::set_next_changeset(Changeset* changeset) noexcept +void MajorSide::set_next_changeset(Changeset* changeset) noexcept { m_transformer.set_next_major_changeset(changeset); } -void TransformerImpl::MajorSide::discard() +void MajorSide::discard() { m_transformer.discard_major(); } -void TransformerImpl::MajorSide::prepend(Instruction operation) +void MajorSide::prepend(Instruction operation) { m_transformer.prepend_major(std::move(operation)); } template -void TransformerImpl::MajorSide::prepend(InputIterator begin, InputIterator end) +void MajorSide::prepend(InputIterator begin, InputIterator end) { m_transformer.prepend_major(std::move(begin), std::move(end)); } -void TransformerImpl::MinorSide::discard() +void MinorSide::discard() { m_transformer.discard_minor(); } -void TransformerImpl::MinorSide::prepend(Instruction operation) +void MinorSide::prepend(Instruction operation) { m_transformer.prepend_minor(std::move(operation)); } template -void TransformerImpl::MinorSide::prepend(InputIterator begin, InputIterator end) +void MinorSide::prepend(InputIterator begin, InputIterator end) { m_transformer.prepend_minor(std::move(begin), std::move(end)); } -} // namespace _impl - -namespace { REALM_NORETURN void throw_bad_merge(std::string msg) { @@ -903,8 +874,7 @@ REALM_NORETURN void bad_merge(const char* msg, Params&&... params) throw_bad_merge(util::format(msg, std::forward(params)...)); } -REALM_NORETURN void bad_merge(_impl::TransformerImpl::Side& side, Instruction::PathInstruction instr, - const std::string& msg) +REALM_NORETURN void bad_merge(Side& side, Instruction::PathInstruction instr, const std::string& msg) { std::stringstream ss; side.m_changeset->print_path(ss, instr.table, instr.object, instr.field, &instr.path); @@ -918,7 +888,7 @@ struct MergeNested; struct MergeUtils { using TransformerImpl = _impl::TransformerImpl; - MergeUtils(TransformerImpl::Side& left_side, TransformerImpl::Side& right_side) + MergeUtils(Side& left_side, Side& right_side) : m_left_side(left_side) , m_right_side(right_side) { @@ -1264,8 +1234,8 @@ struct MergeUtils { } protected: - TransformerImpl::Side& m_left_side; - TransformerImpl::Side& m_right_side; + Side& m_left_side; + Side& m_right_side; }; template @@ -1274,7 +1244,7 @@ struct MergeBase : MergeUtils { static const Instruction::Type B = Instruction::GetInstructionType::value; static_assert(A >= B, "A < B. Please reverse the order of instruction types. :-)"); - MergeBase(TransformerImpl::Side& left_side, TransformerImpl::Side& right_side) + MergeBase(Side& left_side, Side& right_side) : MergeUtils(left_side, right_side) { } @@ -2330,12 +2300,8 @@ DEFINE_MERGE(Instruction::SetErase, Instruction::SetErase) /// END OF MERGE RULES! /// -} // namespace - -namespace _impl { template -void merge_instructions_2(Left& left, Right& right, TransformerImpl::MajorSide& left_side, - TransformerImpl::MinorSide& right_side) +void merge_instructions_2(Left& left, Right& right, MajorSide& left_side, MinorSide& right_side) { Merge::merge(left, right, left_side, right_side); } @@ -2346,7 +2312,17 @@ void merge_nested_2(Outer& outer, Inner& inner, OuterSide& outer_side, InnerSide MergeNested::merge(outer, inner, outer_side, inner_side); } -void TransformerImpl::Transformer::merge_instructions(MajorSide& their_side, MinorSide& our_side) +template +void merge_nested(OuterSide& outer_side, InnerSide& inner_side) +{ + outer_side.get().visit([&](auto& outer) { + inner_side.get().visit([&](auto& inner) { + merge_nested_2(outer, inner, outer_side, inner_side); + }); + }); +} + +void TransformerImpl::merge_instructions(MajorSide& their_side, MinorSide& our_side) { // FIXME: Find a way to avoid heap-copies of the path. Instruction their_before = their_side.get(); @@ -2410,18 +2386,9 @@ void TransformerImpl::Transformer::merge_instructions(MajorSide& their_side, Min } } +} // anonymous namespace -template -void TransformerImpl::Transformer::merge_nested(OuterSide& outer_side, InnerSide& inner_side) -{ - outer_side.get().visit([&](auto& outer) { - inner_side.get().visit([&](auto& inner) { - merge_nested_2(outer, inner, outer_side, inner_side); - }); - }); -} - - +namespace realm::_impl { void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changeset* their_changesets, size_t their_size, Changeset** our_changesets, size_t our_size, util::Logger& logger) @@ -2439,7 +2406,7 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes l = std::unique_lock{trace_mutex}; } #endif - Transformer transformer{trace}; + ::TransformerImpl transformer{trace}; _impl::ChangesetIndex their_index; size_t their_num_instructions = 0; @@ -2559,7 +2526,7 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, version_type current_local_version, util::Span parsed_changesets, - util::UniqueFunction changeset_applier, + util::FunctionRef changeset_applier, util::Logger& logger) { REALM_ASSERT(local_file_ident != 0); @@ -2685,16 +2652,16 @@ void TransformerImpl::flush_reciprocal_transform_cache(TransformHistory& history } } -} // namespace _impl +} // namespace realm::_impl -namespace sync { +namespace realm::sync { std::unique_ptr make_transformer() { return std::make_unique<_impl::TransformerImpl>(); // Throws } -void parse_remote_changeset(const Transformer::RemoteChangeset& remote_changeset, Changeset& parsed_changeset) +void parse_remote_changeset(const RemoteChangeset& remote_changeset, Changeset& parsed_changeset) { // origin_file_ident = 0 is currently used to indicate an entry of local // origin. @@ -2711,5 +2678,4 @@ void parse_remote_changeset(const Transformer::RemoteChangeset& remote_changeset parsed_changeset.original_changeset_size = remote_changeset.original_changeset_size; } -} // namespace sync -} // namespace realm +} // namespace realm::sync diff --git a/src/realm/sync/transform.hpp b/src/realm/sync/transform.hpp index 3465785e9e8..22a15e5ecc2 100644 --- a/src/realm/sync/transform.hpp +++ b/src/realm/sync/transform.hpp @@ -2,13 +2,6 @@ #ifndef REALM_SYNC_TRANSFORM_HPP #define REALM_SYNC_TRANSFORM_HPP -#include - -#include -#include -#include -#include -#include #include #include #include @@ -125,8 +118,6 @@ class TransformError; // Exception class Transformer { public: - class RemoteChangeset; - using iterator = util::Span::iterator; /// Produce operationally transformed versions of the specified changesets, @@ -178,7 +169,7 @@ class Transformer { /// TransformError. virtual size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, version_type current_local_version, util::Span changesets, - util::UniqueFunction changeset_applier, + util::FunctionRef changeset_applier, util::Logger&) = 0; virtual ~Transformer() noexcept {} @@ -188,7 +179,6 @@ std::unique_ptr make_transformer(); } // namespace sync - namespace _impl { class TransformerImpl : public sync::Transformer { @@ -203,11 +193,7 @@ class TransformerImpl : public sync::Transformer { TransformerImpl() = default; size_t transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, - util::UniqueFunction, util::Logger&) override; - - struct Side; - struct MajorSide; - struct MinorSide; + util::FunctionRef, util::Logger&) override; protected: virtual void merge_changesets(file_ident_type local_file_ident, Changeset* their_changesets, @@ -222,13 +208,6 @@ class TransformerImpl : public sync::Transformer { Changeset& get_reciprocal_transform(TransformHistory&, file_ident_type local_file_ident, version_type version, const HistoryEntry&); void flush_reciprocal_transform_cache(TransformHistory&); - - struct Discriminant; - struct Transformer; - struct MergeTracer; - - template - void merge_instructions(LeftSide&, RightSide&); }; } // namespace _impl @@ -236,7 +215,7 @@ class TransformerImpl : public sync::Transformer { namespace sync { -class Transformer::RemoteChangeset { +class RemoteChangeset { public: /// The version produced on the remote peer by this changeset. /// @@ -281,7 +260,7 @@ class Transformer::RemoteChangeset { }; -void parse_remote_changeset(const Transformer::RemoteChangeset&, Changeset&); +void parse_remote_changeset(const RemoteChangeset&, Changeset&); // Implementation @@ -294,8 +273,8 @@ class TransformError : public std::runtime_error { } }; -inline Transformer::RemoteChangeset::RemoteChangeset(version_type rv, version_type lv, ChunkedBinaryData d, - timestamp_type ot, file_ident_type fi) +inline RemoteChangeset::RemoteChangeset(version_type rv, version_type lv, ChunkedBinaryData d, timestamp_type ot, + file_ident_type fi) : remote_version(rv) , last_integrated_local_version(lv) , data(d) diff --git a/test/peer.hpp b/test/peer.hpp index 63b7f573704..a7ac653da2d 100644 --- a/test/peer.hpp +++ b/test/peer.hpp @@ -46,6 +46,7 @@ namespace realm { namespace test_util { using realm::sync::HistoryEntry; +using realm::sync::RemoteChangeset; using realm::sync::SyncReplication; using realm::sync::Transformer; using realm::sync::TransformHistory; @@ -71,16 +72,15 @@ class ShortCircuitHistory : public SyncReplication { } version_type integrate_remote_changeset(file_ident_type remote_file_ident, DB& sg, - const Transformer::RemoteChangeset& changeset, - util::Logger& replay_logger) + const RemoteChangeset& changeset, util::Logger& replay_logger) { std::size_t num_changesets = 1; return integrate_remote_changesets(remote_file_ident, sg, &changeset, num_changesets, replay_logger); } version_type integrate_remote_changesets(file_ident_type remote_file_ident, DB&, - const Transformer::RemoteChangeset* incoming_changesets, - std::size_t num_changesets, util::Logger& replay_logger); + const RemoteChangeset* incoming_changesets, std::size_t num_changesets, + util::Logger& replay_logger); version_type prepare_changeset(const char* data, std::size_t size, version_type orig_version) override { @@ -462,7 +462,7 @@ class ShortCircuitHistory::TransformerImpl : public _impl::TransformerImpl { inline auto ShortCircuitHistory::integrate_remote_changesets(file_ident_type remote_file_ident, DB& sg, - const Transformer::RemoteChangeset* incoming_changesets, + const RemoteChangeset* incoming_changesets, size_t num_changesets, util::Logger& logger) -> version_type { @@ -625,7 +625,7 @@ class Peer { version_type& last_remote_version = last_remote_versions_integrated[remote.local_file_ident]; if (last_remote_version == 0) last_remote_version = 1; - std::unique_ptr changesets{new Transformer::RemoteChangeset[num_changesets]}; + std::unique_ptr changesets{new RemoteChangeset[num_changesets]}; remote.get_next_changesets_for_remote(local_file_ident, last_remote_version, changesets.get(), num_changesets); /* @@ -670,7 +670,7 @@ class Peer { void get_next_changesets_for_remote(file_ident_type remote_file_ident, version_type last_version_integrated_by_remote, - Transformer::RemoteChangeset* out_changesets, size_t num_changesets) const + RemoteChangeset* out_changesets, size_t num_changesets) const { // At least one transaction can be assumed to have been performed REALM_ASSERT(current_version != 0); diff --git a/test/test_sync.cpp b/test/test_sync.cpp index bd62d10aa5d..6d7e254bafc 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -6696,7 +6696,7 @@ TEST(Sync_NonIncreasingServerVersions) ++server_changesets.back().version; std::vector encoded; - std::vector server_changesets_encoded; + std::vector server_changesets_encoded; for (const auto& changeset : server_changesets) { encoded.emplace_back(); encode_changeset(changeset, encoded.back()); @@ -6734,7 +6734,7 @@ TEST(Sync_InvalidChangesetFromServer) ChangesetEncoder::Buffer encoded; encode_changeset(changeset, encoded); - Transformer::RemoteChangeset server_changeset; + RemoteChangeset server_changeset; server_changeset.origin_file_ident = 1; server_changeset.remote_version = 1; server_changeset.data = BinaryData(encoded.data(), encoded.size()); @@ -6845,7 +6845,7 @@ TEST(Sync_SetAndGetEmptyReciprocalChangeset) changeset.origin_file_ident = 2; ChangesetEncoder::Buffer encoded; - std::vector server_changesets_encoded; + std::vector server_changesets_encoded; encode_changeset(changeset, encoded); server_changesets_encoded.emplace_back(changeset.version, changeset.last_integrated_remote_version, BinaryData(encoded.data(), encoded.size()), changeset.origin_timestamp, @@ -6988,7 +6988,7 @@ TEST(Sync_ServerVersionsSkippedFromDownloadCursor) server_changeset.origin_file_ident = 1; std::vector encoded; - std::vector server_changesets_encoded; + std::vector server_changesets_encoded; encoded.emplace_back(); encode_changeset(server_changeset, encoded.back()); server_changesets_encoded.emplace_back(server_changeset.version, server_changeset.last_integrated_remote_version, diff --git a/test/test_sync_pending_bootstraps.cpp b/test/test_sync_pending_bootstraps.cpp index c2ebb2f6260..58c3d4417e3 100644 --- a/test/test_sync_pending_bootstraps.cpp +++ b/test/test_sync_pending_bootstraps.cpp @@ -19,7 +19,7 @@ TEST(Sync_PendingBootstrapStoreBatching) sync::PendingBootstrapStore store(db, *test_context.logger); CHECK(!store.has_pending()); - std::vector changesets; + std::vector changesets; std::vector changeset_data; changeset_data.emplace_back(1024, 'a'); @@ -117,7 +117,7 @@ TEST(Sync_PendingBootstrapStoreClear) sync::PendingBootstrapStore store(db, *test_context.logger); CHECK(!store.has_pending()); - std::vector changesets; + std::vector changesets; std::vector changeset_data; changeset_data.emplace_back(1024, 'a'); From 6c66a263db88e449300b97df33664960ca3ad4b5 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Fri, 13 Oct 2023 15:41:50 -0700 Subject: [PATCH 2/3] Simplify sync::Transformer and eliminate the abstract interface Transformer objects used to have some local state which had to be stored between uses and had the implementation hidden in the cpp file. Neither of those are true any more, so the design doesn't make sense either. It can now just be a stack-allocated local created when needed rather than something which has to be stored and reused. --- CHANGELOG.md | 1 + src/realm/sync/noinst/client_history_impl.cpp | 11 +- src/realm/sync/noinst/client_history_impl.hpp | 12 -- src/realm/sync/noinst/server/server.cpp | 34 ---- .../sync/noinst/server/server_history.cpp | 17 +- .../sync/noinst/server/server_history.hpp | 10 -- src/realm/sync/transform.cpp | 163 ++++++++---------- src/realm/sync/transform.hpp | 58 ++----- test/peer.hpp | 85 ++++----- test/test_sync.cpp | 17 +- 10 files changed, 134 insertions(+), 274 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b056b319aeb..3d3b8da4df6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ * Add support for building against the musl library. ([PR #7067](https://github.com/realm/realm-core/pull/7067)) * Remove ArrayWithFind's ability to use a templated callback parameter. The QueryStateBase consumers now use an index and the array leaf to get the actual value if needed. This allows certain queries such as count() to not do as many lookups to the actual values and results in a small performance gain. Also remove `find_action_pattern()` which was unused for a long time. This reduction in templating throughout the query system produces a small (~100k) binary size reduction. ([#7095](https://github.com/realm/realm-core/pull/7095)) * Rework the implemenatation of the set algrebra functions on Set to reduce the compiled size. +* Rework the internal interface for sync Transformers to simplify it and reduce the compiled size ([PR #7098](https://github.com/realm/realm-core/pull/7098)). ---------------------------------------------- diff --git a/src/realm/sync/noinst/client_history_impl.cpp b/src/realm/sync/noinst/client_history_impl.cpp index d295eb8e6ee..324812faf1e 100644 --- a/src/realm/sync/noinst/client_history_impl.cpp +++ b/src/realm/sync/noinst/client_history_impl.cpp @@ -540,7 +540,6 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Span bool { @@ -551,12 +550,12 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Spanoriginal_changeset_size; - return !(m_db->other_writers_waiting_for_lock() && - transact->get_commit_size() >= commit_byte_size_limit && allow_lock_release); + return !(allow_lock_release && m_db->other_writers_waiting_for_lock() && + transact->get_commit_size() >= commit_byte_size_limit); }; - auto changesets_transformed_count = - transformer.transform_remote_changesets(*this, sync_file_id, local_version, changesets_to_integrate, - std::move(changeset_applier), logger); // Throws + sync::Transformer transformer; + auto changesets_transformed_count = transformer.transform_remote_changesets( + *this, sync_file_id, local_version, changesets_to_integrate, changeset_applier, logger); // Throws return changesets_transformed_count; } catch (const BadChangesetError& e) { diff --git a/src/realm/sync/noinst/client_history_impl.hpp b/src/realm/sync/noinst/client_history_impl.hpp index 4223b7de5bc..afd19486bf9 100644 --- a/src/realm/sync/noinst/client_history_impl.hpp +++ b/src/realm/sync/noinst/client_history_impl.hpp @@ -272,10 +272,6 @@ class ClientHistory final : public _impl::History, public TransformHistory { ClientReplication& m_replication; DB* m_db = nullptr; - // FIXME: All history objects belonging to a particular client object - // (sync::Client) should use a single shared transformer object. - std::unique_ptr m_transformer; - /// The version on which the first changeset in the continuous transactions /// history is based, or if that history is empty, the version associated /// with currently bound snapshot. In general, `m_ct_history_base_version + @@ -412,7 +408,6 @@ class ClientHistory final : public _impl::History, public TransformHistory { void trim_sync_history(); void do_trim_sync_history(std::size_t n); void clamp_sync_version_range(version_type& begin, version_type& end) const noexcept; - Transformer& get_transformer(); void fix_up_client_file_ident_in_stored_changesets(Transaction&, file_ident_type); void record_current_schema_version(); static void record_current_schema_version(Array& schema_versions, version_type snapshot_version); @@ -535,13 +530,6 @@ inline void ClientHistory::clamp_sync_version_range(version_type& begin, version } } -inline auto ClientHistory::get_transformer() -> Transformer& -{ - if (!m_transformer) - m_transformer = make_transformer(); // Throws - return *m_transformer; -} - /// \brief Create a "sync history" implementation of the realm::Replication /// interface. diff --git a/src/realm/sync/noinst/server/server.cpp b/src/realm/sync/noinst/server/server.cpp index f6a03ae3d28..e55dfbcd126 100644 --- a/src/realm/sync/noinst/server/server.cpp +++ b/src/realm/sync/noinst/server/server.cpp @@ -714,14 +714,10 @@ class Worker : public ServerHistory::Context { // Overriding members of ServerHistory::Context std::mt19937_64& server_history_get_random() noexcept override final; - sync::Transformer& get_transformer() override final; - util::Buffer& get_transform_buffer() override final; private: ServerImpl& m_server; std::mt19937_64 m_random; - const std::unique_ptr m_transformer; - util::Buffer m_transform_buffer; ServerFileAccessCache m_file_access_cache; util::Mutex m_mutex; @@ -976,8 +972,6 @@ class ServerImpl : public ServerImplBase, public ServerHistory::Context { // Overriding member functions in _impl::ServerHistory::Context std::mt19937_64& server_history_get_random() noexcept override final; - Transformer& get_transformer() noexcept override final; - util::Buffer& get_transform_buffer() noexcept override final; private: Server::Config m_config; @@ -1014,8 +1008,6 @@ class ServerImpl : public ServerImplBase, public ServerHistory::Context { ServerProtocol m_server_protocol; compression::CompressMemoryArena m_compress_memory_arena; MiscBuffers m_misc_buffers; - std::unique_ptr m_transformer; - util::Buffer m_transform_buffer; int_fast64_t m_current_server_session_ident; Optional m_connection_reaper_timer; bool m_allow_load_balancing = false; @@ -3756,7 +3748,6 @@ void ServerFile::finalize_work_stage_2() Worker::Worker(ServerImpl& server) : logger{"Worker: ", server.logger_ptr} // Throws , m_server{server} - , m_transformer{make_transformer()} // Throws , m_file_access_cache{server.get_config().max_open_files, logger, *this, server.get_config().encryption_key} { util::seed_prng_nondeterministically(m_random); // Throws @@ -3777,17 +3768,6 @@ std::mt19937_64& Worker::server_history_get_random() noexcept } -sync::Transformer& Worker::get_transformer() -{ - return *m_transformer; -} - - -util::Buffer& Worker::get_transform_buffer() -{ - return m_transform_buffer; -} - void Worker::run() { for (;;) { @@ -3913,8 +3893,6 @@ void ServerImpl::start() logger.info("Connection soft close timeout: %1 ms", m_config.soft_close_timeout); // Throws logger.debug("Authorization header name: %1", m_config.authorization_header_name); // Throws - m_transformer = make_transformer(); // Throws - m_realm_names = _impl::find_realm_files(m_root_dir); // Throws initiate_connection_reaper_timer(m_config.connection_reaper_interval); // Throws @@ -3985,18 +3963,6 @@ std::mt19937_64& ServerImpl::server_history_get_random() noexcept } -Transformer& ServerImpl::get_transformer() noexcept -{ - return *m_transformer; -} - - -util::Buffer& ServerImpl::get_transform_buffer() noexcept -{ - return m_transform_buffer; -} - - void ServerImpl::listen() { network::Resolver resolver{get_service()}; diff --git a/src/realm/sync/noinst/server/server_history.cpp b/src/realm/sync/noinst/server/server_history.cpp index f19aea63198..4f2953d5ff1 100644 --- a/src/realm/sync/noinst/server/server_history.cpp +++ b/src/realm/sync/noinst/server/server_history.cpp @@ -1200,10 +1200,9 @@ bool ServerHistory::integrate_remote_changesets(file_ident_type remote_file_iden // Merge with causally unrelated changesets, and resolve the // conflicts if there are any. TransformHistoryImpl transform_hist{remote_file_ident, *this, recip_hist}; - Transformer& transformer = m_context.get_transformer(); // Throws + Transformer transformer; transformer.transform_remote_changesets(transform_hist, m_local_file_ident, current_server_version, - parsed_transformed_changesets, std::move(apply), - logger); // Throws + parsed_transformed_changesets, apply, logger); // Throws for (std::size_t i = 0; i < num_changesets; ++i) { REALM_ASSERT(get_instruction_encoder().buffer().size() == 0); @@ -2247,18 +2246,6 @@ void ServerHistory::record_current_schema_version(Array& schema_versions, versio } -Transformer& ServerHistory::Context::get_transformer() -{ - throw util::runtime_error("Not supported"); -} - - -util::Buffer& ServerHistory::Context::get_transform_buffer() -{ - throw util::runtime_error("Not supported"); -} - - std::ostream& _impl::operator<<(std::ostream& out, const ServerHistory::HistoryContents& hc) { out << "client files:\n"; diff --git a/src/realm/sync/noinst/server/server_history.hpp b/src/realm/sync/noinst/server/server_history.hpp index 23c7e0ef991..c2a622db4cb 100644 --- a/src/realm/sync/noinst/server/server_history.hpp +++ b/src/realm/sync/noinst/server/server_history.hpp @@ -742,16 +742,6 @@ class ServerHistory::Context { public: virtual std::mt19937_64& server_history_get_random() noexcept = 0; - // @{ - /// These are guaranteed to not be called until a remote changeset needs to - /// be integrated into the history. - /// - /// The default implementations throw std::runtime_error, with a message - /// saying "Not supported". - virtual sync::Transformer& get_transformer(); - virtual util::Buffer& get_transform_buffer(); - // @} - protected: Context() noexcept = default; }; diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index 3bbae0d3199..f8b9ddc2732 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -887,7 +887,6 @@ template struct MergeNested; struct MergeUtils { - using TransformerImpl = _impl::TransformerImpl; MergeUtils(Side& left_side, Side& right_side) : m_left_side(left_side) , m_right_side(right_side) @@ -2388,13 +2387,12 @@ void TransformerImpl::merge_instructions(MajorSide& their_side, MinorSide& our_s } // anonymous namespace -namespace realm::_impl { -void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changeset* their_changesets, - size_t their_size, Changeset** our_changesets, size_t our_size, - util::Logger& logger) +namespace realm::sync { +void Transformer::merge_changesets(file_ident_type local_file_ident, util::Span their_changesets, + util::Span our_changesets, util::Logger& logger) { - REALM_ASSERT(their_size != 0); - REALM_ASSERT(our_size != 0); + REALM_ASSERT(our_changesets.size() != 0); + REALM_ASSERT(their_changesets.size() != 0); bool trace = false; #if REALM_DEBUG && !REALM_UWP // FIXME: Not thread-safe (use config parameter instead and confine environment reading to test/test_all.cpp) @@ -2406,7 +2404,7 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes l = std::unique_lock{trace_mutex}; } #endif - ::TransformerImpl transformer{trace}; + TransformerImpl transformer{trace}; _impl::ChangesetIndex their_index; size_t their_num_instructions = 0; @@ -2417,32 +2415,34 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes // on the left side, but which aren't connected on the right side. // FIXME: The conflict groups can be persisted as part of the changeset to // skip this step in the future. - for (size_t i = 0; i < their_size; ++i) { + for (size_t i = 0; i < their_changesets.size(); ++i) { size_t num_instructions = their_changesets[i].size(); their_num_instructions += num_instructions; - logger.trace("Scanning incoming changeset [%1/%2] (%3 instructions)", i + 1, their_size, num_instructions); + logger.trace("Scanning incoming changeset [%1/%2] (%3 instructions)", i + 1, their_changesets.size(), + num_instructions); their_index.scan_changeset(their_changesets[i]); } - for (size_t i = 0; i < our_size; ++i) { + for (size_t i = 0; i < our_changesets.size(); ++i) { Changeset& our_changeset = *our_changesets[i]; size_t num_instructions = our_changeset.size(); our_num_instructions += num_instructions; - logger.trace("Scanning local changeset [%1/%2] (%3 instructions)", i + 1, our_size, num_instructions); + logger.trace("Scanning local changeset [%1/%2] (%3 instructions)", i + 1, our_changesets.size(), + num_instructions); their_index.scan_changeset(our_changeset); } // Build the index. - for (size_t i = 0; i < their_size; ++i) { - logger.trace("Indexing incoming changeset [%1/%2] (%3 instructions)", i + 1, their_size, + for (size_t i = 0; i < their_changesets.size(); ++i) { + logger.trace("Indexing incoming changeset [%1/%2] (%3 instructions)", i + 1, their_changesets.size(), their_changesets[i].size()); their_index.add_changeset(their_changesets[i]); } logger.debug("Finished changeset indexing (incoming: %1 changeset(s) / %2 instructions, local: %3 " "changeset(s) / %4 instructions, conflict group(s): %5)", - their_size, their_num_instructions, our_size, our_num_instructions, + their_changesets.size(), their_num_instructions, our_changesets.size(), our_num_instructions, their_index.get_num_conflict_groups()); #if REALM_DEBUG // LCOV_EXCL_START @@ -2450,24 +2450,24 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes std::cerr << TERM_YELLOW << "\n=> PEER " << std::hex << local_file_ident << " merging " "changeset(s)/from peer(s):\n"; - for (size_t i = 0; i < their_size; ++i) { + for (size_t i = 0; i < their_changesets.size(); ++i) { std::cerr << "Changeset version " << std::dec << their_changesets[i].version << " from peer " << their_changesets[i].origin_file_ident << " at timestamp " << their_changesets[i].origin_timestamp << "\n"; } std::cerr << "Transforming through local changeset(s):\n"; - for (size_t i = 0; i < our_size; ++i) { + for (size_t i = 0; i < our_changesets.size(); ++i) { std::cerr << "Changeset version " << our_changesets[i]->version << " from peer " << our_changesets[i]->origin_file_ident << " at timestamp " << our_changesets[i]->origin_timestamp << "\n"; } - for (size_t i = 0; i < our_size; ++i) { + for (size_t i = 0; i < our_changesets.size(); ++i) { std::cerr << TERM_RED << "\nLOCAL (RECIPROCAL) CHANGESET BEFORE MERGE:\n" << TERM_RESET; our_changesets[i]->print(std::cerr); } - for (size_t i = 0; i < their_size; ++i) { + for (size_t i = 0; i < their_changesets.size(); ++i) { std::cerr << TERM_RED << "\nINCOMING CHANGESET BEFORE MERGE:\n" << TERM_RESET; their_changesets[i].print(std::cerr); } @@ -2485,10 +2485,10 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes static_cast(local_file_ident); #endif // REALM_DEBUG LCOV_EXCL_STOP - for (size_t i = 0; i < our_size; ++i) { + for (size_t i = 0; i < our_changesets.size(); ++i) { logger.trace( "Transforming local changeset [%1/%2] through %3 incoming changeset(s) with %4 conflict group(s)", i + 1, - our_size, their_size, their_index.get_num_conflict_groups()); + our_changesets.size(), their_changesets.size(), their_index.get_num_conflict_groups()); Changeset* our_changeset = our_changesets[i]; transformer.m_major_side.set_next_changeset(our_changeset); @@ -2499,7 +2499,7 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes logger.debug("Finished transforming %1 local changesets through %2 incoming changesets (%3 vs %4 " "instructions, in %5 conflict groups)", - our_size, their_size, our_num_instructions, their_num_instructions, + our_changesets.size(), their_changesets.size(), our_num_instructions, their_num_instructions, their_index.get_num_conflict_groups()); #if REALM_DEBUG // LCOV_EXCL_START @@ -2509,12 +2509,12 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes #if REALM_DEBUG // LCOV_EXCL_START if (trace) { - for (size_t i = 0; i < our_size; ++i) { + for (size_t i = 0; i < our_changesets.size(); ++i) { std::cerr << TERM_CYAN << "\nRECIPROCAL CHANGESET AFTER MERGE:\n" << TERM_RESET; our_changesets[i]->print(std::cerr); std::cerr << '\n'; } - for (size_t i = 0; i < their_size; ++i) { + for (size_t i = 0; i < their_changesets.size(); ++i) { std::cerr << TERM_CYAN << "INCOMING CHANGESET AFTER MERGE:\n" << TERM_RESET; their_changesets[i].print(std::cerr); std::cerr << '\n'; @@ -2523,11 +2523,11 @@ void TransformerImpl::merge_changesets(file_ident_type local_file_ident, Changes #endif // LCOV_EXCL_STOP REALM_DEBUG } -size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, - version_type current_local_version, - util::Span parsed_changesets, - util::FunctionRef changeset_applier, - util::Logger& logger) +size_t Transformer::transform_remote_changesets(TransformHistory& history, file_ident_type local_file_ident, + version_type current_local_version, + util::Span parsed_changesets, + util::FunctionRef changeset_applier, + util::Logger& logger) { REALM_ASSERT(local_file_ident != 0); @@ -2538,65 +2538,51 @@ size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, f auto p = parsed_changesets.begin(); auto parsed_changesets_end = parsed_changesets.end(); - try { - while (p != parsed_changesets_end) { - // Find the range of incoming changesets that share the same - // last_integrated_local_version, which means we can merge them in one go. - auto same_base_range_end = std::find_if(p + 1, parsed_changesets_end, [&](auto& changeset) { - return p->last_integrated_remote_version != changeset.last_integrated_remote_version; - }); + while (p != parsed_changesets_end) { + // Find the range of incoming changesets that share the same + // last_integrated_local_version, which means we can merge them in one go. + auto same_base_range_end = std::find_if(p + 1, parsed_changesets_end, [&](auto& changeset) { + return p->last_integrated_remote_version != changeset.last_integrated_remote_version; + }); - version_type begin_version = p->last_integrated_remote_version; - version_type end_version = current_local_version; - for (;;) { - HistoryEntry history_entry; - version_type version = history.find_history_entry(begin_version, end_version, history_entry); - if (version == 0) - break; // No more local changesets + version_type begin_version = p->last_integrated_remote_version; + version_type end_version = current_local_version; + for (;;) { + HistoryEntry history_entry; + version_type version = history.find_history_entry(begin_version, end_version, history_entry); + if (version == 0) + break; // No more local changesets - Changeset& our_changeset = get_reciprocal_transform(history, local_file_ident, version, - history_entry); // Throws - our_changesets.push_back(&our_changeset); + Changeset& our_changeset = get_reciprocal_transform(history, local_file_ident, version, + history_entry); // Throws + our_changesets.push_back(&our_changeset); - begin_version = version; - } + begin_version = version; + } - bool must_apply_all = false; + bool must_apply_all = false; - if (!our_changesets.empty()) { - merge_changesets(local_file_ident, &*p, same_base_range_end - p, our_changesets.data(), - our_changesets.size(), logger); // Throws - // We need to apply all transformed changesets if at least one reciprocal changeset was modified - // during OT. - must_apply_all = std::any_of(our_changesets.begin(), our_changesets.end(), [](const Changeset* c) { - return c->is_dirty(); - }); - } - - auto continue_applying = true; - for (; p != same_base_range_end && continue_applying; ++p) { - // It is safe to stop applying the changesets if: - // 1. There are no reciprocal changesets - // 2. No reciprocal changeset was modified - continue_applying = changeset_applier(p) || must_apply_all; - } - if (!continue_applying) { - break; - } + if (!our_changesets.empty()) { + merge_changesets(local_file_ident, {&*p, same_base_range_end}, our_changesets, logger); // Throws + // We need to apply all transformed changesets if at least one reciprocal changeset was modified + // during OT. + must_apply_all = std::any_of(our_changesets.begin(), our_changesets.end(), [](const Changeset* c) { + return c->is_dirty(); + }); + } - our_changesets.clear(); // deliberately not releasing memory + auto continue_applying = true; + for (; p != same_base_range_end && continue_applying; ++p) { + // It is safe to stop applying the changesets if: + // 1. There are no reciprocal changesets + // 2. No reciprocal changeset was modified + continue_applying = changeset_applier(p) || must_apply_all; } - } - catch (...) { - // If an exception was thrown while merging, the transform cache will - // be polluted. This is a problem since the same cache object is reused - // by multiple invocations to transform_remote_changesets(), so we must - // clear the cache before rethrowing. - // - // Note that some valid changesets can still cause exceptions to be - // thrown by the merge algorithm, namely incompatible schema changes. - m_reciprocal_transform_cache.clear(); - throw; + if (!continue_applying) { + break; + } + + our_changesets.clear(); // deliberately not releasing memory } // NOTE: Any exception thrown during flushing *MUST* lead to rollback of @@ -2607,8 +2593,8 @@ size_t TransformerImpl::transform_remote_changesets(TransformHistory& history, f } -Changeset& TransformerImpl::get_reciprocal_transform(TransformHistory& history, file_ident_type local_file_ident, - version_type version, const HistoryEntry& history_entry) +Changeset& Transformer::get_reciprocal_transform(TransformHistory& history, file_ident_type local_file_ident, + version_type version, const HistoryEntry& history_entry) { auto& changeset = m_reciprocal_transform_cache[version]; // Throws if (changeset.empty()) { @@ -2637,7 +2623,7 @@ Changeset& TransformerImpl::get_reciprocal_transform(TransformHistory& history, } -void TransformerImpl::flush_reciprocal_transform_cache(TransformHistory& history) +void Transformer::flush_reciprocal_transform_cache(TransformHistory& history) { auto changesets = std::move(m_reciprocal_transform_cache); m_reciprocal_transform_cache.clear(); @@ -2652,15 +2638,6 @@ void TransformerImpl::flush_reciprocal_transform_cache(TransformHistory& history } } -} // namespace realm::_impl - -namespace realm::sync { -std::unique_ptr make_transformer() -{ - return std::make_unique<_impl::TransformerImpl>(); // Throws -} - - void parse_remote_changeset(const RemoteChangeset& remote_changeset, Changeset& parsed_changeset) { // origin_file_ident = 0 is currently used to indicate an entry of local diff --git a/src/realm/sync/transform.hpp b/src/realm/sync/transform.hpp index 22a15e5ecc2..562811cb340 100644 --- a/src/realm/sync/transform.hpp +++ b/src/realm/sync/transform.hpp @@ -118,6 +118,12 @@ class TransformError; // Exception class Transformer { public: + using Changeset = sync::Changeset; + using file_ident_type = sync::file_ident_type; + using HistoryEntry = sync::HistoryEntry; + using Instruction = sync::Instruction; + using TransformHistory = sync::TransformHistory; + using version_type = sync::version_type; using iterator = util::Span::iterator; /// Produce operationally transformed versions of the specified changesets, @@ -165,42 +171,11 @@ class Transformer { /// \throw TransformError Thrown if operational transformation fails due to /// a problem with the specified changeset. /// - /// FIXME: Consider using std::error_code instead of throwing + /// FIXME: Consider using Status instead of throwing /// TransformError. - virtual size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, - version_type current_local_version, util::Span changesets, - util::FunctionRef changeset_applier, - util::Logger&) = 0; - - virtual ~Transformer() noexcept {} -}; - -std::unique_ptr make_transformer(); - -} // namespace sync - -namespace _impl { - -class TransformerImpl : public sync::Transformer { -public: - using Changeset = sync::Changeset; - using file_ident_type = sync::file_ident_type; - using HistoryEntry = sync::HistoryEntry; - using Instruction = sync::Instruction; - using TransformHistory = sync::TransformHistory; - using version_type = sync::version_type; - - TransformerImpl() = default; - - size_t transform_remote_changesets(TransformHistory&, file_ident_type, version_type, util::Span, - util::FunctionRef, util::Logger&) override; - -protected: - virtual void merge_changesets(file_ident_type local_file_ident, Changeset* their_changesets, - std::size_t their_size, - // our_changesets is a pointer-pointer because these changesets - // are held by the reciprocal transform cache. - Changeset** our_changesets, std::size_t our_size, util::Logger& logger); + size_t transform_remote_changesets(TransformHistory&, file_ident_type local_file_ident, version_type, + util::Span, + util::FunctionRef changeset_applier, util::Logger&); private: std::map m_reciprocal_transform_cache; @@ -208,12 +183,14 @@ class TransformerImpl : public sync::Transformer { Changeset& get_reciprocal_transform(TransformHistory&, file_ident_type local_file_ident, version_type version, const HistoryEntry&); void flush_reciprocal_transform_cache(TransformHistory&); -}; - -} // namespace _impl - -namespace sync { +protected: + // A hook which is overriden in tests to dump the changesets + virtual void merge_changesets(file_ident_type local_file_ident, util::Span their_changesets, + // our_changesets is a pointer because these changesets are held by the reciprocal + // transform cache which is non-contiguous storage. + util::Span our_changesets, util::Logger& logger); +}; class RemoteChangeset { public: @@ -259,7 +236,6 @@ class RemoteChangeset { RemoteChangeset(version_type rv, version_type lv, ChunkedBinaryData d, timestamp_type ot, file_ident_type fi); }; - void parse_remote_changeset(const RemoteChangeset&, Changeset&); diff --git a/test/peer.hpp b/test/peer.hpp index a7ac653da2d..51a8249c8a8 100644 --- a/test/peer.hpp +++ b/test/peer.hpp @@ -48,7 +48,6 @@ namespace test_util { using realm::sync::HistoryEntry; using realm::sync::RemoteChangeset; using realm::sync::SyncReplication; -using realm::sync::Transformer; using realm::sync::TransformHistory; @@ -357,96 +356,87 @@ class ShortCircuitHistory::TransformHistoryImpl : public TransformHistory { }; -class ShortCircuitHistory::TransformerImpl : public _impl::TransformerImpl { +class ShortCircuitHistory::TransformerImpl final : public sync::Transformer { public: TransformerImpl(TestDirNameGenerator* changeset_dump_dir_gen) - : _impl::TransformerImpl() - , m_changeset_dump_dir_gen{changeset_dump_dir_gen} + : m_changeset_dump_dir_gen{changeset_dump_dir_gen} { } protected: - void merge_changesets(file_ident_type local_file_ident, Changeset* their_changesets, std::size_t their_size, - Changeset** our_changesets, std::size_t our_size, util::Logger& logger) final + void merge_changesets(file_ident_type local_file_ident, util::Span their_changesets, + util::Span our_changesets, util::Logger& logger) final { std::string directory; if (m_changeset_dump_dir_gen) { directory = m_changeset_dump_dir_gen->next(); util::try_make_dir(directory); - encode_changesets(our_changesets, our_size, logger); - write_changesets_to_file(directory + "/ours_original", our_size, logger); + encode_changesets(our_changesets, logger); + write_changesets_to_file(directory + "/ours_original", logger); - encode_changesets(their_changesets, their_size, logger); - write_changesets_to_file(directory + "/theirs_original", their_size, logger); + encode_changesets(their_changesets, logger); + write_changesets_to_file(directory + "/theirs_original", logger); } - _impl::TransformerImpl::merge_changesets(local_file_ident, their_changesets, their_size, our_changesets, - our_size, logger); + sync::Transformer::merge_changesets(local_file_ident, their_changesets, our_changesets, logger); if (m_changeset_dump_dir_gen) { - encode_changesets(our_changesets, our_size, logger); - write_changesets_to_file(directory + "/ours_transformed", our_size, logger); + encode_changesets(our_changesets, logger); + write_changesets_to_file(directory + "/ours_transformed", logger); - encode_changesets(their_changesets, their_size, logger); - write_changesets_to_file(directory + "/theirs_transformed", their_size, logger); + encode_changesets(their_changesets, logger); + write_changesets_to_file(directory + "/theirs_transformed", logger); } } private: using OutputBuffer = util::ResettableExpandableBufferOutputStream; - void encode_changesets(Changeset* changesets, std::size_t num_changesets, util::Logger& logger) + void encode_changesets(util::Span changesets, util::Logger& logger) { sync::ChangesetEncoder::Buffer encode_buffer; - for (size_t i = 0; i < num_changesets; ++i) { - encode_changeset(changesets[i], encode_buffer); // Throws - - HistoryEntry entry; - entry.remote_version = changesets[i].last_integrated_remote_version; - entry.origin_file_ident = changesets[i].origin_file_ident; - entry.origin_timestamp = changesets[i].origin_timestamp; - entry.changeset = BinaryData{encode_buffer.data(), encode_buffer.size()}; - - _impl::ServerProtocol::ChangesetInfo info{changesets[i].version, entry.remote_version, entry, - entry.changeset.size()}; - - m_protocol.insert_single_changeset_download_message(m_history_entries_buffer, info, logger); // Throws - - encode_buffer.clear(); + for (auto& changeset : changesets) { + encode_and_store_changeset(changeset, encode_buffer, logger); } } - void encode_changesets(Changeset** changesets, std::size_t num_changesets, util::Logger& logger) + void encode_changesets(util::Span changesets, util::Logger& logger) { sync::ChangesetEncoder::Buffer encode_buffer; - for (size_t i = 0; i < num_changesets; ++i) { - encode_changeset(*changesets[i], encode_buffer); // Throws + for (auto& changeset : changesets) { + encode_and_store_changeset(*changeset, encode_buffer, logger); + } + } - HistoryEntry entry; - entry.remote_version = changesets[i]->last_integrated_remote_version; - entry.origin_file_ident = changesets[i]->origin_file_ident; - entry.origin_timestamp = changesets[i]->origin_timestamp; - entry.changeset = BinaryData{encode_buffer.data(), encode_buffer.size()}; + void encode_and_store_changeset(const Changeset& changeset, sync::ChangesetEncoder::Buffer& encode_buffer, + util::Logger& logger) + { + encode_changeset(changeset, encode_buffer); // Throws - _impl::ServerProtocol::ChangesetInfo info{changesets[i]->version, entry.remote_version, entry, - entry.changeset.size()}; + HistoryEntry entry; + entry.remote_version = changeset.last_integrated_remote_version; + entry.origin_file_ident = changeset.origin_file_ident; + entry.origin_timestamp = changeset.origin_timestamp; + entry.changeset = BinaryData{encode_buffer.data(), encode_buffer.size()}; - m_protocol.insert_single_changeset_download_message(m_history_entries_buffer, info, logger); // Throws + _impl::ServerProtocol::ChangesetInfo info{changeset.version, entry.remote_version, entry, + entry.changeset.size()}; + m_protocol.insert_single_changeset_download_message(m_history_entries_buffer, info, logger); // Throws - encode_buffer.clear(); - } + encode_buffer.clear(); } - void write_changesets_to_file(const std::string& pathname, std::size_t num_changesets, util::Logger& logger) + void write_changesets_to_file(const std::string& pathname, util::Logger& logger) { m_protocol.make_download_message(sync::get_current_protocol_version(), m_download_message_buffer, file_ident_type(0), version_type(0), version_type(0), version_type(0), 0, - version_type(0), version_type(0), 0, num_changesets, + version_type(0), version_type(0), 0, m_history_entry_count, m_history_entries_buffer.data(), m_history_entries_buffer.size(), 0, false, logger); // Throws m_history_entries_buffer.reset(); + m_history_entry_count = 0; std::ofstream file(pathname, std::ios::binary); file.write(m_download_message_buffer.data(), m_download_message_buffer.size()); @@ -456,6 +446,7 @@ class ShortCircuitHistory::TransformerImpl : public _impl::TransformerImpl { _impl::ServerProtocol m_protocol; TestDirNameGenerator* m_changeset_dump_dir_gen; + size_t m_history_entry_count = 0; OutputBuffer m_history_entries_buffer; OutputBuffer m_download_message_buffer; }; diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 6d7e254bafc..ccaec4b1a13 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -5320,30 +5320,15 @@ namespace issue2104 { class ServerHistoryContext : public _impl::ServerHistory::Context { public: - ServerHistoryContext() - : m_transformer{make_transformer()} - { - } + ServerHistoryContext() {} std::mt19937_64& server_history_get_random() noexcept override { return m_random; } - sync::Transformer& get_transformer() override - { - return *m_transformer; - } - - util::Buffer& get_transform_buffer() override - { - return m_transform_buffer; - } - private: std::mt19937_64 m_random; - std::unique_ptr m_transformer; - util::Buffer m_transform_buffer; }; } // namespace issue2104 From 003566b6902e1732f8d583384c0bba4ff9b18c2f Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Wed, 1 Nov 2023 13:22:14 -0700 Subject: [PATCH 3/3] Delete some obsolete merge helper functions These were required prior to dangling links. --- src/realm/sync/transform.cpp | 62 ------------------------------------ 1 file changed, 62 deletions(-) diff --git a/src/realm/sync/transform.cpp b/src/realm/sync/transform.cpp index f8b9ddc2732..7b0cf417097 100644 --- a/src/realm/sync/transform.cpp +++ b/src/realm/sync/transform.cpp @@ -120,38 +120,6 @@ struct Side { return intern_string(string); } - Instruction::PrimaryKey adopt_key(const Side& other_side, const Instruction::PrimaryKey& other_key) - { - if (auto str = mpark::get_if(&other_key)) { - return adopt_string(other_side, *str); - } - else { - // Non-string keys do not need to be adopted. - return other_key; - } - } - - void adopt_path(Instruction::PathInstruction& instr, const Side& other_side, - const Instruction::PathInstruction& other) - { - instr.table = adopt_string(other_side, other.table); - instr.object = adopt_key(other_side, other.object); - instr.field = adopt_string(other_side, other.field); - instr.path.m_path.clear(); - instr.path.m_path.reserve(other.path.size()); - for (auto& element : other.path.m_path) { - auto push = util::overload{ - [&](uint32_t index) { - instr.path.m_path.push_back(index); - }, - [&](InternString str) { - instr.path.m_path.push_back(adopt_string(other_side, str)); - }, - }; - mpark::visit(push, element); - } - } - protected: void init_with_instruction(const Instruction& instr) noexcept { @@ -1154,36 +1122,6 @@ struct MergeUtils { return false; } - bool value_targets_table(const Instruction::Payload& value, - const Instruction::TableInstruction& right) const noexcept - { - if (value.type == Instruction::Payload::Type::Link) { - StringData target_table = m_left_side.get_string(value.data.link.target_table); - StringData right_table = m_right_side.get_string(right.table); - return target_table == right_table; - } - return false; - } - - bool value_targets_object(const Instruction::Payload& value, - const Instruction::ObjectInstruction& right) const noexcept - { - if (value_targets_table(value, right)) { - return same_key(value.data.link.target, right.object); - } - return false; - } - - bool value_targets_object(const Instruction::Update& left, const Instruction::ObjectInstruction& right) const - { - return value_targets_object(left.value, right); - } - - bool value_targets_object(const Instruction::ArrayInsert& left, const Instruction::ObjectInstruction& right) const - { - return value_targets_object(left.value, right); - } - // When the left side has a shorter path, get the path component on the // right that corresponds to the last component on the left. //