Skip to content

Commit

Permalink
Fix LevelCompactionPicker caused by marked_for_compaction issues (#212)
Browse files Browse the repository at this point in the history
Problem:
- `LevelCompactionPicker::PickCompaction` always pick files which are marked by `marked_for_compaction`.
- If there are too many SST files have been marked, then the normal compaction would be blocked

Fix:
- Add a `ShouldSkipMarkedForCompaction` function to skip some low score compaction markers.
  • Loading branch information
mm304321141 authored Feb 22, 2022
1 parent 0f1a833 commit fbe63b6
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 21 deletions.
30 changes: 30 additions & 0 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2085,8 +2085,13 @@ void CompactionPicker::PickFilesMarkedForCompaction(
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
if (ShouldSkipMarkedForCompaction(vstorage, level_file.first,
level_file.second)) {
return false;
}
assert(!level_file.second->being_compacted);
*start_level = level_file.first;

if (level_file.second->is_output_to_parent_level()) {
*output_level =
(*start_level == 0) ? vstorage->base_level() : *start_level + 1;
Expand Down Expand Up @@ -2154,6 +2159,27 @@ bool LevelCompactionPicker::NeedsCompaction(
return vstorage->has_space_amplification();
}

bool LevelCompactionPicker::ShouldSkipMarkedForCompaction(
const VersionStorageInfo* vstorage, int level,
const FileMetaData* file_meta) {
assert(file_meta != nullptr);
assert(file_meta->marked_for_compaction);
(void)file_meta;
bool result = false;
if (level != 0) {
int check_level = level == vstorage->base_level() ? 0 : level - 1;
for (int i = 0; i <= vstorage->MaxInputLevel(); ++i) {
if (vstorage->CompactionScoreLevel(i) == check_level) {
result = vstorage->CompactionScore(i) >= 1;
break;
}
}
}
TEST_SYNC_POINT_CALLBACK(
"LevelCompactionPicker:ShouldSkipMarkedForCompaction", &result);
return result;
}

namespace {
// A class to build a leveled compaction step-by-step.
class LevelCompactionBuilder {
Expand Down Expand Up @@ -2304,6 +2330,10 @@ void LevelCompactionBuilder::SetupInitialFiles() {
for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
++i) {
auto& level_and_file = vstorage_->BottommostFilesMarkedForCompaction()[i];
if (compaction_picker_->ShouldSkipMarkedForCompaction(
vstorage_, level_and_file.first, level_and_file.second)) {
continue;
}
assert(!level_and_file.second->being_compacted);
start_level_inputs_.level = output_level_ = start_level_ =
level_and_file.first;
Expand Down
9 changes: 9 additions & 0 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ class CompactionPicker {

virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const = 0;

// Returns true if should skip pick files marked_for_compaction
virtual bool ShouldSkipMarkedForCompaction(const VersionStorageInfo* vstorage,
int level, const FileMetaData* f) {
return false;
}

// Sanitize the input set of compaction input files.
// When the input parameters do not describe a valid compaction, the
// function will try to fix the input_files by adding necessary
Expand Down Expand Up @@ -327,6 +333,9 @@ class LevelCompactionPicker : public CompactionPicker {
LogBuffer* log_buffer) override;

bool NeedsCompaction(const VersionStorageInfo* vstorage) const override;

bool ShouldSkipMarkedForCompaction(const VersionStorageInfo* vstorage,
int level, const FileMetaData* f) override;
};

#ifndef ROCKSDB_LITE
Expand Down
185 changes: 183 additions & 2 deletions db/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "db/compaction_picker_universal.h"
#include "rocksdb/terark_namespace.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"

Expand Down Expand Up @@ -64,7 +65,11 @@ class CompactionPickerTest : public testing::Test {
ioptions_.enable_lazy_compaction = false;
}

~CompactionPickerTest() {}
~CompactionPickerTest() {
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

void NewVersionStorage(int num_levels, CompactionStyle style) {
DeleteVersionStorage();
Expand Down Expand Up @@ -123,9 +128,9 @@ class CompactionPickerTest : public testing::Test {
vstorage_->UpdateNumNonEmptyLevels();
vstorage_->GenerateFileIndexer();
vstorage_->GenerateLevelFilesBrief();
vstorage_->GenerateBottommostFiles();
vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
vstorage_->GenerateLevel0NonOverlapping();
vstorage_->ComputeFilesMarkedForCompaction();
vstorage_->SetFinalized();
}
};
Expand Down Expand Up @@ -508,6 +513,182 @@ TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) {

#endif // ROCKSDB_LITE

// kMarkedFromRangeDeletion is handled by compaction_pri, so SST(4) is the first
// element in VersionStorageInfo::files_by_compaction_pri
TEST_F(CompactionPickerTest, MarkedForCompaction1) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kByCompensatedSize;

Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "200", "250", 1U);
Add(1, 3U, "300", "350", 1000000000U);
Add(1, 4U, "400", "450", 1U); // kMarkedFromRangeDeletion
Add(1, 5U, "500", "550", 1U);
Add(2, 6U, "100", "600", 1U);

vstorage_->LevelFiles(1)[3]->marked_for_compaction =
FileMetaData::kMarkedFromRangeDeletion;

UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), {}, &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(4U, compaction->input(0, 0)->fd.GetNumber());
}

// kMarkedFromTTL is not handled by compaction_pri, so SST(2) is the first
// element in VersionStorageInfo::files_by_compaction_pri
TEST_F(CompactionPickerTest, MarkedForCompaction2) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kByCompensatedSize;

Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "200", "250", 1000000000U);
Add(1, 3U, "300", "350", 1U);
Add(1, 4U, "400", "450", 1U);
Add(1, 5U, "500", "550", 1U); // kMarkedFromTTL
Add(2, 6U, "100", "600", 1U);

vstorage_->LevelFiles(1)[4]->marked_for_compaction =
FileMetaData::kMarkedFromTTL;

UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), {}, &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
}

// kMarkedFromUpdateBlob and kMarkedFromTTL are both not handled by
// compaction_pri, so SST(2) is the first element in
// VersionStorageInfo::files_by_compaction_pri
TEST_F(CompactionPickerTest, MarkedForCompaction3) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kByCompensatedSize;

Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "200", "250", 1000000000U); // kMarkedFromUpdateBlob
Add(1, 3U, "300", "350", 1U);
Add(1, 4U, "400", "450", 1U);
Add(1, 5U, "500", "550", 1U); // kMarkedFromTTL
Add(2, 6U, "100", "600", 1U);

