Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the total number of compaction sorted runs #13320

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
21 changes: 21 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5856,6 +5856,18 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
options.listeners.emplace_back(collector);
DestroyAndReopen(options);

// Verify that the internal statistics for num_running_compactions and
// num_running_compaction_input_iterators start and end at valid states
uint64_t num_running_compactions = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions));
ASSERT_EQ(num_running_compactions, 0);
uint64_t num_running_compaction_input_iterators = 0;
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumRunningCompactionInputIterators,
&num_running_compaction_input_iterators));
ASSERT_EQ(num_running_compaction_input_iterators, 0);

for (int i = 0; i < 32; i++) {
for (int j = 0; j < 5000; j++) {
ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
Expand All @@ -5869,6 +5881,15 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
ColumnFamilyData* cfd = cfh->cfd();

VerifyCompactionStats(*cfd, *collector);
// There should be no more running compactions, and thus no more input
// iterators
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions,
&num_running_compactions));
ASSERT_EQ(num_running_compactions, 0);
ASSERT_TRUE(
db_->GetIntProperty(DB::Properties::kNumRunningCompactionInputIterators,
&num_running_compaction_input_iterators));
ASSERT_EQ(num_running_compaction_input_iterators, 0);
}

TEST_F(DBCompactionTest, SubcompactionEvent) {
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
num_running_compaction_input_iterators_(0),
bg_flush_scheduled_(0),
num_running_flushes_(0),
bg_purge_scheduled_(0),
Expand Down
18 changes: 16 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,13 @@ class DBImpl : public DB {
return num_running_compactions_;
}

// Returns the number of input iterators for currently running compactions.
// REQUIREMENT: mutex_ must be held when calling this function.
int num_running_compaction_input_iterators() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed num_running_compactions did not follow the example of num_running_flushes, where db->num_running_flushes() gets called inside HandleNumRunningFlushes. HandleNumRunningCompactions was just accessing num_running_compactions_ directly.

mutex_.AssertHeld();
return num_running_compaction_input_iterators_;
}

const WriteController& write_controller() { return write_controller_; }

