Skip to content

Commit

Permalink
close StreamMgr after FragmantMgr::close (#30368)
Browse files Browse the repository at this point in the history
Signed-off-by: trueeyu <[email protected]>
(cherry picked from commit 1bbd536)

# Conflicts:
#	be/src/runtime/exec_env.cpp
  • Loading branch information
trueeyu authored and mergify[bot] committed Sep 4, 2023
1 parent 5ebd962 commit adcfcb2
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ void ExecEnv::add_rf_event(const RfTracePoint& pt) {
_runtime_filter_cache->add_rf_event(pt.query_id, pt.filter_id, std::move(msg));
}

<<<<<<< HEAD
class SetMemTrackerForColumnPool {
public:
SetMemTrackerForColumnPool(std::shared_ptr<MemTracker> mem_tracker) : _mem_tracker(std::move(mem_tracker)) {}
Expand All @@ -389,6 +390,14 @@ Status ExecEnv::init_mem_tracker() {
if (bytes_limit <= 0) {
ss << "Failed to parse mem limit from '" + config::mem_limit + "'.";
return Status::InternalError(ss.str());
=======
void ExecEnv::stop() {
if (_load_channel_mgr) {
// Clear load channel should be executed before stopping the storage engine,
// otherwise some writing tasks will still be in the MemTableFlushThreadPool of the storage engine,
// so when the ThreadPool is destroyed, it will crash.
_load_channel_mgr->close();
>>>>>>> 1bbd536da5 (close StreamMgr after FragmantMgr::close (#30368))
}

if (bytes_limit > MemInfo::physical_mem()) {
Expand All @@ -403,11 +412,21 @@ Status ExecEnv::init_mem_tracker() {
return Status::InternalError(ss.str());
}

<<<<<<< HEAD
_process_mem_tracker = regist_tracker(MemTracker::PROCESS, bytes_limit, "process");
int64_t query_pool_mem_limit =
calc_max_query_memory(_process_mem_tracker->limit(), config::query_max_memory_limit_percent);
_query_pool_mem_tracker =
regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit, "query_pool", this->process_mem_tracker());
=======
if (_stream_mgr != nullptr) {
_stream_mgr->close();
}

if (_pipeline_sink_io_pool) {
_pipeline_sink_io_pool->shutdown();
}
>>>>>>> 1bbd536da5 (close StreamMgr after FragmantMgr::close (#30368))

int64_t load_mem_limit = calc_max_load_memory(_process_mem_tracker->limit());
_load_mem_tracker = regist_tracker(MemTracker::LOAD, load_mem_limit, "load", process_mem_tracker());
Expand Down

0 comments on commit adcfcb2

Please sign in to comment.