Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Add storage-related profile for cloud-native #54007

Merged
merged 7 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/fs/fs_starlet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "io/throttled_seekable_input_stream.h"
#include "service/staros_worker.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/string_parser.hpp"

Expand Down Expand Up @@ -249,6 +250,9 @@ class StarletOutputStream : public starrocks::io::OutputStream {
}

Status close() override {
MonotonicStopWatch watch;
watch.start();
DeferOp defer([&]() { IOProfiler::add_sync(watch.elapsed_time()); });
auto stream_st = _file_ptr->stream();
if (!stream_st.ok()) {
return to_status(stream_st.status());
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/direct_s3_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ Status DirectS3OutputStream::close() {
}

if (!_upload_id.empty() && !_etags.empty()) {
MonotonicStopWatch watch;
watch.start();
RETURN_IF_ERROR(complete_multipart_upload());
IOProfiler::add_sync(watch.elapsed_time());
}

_client = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/s3_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ Status S3OutputStream::close() {
return Status::OK();
}

MonotonicStopWatch watch;
watch.start();
if (_upload_id.empty()) {
RETURN_IF_ERROR(singlepart_upload());
} else {
RETURN_IF_ERROR(multipart_upload());
RETURN_IF_ERROR(complete_multipart_upload());
}
IOProfiler::add_sync(watch.elapsed_time());
_client = nullptr;
return Status::OK();
}
Expand Down
131 changes: 118 additions & 13 deletions be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#include "serde/protobuf_serde.h"
#include "service/backend_options.h"
#include "storage/lake/async_delta_writer.h"
#include "storage/lake/delta_writer.h"
#include "storage/lake/delta_writer_finish_mode.h"
#include "storage/memtable.h"
#include "storage/memtable_flush_executor.h"
#include "storage/storage_engine.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/compression/block_compression.h"
Expand All @@ -52,6 +54,7 @@ class TabletManager;

class LakeTabletsChannel : public TabletsChannel {
using AsyncDeltaWriter = lake::AsyncDeltaWriter;
using DeltaWriter = lake::DeltaWriter;
using TxnLogPtr = AsyncDeltaWriter::TxnLogPtr;

public:
Expand Down Expand Up @@ -79,9 +82,7 @@ class LakeTabletsChannel : public TabletsChannel {

void abort(const std::vector<int64_t>& tablet_ids, const std::string& reason) override { return abort(); }

void update_profile() override {
// TODO add profile for lake
}
void update_profile() override;

MemTracker* mem_tracker() { return _mem_tracker; }

Expand Down Expand Up @@ -201,6 +202,8 @@ class LakeTabletsChannel : public TabletsChannel {

void _flush_stale_memtables();

void _update_tablet_profile(DeltaWriter* writer, RuntimeProfile* profile);

LoadChannel* _load_channel;
lake::TabletManager* _tablet_manager;

Expand Down Expand Up @@ -240,8 +243,10 @@ class LakeTabletsChannel : public TabletsChannel {
std::map<string, string> _column_to_expr_value;

// Profile counters
// Number of tablets
RuntimeProfile::Counter* _tablets_num = nullptr;
// Number of times that update_profile() is called
RuntimeProfile::Counter* _profile_update_counter = nullptr;
// Accumulated time for update_profile()
RuntimeProfile::Counter* _profile_update_timer = nullptr;
// Number of times that open() is called
RuntimeProfile::Counter* _open_counter = nullptr;
// Accumulated time of open()
Expand All @@ -256,6 +261,9 @@ class LakeTabletsChannel : public TabletsChannel {
RuntimeProfile::Counter* _wait_flush_timer = nullptr;
// Accumulated time to wait for async delta writers in add_chunk()
RuntimeProfile::Counter* _wait_writer_timer = nullptr;

std::atomic<bool> _is_updating_profile{false};
std::unique_ptr<RuntimeProfile> _tablets_profile;
};

LakeTabletsChannel::LakeTabletsChannel(LoadChannel* load_channel, lake::TabletManager* tablet_manager,
Expand All @@ -268,14 +276,16 @@ LakeTabletsChannel::LakeTabletsChannel(LoadChannel* load_channel, lake::TabletMa
_mem_tracker(mem_tracker),
_mem_pool(std::make_unique<MemPool>()) {
_profile = parent_profile->create_child(fmt::format("Index (id={})", key.index_id));
_tablets_num = ADD_COUNTER(_profile, "TabletsNum", TUnit::UNIT);
_open_counter = ADD_COUNTER(_profile, "OpenCount", TUnit::UNIT);
_open_timer = ADD_TIMER(_profile, "OpenTime");
_add_chunk_counter = ADD_COUNTER(_profile, "AddChunkCount", TUnit::UNIT);
_add_chunk_timer = ADD_TIMER(_profile, "AddChunkTime");
_profile_update_counter = ADD_COUNTER(_profile, "ProfileUpdateCount", TUnit::UNIT);
_profile_update_timer = ADD_TIMER(_profile, "ProfileUpdateTime");
_open_counter = ADD_COUNTER(_profile, "OpenRpcCount", TUnit::UNIT);
_open_timer = ADD_TIMER(_profile, "OpenRpcTime");
_add_chunk_counter = ADD_COUNTER(_profile, "AddChunkRpcCount", TUnit::UNIT);
_add_chunk_timer = ADD_TIMER(_profile, "AddChunkRpcTime");
_add_row_num = ADD_COUNTER(_profile, "AddRowNum", TUnit::UNIT);
_wait_flush_timer = ADD_CHILD_TIMER(_profile, "WaitFlushTime", "AddChunkTime");
_wait_writer_timer = ADD_CHILD_TIMER(_profile, "WaitWriterTime", "AddChunkTime");
_wait_flush_timer = ADD_CHILD_TIMER(_profile, "WaitFlushTime", "AddChunkRpcTime");
_wait_writer_timer = ADD_CHILD_TIMER(_profile, "WaitWriteTime", "AddChunkRpcTime");
_tablets_profile = std::make_unique<RuntimeProfile>("TabletsProfile");
}

LakeTabletsChannel::~LakeTabletsChannel() {
Expand Down Expand Up @@ -329,7 +339,6 @@ Status LakeTabletsChannel::open(const PTabletWriterOpenRequest& params, PTabletW
VLOG(2) << "check tablet writer for tablet " << id << ", partition " << writer->partition_id() << ", txn "
<< _txn_id << ", is_immutable " << writer->is_immutable();
}
COUNTER_SET(_tablets_num, (int64_t)_delta_writers.size());

return Status::OK();
}
Expand Down Expand Up @@ -781,6 +790,102 @@ Status LakeTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
return Status::OK();
}

void LakeTabletsChannel::update_profile() {
if (_profile == nullptr) {
return;
}

bool expect = false;
if (!_is_updating_profile.compare_exchange_strong(expect, true)) {
// skip concurrent update
return;
}
DeferOp defer([this]() { _is_updating_profile.store(false); });
_profile->inc_version();
COUNTER_UPDATE(_profile_update_counter, 1);
SCOPED_TIMER(_profile_update_timer);

std::vector<DeltaWriter*> delta_writers;
{
std::shared_lock<bthreads::BThreadSharedMutex> lk(_rw_mtx);
delta_writers.reserve(_delta_writers.size());
for (auto& [tablet_id, async_writer] : _delta_writers) {
delta_writers.push_back(async_writer->delta_writer());
}
}

std::vector<RuntimeProfile*> tablets_profile;
tablets_profile.reserve(delta_writers.size());
for (auto* writer : delta_writers) {
RuntimeProfile* profile = _tablets_profile->create_child(fmt::format("{}", writer->tablet_id()));
_update_tablet_profile(writer, profile);
tablets_profile.push_back(profile);
}

ObjectPool obj_pool;
if (!tablets_profile.empty()) {
auto* merged_profile = RuntimeProfile::merge_isomorphic_profiles(&obj_pool, tablets_profile);
RuntimeProfile* final_profile = _profile->create_child("PeerReplicas");
auto* tablets_counter = ADD_COUNTER(final_profile, "TabletsNum", TUnit::UNIT);
COUNTER_UPDATE(tablets_counter, tablets_profile.size());
final_profile->copy_all_info_strings_from(merged_profile);
final_profile->copy_all_counters_from(merged_profile);
}
}

#define ADD_AND_UPDATE_COUNTER(profile, name, type, val) (ADD_COUNTER(profile, name, type))->update(val)
#define ADD_AND_UPDATE_TIMER(profile, name, val) (ADD_TIMER(profile, name))->update(val)
#define DEFAULT_IF_NULL(ptr, value, default_value) ((ptr) ? (value) : (default_value))

void LakeTabletsChannel::_update_tablet_profile(DeltaWriter* writer, RuntimeProfile* profile) {
const lake::DeltaWriterStat& writer_stat = writer->get_writer_stat();
ADD_AND_UPDATE_COUNTER(profile, "WriterTaskCount", TUnit::UNIT, writer_stat.task_count);
ADD_AND_UPDATE_TIMER(profile, "WriterTaskPendingTime", writer_stat.pending_time_ns);
ADD_AND_UPDATE_COUNTER(profile, "WriteCount", TUnit::UNIT, writer_stat.write_count);
ADD_AND_UPDATE_COUNTER(profile, "RowCount", TUnit::UNIT, writer_stat.row_count);
ADD_AND_UPDATE_TIMER(profile, "WriteTime", writer_stat.write_time_ns);
ADD_AND_UPDATE_COUNTER(profile, "MemtableFullCount", TUnit::UNIT, writer_stat.memtable_full_count);
ADD_AND_UPDATE_COUNTER(profile, "MemoryExceedCount", TUnit::UNIT, writer_stat.memory_exceed_count);
ADD_AND_UPDATE_TIMER(profile, "WriteWaitFlushTime", writer_stat.write_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CloseTime", writer_stat.close_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishTime", writer_stat.finish_time_ns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to add a threshold for these counters, otherwise they can be pretty noisy

ADD_AND_UPDATE_TIMER(profile, "FinishWaitFlushTime", writer_stat.finish_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishPrepareTxnLogTime", writer_stat.finish_prepare_txn_log_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishPutTxnLogTime", writer_stat.finish_put_txn_log_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishPkPreloadTime", writer_stat.finish_pk_preload_time_ns);

const FlushStatistic* flush_stat = writer->get_flush_stats();
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushedCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->flush_count, 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushingCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->cur_flush_count, 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableQueueCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->queueing_memtable_num.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "FlushTaskPendingTime", DEFAULT_IF_NULL(flush_stat, flush_stat->pending_time_ns, 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableInsertCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.insert_count.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableInsertTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.insert_time_ns.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableFinalizeTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.finalize_time_ns.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableSortCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.sort_count.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableSortTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.sort_time_ns.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableAggCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.agg_count.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableAggTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.agg_time_ns.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableFlushTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.flush_time_ns.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableIOTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.io_time_ns.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableMemorySize", TUnit::BYTES,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.flush_memory_size.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableDiskSize", TUnit::BYTES,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.flush_disk_size.load(), 0));
}

std::shared_ptr<TabletsChannel> new_lake_tablets_channel(LoadChannel* load_channel, lake::TabletManager* tablet_manager,
const TabletsChannelKey& key, MemTracker* mem_tracker,
RuntimeProfile* parent_profile) {
banmoy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,8 +1063,8 @@ void LocalTabletsChannel::_update_peer_replica_profile(DeltaWriter* writer, Runt
ADD_AND_UPDATE_TIMER(profile, "WriteTime", writer_stat.write_time_ns);
ADD_AND_UPDATE_COUNTER(profile, "MemtableFullCount", TUnit::UNIT, writer_stat.memtable_full_count);
ADD_AND_UPDATE_COUNTER(profile, "MemoryExceedCount", TUnit::UNIT, writer_stat.memory_exceed_count);
ADD_AND_UPDATE_TIMER(profile, "WriteWaitFlushTime", writer_stat.write_wait_flush_tims_ns);
ADD_AND_UPDATE_TIMER(profile, "CloseTime", writer_stat.write_wait_flush_tims_ns);
ADD_AND_UPDATE_TIMER(profile, "WriteWaitFlushTime", writer_stat.write_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CloseTime", writer_stat.close_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitTime", writer_stat.commit_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitWaitFlushTime", writer_stat.commit_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitRowsetBuildTime", writer_stat.commit_rowset_build_time_ns);
Expand All @@ -1076,7 +1076,7 @@ void LocalTabletsChannel::_update_peer_replica_profile(DeltaWriter* writer, Runt
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushedCount", TUnit::UNIT, flush_stat.flush_count);
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushingCount", TUnit::UNIT, flush_stat.cur_flush_count);
ADD_AND_UPDATE_COUNTER(profile, "MemtableQueueCount", TUnit::UNIT, flush_stat.queueing_memtable_num);
ADD_AND_UPDATE_COUNTER(profile, "FlushTaskPendingTime", TUnit::UNIT, flush_stat.pending_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FlushTaskPendingTime", flush_stat.pending_time_ns);
auto& memtable_stat = flush_stat.memtable_stats;
ADD_AND_UPDATE_COUNTER(profile, "MemtableInsertCount", TUnit::UNIT, memtable_stat.insert_count);
ADD_AND_UPDATE_TIMER(profile, "MemtableInsertTime", memtable_stat.insert_time_ns);
Expand Down
40 changes: 22 additions & 18 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

namespace starrocks {

#define ADD_COUNTER_RELAXED(counter, value) counter.fetch_add(value, std::memory_order_relaxed)

StatusOr<std::unique_ptr<DeltaWriter>> DeltaWriter::open(const DeltaWriterOptions& opt, MemTracker* mem_tracker) {
std::unique_ptr<DeltaWriter> writer(new DeltaWriter(opt, mem_tracker, StorageEngine::instance()));
SCOPED_THREAD_LOCAL_MEM_SETTER(mem_tracker, false);
Expand Down Expand Up @@ -417,9 +419,10 @@ Status DeltaWriter::_check_partial_update_with_sort_key(const Chunk& chunk) {
Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size) {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
RETURN_IF_ERROR(_check_partial_update_with_sort_key(chunk));
_stats.write_count += 1;
_stats.row_count += size;
SCOPED_RAW_TIMER(&_stats.write_time_ns);
ADD_COUNTER_RELAXED(_stats.write_count, 1);
ADD_COUNTER_RELAXED(_stats.row_count, size);
int64_t start_time = MonotonicNanos();
DeferOp defer([&]() { ADD_COUNTER_RELAXED(_stats.write_time_ns, MonotonicNanos() - start_time); });

// Delay the creation memtables until we write data.
// Because for the tablet which doesn't have any written data, we will not use their memtables.
Expand Down Expand Up @@ -468,7 +471,7 @@ Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t
} else if (full) {
st = flush_memtable_async();
_reset_mem_table();
_stats.memtable_full_count += 1;
ADD_COUNTER_RELAXED(_stats.memtable_full_count, 1);
}
if (!st.ok()) {
_set_state(kAborted, st);
Expand Down Expand Up @@ -503,11 +506,11 @@ Status DeltaWriter::write_segment(const SegmentPB& segment_pb, butil::IOBuf& dat
auto io_stat = scope.current_scoped_tls_io();
auto io_time_ns = io_stat.write_time_ns + io_stat.sync_time_ns;

_stats.add_segment_count += 1;
_stats.row_count += segment_pb.num_rows();
_stats.add_segment_data_size += segment_pb.data_size();
_stats.add_segment_time_ns += duration_ns;
_stats.add_segment_io_time_ns += io_time_ns;
ADD_COUNTER_RELAXED(_stats.add_segment_count, 1);
ADD_COUNTER_RELAXED(_stats.row_count, segment_pb.num_rows());
ADD_COUNTER_RELAXED(_stats.add_segment_data_size, segment_pb.data_size());
ADD_COUNTER_RELAXED(_stats.add_segment_time_ns, duration_ns);
ADD_COUNTER_RELAXED(_stats.add_segment_io_time_ns, io_time_ns);

StarRocksMetrics::instance()->segment_flush_total.increment(1);
StarRocksMetrics::instance()->segment_flush_duration_us.increment(duration_ns / 1000);
Expand All @@ -520,7 +523,8 @@ Status DeltaWriter::write_segment(const SegmentPB& segment_pb, butil::IOBuf& dat

Status DeltaWriter::close() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
SCOPED_RAW_TIMER(&_stats.close_time_ns);
int64_t start_time = MonotonicNanos();
DeferOp defer([&]() { ADD_COUNTER_RELAXED(_stats.close_time_ns, MonotonicNanos() - start_time); });
auto state = get_state();
switch (state) {
case kUninitialized:
Expand Down Expand Up @@ -638,8 +642,8 @@ Status DeltaWriter::_flush_memtable() {
watch.start();
Status st = _flush_token->wait();
auto elapsed_time = watch.elapsed_time();
_stats.memory_exceed_count += 1;
_stats.write_wait_flush_tims_ns += elapsed_time;
ADD_COUNTER_RELAXED(_stats.memory_exceed_count, 1);
ADD_COUNTER_RELAXED(_stats.write_wait_flush_time_ns, elapsed_time);
StarRocksMetrics::instance()->delta_writer_wait_flush_duration_us.increment(elapsed_time / 1000);
return st;
}
Expand Down Expand Up @@ -779,12 +783,12 @@ Status DeltaWriter::commit() {
}
}
VLOG(2) << "Closed delta writer. tablet_id: " << _tablet->tablet_id() << ", stats: " << _flush_token->get_stats();
_stats.commit_time_ns += watch.elapsed_time();
_stats.commit_wait_flush_time_ns += flush_ts;
_stats.commit_rowset_build_time_ns += rowset_build_ts - flush_ts;
_stats.commit_finish_pk_time_ns += pk_finish_ts - rowset_build_ts;
_stats.commit_wait_replica_time_ns += replica_ts - pk_finish_ts;
_stats.commit_txn_commit_time_ns += commit_txn_ts - replica_ts;
ADD_COUNTER_RELAXED(_stats.commit_time_ns, watch.elapsed_time());
ADD_COUNTER_RELAXED(_stats.commit_wait_flush_time_ns, flush_ts);
ADD_COUNTER_RELAXED(_stats.commit_rowset_build_time_ns, rowset_build_ts - flush_ts);
ADD_COUNTER_RELAXED(_stats.commit_finish_pk_time_ns, pk_finish_ts - rowset_build_ts);
ADD_COUNTER_RELAXED(_stats.commit_wait_replica_time_ns, replica_ts - pk_finish_ts);
ADD_COUNTER_RELAXED(_stats.commit_txn_commit_time_ns, commit_txn_ts - replica_ts);
StarRocksMetrics::instance()->delta_writer_wait_flush_duration_us.increment(flush_ts / 1000);
StarRocksMetrics::instance()->delta_writer_wait_replica_duration_us.increment((replica_ts - pk_finish_ts) / 1000);
return Status::OK();
Expand Down
Loading
Loading