Skip to content

Commit

Permalink
fix: fix issue 10326, do ambiguous_distinct_check in select
Browse files Browse the repository at this point in the history
  • Loading branch information
lichuang committed Jan 23, 2025
1 parent 3a8c04c commit 11acffe
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 17 deletions.
40 changes: 26 additions & 14 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,21 +577,33 @@ impl LogicalPlanBuilder {
) -> Result<LogicalPlan> {
let inputs = curr_plan.inputs();
let mut exprs = curr_plan.expressions();
let is_distinct = is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
for input in inputs {
if missing_cols.iter().all(|c| input.schema().has_column(c)) {
let mut missing_exprs = missing_cols
.iter()
.map(|c| normalize_col(Expr::Column(c.clone()), input))
.collect::<Result<Vec<_>>>()?;

// Do not let duplicate columns to be added, some of the
// missing_cols may be already present but without the new
// projected alias.
missing_exprs.retain(|e| !exprs.contains(e));
if is_distinct {
Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &exprs)?;
if let LogicalPlan::Projection(Projection {
input,
expr: _,
schema: _,
}) = input
{
if missing_cols.iter().all(|c| input.schema().has_column(c)) {
let mut missing_exprs = missing_cols
.iter()
.map(|c| normalize_col(Expr::Column(c.clone()), input))
.collect::<Result<Vec<_>>>()?;

// Do not let duplicate columns to be added, some of the
// missing_cols may be already present but without the new
// projected alias.
missing_exprs.retain(|e| !exprs.contains(e));
if is_distinct {
Self::ambiguous_distinct_check(
&missing_exprs,
missing_cols,
&exprs,
)?;
}
exprs.extend(missing_exprs);
}
exprs.extend(missing_exprs);
}
}
let inputs = curr_plan
Expand All @@ -602,7 +614,7 @@ impl LogicalPlanBuilder {
curr_plan.with_new_exprs(exprs, inputs)
}

fn ambiguous_distinct_check(
pub fn ambiguous_distinct_check(
missing_exprs: &[Expr],
missing_cols: &IndexSet<Column>,
projection_exprs: &[Expr],
Expand Down
94 changes: 91 additions & 3 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::utils::{
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{not_impl_err, plan_err, Column, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand All @@ -36,10 +36,10 @@ use datafusion_expr::utils::{
};
use datafusion_expr::{
qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter,
GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, Projection,
};

use indexmap::IndexMap;
use indexmap::{IndexMap, IndexSet};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
WildcardAdditionalOptions, WindowType,
Expand Down Expand Up @@ -264,6 +264,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}?;

if let LogicalPlan::Distinct(_) = &plan {
Self::ambiguous_distinct_project_check(&plan, &order_by_rex)?;
}

// DISTRIBUTE BY
let plan = if !select.distribute_by.is_empty() {
let x = select
Expand All @@ -287,6 +291,90 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
self.order_by(plan, order_by_rex)
}

/// Adding a new column is not correct if there is a `Distinct`
/// node, which produces only distinct values of its
/// inputs. Adding a new column to its input will result in
/// potentially different results than with the original column.
///
/// For example, if the input is like:
///
/// Distinct(A, B)
///
/// If the input looks like
///
/// a | b | c
/// --+---+---
/// 1 | 2 | 3
/// 1 | 2 | 4
///
/// Distinct (A, B) --> (1,2)
///
/// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4)
/// (which will appear as a (1, 2), (1, 2) if a and b are projected
///
/// See <https://github.com/apache/datafusion/issues/5065> for more details
fn ambiguous_distinct_project_check(
plan: &LogicalPlan,
order_by: &[datafusion_expr::expr::Sort],
) -> Result<()> {
let schema = plan.schema();
// Collect sort columns that are missing in the input plan's schema
let mut missing_cols: IndexSet<Column> = IndexSet::new();
order_by.iter().try_for_each::<_, Result<()>>(|sort| {
let columns = sort.expr.column_refs();

missing_cols.extend(
columns
.into_iter()
.filter(|c| !schema.has_column(c))
.cloned(),
);

Ok(())
})?;

if missing_cols.is_empty() {
return Ok(());
}
Self::do_ambiguous_distinct_project_check(plan, &missing_cols)
}

fn do_ambiguous_distinct_project_check(
plan: &LogicalPlan,
missing_cols: &IndexSet<Column>,
) -> Result<()> {
for input in plan.inputs() {
if let LogicalPlan::Projection(Projection {
input,
expr,
schema: _,
..
}) = input
{
if missing_cols.iter().all(|c| input.schema().has_column(c)) {
let mut missing_exprs = missing_cols
.iter()
.map(|c| normalize_col(Expr::Column(c.clone()), input))
.collect::<Result<Vec<_>>>()?;

// Do not let duplicate columns to be added, some of the
// missing_cols may be already present but without the new
// projected alias.
missing_exprs.retain(|e| !expr.contains(e));
LogicalPlanBuilder::ambiguous_distinct_check(
&missing_exprs,
missing_cols,
expr,
)?;
}
} else {
Self::do_ambiguous_distinct_project_check(input, missing_cols)?;
}
}

Ok(())
}

/// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection
pub(super) fn try_process_unnest(
&self,
Expand Down

0 comments on commit 11acffe

Please sign in to comment.