diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0c00bd1a38f0da0..f27c3b34f331af2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -281,6 +281,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024"); DEFINE_mInt32(doris_scanner_row_num, "16384"); // single read execute fragment row bytes DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); +// single read execute fragment max run time millseconds +DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 720f4f72cb4bf7a..25bf8f52cf5b51f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -328,6 +328,8 @@ DECLARE_mInt32(doris_scan_range_max_mb); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); +// single read execute fragment max run time millseconds +DECLARE_mInt32(doris_scanner_max_run_time_ms); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); DECLARE_mInt32(exchg_buffer_queue_capacity_factor); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index b993f4cd31e3133..08ecb601f39941c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re columns_to_filter[i] = i; } IColumn::Filter result_filter; + size_t pre_raw_read_rows = 0; while (!_state->is_cancelled()) { // read predicate columns pre_read_rows = 0; @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re DCHECK_EQ(pre_eof, true); break; } + pre_raw_read_rows += pre_read_rows; RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re Block::erase_useless_column(block, origin_column_num); if (!pre_eof) { + if (pre_raw_read_rows >= config::doris_scanner_row_num) { + break; + } // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; } else { // pre_eof diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 444ff4dbb0cd9f5..985092fa4054b12 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -236,6 +236,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, Thread::set_thread_nice_value(); } #endif + MonotonicStopWatch max_run_time_watch; + max_run_time_watch.start(); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); Status status = Status::OK(); @@ -270,6 +272,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, eos = true; break; } + if (max_run_time_watch.elapsed_time() / 1e6 < + config::doris_scanner_max_run_time_ms) { + break; + } BlockUPtr free_block = ctx->get_free_block(first_read); if (free_block == nullptr) { break; @@ -286,6 +292,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); break; } + if (free_block->empty()) { + break; + } raw_bytes_read += free_block_bytes; if (!scan_task->cached_blocks.empty() && scan_task->cached_blocks.back().first->rows() + free_block->rows() <= diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 75ecc744ac18fb9..02336fec32898c3 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* // or not found in the file column schema. RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); } - break; } + break; } while (true); // Update filtered rows and unselected rows for load, reset counter. diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index a78f8956025cb99..612d825afe71c3c 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -127,9 +127,11 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { // 2. Filter the output block finally. { - auto* timer = _local_state->_filter_timer; - SCOPED_TIMER(timer); - RETURN_IF_ERROR(_filter_output_block(block)); + if (!block->empty()) { + auto* timer = _local_state->_filter_timer; + SCOPED_TIMER(timer); + RETURN_IF_ERROR(_filter_output_block(block)); + } } // record rows return (after filter) for _limit check _num_rows_return += block->rows();