From 14936314e58f8154672d35a1d382ff531f1fb32a Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 5 Dec 2024 00:51:31 +0800 Subject: [PATCH 1/2] Fix lock leak Signed-off-by: PengFei Li --- .../http/action/transaction_stream_load.cpp | 56 +++-- be/src/http/action/transaction_stream_load.h | 2 + be/test/http/transaction_stream_load_test.cpp | 228 +++++++++++++++--- 3 files changed, 218 insertions(+), 68 deletions(-) diff --git a/be/src/http/action/transaction_stream_load.cpp b/be/src/http/action/transaction_stream_load.cpp index 33e0ff95f0955..5db8bd6afa92f 100644 --- a/be/src/http/action/transaction_stream_load.cpp +++ b/be/src/http/action/transaction_stream_load.cpp @@ -17,6 +17,7 @@ #include #include #include +#include // use string iequal #include @@ -144,22 +145,14 @@ void TransactionStreamLoadAction::_send_error_reply(HttpRequest* req, const Stat } void TransactionStreamLoadAction::handle(HttpRequest* req) { - StreamLoadContext* ctx = nullptr; - const auto& label = req->header(HTTP_LABEL_KEY); - if (!req->header(HTTP_CHANNEL_ID).empty()) { - int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID)); - ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id); - } else { - ctx = _exec_env->stream_context_mgr()->get(label); + if (config::enable_stream_load_verbose_log) { + LOG(INFO) << "transaction streaming load request, handle: " << req->debug_string(); } + + StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); if (ctx == nullptr) { return; } - DeferOp defer([&] { - if (ctx->unref()) { - delete ctx; - } - }); ctx->last_active_ts = MonotonicNanos(); if (!ctx->status.ok()) { @@ -179,14 +172,12 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) { } auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx); - ctx->lock.unlock(); - _send_reply(req, resp); } int TransactionStreamLoadAction::on_header(HttpRequest* req) { if (config::enable_stream_load_verbose_log) { - LOG(INFO) << "transaction streaming load request: " << req->debug_string(); + LOG(INFO) << "transaction streaming load request, header: " << req->debug_string(); } const auto& label = req->header(HTTP_LABEL_KEY); @@ -230,6 +221,9 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) { _send_error_reply(req, Status::TransactionInProcessing("Transaction in processing, please retry later")); return -1; } + // referenced by the http request + ctx->ref(); + req->set_handler_ctx(ctx); ctx->last_active_ts = MonotonicNanos(); ctx->received_data_cost_nanos = 0; ctx->receive_bytes = 0; @@ -243,7 +237,6 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) { (void)_exec_env->transaction_mgr()->_rollback_transaction(ctx); } auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx); - ctx->lock.unlock(); _send_reply(req, resp); return -1; } @@ -501,22 +494,10 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S } void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) { - StreamLoadContext* ctx = nullptr; - const string& label = req->header(HTTP_LABEL_KEY); - if (!req->header(HTTP_CHANNEL_ID).empty()) { - int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID)); - ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id); - } else { - ctx = _exec_env->stream_context_mgr()->get(label); - } + StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); if (ctx == nullptr) { return; } - DeferOp defer([&] { - if (ctx->unref()) { - delete ctx; - } - }); SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(ctx->instance_mem_tracker.get()); @@ -584,6 +565,23 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) { ctx->last_active_ts = MonotonicNanos(); ctx->received_data_cost_nanos += ctx->last_active_ts - start_read_data_time; ctx->total_received_data_cost_nanos += ctx->last_active_ts - start_read_data_time; + VLOG(1) << "Receive http chunk, " << ctx->brief() << ", total expected bytes: " << ctx->body_bytes + << ", total received bytes: " << ctx->total_receive_bytes; +} + +void TransactionStreamLoadAction::free_handler_ctx(void* param) { + StreamLoadContext* ctx = (StreamLoadContext*)param; + if (ctx == nullptr) { + return; + } + DCHECK(!ctx->lock.try_lock()); + ctx->lock.unlock(); + if (config::enable_stream_load_verbose_log) { + LOG(INFO) << "free handler context, " << ctx->brief(); + } + if (ctx->unref()) { + delete ctx; + } } } // namespace starrocks diff --git a/be/src/http/action/transaction_stream_load.h b/be/src/http/action/transaction_stream_load.h index 486191ea069b1..7c5c90c7ef7cc 100644 --- a/be/src/http/action/transaction_stream_load.h +++ b/be/src/http/action/transaction_stream_load.h @@ -55,6 +55,8 @@ class TransactionStreamLoadAction : public HttpHandler { void on_chunk_data(HttpRequest* req) override; + void free_handler_ctx(void* ctx) override; + private: Status _on_header(HttpRequest* http_req, StreamLoadContext* ctx); Status _channel_on_header(HttpRequest* http_req, StreamLoadContext* ctx); diff --git a/be/test/http/transaction_stream_load_test.cpp b/be/test/http/transaction_stream_load_test.cpp index 9390c4f8098b3..ab88c31e16293 100644 --- a/be/test/http/transaction_stream_load_test.cpp +++ b/be/test/http/transaction_stream_load_test.cpp @@ -28,6 +28,7 @@ #include "runtime/stream_load/load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" #include "runtime/stream_load/transaction_mgr.h" +#include "testutil/assert.h" #include "testutil/sync_point.h" #include "util/brpc_stub_cache.h" #include "util/cpu_info.h" @@ -89,7 +90,7 @@ class TransactionStreamLoadActionTest : public testing::Test { } } -private: +protected: ExecEnv _env; evhttp_request* _evhttp_req = nullptr; }; @@ -311,6 +312,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_commit_success) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -361,6 +363,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_prepared_success) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -411,6 +414,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_put_fail) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -463,6 +467,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_commit_fe_fail) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -515,6 +520,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_prepare_fe_fail) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -589,6 +595,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_plan_fail) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -662,6 +669,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_idle_timeout) { TransactionStreamLoadAction action(&_env); HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -699,6 +707,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) { TransactionStreamLoadAction action(&_env); { HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -719,6 +728,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) { { HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -739,6 +749,7 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) { { HttpRequest request(_evhttp_req); + request.set_handler(&action); struct evhttp_request ev_req; ev_req.remote_host = nullptr; @@ -753,52 +764,75 @@ TEST_F(TransactionStreamLoadActionTest, txn_not_same_load) { } } +#define SET_MEMORY_LIMIT_EXCEEDED(stmt) \ + do { \ + DeferOp defer([]() { \ + SyncPoint::GetInstance()->ClearCallBack("ByteBuffer::allocate_with_tracker"); \ + SyncPoint::GetInstance()->DisableProcessing(); \ + }); \ + SyncPoint::GetInstance()->EnableProcessing(); \ + SyncPoint::GetInstance()->SetCallBack("ByteBuffer::allocate_with_tracker", [](void* arg) { \ + *((Status*)arg) = Status::MemoryLimitExceeded("TestFail"); \ + }); \ + { stmt; } \ + } while (0) + TEST_F(TransactionStreamLoadActionTest, huge_malloc) { TransactionStreamLoadAction action(&_env); auto ctx = new StreamLoadContext(&_env); + ctx->db = "db"; + ctx->table = "tbl"; + ctx->label = "huge_malloc"; ctx->ref(); ctx->body_sink = std::make_shared(); + bool remove_from_stream_context_mgr = false; + auto evb = evbuffer_new(); + DeferOp defer([&]() { + if (remove_from_stream_context_mgr) { + _env.stream_context_mgr()->remove(ctx->label); + } + if (ctx->unref()) { + delete ctx; + } + evbuffer_free(evb); + }); + ASSERT_OK((_env.stream_context_mgr())->put(ctx->label, ctx)); + remove_from_stream_context_mgr = true; + HttpRequest request(_evhttp_req); + request.set_handler(&action); std::string content = "abc"; struct evhttp_request ev_req; ev_req.remote_host = nullptr; - auto evb = evbuffer_new(); ev_req.input_buffer = evb; request._ev_req = &ev_req; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); request._headers.emplace(HttpHeaders::CONTENT_LENGTH, "16"); - request._headers.emplace(HTTP_DB_KEY, "db"); - request._headers.emplace(HTTP_LABEL_KEY, "123"); - request._headers.emplace(HTTP_COLUMN_SEPARATOR, "|"); - - (_env._stream_context_mgr)->put("123", ctx); + request._headers.emplace(HTTP_DB_KEY, ctx->db); + request._headers.emplace(HTTP_TABLE_KEY, ctx->table); + request._headers.emplace(HTTP_LABEL_KEY, ctx->label); + ASSERT_EQ(0, action.on_header(&request)); evbuffer_add(evb, content.data(), content.size()); - SyncPoint::GetInstance()->EnableProcessing(); - SyncPoint::GetInstance()->SetCallBack("ByteBuffer::allocate_with_tracker", - [](void* arg) { *((Status*)arg) = Status::MemoryLimitExceeded("TestFail"); }); - ctx->status = Status::OK(); - action.on_chunk_data(&request); - ASSERT_TRUE(ctx->status.is_mem_limit_exceeded()); - SyncPoint::GetInstance()->ClearCallBack("ByteBuffer::allocate_with_tracker"); - SyncPoint::GetInstance()->DisableProcessing(); + SET_MEMORY_LIMIT_EXCEEDED({ + ctx->status = Status::OK(); + action.on_chunk_data(&request); + ASSERT_TRUE(ctx->status.is_mem_limit_exceeded()); + }); ctx->status = Status::OK(); action.on_chunk_data(&request); ASSERT_TRUE(ctx->status.ok()); evbuffer_add(evb, content.data(), content.size()); - SyncPoint::GetInstance()->EnableProcessing(); - SyncPoint::GetInstance()->SetCallBack("ByteBuffer::allocate_with_tracker", - [](void* arg) { *((Status*)arg) = Status::MemoryLimitExceeded("TestFail"); }); - ctx->buffer = ByteBufferPtr(new ByteBuffer(1)); - ctx->status = Status::OK(); - action.on_chunk_data(&request); - ASSERT_TRUE(ctx->status.is_mem_limit_exceeded()); - ctx->buffer = nullptr; - SyncPoint::GetInstance()->ClearCallBack("ByteBuffer::allocate_with_tracker"); - SyncPoint::GetInstance()->DisableProcessing(); + SET_MEMORY_LIMIT_EXCEEDED({ + ctx->buffer = ByteBufferPtr(new ByteBuffer(1)); + ctx->status = Status::OK(); + action.on_chunk_data(&request); + ASSERT_TRUE(ctx->status.is_mem_limit_exceeded()); + ctx->buffer = nullptr; + }); ctx->buffer = ByteBufferPtr(new ByteBuffer(1)); ctx->status = Status::OK(); action.on_chunk_data(&request); @@ -807,17 +841,14 @@ TEST_F(TransactionStreamLoadActionTest, huge_malloc) { evbuffer_add(evb, content.data(), content.size()); auto old_format = ctx->format; - SyncPoint::GetInstance()->EnableProcessing(); - SyncPoint::GetInstance()->SetCallBack("ByteBuffer::allocate_with_tracker", - [](void* arg) { *((Status*)arg) = Status::MemoryLimitExceeded("TestFail"); }); - ctx->format = TFileFormatType::FORMAT_JSON; - ctx->buffer = ByteBufferPtr(new ByteBuffer(1)); - ctx->status = Status::OK(); - action.on_chunk_data(&request); - ASSERT_TRUE(ctx->status.is_mem_limit_exceeded()); - ctx->buffer = nullptr; - SyncPoint::GetInstance()->ClearCallBack("ByteBuffer::allocate_with_tracker"); - SyncPoint::GetInstance()->DisableProcessing(); + SET_MEMORY_LIMIT_EXCEEDED({ + ctx->format = TFileFormatType::FORMAT_JSON; + ctx->buffer = ByteBufferPtr(new ByteBuffer(1)); + ctx->status = Status::OK(); + action.on_chunk_data(&request); + ASSERT_TRUE(ctx->status.is_mem_limit_exceeded()); + ctx->buffer = nullptr; + }); ctx->format = TFileFormatType::FORMAT_JSON; ctx->buffer = ByteBufferPtr(new ByteBuffer(1)); ctx->status = Status::OK(); @@ -825,11 +856,130 @@ TEST_F(TransactionStreamLoadActionTest, huge_malloc) { ASSERT_TRUE(ctx->status.ok()); ctx->buffer = nullptr; ctx->format = old_format; +} + +TEST_F(TransactionStreamLoadActionTest, free_handler_ctx) { + TransactionStreamLoadAction action(&_env); + auto ctx = new StreamLoadContext(&_env); + ctx->ref(); + ctx->db = "db"; + ctx->table = "tbl"; + ctx->label = "free_handler_ctx"; + ctx->body_sink = std::make_shared(); + bool remove_from_stream_context_mgr = false; + DeferOp defer([&]() { + if (remove_from_stream_context_mgr) { + _env.stream_context_mgr()->remove(ctx->label); + } + if (ctx->unref()) { + delete ctx; + } + }); + ASSERT_OK((_env.stream_context_mgr())->put(ctx->label, ctx)); + remove_from_stream_context_mgr = true; + ASSERT_TRUE(ctx->lock.try_lock()); + ctx->lock.unlock(); - if (ctx->unref()) { - delete ctx; + // normal request + { + k_response_str = ""; + HttpRequest request(_evhttp_req); + request.set_handler(&action); + std::string content = "abc"; + auto evb = evbuffer_new(); + evbuffer_add(evb, content.data(), content.size()); + DeferOp free_evb([&]() { evbuffer_free(evb); }); + struct evhttp_request ev_req { + .remote_host = nullptr, .input_buffer = evb + }; + request._ev_req = &ev_req; + request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); + request._headers.emplace(HttpHeaders::CONTENT_LENGTH, std::to_string(content.length())); + request._headers.emplace(HTTP_DB_KEY, ctx->db); + request._headers.emplace(HTTP_TABLE_KEY, ctx->table); + request._headers.emplace(HTTP_LABEL_KEY, ctx->label); + ASSERT_EQ(0, action.on_header(&request)); + StreamLoadContext* req_ctx = (StreamLoadContext*)request.handler_ctx(); + ASSERT_EQ(ctx, req_ctx); + ASSERT_EQ(3, ctx->num_refs()); + ASSERT_FALSE(ctx->lock.try_lock()); + ASSERT_TRUE(k_response_str.empty()); + action.on_chunk_data(&request); + ASSERT_EQ(3, ctx->num_refs()); + ASSERT_FALSE(ctx->lock.try_lock()); + action.handle(&request); + ASSERT_EQ(3, ctx->num_refs()); + ASSERT_FALSE(ctx->lock.try_lock()); + rapidjson::Document doc; + doc.Parse(k_response_str.c_str()); + ASSERT_STREQ("OK", doc["Status"].GetString()); + } + ASSERT_EQ(2, ctx->num_refs()); + ASSERT_TRUE(ctx->lock.try_lock()); + ctx->lock.unlock(); + + // on_header fail + { + k_response_str = ""; + HttpRequest request(_evhttp_req); + request.set_handler(&action); + std::string content = "abc"; + auto evb = evbuffer_new(); + evbuffer_add(evb, content.data(), content.size()); + DeferOp free_evb([&]() { evbuffer_free(evb); }); + struct evhttp_request ev_req { + .remote_host = nullptr, .input_buffer = evb + }; + request._ev_req = &ev_req; + request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); + request._headers.emplace(HttpHeaders::CONTENT_LENGTH, std::to_string(content.length())); + request._headers.emplace(HTTP_DB_KEY, ctx->db + "x"); + request._headers.emplace(HTTP_TABLE_KEY, ctx->table); + request._headers.emplace(HTTP_LABEL_KEY, ctx->label); + ASSERT_EQ(-1, action.on_header(&request)); + StreamLoadContext* req_ctx = (StreamLoadContext*)request.handler_ctx(); + ASSERT_EQ(nullptr, req_ctx); + ASSERT_EQ(2, ctx->num_refs()); + ASSERT_TRUE(ctx->lock.try_lock()); + ctx->lock.unlock(); + rapidjson::Document doc; + doc.Parse(k_response_str.c_str()); + ASSERT_STREQ("INVALID_ARGUMENT", doc["Status"].GetString()); + ASSERT_NE(nullptr, + std::strstr(doc["Message"].GetString(), "Request database dbx not equal transaction database db")); } - evbuffer_free(evb); + ASSERT_EQ(2, ctx->num_refs()); + ASSERT_TRUE(ctx->lock.try_lock()); + ctx->lock.unlock(); + + // skip on_chunk_data and handle + { + k_response_str = ""; + HttpRequest request(_evhttp_req); + request.set_handler(&action); + std::string content = "abc"; + auto evb = evbuffer_new(); + evbuffer_add(evb, content.data(), content.size()); + DeferOp free_evb([&]() { evbuffer_free(evb); }); + struct evhttp_request ev_req { + .remote_host = nullptr, .input_buffer = evb + }; + request._ev_req = &ev_req; + request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); + request._headers.emplace(HttpHeaders::CONTENT_LENGTH, std::to_string(content.length())); + request._headers.emplace(HTTP_DB_KEY, ctx->db); + request._headers.emplace(HTTP_TABLE_KEY, ctx->table); + request._headers.emplace(HTTP_LABEL_KEY, ctx->label); + ASSERT_EQ(0, action.on_header(&request)); + StreamLoadContext* req_ctx = (StreamLoadContext*)request.handler_ctx(); + ASSERT_EQ(ctx, req_ctx); + ASSERT_EQ(3, ctx->num_refs()); + ASSERT_FALSE(ctx->lock.try_lock()); + ASSERT_TRUE(k_response_str.empty()); + } + ASSERT_EQ(2, ctx->num_refs()); + ASSERT_TRUE(ctx->lock.try_lock()); + ctx->lock.unlock(); } } // namespace starrocks From 7384a10b8142ee16dc20beeb8ae7c4b9c10f72fc Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 5 Dec 2024 11:38:39 +0800 Subject: [PATCH 2/2] Fix comments Signed-off-by: PengFei Li --- be/src/http/action/transaction_stream_load.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/src/http/action/transaction_stream_load.cpp b/be/src/http/action/transaction_stream_load.cpp index 5db8bd6afa92f..71aab57e82a0c 100644 --- a/be/src/http/action/transaction_stream_load.cpp +++ b/be/src/http/action/transaction_stream_load.cpp @@ -149,7 +149,7 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) { LOG(INFO) << "transaction streaming load request, handle: " << req->debug_string(); } - StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); + StreamLoadContext* ctx = static_cast(req->handler_ctx()); if (ctx == nullptr) { return; } @@ -494,7 +494,7 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S } void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) { - StreamLoadContext* ctx = (StreamLoadContext*)req->handler_ctx(); + StreamLoadContext* ctx = static_cast(req->handler_ctx()); if (ctx == nullptr) { return; } @@ -570,7 +570,7 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) { } void TransactionStreamLoadAction::free_handler_ctx(void* param) { - StreamLoadContext* ctx = (StreamLoadContext*)param; + StreamLoadContext* ctx = static_cast(param); if (ctx == nullptr) { return; }