Skip to content

Commit

Permalink
[BugFix] Fix transaction stream load lock leak (#53564)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
(cherry picked from commit 82fffc3)

# Conflicts:
#	be/test/http/transaction_stream_load_test.cpp
  • Loading branch information
banmoy authored and mergify[bot] committed Dec 5, 2024
1 parent bf676fd commit 109e3f5
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 30 deletions.
56 changes: 27 additions & 29 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <deque>
#include <future>
#include <sstream>
#include <utility>

// use string iequal
#include <event2/buffer.h>
Expand Down Expand Up @@ -144,22 +145,14 @@ void TransactionStreamLoadAction::_send_error_reply(HttpRequest* req, const Stat
}

void TransactionStreamLoadAction::handle(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const auto& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request, handle: " << req->debug_string();
}

StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});
ctx->last_active_ts = MonotonicNanos();

if (!ctx->status.ok()) {
Expand All @@ -178,14 +171,12 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {
}

auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();

_send_reply(req, resp);
}

int TransactionStreamLoadAction::on_header(HttpRequest* req) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request: " << req->debug_string();
LOG(INFO) << "transaction streaming load request, header: " << req->debug_string();
}

const auto& label = req->header(HTTP_LABEL_KEY);
Expand Down Expand Up @@ -229,6 +220,9 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
_send_error_reply(req, Status::TransactionInProcessing("Transaction in processing, please retry later"));
return -1;
}
// referenced by the http request
ctx->ref();
req->set_handler_ctx(ctx);
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos = 0;
ctx->receive_bytes = 0;
Expand All @@ -242,7 +236,6 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();
_send_reply(req, resp);
return -1;
}
Expand Down Expand Up @@ -496,22 +489,10 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S
}

void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const string& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
}
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});

SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(ctx->instance_mem_tracker.get());

Expand Down Expand Up @@ -572,6 +553,23 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
ctx->total_received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
VLOG(1) << "Receive http chunk, " << ctx->brief() << ", total expected bytes: " << ctx->body_bytes
<< ", total received bytes: " << ctx->total_receive_bytes;
}

void TransactionStreamLoadAction::free_handler_ctx(void* param) {
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(param);
if (ctx == nullptr) {
return;
}
DCHECK(!ctx->lock.try_lock());
ctx->lock.unlock();
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "free handler context, " << ctx->brief();
}
if (ctx->unref()) {
delete ctx;
}
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/http/action/transaction_stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class TransactionStreamLoadAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;

void free_handler_ctx(void* ctx) override;

private:
Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Status _channel_on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Expand Down
Loading

0 comments on commit 109e3f5

Please sign in to comment.