From 1e15c3d0b482a36089639f1957d1df7923930cb5 Mon Sep 17 00:00:00 2001 From: Andrew Chang Date: Tue, 21 Jan 2025 14:58:07 -0800 Subject: [PATCH] input iterators --- db/db_impl/db_impl.cc | 2 +- db/db_impl/db_impl.h | 7 +++-- db/db_impl/db_impl_compaction_flush.cc | 39 +++++++++++++------------- db/internal_stats.cc | 22 +++++++-------- db/internal_stats.h | 4 +-- include/rocksdb/db.h | 2 +- 6 files changed, 39 insertions(+), 37 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 588ac6fcb4e..db37f9b4970 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -216,7 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, bg_bottom_compaction_scheduled_(0), bg_compaction_scheduled_(0), num_running_compactions_(0), - num_running_compaction_iterators_(0), + num_running_compaction_input_iterators_(0), bg_flush_scheduled_(0), num_running_flushes_(0), bg_purge_scheduled_(0), diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f00e690a1e7..3bde8681c67 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2430,7 +2430,7 @@ class DBImpl : public DB { const std::vector& inputs, bool* sfm_bookkeeping, LogBuffer* log_buffer); - int GetNumberCompactionIterators(Compaction* c); + size_t GetNumberCompactionInputIterators(Compaction* c); // Request compaction tasks token from compaction thread limiter. // It always succeeds if force = true or limiter is disable. @@ -2965,8 +2965,9 @@ class DBImpl : public DB { // stores the number of compactions are currently running int num_running_compactions_; - // stores the number of iterators required for currently running compactions - int num_running_compaction_iterators_; + // stores the number of input iterators required for currently running + // compactions + int num_running_compaction_input_iterators_; // number of background memtable flush jobs, submitted to the HIGH pool int bg_flush_scheduled_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index ac2d824d2df..f058b6f7d34 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -61,18 +61,15 @@ bool DBImpl::EnoughRoomForCompaction( return enough_room; } -int DBImpl::GetNumberCompactionIterators(Compaction* c) { +size_t DBImpl::GetNumberCompactionInputIterators(Compaction* c) { assert(c); - int num_l0_files = 0; - int num_non_l0_levels = 0; - for (auto& each_level : *c->inputs()) { - if (each_level.level == 0) { - num_l0_files += each_level.files.size(); - } else { - num_non_l0_levels++; - } + if (c->start_level() == 0) { + assert(0 < c->num_input_levels()); + size_t num_l0_files = c->num_input_files(0); + size_t num_non_l0_levels = c->num_input_levels() - 1; + return num_l0_files + num_non_l0_levels; } - return num_l0_files + num_non_l0_levels; + return c->num_input_levels(); } bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, @@ -3424,7 +3421,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, InstrumentedMutexLock l(&mutex_); num_running_compactions_++; - int num_compaction_iterators = 0; + int num_compaction_input_iterators = 0; std::unique_ptr::iterator> pending_outputs_inserted_elem(new std::list::iterator( @@ -3433,10 +3430,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, assert((bg_thread_pri == Env::Priority::BOTTOM && bg_bottom_compaction_scheduled_) || (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); - Status s = BackgroundCompaction(&made_progress, num_compaction_iterators, - &job_context, &log_buffer, - prepicked_compaction, bg_thread_pri); - num_running_compaction_iterators_ += num_compaction_iterators; + Status s = BackgroundCompaction( + &made_progress, num_compaction_input_iterators, &job_context, + &log_buffer, prepicked_compaction, bg_thread_pri); TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (s.IsBusy()) { bg_cv_.SignalAll(); // In case a waiter can proceed despite the error @@ -3501,8 +3497,9 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, assert(num_running_compactions_ > 0); num_running_compactions_--; - assert(num_running_compaction_iterators_ >= num_compaction_iterators); - num_running_compaction_iterators_ -= num_compaction_iterators; + assert(num_running_compaction_input_iterators_ >= + num_compaction_input_iterators); + num_running_compaction_input_iterators_ -= num_compaction_input_iterators; if (bg_thread_pri == Env::Priority::LOW) { bg_compaction_scheduled_--; @@ -3542,7 +3539,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, } Status DBImpl::BackgroundCompaction(bool* made_progress, - int& num_compaction_iterators, + int& num_compaction_input_iterators, JobContext* job_context, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction, @@ -3740,7 +3737,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, num_files += each_level.files.size(); } RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files); - num_compaction_iterators = GetNumberCompactionIterators(c.get()); + num_compaction_input_iterators = + static_cast(GetNumberCompactionInputIterators(c.get())); + assert(num_compaction_input_iterators >= 0); + num_running_compaction_input_iterators_ += + num_compaction_input_iterators; // There are three things that can change compaction score: // 1) When flush or compaction finish. This case is covered by diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 1636c34613e..ce212b52f50 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -301,8 +301,8 @@ static const std::string aggregated_table_properties = static const std::string aggregated_table_properties_at_level = aggregated_table_properties + "-at-level"; static const std::string num_running_compactions = "num-running-compactions"; -static const std::string num_running_compaction_iterators = - "num-running-compaction-iterators"; +static const std::string num_running_compaction_input_iterators = + "num-running-compaction-input-iterators"; static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; @@ -353,8 +353,8 @@ const std::string DB::Properties::kCompactionPending = rocksdb_prefix + compaction_pending; const std::string DB::Properties::kNumRunningCompactions = rocksdb_prefix + num_running_compactions; -const std::string DB::Properties::kNumRunningCompactionIterators = - rocksdb_prefix + num_running_compaction_iterators; +const std::string DB::Properties::kNumRunningCompactionInputIterators = + rocksdb_prefix + num_running_compaction_input_iterators; const std::string DB::Properties::kNumRunningFlushes = rocksdb_prefix + num_running_flushes; const std::string DB::Properties::kBackgroundErrors = @@ -584,9 +584,10 @@ const UnorderedMap {DB::Properties::kNumRunningCompactions, {false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr, nullptr}}, - {DB::Properties::kNumRunningCompactionIterators, - {false, nullptr, &InternalStats::HandleNumRunningCompactionIterators, - nullptr, nullptr}}, + {DB::Properties::kNumRunningCompactionInputIterators, + {false, nullptr, + &InternalStats::HandleNumRunningCompactionInputIterators, nullptr, + nullptr}}, {DB::Properties::kActualDelayedWriteRate, {false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr, nullptr}}, @@ -1272,10 +1273,9 @@ bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db, return true; } -bool InternalStats::HandleNumRunningCompactionIterators(uint64_t* value, - DBImpl* db, - Version* /*version*/) { - *value = db->num_running_compaction_iterators_; +bool InternalStats::HandleNumRunningCompactionInputIterators( + uint64_t* value, DBImpl* db, Version* /*version*/) { + *value = db->num_running_compaction_input_iterators_; return true; } diff --git a/db/internal_stats.h b/db/internal_stats.h index 2bae3d11a01..417f6eada8c 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -847,8 +847,8 @@ class InternalStats { bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version); bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db, Version* version); - bool HandleNumRunningCompactionIterators(uint64_t* value, DBImpl* db, - Version* version); + bool HandleNumRunningCompactionInputIterators(uint64_t* value, DBImpl* db, + Version* version); bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version); bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db, Version* version); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index f7969093530..9f78e38cf45 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1212,7 +1212,7 @@ class DB { // "rocksdb.num-running-compaction-iterators" - returns the number of // iterators required for currently running compactions. - static const std::string kNumRunningCompactionIterators; + static const std::string kNumRunningCompactionInputIterators; // "rocksdb.background-errors" - returns accumulated number of background // errors.