Skip to content

Commit

Permalink
[Enhancement] improve cloud native index memtable memory usage and st…
Browse files Browse the repository at this point in the history
…atistic (StarRocks#54358)

Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha authored and magzhu committed Jan 6, 2025
1 parent 5feabd2 commit 6fb5107
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 56 deletions.
2 changes: 0 additions & 2 deletions be/src/storage/lake/lake_persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class PersistentIndexMemtable;
class PersistentIndexSstable;
class TabletManager;

using IndexValueWithVer = std::pair<int64_t, IndexValue>;

class KeyValueMerger {
public:
explicit KeyValueMerger(const std::string& key, uint64_t max_rss_rowid, sstable::TableBuilder* builder,
Expand Down
61 changes: 25 additions & 36 deletions be/src/storage/lake/persistent_index_memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

namespace starrocks::lake {

void PersistentIndexMemtable::update_index_value(std::list<IndexValueWithVer>* index_value_infos, int64_t version,
void PersistentIndexMemtable::update_index_value(IndexValueWithVer* index_value_info, int64_t version,
const IndexValue& value) {
std::list<IndexValueWithVer> t;
t.emplace_front(version, value);
index_value_infos->swap(t);
index_value_info->first = version;
index_value_info->second = value;
}

Status PersistentIndexMemtable::upsert(size_t n, const Slice* keys, const IndexValue* values, IndexValue* old_values,
Expand All @@ -33,17 +32,15 @@ Status PersistentIndexMemtable::upsert(size_t n, const Slice* keys, const IndexV
for (size_t i = 0; i < n; ++i) {
auto key = keys[i].to_string();
const auto value = values[i];
std::list<IndexValueWithVer> index_value_vers;
index_value_vers.emplace_front(version, value);
if (auto [it, inserted] = _map.emplace(key, index_value_vers); inserted) {
if (auto [it, inserted] = _map.emplace(key, std::make_pair(version, value)); inserted) {
not_founds->insert(i);
_keys_size += key.capacity() + sizeof(std::string);
} else {
auto& old_index_value_vers = it->second;
auto old_value = old_index_value_vers.front().second;
auto& old_index_value_ver = it->second;
auto old_value = old_index_value_ver.second;
old_values[i] = old_value;
nfound += old_value.get_value() != NullIndexValue;
update_index_value(&old_index_value_vers, version, value);
update_index_value(&old_index_value_ver, version, value);
}
_max_rss_rowid = std::max(_max_rss_rowid, value.get_value());
}
Expand All @@ -57,13 +54,11 @@ Status PersistentIndexMemtable::insert(size_t n, const Slice* keys, const IndexV
auto key = keys[i].to_string();
auto size = keys[i].get_size();
const auto value = values[i];
std::list<IndexValueWithVer> index_value_vers;
index_value_vers.emplace_front(version, value);
if (auto [it, inserted] = _map.emplace(key, index_value_vers); inserted) {
if (auto [it, inserted] = _map.emplace(key, std::make_pair(version, value)); inserted) {
_keys_size += key.capacity() + sizeof(std::string);
} else {
auto& old_index_value_vers = it->second;
auto old_index_value = old_index_value_vers.front().second;
auto& old_index_value_ver = it->second;
auto old_index_value = old_index_value_ver.second;
if (old_index_value.get_value() != NullIndexValue) {
// shouldn't happen
std::string msg = strings::Substitute("PersistentIndexMemtable<$0> insert found duplicate key $1", size,
Expand All @@ -72,11 +67,11 @@ Status PersistentIndexMemtable::insert(size_t n, const Slice* keys, const IndexV
if (!config::experimental_lake_ignore_pk_consistency_check) {
return Status::AlreadyExist(msg);
} else {
update_index_value(&old_index_value_vers, version, value);
update_index_value(&old_index_value_ver, version, value);
}
} else {
// cover delete operation.
update_index_value(&old_index_value_vers, version, value);
update_index_value(&old_index_value_ver, version, value);
}
}
_max_rss_rowid = std::max(_max_rss_rowid, value.get_value());
Expand All @@ -90,18 +85,16 @@ Status PersistentIndexMemtable::erase(size_t n, const Slice* keys, IndexValue* o
size_t nfound = 0;
for (size_t i = 0; i < n; ++i) {
auto key = keys[i].to_string();
std::list<IndexValueWithVer> index_value_vers;
index_value_vers.emplace_front(version, IndexValue(NullIndexValue));
if (auto [it, inserted] = _map.emplace(key, index_value_vers); inserted) {
if (auto [it, inserted] = _map.emplace(key, std::make_pair(version, IndexValue(NullIndexValue))); inserted) {
old_values[i] = NullIndexValue;
not_founds->insert(i);
_keys_size += key.capacity() + sizeof(std::string);
} else {
auto& old_index_value_vers = it->second;
auto old_index_value = old_index_value_vers.front().second;
auto& old_index_value_ver = it->second;
auto old_index_value = old_index_value_ver.second;
old_values[i] = old_index_value;
nfound += old_index_value.get_value() != NullIndexValue;
update_index_value(&old_index_value_vers, version, IndexValue(NullIndexValue));
update_index_value(&old_index_value_ver, version, IndexValue(NullIndexValue));
}
}
// Delete is after upsert, so using UINT32_MAX as it's rowid
Expand All @@ -118,13 +111,11 @@ Status PersistentIndexMemtable::erase_with_filter(size_t n, const Slice* keys, c
continue;
}
auto key = keys[i].to_string();
std::list<IndexValueWithVer> index_value_vers;
index_value_vers.emplace_front(version, IndexValue(NullIndexValue));
if (auto [it, inserted] = _map.emplace(key, index_value_vers); inserted) {
if (auto [it, inserted] = _map.emplace(key, std::make_pair(version, IndexValue(NullIndexValue))); inserted) {
_keys_size += key.capacity() + sizeof(std::string);
} else {
auto& old_index_value_vers = it->second;
update_index_value(&old_index_value_vers, version, IndexValue(NullIndexValue));
auto& old_index_value_ver = it->second;
update_index_value(&old_index_value_ver, version, IndexValue(NullIndexValue));
}
}
// Delete is after upsert, so using UINT32_MAX as it's rowid
Expand All @@ -138,9 +129,7 @@ Status PersistentIndexMemtable::replace(const Slice* keys, const IndexValue* val
for (unsigned long idx : replace_idxes) {
auto key = keys[idx].to_string();
const auto value = values[idx];
std::list<IndexValueWithVer> index_value_vers;
index_value_vers.emplace_front(version, value);
if (auto [it, inserted] = _map.emplace(key, index_value_vers); !inserted) {
if (auto [it, inserted] = _map.emplace(key, std::make_pair(version, value)); !inserted) {
update_index_value(&it->second, version, value);
} else {
_keys_size += key.capacity() + sizeof(std::string);
Expand All @@ -161,8 +150,8 @@ Status PersistentIndexMemtable::get(size_t n, const Slice* keys, IndexValue* val
not_founds->insert(i);
} else {
// Assuming we want the latest (first) value due to emplace_front in updates/inserts
auto& index_value_vers = it->second;
auto index_value = index_value_vers.front().second;
auto& index_value_ver = it->second;
auto index_value = index_value_ver.second;
values[i] = index_value;
}
}
Expand All @@ -177,8 +166,8 @@ Status PersistentIndexMemtable::get(const Slice* keys, IndexValue* values, const
auto it = _map.find(key);
if (it != _map.end()) {
// Assuming we want the latest (first) value due to emplace_front in updates/inserts
auto& index_value_vers = it->second;
auto& index_value = index_value_vers.front().second;
auto& index_value_ver = it->second;
auto& index_value = index_value_ver.second;
values[key_index] = index_value.get_value();
found_key_indexes->insert(key_index);
}
Expand All @@ -187,7 +176,7 @@ Status PersistentIndexMemtable::get(const Slice* keys, IndexValue* values, const
}

size_t PersistentIndexMemtable::memory_usage() const {
return _keys_size + _map.size() * sizeof(IndexValueWithVer);
return _map.bytes_used();
}

Status PersistentIndexMemtable::flush(WritableFile* wf, uint64_t* filesize) {
Expand Down
6 changes: 2 additions & 4 deletions be/src/storage/lake/persistent_index_memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ namespace starrocks::lake {

using KeyIndex = size_t;
using KeyIndexSet = std::set<KeyIndex>;
using IndexValueWithVer = std::pair<int64_t, IndexValue>;

class PersistentIndexMemtable {
public:
Expand Down Expand Up @@ -72,12 +71,11 @@ class PersistentIndexMemtable {
const uint64_t max_rss_rowid() const { return _max_rss_rowid; }

private:
static void update_index_value(std::list<IndexValueWithVer>* index_value_info, int64_t version,
const IndexValue& value);
static void update_index_value(IndexValueWithVer* index_value_info, int64_t version, const IndexValue& value);

private:
// The size can be up to 230K. The performance of std::map may be poor.
phmap::btree_map<std::string, std::list<IndexValueWithVer>, std::less<>> _map;
phmap::btree_map<std::string, IndexValueWithVer, std::less<>> _map;
int64_t _keys_size{0};
uint64_t _max_rss_rowid{0};
};
Expand Down
15 changes: 6 additions & 9 deletions be/src/storage/lake/persistent_index_sstable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,19 @@ Status PersistentIndexSstable::init(std::unique_ptr<RandomAccessFile> rf, const
return Status::OK();
}

Status PersistentIndexSstable::build_sstable(
const phmap::btree_map<std::string, std::list<IndexValueWithVer>, std::less<>>& map, WritableFile* wf,
uint64_t* filesz) {
Status PersistentIndexSstable::build_sstable(const phmap::btree_map<std::string, IndexValueWithVer, std::less<>>& map,
WritableFile* wf, uint64_t* filesz) {
std::unique_ptr<sstable::FilterPolicy> filter_policy;
filter_policy.reset(const_cast<sstable::FilterPolicy*>(sstable::NewBloomFilterPolicy(10)));
sstable::Options options;
options.filter_policy = filter_policy.get();
sstable::TableBuilder builder(options, wf);
for (const auto& [k, v] : map) {
IndexValuesWithVerPB index_value_pb;
for (const auto& index_value_with_ver : v) {
auto* value = index_value_pb.add_values();
value->set_version(index_value_with_ver.first);
value->set_rssid(index_value_with_ver.second.get_rssid());
value->set_rowid(index_value_with_ver.second.get_rowid());
}
auto* value = index_value_pb.add_values();
value->set_version(v.first);
value->set_rssid(v.second.get_rssid());
value->set_rowid(v.second.get_rowid());
builder.Add(Slice(k), Slice(index_value_pb.SerializeAsString()));
}
RETURN_IF_ERROR(builder.Finish());
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/persistent_index_sstable.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PersistentIndexSstable {
Status init(std::unique_ptr<RandomAccessFile> rf, const PersistentIndexSstablePB& sstable_pb, Cache* cache,
bool need_filter = true);

static Status build_sstable(const phmap::btree_map<std::string, std::list<IndexValueWithVer>, std::less<>>& map,
static Status build_sstable(const phmap::btree_map<std::string, IndexValueWithVer, std::less<>>& map,
WritableFile* wf, uint64_t* filesz);

// multi_get can get multi keys at onces
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ struct IndexValue {
void operator=(uint64_t rhs) { return UNALIGNED_STORE64(v, rhs); }
};

using IndexValueWithVer = std::pair<int64_t, IndexValue>;

static constexpr size_t kIndexValueSize = 8;
static_assert(sizeof(IndexValue) == kIndexValueSize);
constexpr static size_t kSliceMaxFixLength = 64;
Expand Down
1 change: 1 addition & 0 deletions be/src/util/phmap/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -3114,6 +3114,7 @@ class btree_container {
size_type size() const { return tree_.size(); }
size_type max_size() const { return tree_.max_size(); }
bool empty() const { return tree_.empty(); }
size_type bytes_used() const { return tree_.bytes_used(); }

friend bool operator==(const btree_container& x, const btree_container& y) {
if (x.size() != y.size()) return false;
Expand Down
21 changes: 21 additions & 0 deletions be/test/storage/lake/persistent_index_memtable_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,25 @@ TEST(PersistentIndexMemtableTest, test_replace) {
}
}

TEST(PersistentIndexMemtableTest, test_memory_usage) {
auto memtable = std::make_unique<PersistentIndexMemtable>();
{
using Key = uint64_t;
const int N = 1000;
vector<Key> keys;
vector<Slice> key_slices;
vector<IndexValue> values;
vector<size_t> idxes;
keys.reserve(N);
key_slices.reserve(N);
for (int i = 0; i < N; i++) {
keys.emplace_back(i);
values.emplace_back(i * 2);
key_slices.emplace_back((uint8_t*)(&keys[i]), sizeof(Key));
}
ASSERT_OK(memtable->insert(N, key_slices.data(), values.data(), -1));
}
ASSERT_TRUE(memtable->memory_usage() < 100000 && memtable->memory_usage() > 0);
}

} // namespace starrocks::lake
6 changes: 2 additions & 4 deletions be/test/storage/lake/persistent_index_sstable_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,9 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable) {
// 1. build sstable
const std::string filename = "test_persistent_index_sstable_1.sst";
ASSIGN_OR_ABORT(auto file, fs::new_writable_file(lake::join_path(kTestDir, filename)));
phmap::btree_map<std::string, std::list<IndexValueWithVer>, std::less<>> map;
phmap::btree_map<std::string, IndexValueWithVer, std::less<>> map;
for (int i = 0; i < N; i++) {
std::list<IndexValueWithVer> index_value_vers;
index_value_vers.emplace_front(100, i);
map.insert({fmt::format("test_key_{:016X}", i), index_value_vers});
map.emplace(fmt::format("test_key_{:016X}", i), std::make_pair(100, IndexValue(i)));
}
uint64_t filesize = 0;
ASSERT_OK(PersistentIndexSstable::build_sstable(map, file.get(), &filesize));
Expand Down

0 comments on commit 6fb5107

Please sign in to comment.