vstorage_->LevelFiles(1)[1]->marked_for_compaction =
FileMetaData::kMarkedFromUpdateBlob;
vstorage_->LevelFiles(1)[4]->marked_for_compaction =
FileMetaData::kMarkedFromTTL;

UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), {}, &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(2U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
}

// Compaction scores are less than 1, so SST(7) should picked by
// marked_for_compaction
TEST_F(CompactionPickerTest, MarkedForCompaction4) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kByCompensatedSize;

Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "200", "250", 1U);
Add(1, 3U, "300", "350", 1U);
Add(1, 4U, "400", "450", 1U);
Add(1, 5U, "500", "550", 1U);
Add(2, 6U, "100", "320", 1U);
Add(2, 7U, "330", "600", 1U); // kMarkedFromTTL

vstorage_->LevelFiles(2)[1]->marked_for_compaction =
FileMetaData::kMarkedFromTTL;

UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), {}, &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->start_level());
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
}

// Level 1 compaction score is greater than 1, and level 1 files are all
// conflict with level 2 files. we should skip marked_for_compaction SSTs in
// level 2, but we disable this login for test
TEST_F(CompactionPickerTest, MarkedForCompaction5) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kByCompensatedSize;

Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "200", "250", 1U);
Add(1, 3U, "300", "350", 1000000000U);
Add(1, 4U, "400", "450", 1U);
Add(1, 5U, "500", "550", 1U);
Add(2, 6U, "100", "310", 1U); // being_compacted
Add(2, 7U, "320", "330", 1U); // kMarkedFromTTL
Add(2, 8U, "340", "600", 1U); // being_compacted

vstorage_->LevelFiles(2)[0]->being_compacted = true;
vstorage_->LevelFiles(2)[1]->marked_for_compaction =
FileMetaData::kMarkedFromTTL;
vstorage_->LevelFiles(2)[2]->being_compacted = true;

UpdateVersionStorageInfo();

SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker:ShouldSkipMarkedForCompaction", [&](void* arg) {
bool* result = static_cast<bool*>(arg);
*result = false;
});
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), {}, &log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_levels());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->start_level());
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
}

