Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix transaction stream load lock leak #53564

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 27 additions & 29 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <deque>
#include <future>
#include <sstream>
#include <utility>

// use string iequal
#include <event2/buffer.h>
Expand Down Expand Up @@ -144,22 +145,14 @@ void TransactionStreamLoadAction::_send_error_reply(HttpRequest* req, const Stat
}

void TransactionStreamLoadAction::handle(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const auto& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request, handle: " << req->debug_string();
}

StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});
ctx->last_active_ts = MonotonicNanos();

if (!ctx->status.ok()) {
Expand All @@ -179,14 +172,12 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {
}

auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();

_send_reply(req, resp);
}

int TransactionStreamLoadAction::on_header(HttpRequest* req) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request: " << req->debug_string();
LOG(INFO) << "transaction streaming load request, header: " << req->debug_string();
}

const auto& label = req->header(HTTP_LABEL_KEY);
Expand Down Expand Up @@ -230,6 +221,9 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
_send_error_reply(req, Status::TransactionInProcessing("Transaction in processing, please retry later"));
return -1;
}
// referenced by the http request
ctx->ref();
req->set_handler_ctx(ctx);
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos = 0;
ctx->receive_bytes = 0;
Expand All @@ -243,7 +237,6 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
(void)_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();
_send_reply(req, resp);
return -1;
}
Expand Down Expand Up @@ -501,22 +494,10 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S
}

void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const string& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
}
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});

SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(ctx->instance_mem_tracker.get());

Expand Down Expand Up @@ -584,6 +565,23 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
ctx->total_received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
VLOG(1) << "Receive http chunk, " << ctx->brief() << ", total expected bytes: " << ctx->body_bytes
<< ", total received bytes: " << ctx->total_receive_bytes;
}

void TransactionStreamLoadAction::free_handler_ctx(void* param) {
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(param);
if (ctx == nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall check nullptr against param before convert to StreamLoadContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast can be applied to nullptr so it's safe to check after the cast

return;
}
DCHECK(!ctx->lock.try_lock());
ctx->lock.unlock();
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "free handler context, " << ctx->brief();
}
if (ctx->unref()) {
delete ctx;
}
}

} // namespace starrocks
banmoy marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions be/src/http/action/transaction_stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class TransactionStreamLoadAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;

void free_handler_ctx(void* ctx) override;

private:
Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Status _channel_on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Expand Down
Loading
Loading