Skip to content

Commit

Permalink
Merge pull request #6530 from realm/tg/notifier-memory
Browse files Browse the repository at this point in the history
Use fewer redundant Transactions for newly registered notifiers
  • Loading branch information
tgoyne authored Apr 27, 2023
2 parents b1f9c74 + 99db86b commit 44e1793
Show file tree
Hide file tree
Showing 37 changed files with 161 additions and 341 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* Performing a query like "{1, 2, 3, ...} IN list" where the array is longer than 8 and all elements are smaller than some values in list, the program would crash ([#1183](https://github.com/realm/realm-kotlin/issues/1183), v12.5.0)
* Performing a large number of queries without ever performing a write resulted in steadily increasing memory usage, some of which was never fully freed due to an unbounded cache ([Swift #7978](https://github.com/realm/realm-swift/issues/7978), since v12.0.0)

### Breaking changes
* None.
Expand All @@ -20,6 +21,7 @@
* Clear out SubscriptionStore and cancel pending notifications upon rollback to PBS after client migration to FLX. ([#6389](https://github.com/realm/realm-core/issues/6389))
* Simplify the non-sync replication log by emitting the same instruction type for all three types of collections rather than different instructions per collection type. This has no functional effect but eliminates some duplicated code. ([PR #6513](https://github.com/realm/realm-core/pull/6513))
* Remove TransactionChangeInfo::track_all, which was only ever used by the global notifier. ([PR #6513](https://github.com/realm/realm-core/pull/6513))
* Delete util::InputStream and rename util::NoCopyInputStream to util::InputStream.

----------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion src/realm/chunked_binary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ChunkedBinaryData {
BinaryIterator m_begin;
};

class ChunkedBinaryInputStream : public util::NoCopyInputStream {
class ChunkedBinaryInputStream : public util::InputStream {
public:
explicit ChunkedBinaryInputStream(const ChunkedBinaryData& chunks)
: m_it(chunks.iterator())
Expand Down
2 changes: 1 addition & 1 deletion src/realm/exec/realm_trawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ void RealmFile::changes() const
auto changesets = m_group->get_changesets();

for (auto c : changesets) {
realm::util::SimpleNoCopyInputStream stream(c);
realm::util::SimpleInputStream stream(c);
parser.parse(stream, logger);
std::cout << "--------------------------------------------" << std::endl;
}
Expand Down
71 changes: 4 additions & 67 deletions src/realm/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,8 @@ size_t Group::get_used_space() const noexcept
}


class Group::TransactAdvancer {
namespace {
class TransactAdvancer : public _impl::NullInstructionObserver {
public:
TransactAdvancer(Group&, bool& schema_changed)
: m_schema_changed(schema_changed)
Expand All @@ -1297,41 +1298,6 @@ class Group::TransactAdvancer {
return true;
}

bool select_table(TableKey) noexcept
{
return true;
}

bool create_object(ObjKey) noexcept
{
return true;
}

bool remove_object(ObjKey) noexcept
{
return true;
}

bool modify_object(ColKey, ObjKey) noexcept
{
return true; // No-op
}

bool collection_set(size_t)
{
return true;
}

bool collection_insert(size_t)
{
return true;
}

bool enumerate_string_column(ColKey)
{
return true; // No-op
}

bool insert_column(ColKey)
{
m_schema_changed = true;
Expand All @@ -1350,39 +1316,10 @@ class Group::TransactAdvancer {
return true; // No-op
}

bool set_link_type(ColKey) noexcept
{
return true; // No-op
}

bool select_collection(ColKey, ObjKey) noexcept
{
return true; // No-op
}

bool collection_move(size_t, size_t) noexcept
{
return true; // No-op
}

bool collection_erase(size_t) noexcept
{
return true; // No-op
}

bool collection_clear(size_t) noexcept
{
return true; // No-op
}

bool typed_link_change(ColKey, TableKey)
{
return true; // No-op
}

private:
bool& m_schema_changed;
};
} // anonymous namespace


void Group::update_allocator_wrappers(bool writable)
Expand Down Expand Up @@ -1445,7 +1382,7 @@ void Group::refresh_dirty_accessors()
}


void Group::advance_transact(ref_type new_top_ref, util::NoCopyInputStream* in, bool writable)
void Group::advance_transact(ref_type new_top_ref, util::InputStream* in, bool writable)
{
REALM_ASSERT(is_attached());
// Exception safety: If this function throws, the group accessor and all of
Expand Down
5 changes: 1 addition & 4 deletions src/realm/group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class TableKeys;

namespace _impl {
class GroupFriend;
class TransactLogParser;
} // namespace _impl


Expand Down Expand Up @@ -693,10 +692,9 @@ class Group : public ArrayParent {
std::shared_ptr<metrics::Metrics> get_metrics() const noexcept;
void set_metrics(std::shared_ptr<metrics::Metrics> other) noexcept;
void update_num_objects();
class TransactAdvancer;
/// Memory mappings must have been updated to reflect any growth in filesize before
/// calling advance_transact()
void advance_transact(ref_type new_top_ref, util::NoCopyInputStream*, bool writable);
void advance_transact(ref_type new_top_ref, util::InputStream*, bool writable);
void refresh_dirty_accessors();
void flush_accessors_for_commit();

Expand Down Expand Up @@ -837,7 +835,6 @@ class Group : public ArrayParent {
friend class GroupWriter;
friend class DB;
friend class _impl::GroupFriend;
friend class _impl::TransactLogParser;
friend class metrics::QueryInfo;
friend class metrics::Metrics;
friend class Transaction;
Expand Down
4 changes: 2 additions & 2 deletions src/realm/history.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class InRealmHistory : public _impl::History {
private:
Allocator* m_alloc = nullptr;
/// Version on which the first changeset in the history is based, or if the
/// history is empty, the version associatede with currently bound
/// snapshot. In general, the version associatede with currently bound
/// history is empty, the version associated with currently bound
/// snapshot. In general, the version associated with currently bound
/// snapshot is equal to `m_base_version + m_size`, but after
/// add_changeset() is called, it is equal to one minus that.
version_type m_base_version = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/realm/impl/changeset_input_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <realm/impl/cont_transact_hist.hpp>

namespace realm::_impl {
class ChangesetInputStream : public util::NoCopyInputStream {
class ChangesetInputStream : public util::InputStream {
public:
using version_type = History::version_type;
static constexpr unsigned NB_BUFFERS = 8;
Expand Down
36 changes: 11 additions & 25 deletions src/realm/impl/transact_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,22 +327,16 @@ class TransactLogEncoder {

class TransactLogParser {
public:
TransactLogParser();
~TransactLogParser() noexcept;

/// See `TransactLogEncoder` for a list of methods that the `InstructionHandler` must define.
template <class InstructionHandler>
void parse(util::InputStream&, InstructionHandler&);

template <class InstructionHandler>
void parse(util::NoCopyInputStream&, InstructionHandler&);

private:
util::Buffer<char> m_input_buffer;
util::Buffer<char> m_input_buffer{1024};

// The input stream is assumed to consist of chunks of memory organised such that
// every instruction resides in a single chunk only.
util::NoCopyInputStream* m_input;
util::InputStream* m_input;
// pointer into transaction log, each instruction is parsed from m_input_begin and onwards.
// Each instruction are assumed to be contiguous in memory.
const char* m_input_begin;
Expand Down Expand Up @@ -668,17 +662,8 @@ inline bool TransactLogEncoder::typed_link_change(ColKey col, TableKey dest)
}


inline TransactLogParser::TransactLogParser()
: m_input_buffer(1024) // Throws
{
}


inline TransactLogParser::~TransactLogParser() noexcept {}


template <class InstructionHandler>
void TransactLogParser::parse(util::NoCopyInputStream& in, InstructionHandler& handler)
void TransactLogParser::parse(util::InputStream& in, InstructionHandler& handler)
{
m_input = &in;
m_input_begin = m_input_end = nullptr;
Expand All @@ -687,13 +672,6 @@ void TransactLogParser::parse(util::NoCopyInputStream& in, InstructionHandler& h
parse_one(handler); // Throws
}

template <class InstructionHandler>
void TransactLogParser::parse(util::InputStream& in, InstructionHandler& handler)
{
util::NoCopyInputStreamAdaptor in_2(in, m_input_buffer);
parse(in_2, handler); // Throws
}

inline bool TransactLogParser::has_next() noexcept
{
return m_input_begin != m_input_end || next_input_buffer();
Expand Down Expand Up @@ -952,6 +930,14 @@ inline bool TransactLogParser::read_char(char& c)
return true;
}

template <typename Handler>
void parse_transact_log(util::InputStream& is, Handler& handler)
{
TransactLogParser parser;
parser.parse(is, handler);
handler.parse_complete();
}

} // namespace _impl
} // namespace realm

Expand Down
15 changes: 14 additions & 1 deletion src/realm/object-store/impl/collection_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ bool CollectionNotifier::all_callbacks_filtered() const noexcept

CollectionNotifier::CollectionNotifier(std::shared_ptr<Realm> realm)
: m_realm(std::move(realm))
, m_transaction(m_realm->duplicate())
, m_transaction(Realm::Internal::get_transaction_ref(*m_realm))
{
}

Expand Down Expand Up @@ -374,6 +374,19 @@ void CollectionNotifier::for_each_callback(Fn&& fn)
m_callback_index = npos;
}

void CollectionNotifier::set_initial_transaction(
const std::vector<std::shared_ptr<CollectionNotifier>>& other_notifiers)
{
for (auto& other : other_notifiers) {
if (version() == other->version()) {
attach_to(other->m_transaction);
return;
}
}
attach_to(m_transaction->duplicate());
}


void CollectionNotifier::attach_to(std::shared_ptr<Transaction> tr)
{
REALM_ASSERT(!m_has_run);
Expand Down
6 changes: 6 additions & 0 deletions src/realm/object-store/impl/collection_notifier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ class CollectionNotifier {
return m_has_run;
}

// Detach from the source Realm's transaction and attach to either an existing
// transaction from another notifier (if any are the correct version) or a
// new one.
// precondition: RealmCoordinator::m_notifier_mutex is locked
void set_initial_transaction(const std::vector<std::shared_ptr<CollectionNotifier>>& other_notifiers);

// Discard the notifier's Transaction and move the local data over to the
// given Transaction. Must be called before the notifier is ever run.
// precondition: RealmCoordinator::m_notifier_mutex is locked
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/impl/list_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void ListNotifier::run()

m_prev_size = m_list->size();

if (m_type == PropertyType::Object) {
if (m_info && m_type == PropertyType::Object) {
auto object_did_change = get_modification_checker(*m_info, m_list->get_target_table());
for (size_t i = 0; i < m_prev_size; ++i) {
if (m_change.modifications.contains(i))
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/impl/object_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bool ObjectNotifier::do_add_required_change_info(TransactionChangeInfo& info)

void ObjectNotifier::run()
{
if (!m_table)
if (!m_table || !m_info)
return;

auto it = m_info->tables.find(m_table->get_key());
Expand Down
2 changes: 1 addition & 1 deletion src/realm/object-store/impl/object_notifier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ObjectNotifier : public CollectionNotifier {
private:
TableRef m_table;
ObjKey m_obj_key;
TransactionChangeInfo* m_info;
TransactionChangeInfo* m_info = nullptr;

void run() override REQUIRES(!m_callback_mutex);
void reattach() override;
Expand Down
8 changes: 5 additions & 3 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> not
auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
{
util::CheckedLockGuard lock(self.m_notifier_mutex);
notifier->set_initial_transaction(self.m_new_notifiers);
self.m_new_notifiers.push_back(std::move(notifier));
}
}
Expand Down Expand Up @@ -908,9 +909,12 @@ void RealmCoordinator::run_async_notifiers()
if (!new_notifiers.empty()) {
new_notifier_change_info.reserve(new_notifiers.size());
for (auto& notifier : new_notifiers) {
if (notifier->version() == version)
continue;
new_notifier_change_info.emplace_back();
notifier->add_required_change_info(new_notifier_change_info.back());
transaction::advance(notifier->transaction(), new_notifier_change_info.back(), version);
transaction::parse(*newest_transaction, new_notifier_change_info.back(), notifier->version().version,
version.version);
}
}

Expand Down Expand Up @@ -967,8 +971,6 @@ void RealmCoordinator::run_async_notifiers()
// Now that they're at the same version, switch the new notifiers over to
// the main Transaction used for background work rather than the temporary one
for (auto& notifier : new_notifiers) {
REALM_ASSERT(m_notifier_transaction->get_version_of_current_transaction() ==
notifier->transaction().get_version_of_current_transaction());
notifier->attach_to(m_notifier_transaction);
notifier->run();
}
Expand Down
6 changes: 3 additions & 3 deletions src/realm/object-store/impl/results_notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ bool ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info)
// when key path filters are set hence we need to recalculate every time the callbacks are changed.
util::CheckedLockGuard lock(m_callback_mutex);
if (m_did_modify_callbacks) {
update_related_tables(*(m_query->get_table()));
update_related_tables(*m_query->get_table());
}

return m_query->get_table() && has_run() && have_callbacks();
Expand Down Expand Up @@ -141,7 +141,7 @@ void ResultsNotifier::calculate_changes()

void ResultsNotifier::run()
{
REALM_ASSERT(m_info);
REALM_ASSERT(m_info || !has_run());

// Table's been deleted, so report all objects as deleted
if (!m_query->get_table()) {
Expand Down Expand Up @@ -288,7 +288,7 @@ bool ListResultsNotifier::do_add_required_change_info(TransactionChangeInfo& inf

bool ListResultsNotifier::need_to_run()
{
REALM_ASSERT(m_info);
REALM_ASSERT(m_info || !has_run());

// Don't run the query if the results aren't actually going to be used
if (!is_alive() || (!have_callbacks() && !m_results_were_used))
Expand Down
9 changes: 9 additions & 0 deletions src/realm/object-store/impl/transact_log_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,15 @@ void advance(Transaction& tr, TransactionChangeInfo& info, VersionID version)
}
}

void parse(Transaction& tr, TransactionChangeInfo& info, VersionID::version_type initial_version,
VersionID::version_type end_version)
{
if (!info.tables.empty() || !info.collections.empty()) {
TransactLogObserver o(info);
tr.parse_history(o, initial_version, end_version);
}
}

} // namespace transaction
} // namespace _impl
} // namespace realm
Loading

0 comments on commit 44e1793

Please sign in to comment.