From 725afd027645b108ae0b9d6dbac3074ca69ed5af Mon Sep 17 00:00:00 2001 From: kaka11chen Date: Wed, 11 Sep 2024 01:20:47 +0800 Subject: [PATCH] [Opt](scanner-scheduler) Opt scanner scheduler starvation issue. --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/vec/exec/format/parquet/vparquet_group_reader.cpp | 5 +++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 2 +- 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 58679fbe9b4245..a3de82b53e0b69 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024"); DEFINE_mInt32(doris_scanner_row_num, "16384"); // single read execute fragment row bytes DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); +// single read execute fragment max run time millseconds +DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 56a9357e72e798..1ecc08c7d07c1c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); +// single read execute fragment max run time millseconds +DECLARE_mInt32(doris_scanner_max_run_time_ms); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); DECLARE_mInt32(exchg_buffer_queue_capacity_factor); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index b993f4cd31e313..08ecb601f39941 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re columns_to_filter[i] = i; } IColumn::Filter result_filter; + size_t pre_raw_read_rows = 0; while (!_state->is_cancelled()) { // read predicate columns pre_read_rows = 0; @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re DCHECK_EQ(pre_eof, true); break; } + pre_raw_read_rows += pre_read_rows; RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re Block::erase_useless_column(block, origin_column_num); if (!pre_eof) { + if (pre_raw_read_rows >= config::doris_scanner_row_num) { + break; + } // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; } else { // pre_eof diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index fdd677f0687d30..4a5ab9e6e52b37 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -236,6 +236,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, Thread::set_thread_nice_value(); } #endif + MonotonicStopWatch max_run_time_watch; + max_run_time_watch.start(); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); Status status = Status::OK(); @@ -270,6 +272,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, eos = true; break; } + if (max_run_time_watch.elapsed_time() > + config::doris_scanner_max_run_time_ms * 1e6) { + break; + } BlockUPtr free_block = ctx->get_free_block(first_read); if (free_block == nullptr) { break; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 52aa752935e88d..ffc88d07cab2f0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* // or not found in the file column schema. RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); } - break; } + break; } while (true); // Update filtered rows and unselected rows for load, reset counter.