Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Optimize merge commit performance #54251

Merged
merged 2 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,12 @@ CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

// The time that stream load pipe waits for the input. The pipe will block the pipeline scan executor
// util the input is available or the timeout is reached. Don't set this value too large to avoid
// blocking the pipeline scan executor for a long time.
CONF_mInt32(merge_commit_stream_load_pipe_block_wait_us, "500");
// The maximum number of bytes that the merge commit stream load pipe can buffer.
CONF_mInt64(merge_commit_stream_load_pipe_max_buffered_bytes, "1073741824");
CONF_Int32(batch_write_thread_pool_num_min, "0");
CONF_Int32(batch_write_thread_pool_num_max, "512");
CONF_Int32(batch_write_thread_pool_queue_size, "4096");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class STATUS_ATTRIBUTE Status {

bool is_end_of_file() const { return code() == TStatusCode::END_OF_FILE; }

bool is_internal_error() const { return code() == TStatusCode::INTERNAL_ERROR; }

bool is_ok_or_eof() const { return ok() || is_end_of_file(); }

bool is_not_found() const { return code() == TStatusCode::NOT_FOUND; }
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ StatusOr<StreamLoadContext*> BatchWriteMgr::create_and_register_pipe(
const std::map<std::string, std::string>& load_parameters, const std::string& label, long txn_id,
const TUniqueId& load_id, int32_t batch_write_interval_ms) {
std::string pipe_name = fmt::format("txn_{}_label_{}_id_{}", txn_id, label, print_id(load_id));
auto pipe = std::make_shared<TimeBoundedStreamLoadPipe>(pipe_name, batch_write_interval_ms);
auto pipe = std::make_shared<TimeBoundedStreamLoadPipe>(pipe_name, batch_write_interval_ms,
config::merge_commit_stream_load_pipe_block_wait_us,
config::merge_commit_stream_load_pipe_max_buffered_bytes);
RETURN_IF_ERROR(exec_env->load_stream_mgr()->put(load_id, pipe));
StreamLoadContext* ctx = new StreamLoadContext(exec_env, load_id);
ctx->ref();
Expand Down Expand Up @@ -188,11 +190,10 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
}
ctx->timeout_second = timeout_second;
}
std::string remote_host;
butil::ip2hostname(cntl->remote_side().ip, &remote_host);
auto user_ip = butil::ip2str(cntl->remote_side().ip);
ctx->auth.user = request->user();
ctx->auth.passwd = request->passwd();
ctx->auth.user_ip = remote_host;
ctx->auth.user_ip.assign(user_ip.c_str());
ctx->load_parameters = get_load_parameters_from_brpc(parameters);

butil::IOBuf& io_buf = cntl->request_attachment();
Expand Down Expand Up @@ -221,6 +222,7 @@ void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller*
}
ctx->buffer->pos += io_buf.size();
ctx->buffer->flip();
ctx->receive_bytes = io_buf.size();
ctx->status = exec_env->batch_write_mgr()->append_data(ctx);
}

Expand Down
103 changes: 64 additions & 39 deletions be/src/runtime/batch_write/isomorphic_batch_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ bool IsomorphicBatchWrite::contain_pipe(StreamLoadContext* pipe_ctx) {
return _dead_stream_load_pipe_ctxs.find(pipe_ctx) != _dead_stream_load_pipe_ctxs.end();
}