// Level 1 compaction score is greater than 1, and level 1 files are all
// conflict with level 2 files. we should skip marked_for_compaction SSTs in
// level 2, so we can't pick a compaction
TEST_F(CompactionPickerTest, MarkedForCompaction6) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kByCompensatedSize;

Add(1, 1U, "100", "150", 1U);
Add(1, 2U, "200", "250", 1U);
Add(1, 3U, "300", "350", 1000000000U);
Add(1, 4U, "400", "450", 1U);
Add(1, 5U, "500", "550", 1U);
Add(2, 6U, "100", "310", 1U); // being_compacted
Add(2, 7U, "320", "330", 1U); // kMarkedFromTTL
Add(2, 8U, "340", "600", 1U); // being_compacted

vstorage_->LevelFiles(2)[0]->being_compacted = true;
vstorage_->LevelFiles(2)[1]->marked_for_compaction =
FileMetaData::kMarkedFromTTL;
vstorage_->LevelFiles(2)[2]->being_compacted = true;

UpdateVersionStorageInfo();

std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), {}, &log_buffer_));
ASSERT_TRUE(compaction.get() == nullptr);
}

TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) {
NewVersionStorage(6, kCompactionStyleLevel);
ioptions_.compaction_pri = kMinOverlappingRatio;
Expand Down
7 changes: 7 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ struct FileMetaData {
return (marked_for_compaction & kFlag) != 0;
}

bool is_handle_compaction_pri() const {
constexpr uint8_t kFlag = kMarkedFromUser | kMarkedFromFileSystemHigh |
kMarkedFromRangeDeletion |
kMarkedFromTableBuilder;
return (marked_for_compaction & kFlag) != 0;
}

bool is_gc_forbidden() const {
return gc_status == kGarbageCollectionForbidden;
}
Expand Down
39 changes: 20 additions & 19 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ struct BottommostMarkedFilesComp {
bool operator()(const std::pair<int, FileMetaData*>& l,
const std::pair<int, FileMetaData*>& r) const noexcept {
uint8_t lm = l.second->marked_for_compaction;
uint8_t rm = l.second->marked_for_compaction;
uint8_t rm = r.second->marked_for_compaction;
if (lm != rm) {
return lm > rm;
}
Expand All @@ -357,7 +357,7 @@ struct MarkedFilesComp {
bool operator()(const std::pair<int, FileMetaData*>& l,
const std::pair<int, FileMetaData*>& r) const noexcept {
uint8_t lp = l.second->is_output_to_parent_level();
uint8_t rp = l.second->is_output_to_parent_level();
uint8_t rp = r.second->is_output_to_parent_level();
if (lp != rp) {
return lp > rp;
}
Expand All @@ -369,7 +369,7 @@ struct MarkedFilesComp {
}
}
uint8_t lm = l.second->marked_for_compaction;
uint8_t rm = l.second->marked_for_compaction;
uint8_t rm = r.second->marked_for_compaction;
if (lm != rm) {
return lm > rm;
}
Expand Down Expand Up @@ -2152,25 +2152,26 @@ void VersionStorageInfo::UpdateFilesByCompactionPri(
}
assert(temp.size() == files.size());

#ifdef WITH_ZENFS
// initialize files_by_compaction_pri_
for (size_t i = 0; i < temp.size(); i++) {
if (temp[i].file->marked_for_compaction &
FileMetaData::kMarkedFromFileSystemHigh) {
files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
}
}
for (size_t i = 0; i < temp.size(); i++) {
if (!(temp[i].file->marked_for_compaction &
FileMetaData::kMarkedFromFileSystemHigh)) {
files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
}
}
#else
std::stable_sort(
temp.begin(), temp.end(), [&](const Fsize& l, const Fsize& r) {
// lp rp
// 0 0 false
// 0 1 false
// 1 0 true
// 1 1 comp marked_for_compaction
uint8_t lp = l.file->is_handle_compaction_pri();
if (!lp) {
return false;
}
uint8_t rp = r.file->is_handle_compaction_pri();
if (!rp) {
return true;
}
return l.file->marked_for_compaction > r.file->marked_for_compaction;
});
for (size_t i = 0; i < temp.size(); i++) {
files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
}
#endif
next_file_to_compact_by_size_[level] = 0;
assert(files_[level].size() == files_by_compaction_pri_[level].size());
}
Expand Down

0 comments on commit fbe63b6

Please sign in to comment.