Skip to content

Commit

Permalink
[Opt](scanner-scheduler) Opt scanner scheduler starvation issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Sep 23, 2024
1 parent 8c34bfe commit a1b1e78
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 1 deletion.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
size_t pre_raw_read_rows = 0;
while (!_state->is_cancelled()) {
// read predicate columns
pre_read_rows = 0;
Expand All @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
DCHECK_EQ(pre_eof, true);
break;
}
pre_raw_read_rows += pre_read_rows;
RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
Expand Down Expand Up @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
Block::erase_useless_column(block, origin_column_num);

if (!pre_eof) {
if (pre_raw_read_rows >= config::doris_scanner_row_num) {
break;
}
// If continuous batches are skipped, we can cache them to skip a whole page
_cached_filtered_rows += pre_read_rows;
} else { // pre_eof
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
Thread::set_thread_nice_value();
}
#endif
MonotonicStopWatch max_run_time_watch;
max_run_time_watch.start();
scanner->update_wait_worker_timer();
scanner->start_scan_cpu_timer();
Status status = Status::OK();
Expand Down Expand Up @@ -270,6 +272,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
eos = true;
break;
}
if (max_run_time_watch.elapsed_time() >
config::doris_scanner_max_run_time_ms * 1e6) {
break;
}
BlockUPtr free_block = ctx->get_free_block(first_read);
if (free_block == nullptr) {
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool*
// or not found in the file column schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
}
break;
}
break;
} while (true);

// Update filtered rows and unselected rows for load, reset counter.
Expand Down

0 comments on commit a1b1e78

Please sign in to comment.