bool IsomorphicBatchWrite::is_pipe_alive(starrocks::StreamLoadContext* pipe_ctx) {
std::unique_lock<std::mutex> lock(_mutex);
auto it = _alive_stream_load_pipe_ctxs.find(pipe_ctx);
return it != _alive_stream_load_pipe_ctxs.end();
}

Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
Expand All @@ -231,7 +237,7 @@ Status IsomorphicBatchWrite::append_data(StreamLoadContext* data_ctx) {
async_ctx->latch().wait();
async_ctx->total_cost_ns.store(MonotonicNanos() - async_ctx->create_time_ts);
TRACE_BATCH_WRITE << "wait async finish, " << _batch_write_id << ", user label: " << async_ctx->data_ctx()->label
<< ", data size: " << data_ctx->receive_bytes
<< ", user ip: " << data_ctx->auth.user_ip << ", data size: " << data_ctx->receive_bytes
<< ", total_cost: " << (async_ctx->total_cost_ns / 1000)
<< "us, total_async_cost: " << (async_ctx->total_async_cost_ns / 1000)
<< "us, task_pending_cost: " << (async_ctx->task_pending_cost_ns / 1000)
Expand Down Expand Up @@ -286,68 +292,87 @@ int IsomorphicBatchWrite::_execute_tasks(void* meta, bthread::TaskIterator<Task>
}

Status IsomorphicBatchWrite::_execute_write(AsyncAppendDataContext* async_ctx) {
Status st;
int64_t append_pipe_cost_ns = 0;
int64_t write_data_cost_ns = 0;
int64_t rpc_cost_ns = 0;
int64_t wait_pipe_cost_ns = 0;
int num_retries = 0;
while (num_retries <= config::batch_write_rpc_request_retry_num) {
Status st;
while (true) {
if (_stopped.load(std::memory_order_acquire)) {
return Status::ServiceUnavailable("Batch write is stopped");
st = Status::ServiceUnavailable("Batch write is stopped");
break;
}
int64_t append_ts = MonotonicNanos();
st = _write_data(async_ctx);
int64_t rpc_ts = MonotonicNanos();
append_pipe_cost_ns += rpc_ts - append_ts;
if (st.ok()) {
{
SCOPED_RAW_TIMER(&write_data_cost_ns);
st = _write_data_to_pipe(async_ctx);
}
if (st.ok() || num_retries >= config::batch_write_rpc_request_retry_num) {
break;
}
// TODO check if the error is retryable
st = _send_rpc_request(async_ctx->data_ctx());
int64_t wait_ts = MonotonicNanos();
rpc_cost_ns += wait_ts - rpc_ts;
st = _wait_for_stream_load_pipe();
wait_pipe_cost_ns += MonotonicNanos() - wait_ts;
num_retries += 1;
{
SCOPED_RAW_TIMER(&rpc_cost_ns);
// TODO check if the error is retryable if the return status is not ok
(void)_send_rpc_request(async_ctx->data_ctx());
}
{
SCOPED_RAW_TIMER(&wait_pipe_cost_ns);
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, std::chrono::milliseconds(config::batch_write_rpc_request_retry_interval_ms),
[&]() { return !_alive_stream_load_pipe_ctxs.empty(); });
}
}
async_ctx->append_pipe_cost_ns.store(append_pipe_cost_ns);
async_ctx->append_pipe_cost_ns.store(write_data_cost_ns);
async_ctx->rpc_cost_ns.store(rpc_cost_ns);
async_ctx->wait_pipe_cost_ns.store(wait_pipe_cost_ns);
async_ctx->num_retries.store(num_retries);
if (!st.ok()) {
std::stringstream stream;
stream << "Failed to write data to stream load pipe, num retry: " << num_retries
<< ", write_data: " << (write_data_cost_ns / 1000) << " us, rpc: " << (rpc_cost_ns / 1000)
<< "us, wait_pipe: " << (wait_pipe_cost_ns / 1000) << " us, last error: " << st;
st = Status::InternalError(stream.str());
}
return st;
}

Status IsomorphicBatchWrite::_write_data(AsyncAppendDataContext* async_ctx) {
// TODO write data outside the lock
std::unique_lock<std::mutex> lock(_mutex);
Status st;
Status IsomorphicBatchWrite::_write_data_to_pipe(AsyncAppendDataContext* async_ctx) {
StreamLoadContext* data_ctx = async_ctx->data_ctx();
for (auto it = _alive_stream_load_pipe_ctxs.begin(); it != _alive_stream_load_pipe_ctxs.end();) {
StreamLoadContext* pipe_ctx = *it;
// add reference to the buffer to avoid being released if append fails
while (true) {
StreamLoadContext* pipe_ctx;
{
std::unique_lock<std::mutex> lock(_mutex);
if (!_alive_stream_load_pipe_ctxs.empty()) {
pipe_ctx = *(_alive_stream_load_pipe_ctxs.begin());
// take a reference to avoid being released when appending data to the pipe outside the lock
pipe_ctx->ref();
} else {
return Status::CapacityLimitExceed("No available stream load pipe");
}
}
DeferOp defer([&] { StreamLoadContext::release(pipe_ctx); });
// task a reference to avoid being released by the pipe if append fails
ByteBufferPtr buffer = data_ctx->buffer;
st = pipe_ctx->body_sink->append(std::move(buffer));
Status st = pipe_ctx->body_sink->append(std::move(buffer));
if (st.ok()) {
data_ctx->buffer.reset();
async_ctx->pipe_left_active_ns.store(
static_cast<TimeBoundedStreamLoadPipe*>(pipe_ctx->body_sink.get())->left_active_ns());
async_ctx->set_txn(pipe_ctx->txn_id, pipe_ctx->label);
return st;
return Status::OK();
}
TRACE_BATCH_WRITE << "Fail to append data to stream load pipe, " << _batch_write_id
<< ", user label: " << data_ctx->label << ", txn_id: " << pipe_ctx->txn_id
<< ", label: " << pipe_ctx->label << ", status: " << st;
// if failed, the pipe can't be appended anymore and move it from
// the alive to the dead, and wait for being unregistered
{
std::unique_lock<std::mutex> lock(_mutex);
if (_alive_stream_load_pipe_ctxs.erase(pipe_ctx)) {
_dead_stream_load_pipe_ctxs.emplace(pipe_ctx);
}
}
_dead_stream_load_pipe_ctxs.emplace(pipe_ctx);
it = _alive_stream_load_pipe_ctxs.erase(it);
}
return st.ok() ? Status::CapacityLimitExceed("") : st;
}

Status IsomorphicBatchWrite::_wait_for_stream_load_pipe() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, std::chrono::milliseconds(config::batch_write_rpc_request_retry_interval_ms),
[&]() { return !_alive_stream_load_pipe_ctxs.empty(); });
if (!_alive_stream_load_pipe_ctxs.empty()) {
return Status::OK();
}
return Status::TimedOut("");
}

Status IsomorphicBatchWrite::_send_rpc_request(StreamLoadContext* data_ctx) {
banmoy marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/batch_write/isomorphic_batch_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class IsomorphicBatchWrite {
void unregister_stream_load_pipe(StreamLoadContext* pipe_ctx);
// For testing
bool contain_pipe(StreamLoadContext* pipe_ctx);
bool is_pipe_alive(StreamLoadContext* pipe_ctx);

Status append_data(StreamLoadContext* data_ctx);

Expand All @@ -64,8 +65,7 @@ class IsomorphicBatchWrite {
static int _execute_tasks(void* meta, bthread::TaskIterator<Task>& iter);

Status _execute_write(AsyncAppendDataContext* async_ctx);
Status _write_data(AsyncAppendDataContext* data_ctx);
Status _wait_for_stream_load_pipe();
Status _write_data_to_pipe(AsyncAppendDataContext* data_ctx);
Status _send_rpc_request(StreamLoadContext* data_ctx);
Status _wait_for_load_status(StreamLoadContext* data_ctx, int64_t timeout_ns);

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/stream_load/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ StatusOr<ByteBufferPtr> StreamLoadPipe::read() {
StatusOr<ByteBufferPtr> StreamLoadPipe::no_block_read() {
std::unique_lock<std::mutex> l(_lock);

_get_cond.wait_for(l, std::chrono::milliseconds(_non_blocking_wait_ms),
_get_cond.wait_for(l, std::chrono::microseconds(_non_blocking_wait_us),
[&]() { return _cancelled || _finished || !_buf_queue.empty(); });

// cancelled
Expand Down Expand Up @@ -184,7 +184,7 @@ Status StreamLoadPipe::no_block_read(uint8_t* data, size_t* data_size, bool* eof
if (_read_buf == nullptr || !_read_buf->has_remaining()) {
std::unique_lock<std::mutex> l(_lock);

_get_cond.wait_for(l, std::chrono::milliseconds(_non_blocking_wait_ms),
_get_cond.wait_for(l, std::chrono::microseconds(_non_blocking_wait_us),
[&]() { return _cancelled || _finished || !_buf_queue.empty(); });

// cancelled
Expand Down
11 changes: 8 additions & 3 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ class StreamLoadPipe : public MessageBodySink {
size_t min_chunk_size = DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE)
: StreamLoadPipe(false, -1, max_buffered_bytes, min_chunk_size) {}

StreamLoadPipe(bool non_blocking_read, int32_t non_blocking_wait_ms, size_t max_buffered_bytes,
StreamLoadPipe(bool non_blocking_read, int32_t non_blocking_wait_us, size_t max_buffered_bytes,
size_t min_chunk_size)
: _non_blocking_read(non_blocking_read),
_non_blocking_wait_ms(non_blocking_wait_ms),
_non_blocking_wait_us(non_blocking_wait_us),
_max_buffered_bytes(max_buffered_bytes),
_min_chunk_size(min_chunk_size) {}

Expand Down Expand Up @@ -90,6 +90,11 @@ class StreamLoadPipe : public MessageBodySink {
// called when producer finished
Status finish() override;

bool finished() {
std::unique_lock<std::mutex> l(_lock);
return _finished;
}

// called when producer/consumer failed
void cancel(const Status& status) override;

Expand All @@ -115,7 +120,7 @@ class StreamLoadPipe : public MessageBodySink {
std::mutex _lock;
size_t _buffered_bytes{0};
bool _non_blocking_read{false};
int32_t _non_blocking_wait_ms;
int32_t _non_blocking_wait_us;
size_t _max_buffered_bytes;
size_t _min_chunk_size;
std::deque<ByteBufferPtr> _buf_queue;
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/stream_load/time_bounded_stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

namespace starrocks {

static constexpr int32_t DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS = 50;
static constexpr int32_t DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_US = 500;

class TimeBoundedStreamLoadPipe : public StreamLoadPipe {
public:
TimeBoundedStreamLoadPipe(const std::string& name, int32_t active_window_ms,
int32_t non_blocking_wait_ms = DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS,
int32_t non_blocking_wait_us = DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_US,
size_t max_buffered_bytes = DEFAULT_STREAM_LOAD_PIPE_BUFFERED_BYTES)
: StreamLoadPipe(true, non_blocking_wait_ms, max_buffered_bytes, DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE) {
: StreamLoadPipe(true, non_blocking_wait_us, max_buffered_bytes, DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE) {
_name = name;
_active_window_ns = active_window_ms * (int64_t)1000000;
_start_time_ns = _get_current_ns();
Expand Down
Loading
Loading