Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add more metrics to help locate hotspot issues (backport #53490) #55644

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void calculate_metrics(void* arg_this) {
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes,
mem_metrics->jit_cache_mem_bytes.value());

StarRocksMetrics::instance()->table_metrics_mgr()->cleanup();
nap_sleep(15, [daemon] { return daemon->stopped(); });
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ set(EXEC_FILES
pipeline/sort/spillable_partition_sort_sink_operator.cpp
pipeline/sort/local_parallel_merge_sort_source_operator.cpp
pipeline/sort/sort_context.cpp
pipeline/pipeline_metrics.cpp
pipeline/pipeline_driver_executor.cpp
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,7 @@ void PipelineDriver::_try_to_release_buffer(RuntimeState* state, OperatorPtr& op
}
}

void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state, int64_t schedule_count,
int64_t execution_time) {
void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
stop_timers();
int64_t time_spent = 0;
// The driver may be destructed after finalizing, so use a temporal driver to record
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class PipelineDriver {
void set_morsel_queue(MorselQueue* morsel_queue) { _morsel_queue = morsel_queue; }
Status prepare(RuntimeState* runtime_state);
virtual StatusOr<DriverState> process(RuntimeState* runtime_state, int worker_id);
void finalize(RuntimeState* runtime_state, DriverState state, int64_t schedule_count, int64_t execution_time);
void finalize(RuntimeState* runtime_state, DriverState state);
DriverAcct& driver_acct() { return _driver_acct; }
DriverState driver_state() const { return _state; }

Expand Down
32 changes: 17 additions & 15 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>

#include "agent/master_info.h"
#include "exec/pipeline/pipeline_metrics.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "gutil/strings/substitute.h"
Expand All @@ -30,14 +31,19 @@
namespace starrocks::pipeline {

GlobalDriverExecutor::GlobalDriverExecutor(const std::string& name, std::unique_ptr<ThreadPool> thread_pool,
bool enable_resource_group, const CpuUtil::CpuIds& cpuids)
bool enable_resource_group, const CpuUtil::CpuIds& cpuids,
PipelineExecutorMetrics* metrics)
: Base("pip_exec_" + name),
_driver_queue(enable_resource_group ? std::unique_ptr<DriverQueue>(std::make_unique<WorkGroupDriverQueue>())
: std::make_unique<QuerySharedDriverQueue>()),
_driver_queue(enable_resource_group
? std::unique_ptr<DriverQueue>(
std::make_unique<WorkGroupDriverQueue>(metrics->get_driver_queue_metrics()))
: std::make_unique<QuerySharedDriverQueue>(metrics->get_driver_queue_metrics())),
_thread_pool(std::move(thread_pool)),
_blocked_driver_poller(new PipelineDriverPoller(name, _driver_queue.get(), cpuids)),
_blocked_driver_poller(
new PipelineDriverPoller(name, _driver_queue.get(), cpuids, metrics->get_poller_metrics())),
_exec_state_reporter(new ExecStateReporter(cpuids)),
_audit_statistics_reporter(new AuditStatisticsReporter()) {}
_audit_statistics_reporter(new AuditStatisticsReporter()),
_metrics(metrics->get_driver_executor_metrics()) {}

void GlobalDriverExecutor::close() {
_driver_queue->close();
Expand All @@ -53,13 +59,6 @@ void GlobalDriverExecutor::initialize(int num_threads) {
}
}

DriverExecutorMetrics GlobalDriverExecutor::metrics() const {
return {.schedule_count = _schedule_count.load(),
.driver_execution_ns = _driver_execution_ns.load(),
.driver_queue_len = static_cast<int64_t>(_driver_queue->size()),
.driver_poller_block_queue_len = static_cast<int64_t>(_blocked_driver_poller->num_drivers())};
}

void GlobalDriverExecutor::change_num_threads(int32_t num_threads) {
int32_t old_num_threads = 0;
if (!_num_threads_setter.adjust_expect_num(num_threads, &old_num_threads)) {
Expand All @@ -74,7 +73,7 @@ void GlobalDriverExecutor::change_num_threads(int32_t num_threads) {

void GlobalDriverExecutor::_finalize_driver(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state) {
DCHECK(driver);
driver->finalize(runtime_state, state, _schedule_count, _driver_execution_ns);
driver->finalize(runtime_state, state);
}

void GlobalDriverExecutor::_worker_thread() {
Expand Down Expand Up @@ -110,7 +109,7 @@ void GlobalDriverExecutor::_worker_thread() {
auto* fragment_ctx = driver->fragment_ctx();

driver->increment_schedule_times();
_schedule_count++;
_metrics->driver_schedule_count.increment(1);

SCOPED_SET_TRACE_INFO(driver->driver_id(), query_ctx->query_id(), fragment_ctx->fragment_instance_id());

Expand Down Expand Up @@ -150,6 +149,7 @@ void GlobalDriverExecutor::_worker_thread() {

StatusOr<DriverState> maybe_state;
int64_t start_time = driver->get_active_time();
_metrics->exec_running_tasks.increment(1);
#ifdef NDEBUG
TRY_CATCH_ALL(maybe_state, driver->process(runtime_state, worker_id));
#else
Expand All @@ -161,7 +161,9 @@ void GlobalDriverExecutor::_worker_thread() {
Status status = maybe_state.status();
this->_driver_queue->update_statistics(driver);
int64_t end_time = driver->get_active_time();
_driver_execution_ns += end_time - start_time;
_metrics->driver_execution_time.increment(end_time - start_time);
_metrics->exec_running_tasks.increment(-1);
_metrics->exec_finished_tasks.increment(1);

// Check big query
if (!driver->is_query_never_expired() && status.ok() && driver->workgroup()) {
Expand Down
15 changes: 4 additions & 11 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "exec/pipeline/pipeline_driver_poller.h"
#include "exec/pipeline/pipeline_driver_queue.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/pipeline_metrics.h"
#include "exec/pipeline/query_context.h"
#include "runtime/runtime_state.h"
#include "util/factory_method.h"
Expand All @@ -34,12 +35,7 @@ namespace starrocks::pipeline {
class DriverExecutor;
using DriverExecutorPtr = std::shared_ptr<DriverExecutor>;

struct DriverExecutorMetrics {
int64_t schedule_count;
int64_t driver_execution_ns;
int64_t driver_queue_len;
int64_t driver_poller_block_queue_len;
};
class PipelineExecutorMetrics;

class DriverExecutor {
public:
Expand Down Expand Up @@ -72,16 +68,14 @@ class DriverExecutor {

virtual void bind_cpus(const CpuUtil::CpuIds& cpuids, const std::vector<CpuUtil::CpuIds>& borrowed_cpuids) = 0;

virtual DriverExecutorMetrics metrics() const = 0;

protected:
std::string _name;
};

class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDriverExecutor> {
public:
GlobalDriverExecutor(const std::string& name, std::unique_ptr<ThreadPool> thread_pool, bool enable_resource_group,
const CpuUtil::CpuIds& cpuids);
const CpuUtil::CpuIds& cpuids, PipelineExecutorMetrics* metrics);
~GlobalDriverExecutor() override = default;
void initialize(int32_t num_threads) override;
void change_num_threads(int32_t num_threads) override;
Expand Down Expand Up @@ -111,8 +105,6 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr

void _finalize_epoch(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state);

DriverExecutorMetrics metrics() const override;

private:
// The maximum duration that a driver could stay in local_driver_queue
static constexpr int64_t LOCAL_MAX_WAIT_TIME_SPENT_NS = 1'000'000L;
Expand All @@ -132,6 +124,7 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
// metrics
std::unique_ptr<UIntGauge> _driver_queue_len;
std::unique_ptr<UIntGauge> _driver_poller_block_queue_len;
DriverExecutorMetrics* _metrics;
};

} // namespace starrocks::pipeline
7 changes: 5 additions & 2 deletions be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include <chrono>

#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/pipeline_metrics.h"
#include "runtime/exec_env.h"
#include "util/time_guard.h"

namespace starrocks::pipeline {
Expand Down Expand Up @@ -187,7 +190,7 @@ void PipelineDriverPoller::run_internal() {
void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {
std::unique_lock<std::mutex> lock(_global_mutex);
_blocked_drivers.push_back(driver);
_num_drivers++;
_metrics->poller_block_queue_len.increment(1);
driver->_pending_timer_sw->reset();
driver->driver_acct().clean_local_queue_infos();
_cond.notify_one();
Expand Down Expand Up @@ -239,7 +242,7 @@ void PipelineDriverPoller::remove_blocked_driver(DriverList& local_blocked_drive
auto& driver = *driver_it;
driver->_pending_timer->update(driver->_pending_timer_sw->elapsed_time());
local_blocked_drivers.erase(driver_it++);
_num_drivers--;
_metrics->poller_block_queue_len.increment(-1);
}

void PipelineDriverPoller::on_cancel(DriverRawPtr driver, std::vector<DriverRawPtr>& ready_drivers,
Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/pipeline/pipeline_driver_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ namespace starrocks::pipeline {

class PipelineDriverPoller;
using PipelineDriverPollerPtr = std::unique_ptr<PipelineDriverPoller>;
class PollerMetrics;

class PipelineDriverPoller {
public:
explicit PipelineDriverPoller(std::string name, DriverQueue* driver_queue, CpuUtil::CpuIds cpuids)
explicit PipelineDriverPoller(std::string name, DriverQueue* driver_queue, CpuUtil::CpuIds cpuids,
PollerMetrics* metrics)
: _name(std::move(name)),
_cpud_ids(std::move(cpuids)),
_driver_queue(driver_queue),
_polling_thread(nullptr),
_is_polling_thread_initialized(false),
_is_shutdown(false),
_num_drivers(0) {}
_metrics(metrics) {}

~PipelineDriverPoller() { shutdown(); }

Expand All @@ -56,9 +58,6 @@ class PipelineDriverPoller {
size_t activate_parked_driver(const ConstDriverPredicator& predicate_func);
size_t calculate_parked_driver(const ConstDriverPredicator& predicate_func) const;

// only used for collect metrics
size_t num_drivers() const { return _num_drivers; }

void for_each_driver(const ConstDriverConsumer& call) const;

void bind_cpus(const CpuUtil::CpuIds& cpuids);
Expand Down Expand Up @@ -88,6 +87,6 @@ class PipelineDriverPoller {
mutable std::mutex _global_parked_mutex;
DriverList _parked_drivers;

std::atomic<size_t> _num_drivers;
PollerMetrics* _metrics;
};
} // namespace starrocks::pipeline
12 changes: 11 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@

#include "exec/pipeline/pipeline_driver_queue.h"

#include "exec/pipeline/pipeline_metrics.h"
#include "exec/pipeline/source_operator.h"
#include "exec/workgroup/work_group.h"
#include "gutil/strings/substitute.h"

namespace starrocks::pipeline {

/// QuerySharedDriverQueue.
QuerySharedDriverQueue::QuerySharedDriverQueue() {
QuerySharedDriverQueue::QuerySharedDriverQueue(DriverQueueMetrics* metrics) : FactoryMethod(metrics) {
double factor = 1;
for (int i = QUEUE_SIZE - 1; i >= 0; --i) {
// initialize factor for every sub queue,
// Higher priority queues have more execution time,
// so they have a larger factor.
_queues[i].set_metrics(metrics);
_queues[i].factor_for_normal = factor;
factor *= RATIO_OF_ADJACENT_QUEUE;
}
Expand Down Expand Up @@ -56,6 +58,7 @@ void QuerySharedDriverQueue::put_back(const DriverRawPtr driver) {
_cv.notify_one();
++_num_drivers;
}
_metrics->driver_queue_len.increment(1);
}

void QuerySharedDriverQueue::put_back(const std::vector<DriverRawPtr>& drivers) {
Expand All @@ -73,6 +76,7 @@ void QuerySharedDriverQueue::put_back(const std::vector<DriverRawPtr>& drivers)
_cv.notify_one();
}
_num_drivers += drivers.size();
_metrics->driver_queue_len.increment(drivers.size());
}

void QuerySharedDriverQueue::put_back_from_executor(const DriverRawPtr driver) {
Expand Down Expand Up @@ -120,6 +124,7 @@ StatusOr<DriverRawPtr> QuerySharedDriverQueue::take(const bool block) {
driver_ptr->set_in_ready_queue(false);

--_num_drivers;
_metrics->driver_queue_len.increment(-1);
}
}

Expand Down Expand Up @@ -170,6 +175,7 @@ void SubQuerySharedDriverQueue::put(const DriverRawPtr driver) {
queue.emplace_back(driver);
}
num_drivers++;
_metrics->driver_queue_len.increment(1);
}

void SubQuerySharedDriverQueue::cancel(const DriverRawPtr driver) {
Expand All @@ -187,6 +193,7 @@ DriverRawPtr SubQuerySharedDriverQueue::take(const bool block) {
pending_cancel_queue.pop();
cancelled_set.insert(driver);
--num_drivers;
_metrics->driver_queue_len.increment(-1);
return driver;
}

Expand All @@ -198,6 +205,7 @@ DriverRawPtr SubQuerySharedDriverQueue::take(const bool block) {
cancelled_set.erase(iter);
} else {
--num_drivers;
_metrics->driver_queue_len.increment(-1);
return driver;
}
}
Expand Down Expand Up @@ -282,6 +290,7 @@ StatusOr<DriverRawPtr> WorkGroupDriverQueue::take(const bool block) {
auto maybe_driver = wg_entity->queue()->take(block);
if (maybe_driver.ok() && maybe_driver.value() != nullptr) {
--_num_drivers;
_metrics->driver_queue_len.increment(-1);
}
return maybe_driver;
}
Expand Down Expand Up @@ -351,6 +360,7 @@ void WorkGroupDriverQueue::_put_back(const DriverRawPtr driver) {
}

++_num_drivers;
_metrics->driver_queue_len.increment(1);

_cv.notify_one();
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ namespace starrocks::pipeline {

class DriverQueue;
using DriverQueuePtr = std::unique_ptr<DriverQueue>;
class DriverQueueMetrics;

class DriverQueue {
public:
DriverQueue(DriverQueueMetrics* metrics) : _metrics(metrics) {}
virtual ~DriverQueue() = default;
virtual void close() = 0;

Expand All @@ -46,6 +48,8 @@ class DriverQueue {
bool empty() const { return size() == 0; }

virtual bool should_yield(const DriverRawPtr driver, int64_t unaccounted_runtime_ns) const = 0;

DriverQueueMetrics* _metrics;
};

// SubQuerySharedDriverQueue is used to store the driver waiting to be executed.
Expand Down Expand Up @@ -82,6 +86,7 @@ class SubQuerySharedDriverQueue {
inline bool empty() const { return num_drivers == 0; }

inline size_t size() const { return num_drivers; }
void set_metrics(DriverQueueMetrics* metrics) { _metrics = metrics; }

std::deque<DriverRawPtr> queue;
std::queue<DriverRawPtr> pending_cancel_queue;
Expand All @@ -93,13 +98,14 @@ class SubQuerySharedDriverQueue {

private:
std::atomic<int64_t> _accu_consume_time = 0;
DriverQueueMetrics* _metrics;
};

class QuerySharedDriverQueue : public FactoryMethod<DriverQueue, QuerySharedDriverQueue> {
friend class FactoryMethod<DriverQueue, QuerySharedDriverQueue>;

public:
QuerySharedDriverQueue();
QuerySharedDriverQueue(DriverQueueMetrics* metrics);
~QuerySharedDriverQueue() override = default;
void close() override;
void put_back(const DriverRawPtr driver) override;
Expand Down Expand Up @@ -149,6 +155,7 @@ class WorkGroupDriverQueue : public FactoryMethod<DriverQueue, WorkGroupDriverQu
friend class FactoryMethod<DriverQueue, WorkGroupDriverQueue>;

public:
WorkGroupDriverQueue(DriverQueueMetrics* metrics) : FactoryMethod(metrics) {}
~WorkGroupDriverQueue() override = default;
void close() override;

Expand Down
Loading
Loading