Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the total number of compaction sorted runs #13320

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
num_running_compaction_input_iterators_(0),
bg_flush_scheduled_(0),
num_running_flushes_(0),
bg_purge_scheduled_(0),
Expand Down
11 changes: 9 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2417,8 +2417,9 @@ class DBImpl : public DB {
Env::Priority thread_pri);
void BackgroundCallFlush(Env::Priority thread_pri);
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer,
Status BackgroundCompaction(bool* madeProgress,
int& num_compaction_iterators_added,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ style guide suggests that output parameters go at the end

JobContext* job_context, LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
Expand All @@ -2430,6 +2431,8 @@ class DBImpl : public DB {
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);

size_t GetNumberCompactionInputIterators(Compaction* c);

// Request compaction tasks token from compaction thread limiter.
// It always succeeds if force = true or limiter is disable.
bool RequestCompactionToken(ColumnFamilyData* cfd, bool force,
Expand Down Expand Up @@ -2963,6 +2966,10 @@ class DBImpl : public DB {
// stores the number of compactions are currently running
int num_running_compactions_;

// stores the number of input iterators required for currently running
// compactions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64_t or size_t makes more sense to me since num_running_compaction_input_iterators_ should not be negative. However, I wanted to follow the convention here since everything else is using int

int num_running_compaction_input_iterators_;
Copy link
Contributor Author

@archang19 archang19 Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we have InstrumentedMutexLock l(&mutex_); protecting num_running_compactions_ as well as num_running_flushes_.

I guess that means we do not need std::atomic<int> for num_running_compaction_input_iterators_


// number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_;

Expand Down
33 changes: 31 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ bool DBImpl::EnoughRoomForCompaction(
return enough_room;
}

size_t DBImpl::GetNumberCompactionInputIterators(Compaction* c) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check for and skip trivial moves and deletion compactions since they won't have any input iterators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. TIL about "deletion compactions"

assert(c);
if (c->start_level() == 0) {
assert(0 < c->num_input_levels());
size_t num_l0_files = c->num_input_files(0);
size_t num_non_l0_levels = c->num_input_levels() - 1;
return num_l0_files + num_non_l0_levels;
}
return c->num_input_levels();
}

bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer) {
Expand Down Expand Up @@ -3418,8 +3429,14 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);

// BackgroundCompaction will update the
// num_running_compaction_input_iterators_ total and later we will subtract
// what was added
int num_compaction_input_iterators_added = 0;
Status s = BackgroundCompaction(
&made_progress, num_compaction_input_iterators_added, &job_context,
&log_buffer, prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Expand Down Expand Up @@ -3484,6 +3501,11 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,

assert(num_running_compactions_ > 0);
num_running_compactions_--;
assert(num_running_compaction_input_iterators_ >= 0);
assert(num_running_compaction_input_iterators_ >=
num_compaction_input_iterators_added);
num_running_compaction_input_iterators_ -=
num_compaction_input_iterators_added;

if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
Expand Down Expand Up @@ -3523,6 +3545,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
}

Status DBImpl::BackgroundCompaction(bool* made_progress,
int& num_compaction_input_iterators_added,
JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Expand Down Expand Up @@ -3720,6 +3743,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
num_files += each_level.files.size();
}
RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
num_compaction_input_iterators_added =
static_cast<int>(GetNumberCompactionInputIterators(c.get()));
assert(num_compaction_input_iterators_added >= 0);
assert(num_running_compaction_input_iterators_ >= 0);
num_running_compaction_input_iterators_ +=
num_compaction_input_iterators_added;

// There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by
Expand Down
14 changes: 14 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ static const std::string aggregated_table_properties =
static const std::string aggregated_table_properties_at_level =
aggregated_table_properties + "-at-level";
static const std::string num_running_compactions = "num-running-compactions";
static const std::string num_running_compaction_input_iterators =
"num-running-compaction-input-iterators";
static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
Expand Down Expand Up @@ -351,6 +353,8 @@ const std::string DB::Properties::kCompactionPending =
rocksdb_prefix + compaction_pending;
const std::string DB::Properties::kNumRunningCompactions =
rocksdb_prefix + num_running_compactions;
const std::string DB::Properties::kNumRunningCompactionInputIterators =
rocksdb_prefix + num_running_compaction_input_iterators;
const std::string DB::Properties::kNumRunningFlushes =
rocksdb_prefix + num_running_flushes;
const std::string DB::Properties::kBackgroundErrors =
Expand Down Expand Up @@ -580,6 +584,10 @@ const UnorderedMap<std::string, DBPropertyInfo>
{DB::Properties::kNumRunningCompactions,
{false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr,
nullptr}},
{DB::Properties::kNumRunningCompactionInputIterators,
{false, nullptr,
&InternalStats::HandleNumRunningCompactionInputIterators, nullptr,
nullptr}},
{DB::Properties::kActualDelayedWriteRate,
{false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr,
nullptr}},
Expand Down Expand Up @@ -1265,6 +1273,12 @@ bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
return true;
}

bool InternalStats::HandleNumRunningCompactionInputIterators(
uint64_t* value, DBImpl* db, Version* /*version*/) {
*value = db->num_running_compaction_input_iterators_;
return true;
}

bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// Accumulated number of errors in background flushes or compactions.
Expand Down
2 changes: 2 additions & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,8 @@ class InternalStats {
bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version);
bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
Version* version);
bool HandleNumRunningCompactionInputIterators(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version);
bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db,
Version* version);
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,10 @@ class DB {
// running compactions.
static const std::string kNumRunningCompactions;

// "rocksdb.num-running-compaction-input-iterators" - returns the number of
// input iterators required for currently running compactions.
static const std::string kNumRunningCompactionInputIterators;

// "rocksdb.background-errors" - returns accumulated number of background
// errors.
static const std::string kBackgroundErrors;
Expand Down
Loading