Skip to content

Commit

Permalink
Adding Constant Check for FilterExec (apache#9649)
Browse files Browse the repository at this point in the history
* fix bugs in adding extra SortExec

* adding tests

* optimize code

* Update datafusion/physical-plan/src/filter.rs

Co-authored-by: Mustafa Akur <[email protected]>

* optimize code

* optimize code

* optimize code

* optimize code

* fix clippy

---------

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
Lordworms and mustafasrepo authored Mar 18, 2024
1 parent 269563a commit eb13f59
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
27 changes: 26 additions & 1 deletion datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,27 @@ impl FilterExec {
})
}

fn extend_constants(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
) -> Vec<Arc<dyn PhysicalExpr>> {
let mut res_constants = Vec::new();
let input_eqs = input.equivalence_properties();

let conjunctions = split_conjunction(predicate);
for conjunction in conjunctions {
if let Some(binary) = conjunction.as_any().downcast_ref::<BinaryExpr>() {
if binary.op() == &Operator::Eq {
if input_eqs.is_expr_constant(binary.left()) {
res_constants.push(binary.right().clone())
} else if input_eqs.is_expr_constant(binary.right()) {
res_constants.push(binary.left().clone())
}
}
}
}
res_constants
}
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
Expand All @@ -181,8 +202,12 @@ impl FilterExec {
.into_iter()
.filter(|column| stats.column_statistics[column.index()].is_singleton())
.map(|column| Arc::new(column) as _);
// this is for statistics
eq_properties = eq_properties.add_constants(constants);

// this is for logical constant (for example: a = '1', then a could be marked as a constant)
// to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0)
eq_properties =
eq_properties.add_constants(Self::extend_constants(input, predicate));
Ok(PlanProperties::new(
eq_properties,
input.output_partitioning().clone(), // Output Partitioning
Expand Down
61 changes: 61 additions & 0 deletions datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# prepare table
statement ok
CREATE UNBOUNDED EXTERNAL TABLE data (
"date" VARCHAR,
"ticker" VARCHAR,
"time" VARCHAR,
) STORED AS CSV
WITH ORDER ("date", "ticker", "time")
LOCATION './a.parquet';


# query
query TT
explain SELECT * FROM data
WHERE ticker = 'A'
ORDER BY "date", "time";
----
logical_plan
Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
--Filter: data.ticker = Utf8("A")
----TableScan: data projection=[date, ticker, time]
physical_plan
SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: ticker@1 = A
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST]

# query
query TT
explain SELECT * FROM data
WHERE date = 'A'
ORDER BY "ticker", "time";
----
logical_plan
Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
--Filter: data.date = Utf8("A")
----TableScan: data projection=[date, ticker, time]
physical_plan
SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST]
--CoalesceBatchesExec: target_batch_size=8192
----FilterExec: date@0 = A
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST]

0 comments on commit eb13f59

Please sign in to comment.