Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support remove immutable memtable for cloud native pindex (backport #54178) #54408

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 6 additions & 29 deletions be/src/storage/lake/lake_persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Status LakePersistentIndex::minor_compact() {
}
ASSIGN_OR_RETURN(auto wf, fs::new_writable_file(wopts, location));
uint64_t filesize = 0;
RETURN_IF_ERROR(_immutable_memtable->flush(wf.get(), &filesize));
RETURN_IF_ERROR(_memtable->flush(wf.get(), &filesize));
RETURN_IF_ERROR(wf->close());

auto sstable = std::make_unique<PersistentIndexSstable>();
Expand All @@ -206,7 +206,7 @@ Status LakePersistentIndex::minor_compact() {
PersistentIndexSstablePB sstable_pb;
sstable_pb.set_filename(filename);
sstable_pb.set_filesize(filesize);
sstable_pb.set_max_rss_rowid(_immutable_memtable->max_rss_rowid());
sstable_pb.set_max_rss_rowid(_memtable->max_rss_rowid());
sstable_pb.set_encryption_meta(encryption_meta);
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
Expand All @@ -219,11 +219,10 @@ Status LakePersistentIndex::minor_compact() {
}

Status LakePersistentIndex::flush_memtable() {
if (_immutable_memtable != nullptr) {
RETURN_IF_ERROR(minor_compact());
}
_immutable_memtable = std::make_unique<PersistentIndexMemtable>(_memtable->max_rss_rowid());
_memtable.swap(_immutable_memtable);
RETURN_IF_ERROR(minor_compact());
auto max_rss_rowid = _memtable->max_rss_rowid();
_memtable.reset();
_memtable = std::make_unique<PersistentIndexMemtable>(max_rss_rowid);
return Status::OK();
}

Expand All @@ -243,24 +242,11 @@ Status LakePersistentIndex::get_from_sstables(size_t n, const Slice* keys, Index
return Status::OK();
}

Status LakePersistentIndex::get_from_immutable_memtable(const Slice* keys, IndexValue* values,
const KeyIndexSet& key_indexes, KeyIndexSet* found_key_indexes,
int64_t version) const {
if (_immutable_memtable == nullptr || key_indexes.empty()) {
return Status::OK();
}
RETURN_IF_ERROR(_immutable_memtable->get(keys, values, key_indexes, found_key_indexes, version));
return Status::OK();
}

Status LakePersistentIndex::get(size_t n, const Slice* keys, IndexValue* values) {
KeyIndexSet not_founds;
// Assuming we always want the latest value now
RETURN_IF_ERROR(_memtable->get(n, keys, values, &not_founds, -1));
KeyIndexSet& key_indexes = not_founds;
KeyIndexSet found_key_indexes;
RETURN_IF_ERROR(get_from_immutable_memtable(keys, values, key_indexes, &found_key_indexes, -1));
set_difference(&key_indexes, found_key_indexes);
RETURN_IF_ERROR(get_from_sstables(n, keys, values, &key_indexes, -1));
return Status::OK();
}
Expand All @@ -271,9 +257,6 @@ Status LakePersistentIndex::upsert(size_t n, const Slice* keys, const IndexValue
size_t num_found;
RETURN_IF_ERROR(_memtable->upsert(n, keys, values, old_values, &not_founds, &num_found, _version.major_number()));
KeyIndexSet& key_indexes = not_founds;
KeyIndexSet found_key_indexes;
RETURN_IF_ERROR(get_from_immutable_memtable(keys, old_values, key_indexes, &found_key_indexes, -1));
set_difference(&key_indexes, found_key_indexes);
RETURN_IF_ERROR(get_from_sstables(n, keys, old_values, &key_indexes, -1));
if (is_memtable_full()) {
return flush_memtable();
Expand Down Expand Up @@ -307,9 +290,6 @@ Status LakePersistentIndex::erase(size_t n, const Slice* keys, IndexValue* old_v
size_t num_found;
RETURN_IF_ERROR(_memtable->erase(n, keys, old_values, &not_founds, &num_found, _version.major_number(), rowset_id));
KeyIndexSet& key_indexes = not_founds;
KeyIndexSet found_key_indexes;
RETURN_IF_ERROR(get_from_immutable_memtable(keys, old_values, key_indexes, &found_key_indexes, -1));
set_difference(&key_indexes, found_key_indexes);
RETURN_IF_ERROR(get_from_sstables(n, keys, old_values, &key_indexes, -1));
if (is_memtable_full()) {
return flush_memtable();
Expand Down Expand Up @@ -776,9 +756,6 @@ size_t LakePersistentIndex::memory_usage() const {
if (_memtable != nullptr) {
mem_usage += _memtable->memory_usage();
}
if (_immutable_memtable != nullptr) {
mem_usage += _immutable_memtable->memory_usage();
}
for (const auto& sst_ptr : _sstables) {
if (sst_ptr != nullptr) {
mem_usage += sst_ptr->memory_usage();
Expand Down
10 changes: 0 additions & 10 deletions be/src/storage/lake/lake_persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,6 @@ class LakePersistentIndex : public PersistentIndex {

bool is_memtable_full() const;

// batch get
// |keys|: key array as raw buffer
// |values|: value array
// |key_indexes|: the indexes of keys.
// |found_key_indexes|: founded indexes of keys
// |version|: version of values
Status get_from_immutable_memtable(const Slice* keys, IndexValue* values, const KeyIndexSet& key_indexes,
KeyIndexSet* found_key_indexes, int64_t version) const;

// batch get
// |n|: size of key/value array
// |keys|: key array as raw buffer
Expand All @@ -191,7 +182,6 @@ class LakePersistentIndex : public PersistentIndex {

private:
std::unique_ptr<PersistentIndexMemtable> _memtable;
std::unique_ptr<PersistentIndexMemtable> _immutable_memtable{nullptr};
TabletManager* _tablet_mgr{nullptr};
int64_t _tablet_id{0};
// The size of sstables is not expected to be too large.
Expand Down
2 changes: 1 addition & 1 deletion be/test/storage/lake/primary_key_compaction_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1391,7 +1391,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_major_compaction) {
version++;
ASSERT_EQ(kChunkSize * N, read(version));
ASSIGN_OR_ABORT(new_tablet_metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
EXPECT_EQ(N - 1, new_tablet_metadata->orphan_files_size());
EXPECT_EQ(N, new_tablet_metadata->orphan_files_size());

config::l0_max_mem_usage = l0_max_mem_usage;
}
Expand Down
Loading