Skip to content

Commit

Permalink
[BugFix] Fix transaction stream load lock leak (StarRocks#53564)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Dec 5, 2024
1 parent 49fa020 commit 781e125
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 30 deletions.
56 changes: 27 additions & 29 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <deque>
#include <future>
#include <sstream>
#include <utility>

// use string iequal
#include <event2/buffer.h>
Expand Down Expand Up @@ -144,22 +145,14 @@ void TransactionStreamLoadAction::_send_error_reply(HttpRequest* req, const Stat
}

void TransactionStreamLoadAction::handle(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const auto& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request, handle: " << req->debug_string();
}

StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});
ctx->last_active_ts = MonotonicNanos();

if (!ctx->status.ok()) {
Expand All @@ -178,14 +171,12 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {
}

auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();

_send_reply(req, resp);
}

int TransactionStreamLoadAction::on_header(HttpRequest* req) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request: " << req->debug_string();
LOG(INFO) << "transaction streaming load request, header: " << req->debug_string();
}

const auto& label = req->header(HTTP_LABEL_KEY);
Expand Down Expand Up @@ -229,6 +220,9 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
_send_error_reply(req, Status::TransactionInProcessing("Transaction in processing, please retry later"));
return -1;
}
// referenced by the http request
ctx->ref();
req->set_handler_ctx(ctx);
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos = 0;
ctx->receive_bytes = 0;
Expand All @@ -242,7 +236,6 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();
_send_reply(req, resp);
return -1;
}
Expand Down Expand Up @@ -486,22 +479,10 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S
}

void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
StreamLoadContext* ctx = nullptr;
const string& label = req->header(HTTP_LABEL_KEY);
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
}
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(req->handler_ctx());
if (ctx == nullptr) {
return;
}
DeferOp defer([&] {
if (ctx->unref()) {
delete ctx;
}
});

SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(ctx->instance_mem_tracker.get());

Expand Down Expand Up @@ -562,6 +543,23 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
ctx->last_active_ts = MonotonicNanos();
ctx->received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
ctx->total_received_data_cost_nanos += ctx->last_active_ts - start_read_data_time;
VLOG(1) << "Receive http chunk, " << ctx->brief() << ", total expected bytes: " << ctx->body_bytes
<< ", total received bytes: " << ctx->total_receive_bytes;
}

void TransactionStreamLoadAction::free_handler_ctx(void* param) {
StreamLoadContext* ctx = static_cast<StreamLoadContext*>(param);
if (ctx == nullptr) {
return;
}
DCHECK(!ctx->lock.try_lock());
ctx->lock.unlock();
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "free handler context, " << ctx->brief();
}
if (ctx->unref()) {
delete ctx;
}
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/http/action/transaction_stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class TransactionStreamLoadAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;

void free_handler_ctx(void* ctx) override;

private:
Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Status _channel_on_header(HttpRequest* http_req, StreamLoadContext* ctx);
Expand Down
131 changes: 130 additions & 1 deletion be/test/http/transaction_stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/transaction_mgr.h"
#include "testutil/assert.h"
#include "util/brpc_stub_cache.h"
#include "util/cpu_info.h"

Expand Down Expand Up @@ -89,7 +90,7 @@ class TransactionStreamLoadActionTest : public testing::Test {
}
}

