Skip to content

Commit

Permalink
[BugFix] Fix transaction stream load lock leak (backport #53564) (#53608
Browse files Browse the repository at this point in the history
)

Signed-off-by: PengFei Li <[email protected]>
Co-authored-by: PengFei Li <[email protected]>
  • Loading branch information
mergify[bot] and banmoy authored Dec 6, 2024
1 parent aeee511 commit a8ca9f8
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 68 deletions.
56 changes: 27 additions & 29 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <deque>
#include <future>
#include <sstream>
#include <utility>

// use string iequal
#include <event2/buffer.h>
Expand Down Expand Up @@ -144,22 +145,14 @@ void TransactionStreamLoadAction::_send_error_reply(HttpRequest* req, const Stat
}

void TransactionStreamLoadAction::handle(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const auto& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request, handle: " << req->debug_string();
}

StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});
ctx->last_active_ts = MonotonicNanos();

if (!ctx->status.ok()) {
Expand All @@ -179,14 +172,12 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {
}

auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();

_send_reply(req, resp);
}

int TransactionStreamLoadAction::on_header(HttpRequest* req) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request: " << req->debug_string();
LOG(INFO) << "transaction streaming load request, header: " << req->debug_string();
}

const auto& label = req->header(HTTP_LABEL_KEY);
Expand Down Expand Up @@ -230,6 +221,9 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
_send_error_reply(req, Status::TransactionInProcessing("Transaction in processing, please retry later"));
return -1;
}
// referenced by the http request
ctx->ref();
req->set_handler_ctx(ctx);
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos = 0;
ctx->receive_bytes = 0;
Expand All @@ -243,7 +237,6 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
(void)_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();
_send_reply(req, resp);
return -1;
}
Expand Down Expand Up @@ -501,22 +494,10 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S
}

void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const string& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
}
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});

SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(ctx->instance_mem_tracker.get());

Expand Down Expand Up @@ -584,6 +565,23 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
ctx->total_received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
VLOG(1) << "Receive http chunk, " << ctx->brief() << ", total expected bytes: " << ctx->body_bytes
<< ", total received bytes: " << ctx->total_receive_bytes;
}

void TransactionStreamLoadAction::free_handler_ctx(void* param) {
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(param);
if (ctx == nullptr) {
return;
}
DCHECK(!ctx->lock.try_lock());
ctx->lock.unlock();
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "free handler context, " << ctx->brief();
}
if (ctx->unref()) {
delete ctx;
}
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/http/action/transaction_stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class TransactionStreamLoadAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;

void free_handler_ctx(void* ctx) override;

private:
Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Status _channel_on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class StreamLoadContext {
// If unref() returns true, this object should be delete
bool unref() { return _refs.fetch_sub(1) == 1; }

int num_refs() { return _refs.load(); }

bool check_and_set_http_limiter(ConcurrentLimiter* limiter);

public:
Expand Down
Loading

0 comments on commit a8ca9f8

Please sign in to comment.