// hollow transactions shell used for recovery.
Expand Down Expand Up @@ -2417,8 +2424,9 @@ class DBImpl : public DB {
Env::Priority thread_pri);
void BackgroundCallFlush(Env::Priority thread_pri);
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer,
Status BackgroundCompaction(bool* madeProgress,
int& num_compaction_iterators_added,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ style guide suggests that output parameters go at the end

JobContext* job_context, LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
Expand All @@ -2430,6 +2438,8 @@ class DBImpl : public DB {
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);

size_t GetNumberCompactionInputIterators(Compaction* c);

// Request compaction tasks token from compaction thread limiter.
// It always succeeds if force = true or limiter is disable.
bool RequestCompactionToken(ColumnFamilyData* cfd, bool force,
Expand Down Expand Up @@ -2963,6 +2973,10 @@ class DBImpl : public DB {
// stores the number of compactions are currently running
int num_running_compactions_;

// stores the number of input iterators required for currently running
// compactions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64_t or size_t makes more sense to me since num_running_compaction_input_iterators_ should not be negative. However, I wanted to follow the convention here since everything else is using int

int num_running_compaction_input_iterators_;
Copy link
Contributor Author

@archang19 archang19 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have InstrumentedMutexLock l(&mutex_); protecting num_running_compactions_ as well as num_running_flushes_.

I guess that means we do not need std::atomic<int> for num_running_compaction_input_iterators_


// number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_;

Expand Down
35 changes: 33 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ bool DBImpl::EnoughRoomForCompaction(
return enough_room;
}

size_t DBImpl::GetNumberCompactionInputIterators(Compaction* c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check for and skip trivial moves and deletion compactions since they won't have any input iterators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. TIL about "deletion compactions"

assert(c);
if (c->start_level() == 0) {
assert(0 < c->num_input_levels());
assert(c->level(0) == 0);
size_t num_l0_files = c->num_input_files(0);
size_t num_non_l0_levels = c->num_input_levels() - 1;
return num_l0_files + num_non_l0_levels;
}
return c->num_input_levels();
}

bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer) {
Expand Down Expand Up @@ -3418,8 +3430,14 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);

// BackgroundCompaction will update the
// num_running_compaction_input_iterators_ total and later we will subtract
// what was added
int num_compaction_input_iterators_added = 0;
Status s = BackgroundCompaction(
&made_progress, num_compaction_input_iterators_added, &job_context,
&log_buffer, prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Expand Down Expand Up @@ -3484,6 +3502,11 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,

assert(num_running_compactions_ > 0);
num_running_compactions_--;
assert(num_running_compaction_input_iterators_ >= 0);
assert(num_running_compaction_input_iterators_ >=
num_compaction_input_iterators_added);
num_running_compaction_input_iterators_ -=
num_compaction_input_iterators_added;

if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
Expand Down Expand Up @@ -3522,7 +3545,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
}
}

// Precondition: mutex_ must be held when calling this function.
Status DBImpl::BackgroundCompaction(bool* made_progress,
int& num_compaction_input_iterators_added,
JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Expand Down Expand Up @@ -3720,6 +3745,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
num_files += each_level.files.size();
}
RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
num_compaction_input_iterators_added =
static_cast<int>(GetNumberCompactionInputIterators(c.get()));
assert(num_compaction_input_iterators_added >= 0);
assert(num_running_compaction_input_iterators_ >= 0);
num_running_compaction_input_iterators_ +=
num_compaction_input_iterators_added;

// There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by
Expand Down
16 changes: 15 additions & 1 deletion db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ static const std::string aggregated_table_properties =
static const std::string aggregated_table_properties_at_level =
aggregated_table_properties + "-at-level";
static const std::string num_running_compactions = "num-running-compactions";
static const std::string num_running_compaction_input_iterators =
"num-running-compaction-input-iterators";
static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
Expand Down Expand Up @@ -351,6 +353,8 @@ const std::string DB::Properties::kCompactionPending =
rocksdb_prefix + compaction_pending;
const std::string DB::Properties::kNumRunningCompactions =
rocksdb_prefix + num_running_compactions;
const std::string DB::Properties::kNumRunningCompactionInputIterators =
rocksdb_prefix + num_running_compaction_input_iterators;
const std::string DB::Properties::kNumRunningFlushes =
rocksdb_prefix + num_running_flushes;
const std::string DB::Properties::kBackgroundErrors =
Expand Down Expand Up @@ -580,6 +584,10 @@ const UnorderedMap<std::string, DBPropertyInfo>
{DB::Properties::kNumRunningCompactions,
{false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr,
nullptr}},
{DB::Properties::kNumRunningCompactionInputIterators,
{false, nullptr,
&InternalStats::HandleNumRunningCompactionInputIterators, nullptr,
nullptr}},
{DB::Properties::kActualDelayedWriteRate,
{false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr,
nullptr}},
Expand Down Expand Up @@ -1261,7 +1269,13 @@ bool InternalStats::HandleCompactionPending(uint64_t* value, DBImpl* /*db*/,

bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
Version* /*version*/) {
*value = db->num_running_compactions_;
*value = db->num_running_compactions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't num_running_compactions() assert that DB mutex is held?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does. I think (?) that is what we want. num_running_flushes() also checks mutex_.AssertHeld(); and is called from HandleNumRunningFlushes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I saw your other comment. I will update this PR to just read off the values.

return true;
}

bool InternalStats::HandleNumRunningCompactionInputIterators(
uint64_t* value, DBImpl* db, Version* /*version*/) {
*value = db->num_running_compaction_input_iterators();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not holding the mutex here. Since these are stats, it should be ok to directly read the counter even if its not 100% accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I also get rid of num_running_flushes(), which asserts the mutex is held?

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,8 @@ class InternalStats {
bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version);
bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
Version* version);
bool HandleNumRunningCompactionInputIterators(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version);
bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db,
Version* version);
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,10 @@ class DB {
// running compactions.
static const std::string kNumRunningCompactions;

// "rocksdb.num-running-compaction-input-iterators" - returns the number of
// input iterators required for currently running compactions.
static const std::string kNumRunningCompactionInputIterators;

// "rocksdb.background-errors" - returns accumulated number of background
// errors.
static const std::string kBackgroundErrors;
Expand Down
Loading