private:
protected:
ExecEnv _env;
evhttp_request* _evhttp_req = nullptr;
};
Expand Down Expand Up @@ -311,6 +312,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_commit_success) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -361,6 +363,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_prepared_success) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -411,6 +414,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_put_fail) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -463,6 +467,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_commit_fe_fail) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -515,6 +520,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_prepare_fe_fail) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -586,6 +592,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_plan_fail) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -657,6 +664,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_idle_timeout) {
TransactionStreamLoadAction action(&_env);

HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand Down Expand Up @@ -694,6 +702,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) {
TransactionStreamLoadAction action(&_env);
{
HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand All @@ -714,6 +723,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) {

{
HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand All @@ -734,6 +744,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) {

{
HttpRequest request(_evhttp_req);
request.set_handler(&action);

struct evhttp_request ev_req;
ev_req.remote_host = nullptr;
Expand All @@ -748,4 +759,122 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) {
}
}

TEST_F(TransactionStreamLoadActionTest, free_handler_ctx) {
TransactionStreamLoadAction action(&_env);
auto ctx = new StreamLoadContext(&_env);
ctx->ref();
ctx->db = "db";
ctx->table = "tbl";
ctx->label = "free_handler_ctx";
ctx->body_sink = std::make_shared<StreamLoadPipe>();
bool remove_from_stream_context_mgr = false;
DeferOp defer([&]() {
if (remove_from_stream_context_mgr) {
_env.stream_context_mgr()->remove(ctx->label);
}
if (ctx->unref()) {
delete ctx;
}
});
ASSERT_OK((_env.stream_context_mgr())->put(ctx->label, ctx));
remove_from_stream_context_mgr = true;
ASSERT_TRUE(ctx->lock.try_lock());
ctx->lock.unlock();

// normal request
{
k_response_str = "";
HttpRequest request(_evhttp_req);
request.set_handler(&action);
std::string content = "abc";
auto evb = evbuffer_new();
evbuffer_add(evb, content.data(), content.size());
DeferOp free_evb([&]() { evbuffer_free(evb); });
struct evhttp_request ev_req{.remote_host = nullptr, .input_buffer = evb};
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, std::to_string(content.length()));
request._headers.emplace(HTTP_DB_KEY, ctx->db);
request._headers.emplace(HTTP_TABLE_KEY, ctx->table);
request._headers.emplace(HTTP_LABEL_KEY, ctx->label);
ASSERT_EQ(0, action.on_header(&request));
StreamLoadContext* req_ctx = (StreamLoadContext*)request.handler_ctx();
ASSERT_EQ(ctx, req_ctx);
ASSERT_EQ(3, ctx->num_refs());
ASSERT_FALSE(ctx->lock.try_lock());
ASSERT_TRUE(k_response_str.empty());
action.on_chunk_data(&request);
ASSERT_EQ(3, ctx->num_refs());
ASSERT_FALSE(ctx->lock.try_lock());
action.handle(&request);
ASSERT_EQ(3, ctx->num_refs());
ASSERT_FALSE(ctx->lock.try_lock());
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
ASSERT_STREQ("OK", doc["Status"].GetString());
}
ASSERT_EQ(2, ctx->num_refs());
ASSERT_TRUE(ctx->lock.try_lock());
ctx->lock.unlock();

// on_header fail
{
k_response_str = "";
HttpRequest request(_evhttp_req);
request.set_handler(&action);
std::string content = "abc";
auto evb = evbuffer_new();
evbuffer_add(evb, content.data(), content.size());
DeferOp free_evb([&]() { evbuffer_free(evb); });
struct evhttp_request ev_req{.remote_host = nullptr, .input_buffer = evb};
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, std::to_string(content.length()));
request._headers.emplace(HTTP_DB_KEY, ctx->db + "x");
request._headers.emplace(HTTP_TABLE_KEY, ctx->table);
request._headers.emplace(HTTP_LABEL_KEY, ctx->label);
ASSERT_EQ(-1, action.on_header(&request));
StreamLoadContext* req_ctx = (StreamLoadContext*)request.handler_ctx();
ASSERT_EQ(nullptr, req_ctx);
ASSERT_EQ(2, ctx->num_refs());
ASSERT_TRUE(ctx->lock.try_lock());
ctx->lock.unlock();
rapidjson::Document doc;
doc.Parse(k_response_str.c_str());
ASSERT_STREQ("INVALID_ARGUMENT", doc["Status"].GetString());
ASSERT_NE(nullptr,
std::strstr(doc["Message"].GetString(), "Request database dbx not equal transaction database db"));
}
ASSERT_EQ(2, ctx->num_refs());
ASSERT_TRUE(ctx->lock.try_lock());
ctx->lock.unlock();

// skip on_chunk_data and handle
{
k_response_str = "";
HttpRequest request(_evhttp_req);
request.set_handler(&action);
std::string content = "abc";
auto evb = evbuffer_new();
evbuffer_add(evb, content.data(), content.size());
DeferOp free_evb([&]() { evbuffer_free(evb); });
struct evhttp_request ev_req{.remote_host = nullptr, .input_buffer = evb};
request._ev_req = &ev_req;
request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo=");
request._headers.emplace(HttpHeaders::CONTENT_LENGTH, std::to_string(content.length()));
request._headers.emplace(HTTP_DB_KEY, ctx->db);
request._headers.emplace(HTTP_TABLE_KEY, ctx->table);
request._headers.emplace(HTTP_LABEL_KEY, ctx->label);
ASSERT_EQ(0, action.on_header(&request));
StreamLoadContext* req_ctx = (StreamLoadContext*)request.handler_ctx();
ASSERT_EQ(ctx, req_ctx);
ASSERT_EQ(3, ctx->num_refs());
ASSERT_FALSE(ctx->lock.try_lock());
ASSERT_TRUE(k_response_str.empty());
}
ASSERT_EQ(2, ctx->num_refs());
ASSERT_TRUE(ctx->lock.try_lock());
ctx->lock.unlock();
}

} // namespace starrocks

0 comments on commit 781e125

Please sign in to comment.