Skip to content

Commit

Permalink
Merge pull request #14 from lutengda/cnosdb_datafusion_27_1
Browse files Browse the repository at this point in the history
add is_tag_scan for RewriteTagScan
  • Loading branch information
roseboy-liu authored Oct 10, 2024
2 parents 6434749 + 41dcf0a commit f6c50a7
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 13 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl TableSource for DefaultTableSource {
.supports_aggregate_pushdown(group_expr, aggr_expr)
}

fn push_down_projection(&self, proj: &[usize]) -> Option<Vec<usize>> {
self.table_provider.push_down_projection(proj)
fn push_down_projection(&self, proj: &[usize], is_tag_scan: bool) -> Option<Vec<usize>> {
self.table_provider.push_down_projection(proj, is_tag_scan)
}

fn table_type(&self) -> datafusion_expr::TableType {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait TableProvider: Sync + Send {
}

/// The projection is pushed down to the data source
fn push_down_projection(&self, _proj: &[usize]) -> Option<Vec<usize>> {
fn push_down_projection(&self, _proj: &[usize], _is_tag_scan: bool) -> Option<Vec<usize>> {
None
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub trait TableSource: Sync + Send {
}

/// The projection is pushed down to the data source
fn push_down_projection(&self, _proj: &[usize]) -> Option<Vec<usize>> {
fn push_down_projection(&self, _proj: &[usize], _is_tag_scan: bool) -> Option<Vec<usize>> {
None
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl Optimizer {
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(PushDownProjection::new()),
Arc::new(PushDownProjection::new(false)),
Arc::new(EliminateProjection::new()),
// PushDownProjection can pushdown Projections through Limits, do PushDownLimit again.
Arc::new(PushDownLimit::new()),
Expand Down
19 changes: 11 additions & 8 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ macro_rules! generate_plan {
/// Optimizer that removes unused projections and aggregations from plans
/// This reduces both scans and
#[derive(Default)]
pub struct PushDownProjection {}
pub struct PushDownProjection {
is_tag_scan: bool,
}

impl OptimizerRule for PushDownProjection {
fn try_optimize(
Expand Down Expand Up @@ -84,7 +86,7 @@ impl OptimizerRule for PushDownProjection {
LogicalPlan::TableScan(scan)
if (scan.projection.is_none() && scan.agg_with_grouping.is_none()) =>
{
return Ok(Some(push_down_scan(&HashSet::new(), scan, false)?));
return Ok(Some(push_down_scan(&HashSet::new(), scan, false, self.is_tag_scan)?));
}
_ => return Ok(None),
};
Expand Down Expand Up @@ -157,12 +159,12 @@ impl OptimizerRule for PushDownProjection {
if projection_is_empty {
used_columns
.insert(scan.projected_schema.fields()[0].qualified_column());
push_down_scan(&used_columns, scan, true)?
push_down_scan(&used_columns, scan, true, self.is_tag_scan)?
} else {
for expr in projection.expr.iter() {
expr_to_columns(expr, &mut used_columns)?;
}
let new_scan = push_down_scan(&used_columns, scan, true)?;
let new_scan = push_down_scan(&used_columns, scan, true, self.is_tag_scan)?;

plan.with_new_inputs(&[new_scan])?
}
Expand Down Expand Up @@ -389,8 +391,8 @@ impl OptimizerRule for PushDownProjection {

impl PushDownProjection {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
pub fn new(is_tag_scan: bool) -> Self {
Self { is_tag_scan }
}
}

Expand Down Expand Up @@ -483,6 +485,7 @@ fn push_down_scan(
used_columns: &HashSet<Column>,
scan: &TableScan,
has_projection: bool,
is_tag_scan: bool,
) -> Result<LogicalPlan> {
// once we reach the table scan, we can use the accumulated set of column
// names to construct the set of column indexes in the scan
Expand Down Expand Up @@ -544,7 +547,7 @@ fn push_down_scan(

let projection = scan
.source
.push_down_projection(&projection)
.push_down_projection(&projection, is_tag_scan)
.unwrap_or(projection);

// create the projected schema
Expand Down Expand Up @@ -1133,7 +1136,7 @@ mod tests {

fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let optimizer = Optimizer::with_rules(vec![
Arc::new(PushDownProjection::new()),
Arc::new(PushDownProjection::new(false)),
Arc::new(EliminateProjection::new()),
]);
let mut optimized_plan = optimizer
Expand Down

0 comments on commit f6c50a7

Please sign in to comment.