Skip to content

Commit

Permalink
input iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
archang19 committed Jan 21, 2025
1 parent 2c2ba95 commit 1e15c3d
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 37 deletions.
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
num_running_compaction_iterators_(0),
num_running_compaction_input_iterators_(0),
bg_flush_scheduled_(0),
num_running_flushes_(0),
bg_purge_scheduled_(0),
Expand Down
7 changes: 4 additions & 3 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2430,7 +2430,7 @@ class DBImpl : public DB {
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);

int GetNumberCompactionIterators(Compaction* c);
size_t GetNumberCompactionInputIterators(Compaction* c);

// Request compaction tasks token from compaction thread limiter.
// It always succeeds if force = true or limiter is disable.
Expand Down Expand Up @@ -2965,8 +2965,9 @@ class DBImpl : public DB {
// stores the number of compactions are currently running
int num_running_compactions_;

// stores the number of iterators required for currently running compactions
int num_running_compaction_iterators_;
// stores the number of input iterators required for currently running
// compactions
int num_running_compaction_input_iterators_;

// number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_;
Expand Down
39 changes: 20 additions & 19 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,15 @@ bool DBImpl::EnoughRoomForCompaction(
return enough_room;
}

int DBImpl::GetNumberCompactionIterators(Compaction* c) {
size_t DBImpl::GetNumberCompactionInputIterators(Compaction* c) {
assert(c);
int num_l0_files = 0;
int num_non_l0_levels = 0;
for (auto& each_level : *c->inputs()) {
if (each_level.level == 0) {
num_l0_files += each_level.files.size();
} else {
num_non_l0_levels++;
}
if (c->start_level() == 0) {
assert(0 < c->num_input_levels());
size_t num_l0_files = c->num_input_files(0);
size_t num_non_l0_levels = c->num_input_levels() - 1;
return num_l0_files + num_non_l0_levels;
}
return num_l0_files + num_non_l0_levels;
return c->num_input_levels();
}

bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
Expand Down Expand Up @@ -3424,7 +3421,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
InstrumentedMutexLock l(&mutex_);

num_running_compactions_++;
int num_compaction_iterators = 0;
int num_compaction_input_iterators = 0;

std::unique_ptr<std::list<uint64_t>::iterator>
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
Expand All @@ -3433,10 +3430,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
assert((bg_thread_pri == Env::Priority::BOTTOM &&
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, num_compaction_iterators,
&job_context, &log_buffer,
prepicked_compaction, bg_thread_pri);
num_running_compaction_iterators_ += num_compaction_iterators;
Status s = BackgroundCompaction(
&made_progress, num_compaction_input_iterators, &job_context,
&log_buffer, prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Expand Down Expand Up @@ -3501,8 +3497,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,

assert(num_running_compactions_ > 0);
num_running_compactions_--;
assert(num_running_compaction_iterators_ >= num_compaction_iterators);
num_running_compaction_iterators_ -= num_compaction_iterators;
assert(num_running_compaction_input_iterators_ >=
num_compaction_input_iterators);
num_running_compaction_input_iterators_ -= num_compaction_input_iterators;

if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
Expand Down Expand Up @@ -3542,7 +3539,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
}

Status DBImpl::BackgroundCompaction(bool* made_progress,
int& num_compaction_iterators,
int& num_compaction_input_iterators,
JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction,
Expand Down Expand Up @@ -3740,7 +3737,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
num_files += each_level.files.size();
}
RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files);
num_compaction_iterators = GetNumberCompactionIterators(c.get());
num_compaction_input_iterators =
static_cast<int>(GetNumberCompactionInputIterators(c.get()));
assert(num_compaction_input_iterators >= 0);
num_running_compaction_input_iterators_ +=
num_compaction_input_iterators;

// There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by
Expand Down
22 changes: 11 additions & 11 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ static const std::string aggregated_table_properties =
static const std::string aggregated_table_properties_at_level =
aggregated_table_properties + "-at-level";
static const std::string num_running_compactions = "num-running-compactions";
static const std::string num_running_compaction_iterators =
"num-running-compaction-iterators";
static const std::string num_running_compaction_input_iterators =
"num-running-compaction-input-iterators";
static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
Expand Down Expand Up @@ -353,8 +353,8 @@ const std::string DB::Properties::kCompactionPending =
rocksdb_prefix + compaction_pending;
const std::string DB::Properties::kNumRunningCompactions =
rocksdb_prefix + num_running_compactions;
const std::string DB::Properties::kNumRunningCompactionIterators =
rocksdb_prefix + num_running_compaction_iterators;
const std::string DB::Properties::kNumRunningCompactionInputIterators =
rocksdb_prefix + num_running_compaction_input_iterators;
const std::string DB::Properties::kNumRunningFlushes =
rocksdb_prefix + num_running_flushes;
const std::string DB::Properties::kBackgroundErrors =
Expand Down Expand Up @@ -584,9 +584,10 @@ const UnorderedMap<std::string, DBPropertyInfo>
{DB::Properties::kNumRunningCompactions,
{false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr,
nullptr}},
{DB::Properties::kNumRunningCompactionIterators,
{false, nullptr, &InternalStats::HandleNumRunningCompactionIterators,
nullptr, nullptr}},
{DB::Properties::kNumRunningCompactionInputIterators,
{false, nullptr,
&InternalStats::HandleNumRunningCompactionInputIterators, nullptr,
nullptr}},
{DB::Properties::kActualDelayedWriteRate,
{false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr,
nullptr}},
Expand Down Expand Up @@ -1272,10 +1273,9 @@ bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
return true;
}

bool InternalStats::HandleNumRunningCompactionIterators(uint64_t* value,
DBImpl* db,
Version* /*version*/) {
*value = db->num_running_compaction_iterators_;
bool InternalStats::HandleNumRunningCompactionInputIterators(
uint64_t* value, DBImpl* db, Version* /*version*/) {
*value = db->num_running_compaction_input_iterators_;
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,8 @@ class InternalStats {
bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version);
bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db,
Version* version);
bool HandleNumRunningCompactionIterators(uint64_t* value, DBImpl* db,
Version* version);
bool HandleNumRunningCompactionInputIterators(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version);
bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db,
Version* version);
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ class DB {

// "rocksdb.num-running-compaction-iterators" - returns the number of
// iterators required for currently running compactions.
static const std::string kNumRunningCompactionIterators;
static const std::string kNumRunningCompactionInputIterators;

// "rocksdb.background-errors" - returns accumulated number of background
// errors.
Expand Down

0 comments on commit 1e15c3d

Please sign in to comment.