From 0eceaf9351aa67d88214db16b26eca3d0167d96f Mon Sep 17 00:00:00 2001 From: kakachen Date: Mon, 25 Nov 2024 15:24:03 +0800 Subject: [PATCH] Revert modified dictionary filter processing logic to optimize late materialization. --- .../format/parquet/vparquet_group_reader.cpp | 32 +++++++++++++++---- .../format/parquet/vparquet_group_reader.h | 1 + 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index c93b5c6fbad2e18..a9854b53f3beec7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -109,6 +109,11 @@ Status RowGroupReader::init( _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; _col_name_to_slot_id = colname_to_slot_id; + if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { + _not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(), + not_single_slot_filter_conjuncts->begin(), + not_single_slot_filter_conjuncts->end()); + } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _merge_read_ranges(row_ranges); if (_read_columns.empty()) { @@ -136,11 +141,6 @@ Status RowGroupReader::init( _column_readers[read_col] = std::move(reader); } // Check if single slot can be filtered by dict. - if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { - _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), - not_single_slot_filter_conjuncts->end()); - return Status::OK(); - } if (!_slot_id_to_filter_conjuncts) { return Status::OK(); } @@ -363,8 +363,17 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ RETURN_IF_CATCH_EXCEPTION( Block::filter_block_internal(block, columns_to_filter, result_filter)); - Block::erase_useless_column(block, column_to_keep); - _convert_dict_cols_to_string_cols(block); + if (!_not_single_slot_filter_conjuncts.empty()) { + _convert_dict_cols_to_string_cols(block); + SCOPED_RAW_TIMER(&_predicate_filter_time); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, block, columns_to_filter, + column_to_keep))); + } else { + Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block); + } } else { RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter))); @@ -595,6 +604,15 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re *batch_eof = pre_eof; RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns)); + if (!_not_single_slot_filter_conjuncts.empty()) { + { + SCOPED_RAW_TIMER(&_predicate_filter_time); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, block, columns_to_filter, + origin_column_num))); + } + } return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 8106241014ba633..f73e9ebe09eee67 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -220,6 +220,7 @@ class RowGroupReader : public ProfileCollector { const TupleDescriptor* _tuple_descriptor = nullptr; const RowDescriptor* _row_descriptor = nullptr; const std::unordered_map* _col_name_to_slot_id = nullptr; + VExprContextSPtrs _not_single_slot_filter_conjuncts; const std::unordered_map* _slot_id_to_filter_conjuncts = nullptr; VExprContextSPtrs _dict_filter_conjuncts; VExprContextSPtrs _filter_conjuncts;