Skip to content

Commit

Permalink
[Enhancement] Add storage-related profile for cloud-native (#54007)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Dec 30, 2024
1 parent e4c7296 commit f73d626
Show file tree
Hide file tree
Showing 15 changed files with 330 additions and 90 deletions.
4 changes: 4 additions & 0 deletions be/src/fs/fs_starlet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "io/throttled_seekable_input_stream.h"
#include "service/staros_worker.h"
#include "storage/olap_common.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/string_parser.hpp"

Expand Down Expand Up @@ -249,6 +250,9 @@ class StarletOutputStream : public starrocks::io::OutputStream {
}

Status close() override {
MonotonicStopWatch watch;
watch.start();
DeferOp defer([&]() { IOProfiler::add_sync(watch.elapsed_time()); });
auto stream_st = _file_ptr->stream();
if (!stream_st.ok()) {
return to_status(stream_st.status());
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/direct_s3_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ Status DirectS3OutputStream::close() {
}

if (!_upload_id.empty() && !_etags.empty()) {
MonotonicStopWatch watch;
watch.start();
RETURN_IF_ERROR(complete_multipart_upload());
IOProfiler::add_sync(watch.elapsed_time());
}

_client = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/s3_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ Status S3OutputStream::close() {
return Status::OK();
}

MonotonicStopWatch watch;
watch.start();
if (_upload_id.empty()) {
RETURN_IF_ERROR(singlepart_upload());
} else {
RETURN_IF_ERROR(multipart_upload());
RETURN_IF_ERROR(complete_multipart_upload());
}
IOProfiler::add_sync(watch.elapsed_time());
_client = nullptr;
return Status::OK();
}
Expand Down
131 changes: 118 additions & 13 deletions be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
#include "serde/protobuf_serde.h"
#include "service/backend_options.h"
#include "storage/lake/async_delta_writer.h"
#include "storage/lake/delta_writer.h"
#include "storage/lake/delta_writer_finish_mode.h"
#include "storage/memtable.h"
#include "storage/memtable_flush_executor.h"
#include "storage/storage_engine.h"
#include "util/bthreads/bthread_shared_mutex.h"
#include "util/compression/block_compression.h"
Expand All @@ -52,6 +54,7 @@ class TabletManager;

class LakeTabletsChannel : public TabletsChannel {
using AsyncDeltaWriter = lake::AsyncDeltaWriter;
using DeltaWriter = lake::DeltaWriter;
using TxnLogPtr = AsyncDeltaWriter::TxnLogPtr;

public:
Expand Down Expand Up @@ -79,9 +82,7 @@ class LakeTabletsChannel : public TabletsChannel {

void abort(const std::vector<int64_t>& tablet_ids, const std::string& reason) override { return abort(); }

void update_profile() override {
// TODO add profile for lake
}
void update_profile() override;

MemTracker* mem_tracker() { return _mem_tracker; }

Expand Down Expand Up @@ -201,6 +202,8 @@ class LakeTabletsChannel : public TabletsChannel {

void _flush_stale_memtables();

void _update_tablet_profile(DeltaWriter* writer, RuntimeProfile* profile);

LoadChannel* _load_channel;
lake::TabletManager* _tablet_manager;

Expand Down Expand Up @@ -240,8 +243,10 @@ class LakeTabletsChannel : public TabletsChannel {
std::map<string, string> _column_to_expr_value;

// Profile counters
// Number of tablets
RuntimeProfile::Counter* _tablets_num = nullptr;
// Number of times that update_profile() is called
RuntimeProfile::Counter* _profile_update_counter = nullptr;
// Accumulated time for update_profile()
RuntimeProfile::Counter* _profile_update_timer = nullptr;
// Number of times that open() is called
RuntimeProfile::Counter* _open_counter = nullptr;
// Accumulated time of open()
Expand All @@ -256,6 +261,9 @@ class LakeTabletsChannel : public TabletsChannel {
RuntimeProfile::Counter* _wait_flush_timer = nullptr;
// Accumulated time to wait for async delta writers in add_chunk()
RuntimeProfile::Counter* _wait_writer_timer = nullptr;

std::atomic<bool> _is_updating_profile{false};
std::unique_ptr<RuntimeProfile> _tablets_profile;
};

LakeTabletsChannel::LakeTabletsChannel(LoadChannel* load_channel, lake::TabletManager* tablet_manager,
Expand All @@ -268,14 +276,16 @@ LakeTabletsChannel::LakeTabletsChannel(LoadChannel* load_channel, lake::TabletMa
_mem_tracker(mem_tracker),
_mem_pool(std::make_unique<MemPool>()) {
_profile = parent_profile->create_child(fmt::format("Index (id={})", key.index_id));
_tablets_num = ADD_COUNTER(_profile, "TabletsNum", TUnit::UNIT);
_open_counter = ADD_COUNTER(_profile, "OpenCount", TUnit::UNIT);
_open_timer = ADD_TIMER(_profile, "OpenTime");
_add_chunk_counter = ADD_COUNTER(_profile, "AddChunkCount", TUnit::UNIT);
_add_chunk_timer = ADD_TIMER(_profile, "AddChunkTime");
_profile_update_counter = ADD_COUNTER(_profile, "ProfileUpdateCount", TUnit::UNIT);
_profile_update_timer = ADD_TIMER(_profile, "ProfileUpdateTime");
_open_counter = ADD_COUNTER(_profile, "OpenRpcCount", TUnit::UNIT);
_open_timer = ADD_TIMER(_profile, "OpenRpcTime");
_add_chunk_counter = ADD_COUNTER(_profile, "AddChunkRpcCount", TUnit::UNIT);
_add_chunk_timer = ADD_TIMER(_profile, "AddChunkRpcTime");
_add_row_num = ADD_COUNTER(_profile, "AddRowNum", TUnit::UNIT);
_wait_flush_timer = ADD_CHILD_TIMER(_profile, "WaitFlushTime", "AddChunkTime");
_wait_writer_timer = ADD_CHILD_TIMER(_profile, "WaitWriterTime", "AddChunkTime");
_wait_flush_timer = ADD_CHILD_TIMER(_profile, "WaitFlushTime", "AddChunkRpcTime");
_wait_writer_timer = ADD_CHILD_TIMER(_profile, "WaitWriteTime", "AddChunkRpcTime");
_tablets_profile = std::make_unique<RuntimeProfile>("TabletsProfile");
}

LakeTabletsChannel::~LakeTabletsChannel() {
Expand Down Expand Up @@ -329,7 +339,6 @@ Status LakeTabletsChannel::open(const PTabletWriterOpenRequest& params, PTabletW
VLOG(2) << "check tablet writer for tablet " << id << ", partition " << writer->partition_id() << ", txn "
<< _txn_id << ", is_immutable " << writer->is_immutable();
}
COUNTER_SET(_tablets_num, (int64_t)_delta_writers.size());

return Status::OK();
}
Expand Down Expand Up @@ -781,6 +790,102 @@ Status LakeTabletsChannel::incremental_open(const PTabletWriterOpenRequest& para
return Status::OK();
}

void LakeTabletsChannel::update_profile() {
if (_profile == nullptr) {
return;
}

bool expect = false;
if (!_is_updating_profile.compare_exchange_strong(expect, true)) {
// skip concurrent update
return;
}
DeferOp defer([this]() { _is_updating_profile.store(false); });
_profile->inc_version();
COUNTER_UPDATE(_profile_update_counter, 1);
SCOPED_TIMER(_profile_update_timer);

std::vector<DeltaWriter*> delta_writers;
{
std::shared_lock<bthreads::BThreadSharedMutex> lk(_rw_mtx);
delta_writers.reserve(_delta_writers.size());
for (auto& [tablet_id, async_writer] : _delta_writers) {
delta_writers.push_back(async_writer->delta_writer());
}
}

std::vector<RuntimeProfile*> tablets_profile;
tablets_profile.reserve(delta_writers.size());
for (auto* writer : delta_writers) {
RuntimeProfile* profile = _tablets_profile->create_child(fmt::format("{}", writer->tablet_id()));
_update_tablet_profile(writer, profile);
tablets_profile.push_back(profile);
}

ObjectPool obj_pool;
if (!tablets_profile.empty()) {
auto* merged_profile = RuntimeProfile::merge_isomorphic_profiles(&obj_pool, tablets_profile);
RuntimeProfile* final_profile = _profile->create_child("PeerReplicas");
auto* tablets_counter = ADD_COUNTER(final_profile, "TabletsNum", TUnit::UNIT);
COUNTER_UPDATE(tablets_counter, tablets_profile.size());
final_profile->copy_all_info_strings_from(merged_profile);
final_profile->copy_all_counters_from(merged_profile);
}
}

#define ADD_AND_UPDATE_COUNTER(profile, name, type, val) (ADD_COUNTER(profile, name, type))->update(val)
#define ADD_AND_UPDATE_TIMER(profile, name, val) (ADD_TIMER(profile, name))->update(val)
#define DEFAULT_IF_NULL(ptr, value, default_value) ((ptr) ? (value) : (default_value))

void LakeTabletsChannel::_update_tablet_profile(DeltaWriter* writer, RuntimeProfile* profile) {
const lake::DeltaWriterStat& writer_stat = writer->get_writer_stat();
ADD_AND_UPDATE_COUNTER(profile, "WriterTaskCount", TUnit::UNIT, writer_stat.task_count);
ADD_AND_UPDATE_TIMER(profile, "WriterTaskPendingTime", writer_stat.pending_time_ns);
ADD_AND_UPDATE_COUNTER(profile, "WriteCount", TUnit::UNIT, writer_stat.write_count);
ADD_AND_UPDATE_COUNTER(profile, "RowCount", TUnit::UNIT, writer_stat.row_count);
ADD_AND_UPDATE_TIMER(profile, "WriteTime", writer_stat.write_time_ns);
ADD_AND_UPDATE_COUNTER(profile, "MemtableFullCount", TUnit::UNIT, writer_stat.memtable_full_count);
ADD_AND_UPDATE_COUNTER(profile, "MemoryExceedCount", TUnit::UNIT, writer_stat.memory_exceed_count);
ADD_AND_UPDATE_TIMER(profile, "WriteWaitFlushTime", writer_stat.write_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CloseTime", writer_stat.close_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishTime", writer_stat.finish_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishWaitFlushTime", writer_stat.finish_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishPrepareTxnLogTime", writer_stat.finish_prepare_txn_log_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishPutTxnLogTime", writer_stat.finish_put_txn_log_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FinishPkPreloadTime", writer_stat.finish_pk_preload_time_ns);

const FlushStatistic* flush_stat = writer->get_flush_stats();
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushedCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->flush_count, 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushingCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->cur_flush_count, 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableQueueCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->queueing_memtable_num.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "FlushTaskPendingTime", DEFAULT_IF_NULL(flush_stat, flush_stat->pending_time_ns, 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableInsertCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.insert_count.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableInsertTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.insert_time_ns.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableFinalizeTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.finalize_time_ns.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableSortCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.sort_count.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableSortTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.sort_time_ns.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableAggCount", TUnit::UNIT,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.agg_count.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableAggTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.agg_time_ns.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableFlushTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.flush_time_ns.load(), 0));
ADD_AND_UPDATE_TIMER(profile, "MemtableIOTime",
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.io_time_ns.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableMemorySize", TUnit::BYTES,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.flush_memory_size.load(), 0));
ADD_AND_UPDATE_COUNTER(profile, "MemtableDiskSize", TUnit::BYTES,
DEFAULT_IF_NULL(flush_stat, flush_stat->memtable_stats.flush_disk_size.load(), 0));
}

std::shared_ptr<TabletsChannel> new_lake_tablets_channel(LoadChannel* load_channel, lake::TabletManager* tablet_manager,
const TabletsChannelKey& key, MemTracker* mem_tracker,
RuntimeProfile* parent_profile) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,8 +1063,8 @@ void LocalTabletsChannel::_update_peer_replica_profile(DeltaWriter* writer, Runt
ADD_AND_UPDATE_TIMER(profile, "WriteTime", writer_stat.write_time_ns);
ADD_AND_UPDATE_COUNTER(profile, "MemtableFullCount", TUnit::UNIT, writer_stat.memtable_full_count);
ADD_AND_UPDATE_COUNTER(profile, "MemoryExceedCount", TUnit::UNIT, writer_stat.memory_exceed_count);
ADD_AND_UPDATE_TIMER(profile, "WriteWaitFlushTime", writer_stat.write_wait_flush_tims_ns);
ADD_AND_UPDATE_TIMER(profile, "CloseTime", writer_stat.write_wait_flush_tims_ns);
ADD_AND_UPDATE_TIMER(profile, "WriteWaitFlushTime", writer_stat.write_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CloseTime", writer_stat.close_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitTime", writer_stat.commit_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitWaitFlushTime", writer_stat.commit_wait_flush_time_ns);
ADD_AND_UPDATE_TIMER(profile, "CommitRowsetBuildTime", writer_stat.commit_rowset_build_time_ns);
Expand All @@ -1076,7 +1076,7 @@ void LocalTabletsChannel::_update_peer_replica_profile(DeltaWriter* writer, Runt
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushedCount", TUnit::UNIT, flush_stat.flush_count);
ADD_AND_UPDATE_COUNTER(profile, "MemtableFlushingCount", TUnit::UNIT, flush_stat.cur_flush_count);
ADD_AND_UPDATE_COUNTER(profile, "MemtableQueueCount", TUnit::UNIT, flush_stat.queueing_memtable_num);
ADD_AND_UPDATE_COUNTER(profile, "FlushTaskPendingTime", TUnit::UNIT, flush_stat.pending_time_ns);
ADD_AND_UPDATE_TIMER(profile, "FlushTaskPendingTime", flush_stat.pending_time_ns);
auto& memtable_stat = flush_stat.memtable_stats;
ADD_AND_UPDATE_COUNTER(profile, "MemtableInsertCount", TUnit::UNIT, memtable_stat.insert_count);
ADD_AND_UPDATE_TIMER(profile, "MemtableInsertTime", memtable_stat.insert_time_ns);
Expand Down
40 changes: 22 additions & 18 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

namespace starrocks {

#define ADD_COUNTER_RELAXED(counter, value) counter.fetch_add(value, std::memory_order_relaxed)

StatusOr<std::unique_ptr<DeltaWriter>> DeltaWriter::open(const DeltaWriterOptions& opt, MemTracker* mem_tracker) {
std::unique_ptr<DeltaWriter> writer(new DeltaWriter(opt, mem_tracker, StorageEngine::instance()));
SCOPED_THREAD_LOCAL_MEM_SETTER(mem_tracker, false);
Expand Down Expand Up @@ -417,9 +419,10 @@ Status DeltaWriter::_check_partial_update_with_sort_key(const Chunk& chunk) {
Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size) {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
RETURN_IF_ERROR(_check_partial_update_with_sort_key(chunk));
_stats.write_count += 1;
_stats.row_count += size;
SCOPED_RAW_TIMER(&_stats.write_time_ns);
ADD_COUNTER_RELAXED(_stats.write_count, 1);
ADD_COUNTER_RELAXED(_stats.row_count, size);
int64_t start_time = MonotonicNanos();
DeferOp defer([&]() { ADD_COUNTER_RELAXED(_stats.write_time_ns, MonotonicNanos() - start_time); });

// Delay the creation memtables until we write data.
// Because for the tablet which doesn't have any written data, we will not use their memtables.
Expand Down Expand Up @@ -468,7 +471,7 @@ Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t
} else if (full) {
st = flush_memtable_async();
_reset_mem_table();
_stats.memtable_full_count += 1;
ADD_COUNTER_RELAXED(_stats.memtable_full_count, 1);
}
if (!st.ok()) {
_set_state(kAborted, st);
Expand Down Expand Up @@ -503,11 +506,11 @@ Status DeltaWriter::write_segment(const SegmentPB& segment_pb, butil::IOBuf& dat
auto io_stat = scope.current_scoped_tls_io();
auto io_time_ns = io_stat.write_time_ns + io_stat.sync_time_ns;

_stats.add_segment_count += 1;
_stats.row_count += segment_pb.num_rows();
_stats.add_segment_data_size += segment_pb.data_size();
_stats.add_segment_time_ns += duration_ns;
_stats.add_segment_io_time_ns += io_time_ns;
ADD_COUNTER_RELAXED(_stats.add_segment_count, 1);
ADD_COUNTER_RELAXED(_stats.row_count, segment_pb.num_rows());
ADD_COUNTER_RELAXED(_stats.add_segment_data_size, segment_pb.data_size());
ADD_COUNTER_RELAXED(_stats.add_segment_time_ns, duration_ns);
ADD_COUNTER_RELAXED(_stats.add_segment_io_time_ns, io_time_ns);

StarRocksMetrics::instance()->segment_flush_total.increment(1);
StarRocksMetrics::instance()->segment_flush_duration_us.increment(duration_ns / 1000);
Expand All @@ -520,7 +523,8 @@ Status DeltaWriter::write_segment(const SegmentPB& segment_pb, butil::IOBuf& dat

Status DeltaWriter::close() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
SCOPED_RAW_TIMER(&_stats.close_time_ns);
int64_t start_time = MonotonicNanos();
DeferOp defer([&]() { ADD_COUNTER_RELAXED(_stats.close_time_ns, MonotonicNanos() - start_time); });
auto state = get_state();
switch (state) {
case kUninitialized:
Expand Down Expand Up @@ -638,8 +642,8 @@ Status DeltaWriter::_flush_memtable() {
watch.start();
Status st = _flush_token->wait();
auto elapsed_time = watch.elapsed_time();
_stats.memory_exceed_count += 1;
_stats.write_wait_flush_tims_ns += elapsed_time;
ADD_COUNTER_RELAXED(_stats.memory_exceed_count, 1);
ADD_COUNTER_RELAXED(_stats.write_wait_flush_time_ns, elapsed_time);
StarRocksMetrics::instance()->delta_writer_wait_flush_duration_us.increment(elapsed_time / 1000);
return st;
}
Expand Down Expand Up @@ -779,12 +783,12 @@ Status DeltaWriter::commit() {
}
}
VLOG(2) << "Closed delta writer. tablet_id: " << _tablet->tablet_id() << ", stats: " << _flush_token->get_stats();
_stats.commit_time_ns += watch.elapsed_time();
_stats.commit_wait_flush_time_ns += flush_ts;
_stats.commit_rowset_build_time_ns += rowset_build_ts - flush_ts;
_stats.commit_finish_pk_time_ns += pk_finish_ts - rowset_build_ts;
_stats.commit_wait_replica_time_ns += replica_ts - pk_finish_ts;
_stats.commit_txn_commit_time_ns += commit_txn_ts - replica_ts;
ADD_COUNTER_RELAXED(_stats.commit_time_ns, watch.elapsed_time());
ADD_COUNTER_RELAXED(_stats.commit_wait_flush_time_ns, flush_ts);
ADD_COUNTER_RELAXED(_stats.commit_rowset_build_time_ns, rowset_build_ts - flush_ts);
ADD_COUNTER_RELAXED(_stats.commit_finish_pk_time_ns, pk_finish_ts - rowset_build_ts);
ADD_COUNTER_RELAXED(_stats.commit_wait_replica_time_ns, replica_ts - pk_finish_ts);
ADD_COUNTER_RELAXED(_stats.commit_txn_commit_time_ns, commit_txn_ts - replica_ts);
StarRocksMetrics::instance()->delta_writer_wait_flush_duration_us.increment(flush_ts / 1000);
StarRocksMetrics::instance()->delta_writer_wait_replica_duration_us.increment((replica_ts - pk_finish_ts) / 1000);
return Status::OK();
Expand Down
Loading

0 comments on commit f73d626

Please sign in to comment.