Skip to content

Commit

Permalink
[BugFix] abort compaction task properly during shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Xiaohua Cai <[email protected]>
  • Loading branch information
kevincai committed Feb 7, 2025
1 parent 8537f8c commit ff35b32
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 6 deletions.
76 changes: 71 additions & 5 deletions be/src/storage/lake/compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@

namespace starrocks::lake {

namespace {
static void reject_request(::google::protobuf::RpcController* controller, const CompactRequest* request,
CompactResponse* response) {
auto st = Status::Aborted("Compaction request rejected due to BE/CN shutdown in progress!");
LOG(WARNING) << "Fail to compact num_of_tablets= " << request->tablet_ids().size()
<< ". version=" << request->version() << " txn_id=" << request->txn_id() << " : " << st;
st.to_protobuf(response->mutable_status());
}
} // namespace

CompactionTaskCallback::~CompactionTaskCallback() = default;

CompactionTaskCallback::CompactionTaskCallback(CompactionScheduler* scheduler, const CompactRequest* request,
Expand Down Expand Up @@ -214,13 +224,26 @@ CompactionScheduler::~CompactionScheduler() {

void CompactionScheduler::stop() {
bool expected = false;
if (_stopped.compare_exchange_strong(expected, true)) {
auto changed = false;
{
// hold the lock to exclude new tasks entering the task queue in compact() interface
std::unique_lock lock(_mutex);
changed = _stopped.compare_exchange_strong(expected, true);
}
if (changed) {
_threads->shutdown();
abort_all();
}
}

void CompactionScheduler::compact(::google::protobuf::RpcController* controller, const CompactRequest* request,
CompactResponse* response, ::google::protobuf::Closure* done) {
brpc::ClosureGuard guard(done);
if (_stopped) {
// take a chance to check the _stopped flag without lock
reject_request(controller, request, response);
return;
}
// when FE request a compaction, CN may not have any key cached yet, so pass an encryption_meta to refresh cache
if (!request->encryption_meta().empty()) {
Status st = KeyCache::instance().refresh_keys(request->encryption_meta());
Expand All @@ -237,17 +260,30 @@ void CompactionScheduler::compact(::google::protobuf::RpcController* controller,
for (auto tablet_id : request->tablet_ids()) {
auto context = std::make_unique<CompactionTaskContext>(request->txn_id(), tablet_id, request->version(),
request->force_base_compaction(), cb);
{
std::lock_guard l(_contexts_lock);
_contexts.Append(context.get());
}
contexts_vec.push_back(std::move(context));
// DO NOT touch `context` from here!
}
// initialize last check time, compact request is received right after FE sends it, so consider it valid now
cb->set_last_check_time(time(nullptr));

std::unique_lock lock(_mutex);
// make changes under lock
// perform the check again under lock, so the _stopped and _task_queues operation is atomic
if (_stopped) {
reject_request(controller, request, response);
return;
}
{
std::lock_guard l(_contexts_lock);
for (auto& ctx : contexts_vec) {
_contexts.Append(ctx.get());
}
}
_task_queues.put_by_txn_id(request->txn_id(), contexts_vec);
// DO NOT touch `contexts_vec` from here!
// release the done guard, let CompactionTaskCallback take charge.
guard.release();

TEST_SYNC_POINT("CompactionScheduler::compact:return");
}

Expand Down Expand Up @@ -417,6 +453,21 @@ Status CompactionScheduler::do_compaction(std::unique_ptr<CompactionTaskContext>
return status;
}

void CompactionScheduler::abort_compaction(std::unique_ptr<CompactionTaskContext> context) {
const auto start_time = ::time(nullptr);
const auto tablet_id = context->tablet_id;
const auto txn_id = context->txn_id;
const auto version = context->version;

int64_t in_queue_time_sec = start_time > context->enqueue_time_sec ? (start_time - context->enqueue_time_sec) : 0;
context->stats->in_queue_time_sec += in_queue_time_sec;
context->status = Status::Aborted("Compaction task aborted due to BE/CN shutdown!");
LOG(WARNING) << "Fail to compact tablet " << tablet_id << ". version=" << version << " txn_id=" << txn_id << " : "
<< context->status;
// make sure every task can be finished no matter it is succeeded or failed.
context->callback->finish_task(std::move(context));
}

Status CompactionScheduler::abort(int64_t txn_id) {
std::unique_lock l(_contexts_lock);
for (butil::LinkNode<CompactionTaskContext>* node = _contexts.head(); node != _contexts.end();
Expand All @@ -435,6 +486,21 @@ Status CompactionScheduler::abort(int64_t txn_id) {
return Status::NotFound(fmt::format("no compaction task with txn id {}", txn_id));
}

void CompactionScheduler::abort_all() {
for (int i = 0; i < _task_queues.task_queue_size(); ++i) {
// drain _task_queues, ensure every tasks in queue are properly aborted
bool done = false;
while (!done) {
CompactionContextPtr context;
if (_task_queues.try_get(i, &context)) {
abort_compaction(std::move(context));
} else {
done = true;
}
}
}
}

// If `lake_compaction_max_concurrency` is reduced during runtime, `id` may exceed it.
// Reschedule all the tasks in _task_queues where idx ranges from [new_val, old_val-1].
// return true means current thread id is beyond target size, current thread shoud exist.
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/compaction_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,17 @@ class CompactionScheduler {
private:
friend class CompactionTaskCallback;

// abort all the compaction tasks in the task queue. Only expected to be invoked during stop()
void abort_all();

void remove_states(const std::vector<std::unique_ptr<CompactionTaskContext>>& contexes);

void thread_task(int id);

Status do_compaction(std::unique_ptr<CompactionTaskContext> context);

void abort_compaction(std::unique_ptr<CompactionTaskContext> context);

bool reschedule_task_if_needed(int id);

TabletManager* _tablet_mgr;
Expand All @@ -246,6 +251,7 @@ class CompactionScheduler {
butil::LinkedList<CompactionTaskContext> _contexts;
std::unique_ptr<ThreadPool> _threads;
std::atomic<bool> _stopped{false};
std::mutex _mutex;
WrapTaskQueues _task_queues;
};

Expand Down
103 changes: 102 additions & 1 deletion be/test/storage/lake/compaction_scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,107 @@ TEST_F(LakeCompactionSchedulerTest, test_list_tasks) {
bthread_join(tid, nullptr);
}

TEST_F(LakeCompactionSchedulerTest, test_abort_all) {
// set to single thread mode, so all the tasks will be in the same thread
_compaction_scheduler.update_compact_threads(1);
std::vector<CompactionTaskInfo> tasks;
_compaction_scheduler.list_tasks(&tasks);
EXPECT_EQ(0, tasks.size());

int num_tasks = 16;
auto l0 = std::make_shared<CountDownLatch>(1);
auto l1 = std::make_shared<CountDownLatch>(num_tasks);
auto l2 = std::make_shared<CountDownLatch>(1);
auto l3 = std::make_shared<CountDownLatch>(num_tasks);
EXPECT_EQ(num_tasks, l1->count());

std::vector<bthread_t> tids;
// preserve requests and responses life time
std::vector<std::shared_ptr<CompactRequest>> requests;
std::vector<std::shared_ptr<CompactResponse>> responses;
{ // task 0: block the execution done until l2.count_down()
auto txn_id = next_id();
auto request = std::make_shared<CompactRequest>();
requests.push_back(request);
auto response = std::make_shared<CompactResponse>();
responses.push_back(response);
auto meta = generate_simple_tablet_metadata(DUP_KEYS);
CHECK_OK(_tablet_mgr->put_tablet_metadata(meta));
request->add_tablet_ids(meta->id());
request->set_timeout_ms(60 * 1000); // 60 seconds
request->set_txn_id(txn_id);
request->set_version(1);
// wait l2, count down l0
ASSIGN_OR_ABORT(auto tid, bthreads::start_bthread([&, l1, l2, request, response]() {
auto cb = ::google::protobuf::NewCallback(notify_and_wait_latch, l0, l2);
_compaction_scheduler.compact(nullptr, request.get(), response.get(), cb);
}));
tids.push_back(tid);
}
// Wait for task0 complete
l0->wait();
// repeatedly submit num_tasks into the queue, make the thread busy before stop() invoked.
for (int i = 0; i < num_tasks; ++i) {
auto txn_id = next_id();
auto request = std::make_shared<CompactRequest>();
requests.push_back(request);
auto response = std::make_shared<CompactResponse>();
responses.push_back(response);
auto meta = generate_simple_tablet_metadata(DUP_KEYS);
CHECK_OK(_tablet_mgr->put_tablet_metadata(meta));
request->add_tablet_ids(meta->id());
request->set_timeout_ms(60 * 1000); // 60 seconds
request->set_txn_id(txn_id);
request->set_version(1);
// wait l2, count down l1
ASSIGN_OR_ABORT(auto tid, bthreads::start_bthread([&, l1, l2, l3, request, response]() {
auto cb = ::google::protobuf::NewCallback(notify_and_wait_latch, l1, l2);
l3->count_down();
_compaction_scheduler.compact(nullptr, request.get(), response.get(), cb);
}));
tids.push_back(tid);
}
// wait until all bthreads run
l3->wait();
// Allow all tasks to be executed
// because the first task is blocked by the l2 countdown, rest are all in task queue.
l2->count_down();
// expect remain tasks in task queue aborted during stop
_compaction_scheduler.stop();
// l1 should be properly count down by all the tasks
l1->wait();

for (auto tid : tids) {
bthread_join(tid, nullptr);
}

int aborted = 0;
for (auto response : responses) {
if (response->status().status_code() == TStatusCode::ABORTED) {
++aborted;
}
}
// total num_tasks + 1 compact tasks submitted.
// expect the first one success, and then the remain `num_tasks` aborted between [1, num_tasks]
EXPECT_GE(aborted, 1);
EXPECT_LE(aborted, num_tasks);
}

TEST_F(LakeCompactionSchedulerTest, test_submit_compact_after_stop) {
_compaction_scheduler.stop();
auto l1 = std::make_shared<CountDownLatch>(1);
CompactRequest request;
CompactResponse response;
request.add_tablet_ids(_tablet_metadata->id());
request.set_timeout_ms(60 * 1000);
request.set_txn_id(next_id());
request.set_version(1);
auto cb = ::google::protobuf::NewCallback(notify, l1);
_compaction_scheduler.compact(nullptr, &request, &response, cb);
l1->wait();
EXPECT_EQ(response.status().status_code(), TStatusCode::ABORTED);
}

TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) {
CompactRequest request;
CompactResponse response;
Expand Down Expand Up @@ -174,4 +275,4 @@ TEST_F(LakeCompactionSchedulerTest, test_issue44136) {
latch->wait();
}

} // namespace starrocks::lake
} // namespace starrocks::lake

0 comments on commit ff35b32

Please sign in to comment.