Skip to content

Commit

Permalink
Memtable sampling for mempurge heuristic. (#8628)
Browse files Browse the repository at this point in the history
Summary:
Changes the API of the MemPurge process: the `bool experimental_allow_mempurge` and `experimental_mempurge_policy` flags have been replaced by a `double experimental_mempurge_threshold` option.
This change of API reflects another major change introduced in this PR: the MemPurgeDecider() function now works by sampling the memtables being flushed to estimate the overall amount of useful payload (payload minus the garbage), and then compare this useful payload estimate with the `double experimental_mempurge_threshold` value.
Therefore, when the value of this flag is `0.0` (default value), mempurge is simply deactivated. On the other hand, a value of `DBL_MAX` would be equivalent to always going through a mempurge regardless of the garbage ratio estimate.
At the moment, a `double experimental_mempurge_threshold` value else than 0.0 or `DBL_MAX` is opnly supported`with the `SkipList` memtable representation.
Regarding the sampling, this PR includes the introduction of a `MemTable::UniqueRandomSample` function that collects (approximately) random entries from the memtable by using the new `SkipList::Iterator::RandomSeek()` under the hood, or by iterating through each memtable entry, depending on the target sample size and the total number of entries.
The unit tests have been readapted to support this new API.

Pull Request resolved: facebook/rocksdb#8628

Reviewed By: pdillinger

Differential Revision: D30149315

Pulled By: bjlemaire

fbshipit-source-id: 1feef5390c95db6f4480ab4434716533d3947f27
  • Loading branch information
bjlemaire authored and facebook-github-bot committed Aug 11, 2021
1 parent f63331e commit e3a96c4
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 120 deletions.
6 changes: 3 additions & 3 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3029,9 +3029,9 @@ unsigned char rocksdb_options_get_advise_random_on_open(
return opt->rep.advise_random_on_open;
}

void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.experimental_allow_mempurge = v;
void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt,
double v) {
opt->rep.experimental_mempurge_threshold = v;
}

void rocksdb_options_set_access_hint_on_compaction_start(
Expand Down
20 changes: 11 additions & 9 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <atomic>
#include <limits>

#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
Expand Down Expand Up @@ -694,8 +695,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
Expand Down Expand Up @@ -842,8 +843,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));

uint32_t mempurge_count = 0;
Expand Down Expand Up @@ -1046,8 +1047,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));

uint32_t mempurge_count = 0;
Expand Down Expand Up @@ -1122,8 +1123,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
// Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));

const size_t KVSIZE = 10;
Expand Down Expand Up @@ -1239,7 +1240,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
const uint32_t EXPECTED_SST_COUNT = 0;

EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
if (options.experimental_mempurge_threshold ==
std::numeric_limits<double>::max()) {
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ Status DBImpl::CloseHelper() {
// flushing (but need to implement something
// else than imm()->IsFlushPending() because the output
// memtables added to imm() dont trigger flushes).
if (immutable_db_options_.experimental_allow_mempurge) {
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
Status flush_ret;
mutex_.Unlock();
for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) {
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2410,7 +2410,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
// future changes. Therefore, we add the following if
// statement - note that calling it twice (or more)
// doesn't break anything.
if (immutable_db_options_.experimental_allow_mempurge) {
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();
Expand Down Expand Up @@ -2556,7 +2556,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,

for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (immutable_db_options_.experimental_allow_mempurge) {
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();
Expand Down
137 changes: 124 additions & 13 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void FlushJob::PickMemTable() {
// If mempurge feature is activated, keep track of any potential
// memtables coming from a previous mempurge operation.
// Used for mempurge policy.
if (db_options_.experimental_allow_mempurge) {
if (db_options_.experimental_mempurge_threshold > 0.0) {
contains_mempurge_outcome_ = false;
for (MemTable* mt : mems_) {
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
Expand Down Expand Up @@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
Status mempurge_s = Status::NotFound("No MemPurge.");
if (db_options_.experimental_allow_mempurge &&
if ((db_options_.experimental_mempurge_threshold > 0.0) &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
(!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge();
Expand Down Expand Up @@ -580,8 +580,6 @@ Status FlushJob::MemPurge() {
// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
mutable_cf_options_.write_buffer_size;
new_mem->Ref();
db_mutex_->Unlock();
} else {
Expand Down Expand Up @@ -622,16 +620,129 @@ Status FlushJob::MemPurge() {
}

bool FlushJob::MemPurgeDecider() {
MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
if (policy == MemPurgePolicy::kAlways) {
double threshold = db_options_.experimental_mempurge_threshold;
// Never trigger mempurge if threshold is not a strictly positive value.
if (!(threshold > 0.0)) {
return false;
}
if (threshold > (1.0 * mems_.size())) {
return true;
} else if (policy == MemPurgePolicy::kAlternate) {
// Note: if at least one of the flushed memtables is
// an output of a previous mempurge process, then flush
// to storage.
return !(contains_mempurge_outcome_);
}
return false;
// Payload and useful_payload (in bytes).
// The useful payload ratio of a given MemTable
// is estimated to be useful_payload/payload.
uint64_t payload = 0, useful_payload = 0;
// If estimated_useful_payload is > threshold,
// then flush to storage, else MemPurge.
double estimated_useful_payload = 0.0;
// Cochran formula for determining sample size.
// 95% confidence interval, 7% precision.
// n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
double n0 = 196.0;
ReadOptions ro;
ro.total_order_seek = true;

// Iterate over each memtable of the set.
for (MemTable* mt : mems_) {
// If the memtable is the output of a previous mempurge,
// its approximate useful payload ratio is already calculated.
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
// We make the assumption that this memtable is already
// free of garbage (garbage underestimation).
estimated_useful_payload += mt->ApproximateMemoryUsage();
} else {
// Else sample from the table.
uint64_t nentries = mt->num_entries();
// Corrected Cochran formula for small populations
// (converges to n0 for large populations).
uint64_t target_sample_size =
static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
std::unordered_set<const char*> sentries = {};
// Populate sample entries set.
mt->UniqueRandomSample(target_sample_size, &sentries);

// Estimate the garbage ratio by comparing if
// each sample corresponds to a valid entry.
for (const char* ss : sentries) {
ParsedInternalKey res;
Slice entry_slice = GetLengthPrefixedSlice(ss);
Status parse_s =
ParseInternalKey(entry_slice, &res, true /*log_err_key*/);
if (!parse_s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Memtable Decider: ParseInternalKey did not parse "
"entry_slice %s"
"successfully.",
entry_slice.data());
}
LookupKey lkey(res.user_key, kMaxSequenceNumber);
std::string vget;
Status s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;

// Pick the oldest existing snapshot that is more recent
// than the sequence number of the sampled entry.
SequenceNumber min_seqno_snapshot = kMaxSequenceNumber;
SnapshotImpl min_snapshot;
for (SequenceNumber seq_num : existing_snapshots_) {
if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
min_seqno_snapshot = seq_num;
}
}
min_snapshot.number_ = min_seqno_snapshot;
ro.snapshot =
min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;

// Estimate if the sample entry is valid or not.
bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro);
if (!gres) {
ROCKS_LOG_WARN(
db_options_.info_log,
"Memtable Get returned false when Get(sampled entry). "
"Yet each sample entry should exist somewhere in the memtable, "
"unrelated to whether it has been deleted or not.");
}
payload += entry_slice.size();

// TODO(bjlemaire): evaluate typeMerge.
// This is where the sampled entry is estimated to be
// garbage or not. Note that this is a garbage *estimation*
// because we do not include certain items such as
// CompactionFitlers triggered at flush, or if the same delete
// has been inserted twice or more in the memtable.
if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
useful_payload += entry_slice.size();
} else if (((res.type == kTypeDeletion) ||
(res.type == kTypeSingleDeletion)) &&
s.IsNotFound() && gres) {
useful_payload += entry_slice.size();
}
}
if (payload > 0) {
// We used the estimated useful payload ratio
// to evaluate how much of the total memtable is useful bytes.
estimated_useful_payload +=
(mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
ROCKS_LOG_INFO(
db_options_.info_log,
"Mempurge sampling - found garbage ratio from sampling: %f.\n",
(payload - useful_payload) * 1.0 / payload);
} else {
ROCKS_LOG_WARN(
db_options_.info_log,
"Mempurge kSampling policy: null payload measured, and collected "
"sample size is %zu\n.",
sentries.size());
}
}
}
// We convert the total number of useful paylaod bytes
// into the proportion of memtable necessary to store all these bytes.
// We compare this proportion with the threshold value.
return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
threshold;
}

Status FlushJob::WriteLevel0Table() {
Expand Down Expand Up @@ -843,7 +954,7 @@ Status FlushJob::WriteLevel0Table() {

stats.num_output_files_blob = static_cast<int>(blobs.size());

if (db_options_.experimental_allow_mempurge && s.ok()) {
if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
// The db_mutex is held at this point.
for (MemTable* mt : mems_) {
// Note: if m is not a previous mempurge output memtable,
Expand Down
6 changes: 3 additions & 3 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ class FlushJob {
// of development. At the moment it is only compatible with the Get, Put,
// Delete operations as well as Iterators and CompactionFilters.
// For this early version, "MemPurge" is called by setting the
// options.experimental_allow_mempurge flag as "true". When this is
// options.experimental_mempurge_threshold value as >0.0. When this is
// the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
// first go through the MemPurge process. herefore, we strongly
// first go through the MemPurge process. Therefore, we strongly
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
Expand Down Expand Up @@ -192,7 +192,7 @@ class FlushJob {
const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;

// Used when experimental_allow_mempurge set to true.
// Used when experimental_mempurge_threshold > 0.0.
bool contains_mempurge_outcome_;
};

Expand Down
21 changes: 21 additions & 0 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "db/dbformat.h"
Expand Down Expand Up @@ -145,6 +146,26 @@ class MemTable {
return approximate_memory_usage_.load(std::memory_order_relaxed);
}

// Returns a vector of unique random memtable entries of size 'sample_size'.
//
// Note: the entries are stored in the unordered_set as length-prefixed keys,
// hence their representation in the set as "const char*".
// Note2: the size of the output set 'entries' is not enforced to be strictly
// equal to 'target_sample_size'. Its final size might be slightly
// greater or slightly less than 'target_sample_size'
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
// REQUIRES: SkipList memtable representation. This function is not
// implemented for any other type of memtable representation (vectorrep,
// hashskiplist,...).
void UniqueRandomSample(const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) {
// TODO(bjlemaire): at the moment, only supported by skiplistrep.
// Extend it to all other memtable representations.
table_->UniqueRandomSample(num_entries(), target_sample_size, entries);
}

// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {
Expand Down
6 changes: 1 addition & 5 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,7 @@ class MemTableList {
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
void AddMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
mempurged_ids_.insert(mid);
}
}
void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); }

void RemoveMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
Expand Down
15 changes: 1 addition & 14 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ DECLARE_uint64(subcompactions);
DECLARE_uint64(periodic_compaction_seconds);
DECLARE_uint64(compaction_ttl);
DECLARE_bool(allow_concurrent_memtable_write);
DECLARE_bool(experimental_allow_mempurge);
DECLARE_string(experimental_mempurge_policy);
DECLARE_double(experimental_mempurge_threshold);
DECLARE_bool(enable_write_thread_adaptive_yield);
DECLARE_int32(reopen);
DECLARE_double(bloom_bits);
Expand Down Expand Up @@ -341,18 +340,6 @@ inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
return ret_compression_type;
}

inline enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
const char* mpolicy) {
assert(mpolicy);
if (!strcasecmp(mpolicy, "kAlways")) {
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
} else if (!strcasecmp(mpolicy, "kAlternate")) {
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
}
fprintf(stderr, "Cannot parse mempurge policy: '%s'\n", mpolicy);
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
}

inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType(
const char* ctype) {
assert(ctype);
Expand Down
8 changes: 3 additions & 5 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,9 @@ DEFINE_uint64(compaction_ttl, 1000,
DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");

DEFINE_bool(experimental_allow_mempurge, false,
"Allow mempurge process to collect memtable garbage bytes.");

DEFINE_string(experimental_mempurge_policy, "kAlternate",
"Set mempurge (MemTable Garbage Collection) policy.");
DEFINE_double(experimental_mempurge_threshold, 0.0,
"Maximum estimated useful payload that triggers a "
"mempurge process to collect memtable garbage bytes.");

DEFINE_bool(enable_write_thread_adaptive_yield, true,
"Use a yielding spin loop for brief writer thread waits.");
Expand Down
Loading

0 comments on commit e3a96c4

Please sign in to comment.