diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index 36645c0e1be5..4ca552f66288 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -76,8 +76,8 @@ impl TableSource for DefaultTableSource { .supports_aggregate_pushdown(group_expr, aggr_expr) } - fn push_down_projection(&self, proj: &[usize]) -> Option> { - self.table_provider.push_down_projection(proj) + fn push_down_projection(&self, proj: &[usize], is_tag_scan: bool) -> Option> { + self.table_provider.push_down_projection(proj, is_tag_scan) } fn table_type(&self) -> datafusion_expr::TableType { diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 9efdb1d804a3..5a995935467e 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -107,7 +107,7 @@ pub trait TableProvider: Sync + Send { } /// The projection is pushed down to the data source - fn push_down_projection(&self, _proj: &[usize]) -> Option> { + fn push_down_projection(&self, _proj: &[usize], _is_tag_scan: bool) -> Option> { None } diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index 528c4e2d675f..ca621549b3bf 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -120,7 +120,7 @@ pub trait TableSource: Sync + Send { } /// The projection is pushed down to the data source - fn push_down_projection(&self, _proj: &[usize]) -> Option> { + fn push_down_projection(&self, _proj: &[usize], _is_tag_scan: bool) -> Option> { None } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 2306593d424b..ce14ff641401 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -250,7 +250,7 @@ impl Optimizer { Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), Arc::new(CommonSubexprEliminate::new()), - Arc::new(PushDownProjection::new()), + Arc::new(PushDownProjection::new(false)), Arc::new(EliminateProjection::new()), // PushDownProjection can pushdown Projections through Limits, do PushDownLimit again. Arc::new(PushDownLimit::new()), diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 62c5f67122eb..4dc888545e22 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -56,7 +56,9 @@ macro_rules! generate_plan { /// Optimizer that removes unused projections and aggregations from plans /// This reduces both scans and #[derive(Default)] -pub struct PushDownProjection {} +pub struct PushDownProjection { + is_tag_scan: bool, +} impl OptimizerRule for PushDownProjection { fn try_optimize( @@ -84,7 +86,7 @@ impl OptimizerRule for PushDownProjection { LogicalPlan::TableScan(scan) if (scan.projection.is_none() && scan.agg_with_grouping.is_none()) => { - return Ok(Some(push_down_scan(&HashSet::new(), scan, false)?)); + return Ok(Some(push_down_scan(&HashSet::new(), scan, false, self.is_tag_scan)?)); } _ => return Ok(None), }; @@ -157,12 +159,12 @@ impl OptimizerRule for PushDownProjection { if projection_is_empty { used_columns .insert(scan.projected_schema.fields()[0].qualified_column()); - push_down_scan(&used_columns, scan, true)? + push_down_scan(&used_columns, scan, true, self.is_tag_scan)? } else { for expr in projection.expr.iter() { expr_to_columns(expr, &mut used_columns)?; } - let new_scan = push_down_scan(&used_columns, scan, true)?; + let new_scan = push_down_scan(&used_columns, scan, true, self.is_tag_scan)?; plan.with_new_inputs(&[new_scan])? } @@ -389,8 +391,8 @@ impl OptimizerRule for PushDownProjection { impl PushDownProjection { #[allow(missing_docs)] - pub fn new() -> Self { - Self {} + pub fn new(is_tag_scan: bool) -> Self { + Self { is_tag_scan } } } @@ -483,6 +485,7 @@ fn push_down_scan( used_columns: &HashSet, scan: &TableScan, has_projection: bool, + is_tag_scan: bool, ) -> Result { // once we reach the table scan, we can use the accumulated set of column // names to construct the set of column indexes in the scan @@ -544,7 +547,7 @@ fn push_down_scan( let projection = scan .source - .push_down_projection(&projection) + .push_down_projection(&projection, is_tag_scan) .unwrap_or(projection); // create the projected schema @@ -1133,7 +1136,7 @@ mod tests { fn optimize(plan: &LogicalPlan) -> Result { let optimizer = Optimizer::with_rules(vec![ - Arc::new(PushDownProjection::new()), + Arc::new(PushDownProjection::new(false)), Arc::new(EliminateProjection::new()), ]); let mut optimized_plan = optimizer