diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 194aa35ff9d..c383824abea 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3300,11 +3300,14 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) { GenerateNewFile(&rnd, &key_idx); ASSERT_EQ("1,4,8", FilesPerLevel(0)); - ASSERT_EQ(matches, 12); + // 12 of the matches come from GetNumberCompactionSortedRuns which calls + // IsTrivialMove(), which then calls InputCompressionMatchesOutput() + ASSERT_EQ(matches, 12 + 12); // Currently, the test relies on the number of calls to // InputCompressionMatchesOutput() per compaction. const int kCallsToInputCompressionMatch = 2; - ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch); + // Similarly, 8 of the didnt_match come from GetNumberCompactionSortedRuns + ASSERT_EQ(didnt_match, 8 + 8 * kCallsToInputCompressionMatch); ASSERT_EQ(trivial_move, 12); ASSERT_EQ(non_trivial, 8); @@ -5856,6 +5859,18 @@ TEST_F(DBCompactionTest, CompactionStatsTest) { options.listeners.emplace_back(collector); DestroyAndReopen(options); + // Verify that the internal statistics for num_running_compactions and + // num_running_compaction_sorted_runs start and end at valid states + uint64_t num_running_compactions = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions, + &num_running_compactions)); + ASSERT_EQ(num_running_compactions, 0); + uint64_t num_running_compaction_sorted_runs = 0; + ASSERT_TRUE( + db_->GetIntProperty(DB::Properties::kNumRunningCompactionSortedRuns, + &num_running_compaction_sorted_runs)); + ASSERT_EQ(num_running_compaction_sorted_runs, 0); + for (int i = 0; i < 32; i++) { for (int j = 0; j < 5000; j++) { ASSERT_OK(Put(std::to_string(j), std::string(1, 'A'))); @@ -5869,6 +5884,15 @@ TEST_F(DBCompactionTest, CompactionStatsTest) { ColumnFamilyData* cfd = cfh->cfd(); VerifyCompactionStats(*cfd, *collector); + // There should be no more running compactions, and thus no more input + // sorted runs + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumRunningCompactions, + &num_running_compactions)); + ASSERT_EQ(num_running_compactions, 0); + ASSERT_TRUE( + db_->GetIntProperty(DB::Properties::kNumRunningCompactionSortedRuns, + &num_running_compaction_sorted_runs)); + ASSERT_EQ(num_running_compaction_sorted_runs, 0); } TEST_F(DBCompactionTest, SubcompactionEvent) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 48aa47c27f1..b67ff2aeb9b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, bg_bottom_compaction_scheduled_(0), bg_compaction_scheduled_(0), num_running_compactions_(0), + num_running_compaction_sorted_runs_(0), bg_flush_scheduled_(0), num_running_flushes_(0), bg_purge_scheduled_(0), diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 1e1af5eb780..3e473797365 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -950,13 +950,6 @@ class DBImpl : public DB { return num_running_flushes_; } - // Returns the number of currently running compactions. - // REQUIREMENT: mutex_ must be held when calling this function. - int num_running_compactions() { - mutex_.AssertHeld(); - return num_running_compactions_; - } - const WriteController& write_controller() { return write_controller_; } // hollow transactions shell used for recovery. @@ -2420,7 +2413,8 @@ class DBImpl : public DB { Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction, - Env::Priority thread_pri); + Env::Priority thread_pri, + int& num_compaction_sorted_runs_added); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, FlushReason* reason, bool* flush_rescheduled_to_retain_udt, @@ -2430,6 +2424,8 @@ class DBImpl : public DB { const std::vector& inputs, bool* sfm_bookkeeping, LogBuffer* log_buffer); + size_t GetNumberCompactionSortedRuns(Compaction* c); + // Request compaction tasks token from compaction thread limiter. // It always succeeds if force = true or limiter is disable. bool RequestCompactionToken(ColumnFamilyData* cfd, bool force, @@ -2963,6 +2959,10 @@ class DBImpl : public DB { // stores the number of compactions are currently running int num_running_compactions_; + // stores the number of sorted runs being processed by currently running + // compactions + int num_running_compaction_sorted_runs_; + // number of background memtable flush jobs, submitted to the HIGH pool int bg_flush_scheduled_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 63e6f46ffef..b5433265342 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -61,6 +61,21 @@ bool DBImpl::EnoughRoomForCompaction( return enough_room; } +size_t DBImpl::GetNumberCompactionSortedRuns(Compaction* c) { + assert(c); + if (c->IsTrivialMove() || c->deletion_compaction()) { + return 0; + } + if (c->start_level() == 0) { + assert(0 < c->num_input_levels()); + assert(c->level(0) == 0); + size_t num_l0_files = c->num_input_files(0); + size_t num_non_l0_levels = c->num_input_levels() - 1; + return num_l0_files + num_non_l0_levels; + } + return c->num_input_levels(); +} + bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, std::unique_ptr* token, LogBuffer* log_buffer) { @@ -3418,8 +3433,14 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, assert((bg_thread_pri == Env::Priority::BOTTOM && bg_bottom_compaction_scheduled_) || (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); + + // BackgroundCompaction will update the + // num_running_compaction_sorted_runs_ total and later we will subtract + // what was added + int num_compaction_sorted_runs_added = 0; Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, - prepicked_compaction, bg_thread_pri); + prepicked_compaction, bg_thread_pri, + num_compaction_sorted_runs_added); TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (s.IsBusy()) { bg_cv_.SignalAll(); // In case a waiter can proceed despite the error @@ -3484,6 +3505,10 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, assert(num_running_compactions_ > 0); num_running_compactions_--; + assert(num_running_compaction_sorted_runs_ >= 0); + assert(num_running_compaction_sorted_runs_ >= + num_compaction_sorted_runs_added); + num_running_compaction_sorted_runs_ -= num_compaction_sorted_runs_added; if (bg_thread_pri == Env::Priority::LOW) { bg_compaction_scheduled_--; @@ -3522,11 +3547,13 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, } } +// Precondition: mutex_ must be held when calling this function. Status DBImpl::BackgroundCompaction(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction, - Env::Priority thread_pri) { + Env::Priority thread_pri, + int& num_compaction_sorted_runs_added) { ManualCompactionState* manual_compaction = prepicked_compaction == nullptr ? nullptr @@ -3720,6 +3747,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, num_files += each_level.files.size(); } RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION, num_files); + num_compaction_sorted_runs_added = + static_cast(GetNumberCompactionSortedRuns(c.get())); + assert(num_compaction_sorted_runs_added >= 0); + assert(num_running_compaction_sorted_runs_ >= 0); + num_running_compaction_sorted_runs_ += + num_compaction_sorted_runs_added; // There are three things that can change compaction score: // 1) When flush or compaction finish. This case is covered by diff --git a/db/internal_stats.cc b/db/internal_stats.cc index bfc66731a1b..57c2a2dce84 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -301,6 +301,8 @@ static const std::string aggregated_table_properties = static const std::string aggregated_table_properties_at_level = aggregated_table_properties + "-at-level"; static const std::string num_running_compactions = "num-running-compactions"; +static const std::string num_running_compaction_sorted_runs = + "num-running-compaction-sorted-runs"; static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; @@ -351,6 +353,8 @@ const std::string DB::Properties::kCompactionPending = rocksdb_prefix + compaction_pending; const std::string DB::Properties::kNumRunningCompactions = rocksdb_prefix + num_running_compactions; +const std::string DB::Properties::kNumRunningCompactionSortedRuns = + rocksdb_prefix + num_running_compaction_sorted_runs; const std::string DB::Properties::kNumRunningFlushes = rocksdb_prefix + num_running_flushes; const std::string DB::Properties::kBackgroundErrors = @@ -580,6 +584,9 @@ const UnorderedMap {DB::Properties::kNumRunningCompactions, {false, nullptr, &InternalStats::HandleNumRunningCompactions, nullptr, nullptr}}, + {DB::Properties::kNumRunningCompactionSortedRuns, + {false, nullptr, &InternalStats::HandleNumRunningCompactionSortedRuns, + nullptr, nullptr}}, {DB::Properties::kActualDelayedWriteRate, {false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr, nullptr}}, @@ -1265,6 +1272,13 @@ bool InternalStats::HandleNumRunningCompactions(uint64_t* value, DBImpl* db, return true; } +bool InternalStats::HandleNumRunningCompactionSortedRuns(uint64_t* value, + DBImpl* db, + Version* /*version*/) { + *value = db->num_running_compaction_sorted_runs_; + return true; +} + bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/, Version* /*version*/) { // Accumulated number of errors in background flushes or compactions. diff --git a/db/internal_stats.h b/db/internal_stats.h index c1695308ebd..db162a589ea 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -847,6 +847,8 @@ class InternalStats { bool HandleCompactionPending(uint64_t* value, DBImpl* db, Version* version); bool HandleNumRunningCompactions(uint64_t* value, DBImpl* db, Version* version); + bool HandleNumRunningCompactionSortedRuns(uint64_t* value, DBImpl* db, + Version* version); bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version); bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db, Version* version); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a647e3045a4..4b86983c9d5 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1210,6 +1210,10 @@ class DB { // running compactions. static const std::string kNumRunningCompactions; + // "rocksdb.num-running-compaction-sorted-runs" - returns the number of + // sorted runs being processed by currently running compactions. + static const std::string kNumRunningCompactionSortedRuns; + // "rocksdb.background-errors" - returns accumulated number of background // errors. static const std::string kBackgroundErrors; diff --git a/unreleased_history/new_features/num_running_compaction_sorted_runs.md b/unreleased_history/new_features/num_running_compaction_sorted_runs.md new file mode 100644 index 00000000000..9b29ca4eba8 --- /dev/null +++ b/unreleased_history/new_features/num_running_compaction_sorted_runs.md @@ -0,0 +1 @@ +Add new statistic `num_running_compaction_sorted_runs` that tracks the number of sorted runs being processed by currently running compactions