Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate table constraints through physical plans to optimize sort operations #14111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d8333be
Add projection to `Constraints`
gokselk Jan 3, 2025
30e5177
Add constraints support to `EquivalenceProperties`
gokselk Jan 3, 2025
8f67f8f
Pass constraints to physical plan
gokselk Jan 3, 2025
2fbe8c5
Add slt test for primary key sort optimization
gokselk Jan 3, 2025
82361a7
Pass constraints to MemoryExec
gokselk Jan 3, 2025
b9717c7
Update properties.rs
berkaysynnada Jan 6, 2025
3fc03e7
Simplify MemoryExec instantiation
gokselk Jan 7, 2025
94da4a5
Rename EquivalenceProperties method name for clarity
gokselk Jan 7, 2025
46b860f
Refactor projection handling in FileScanConfig
gokselk Jan 7, 2025
05384e1
Bug fix
gokselk Jan 7, 2025
7ecbfac
Display constraints on data sources
gokselk Jan 7, 2025
0258a99
Bug fix and test improvements
gokselk Jan 9, 2025
acfb572
Use different schemas for tests
gokselk Jan 9, 2025
4fbaaa3
Lint and visibility fix
gokselk Jan 9, 2025
b18b013
Merge branch 'apache_main' into feature/physical-planner-functional-d…
gokselk Jan 10, 2025
966fd76
Fixes after merge
gokselk Jan 10, 2025
97e4796
Review part 1
berkaysynnada Jan 10, 2025
8e4d029
Merge branch 'apache_main' into feature/physical-planner-functional-d…
berkaysynnada Jan 10, 2025
ddd4d14
Update memory.rs
berkaysynnada Jan 10, 2025
6297ec1
Merge branch 'apache_main' into feature/physical-planner-functional-d…
berkaysynnada Jan 10, 2025
bc72553
update dep
berkaysynnada Jan 10, 2025
b5d47c8
update proto
berkaysynnada Jan 10, 2025
397297c
add aggregate distinct
berkaysynnada Jan 13, 2025
56c053a
minor
berkaysynnada Jan 13, 2025
95ba34f
Update order.slt
berkaysynnada Jan 13, 2025
f5c15ae
undo proto
berkaysynnada Jan 13, 2025
b2fdcc7
Update properties.rs
berkaysynnada Jan 13, 2025
417370e
Merge remote-tracking branch 'upstream/main' into feature/physical-pl…
gokselk Jan 13, 2025
b0877a2
Move reserved entry
gokselk Jan 13, 2025
bbe35d4
Update `FileScanConfig` to return a single projected configuration ob…
gokselk Jan 14, 2025
cc3ca91
Improve constraint based ordering satisfaction logic
gokselk Jan 15, 2025
8b0f340
Update datafusion/physical-plan/src/aggregates/mod.rs
gokselk Jan 15, 2025
a6c13de
Revert "Update `FileScanConfig` to return a single projected configur…
gokselk Jan 15, 2025
cda812f
Refactor MemoryExec constraints display
gokselk Jan 15, 2025
ab93279
Avoid unnecessary clone
gokselk Jan 15, 2025
726737b
Refactor constraint based ordering satisfaction logic
gokselk Jan 15, 2025
56e8054
Cargo fmt
gokselk Jan 15, 2025
e19c6a0
Revert "Avoid unnecessary clone"
gokselk Jan 15, 2025
0a7a974
Avoid unnecessary clone
gokselk Jan 15, 2025
8a2f372
Update properties.rs
berkaysynnada Jan 16, 2025
5f9980a
Bug fix
gokselk Jan 16, 2025
d136860
Make `update_elements_with_matching_indices` take iterators for proj_…
gokselk Jan 16, 2025
682e8a0
Revert "Make `update_elements_with_matching_indices` take iterators f…
gokselk Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,41 @@ impl Constraints {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Projects constraints using the given projection indices.
/// Returns None if any of the constraint columns are not included in the projection.
pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
let projected = self
.inner
.iter()
.filter_map(|constraint| {
match constraint {
Constraint::PrimaryKey(indices) => {
let new_indices =
update_elements_with_matching_indices(indices, proj_indices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you refactor the update_elements_with_matching_indices function to take two impl Iterator's (you probably need to replace the looping order to do that), this function can also accept proj_indices as an impl Iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update_elements_with_matching_indices function uses .position() on proj_indices, which makes it necessary to clone it if we take it as an impl Iterator. I think this defeats the whole purpose of this refactoring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I had in mind was to swap the loop order (iterate on proj_indices on the outer loop). That may enable us to use an impl Iterator for proj_indices. We probably will need to keep the type of entries as a slice because it does not have an ordering (though we can enforce that in a future PR). Had entries was ordered, I think we could have also taken it in as an impl Iterator -- but let's leave the latter for a future PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We looked into this with @berkaysynnada and it seems to have some intricacies. Let's leave this to another PR

// Only keep constraint if all columns are preserved
(new_indices.len() == indices.len())
.then_some(Constraint::PrimaryKey(new_indices))
}
Constraint::Unique(indices) => {
let new_indices =
update_elements_with_matching_indices(indices, proj_indices);
// Only keep constraint if all columns are preserved
(new_indices.len() == indices.len())
.then_some(Constraint::Unique(new_indices))
}
}
})
.collect::<Vec<_>>();

(!projected.is_empty()).then_some(Constraints::new_unverified(projected))
}
}

impl Default for Constraints {
fn default() -> Self {
Constraints::empty()
}
}

impl IntoIterator for Constraints {
Expand All @@ -73,13 +108,13 @@ impl IntoIterator for Constraints {

impl Display for Constraints {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let pk: Vec<String> = self.inner.iter().map(|c| format!("{:?}", c)).collect();
let pk = self
.inner
.iter()
.map(|c| format!("{:?}", c))
.collect::<Vec<_>>();
let pk = pk.join(", ");
if !pk.is_empty() {
write!(f, " constraints=[{pk}]")
} else {
write!(f, "")
}
write!(f, "constraints=[{pk}]")
}
}

Expand Down Expand Up @@ -599,6 +634,24 @@ mod tests {
assert_eq!(iter.next(), None);
}

#[test]
fn test_project_constraints() {
let constraints = Constraints::new_unverified(vec![
Constraint::PrimaryKey(vec![1, 2]),
Constraint::Unique(vec![0, 3]),
]);

// Project keeping columns 1,2,3
let projected = constraints.project(&[1, 2, 3]).unwrap();
assert_eq!(
projected,
Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0, 1])])
);

// Project keeping only column 0 - should return None as no constraints are preserved
assert!(constraints.project(&[0]).is_none());
}

#[test]
fn test_get_updated_id_keys() {
let fund_dependencies =
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ impl TableProvider for ListingTable {
session_state,
FileScanConfig::new(object_store_url, Arc::clone(&self.file_schema))
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_limit(limit)
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl MemTable {
state: &SessionState,
) -> Result<Self> {
let schema = t.schema();
let constraints = t.constraints();
let exec = t.scan(state, None, &[], None).await?;
let partition_count = exec.output_partitioning().partition_count();

Expand Down Expand Up @@ -162,7 +163,10 @@ impl MemTable {
}
}

let exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?;
let mut exec = MemoryExec::try_new(&data, Arc::clone(&schema), None)?;
if let Some(cons) = constraints {
exec = exec.with_constraints(cons.clone());
}

if let Some(num_partitions) = output_partitions {
let exec = RepartitionExec::try_new(
Expand Down
17 changes: 12 additions & 5 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::buffer::Buffer;
use arrow_ipc::reader::FileDecoder;
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_common::{Constraints, Statistics};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
Expand All @@ -60,11 +60,16 @@ pub struct ArrowExec {
impl ArrowExec {
/// Create a new Arrow reader execution plan provided base configurations
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let (
projected_schema,
projected_constraints,
projected_statistics,
projected_output_ordering,
) = base_config.project();
let cache = Self::compute_properties(
Arc::clone(&projected_schema),
&projected_output_ordering,
projected_constraints,
&base_config,
);
Self {
Expand All @@ -88,12 +93,14 @@ impl ArrowExec {
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
schema: SchemaRef,
projected_output_ordering: &[LexOrdering],
output_ordering: &[LexOrdering],
constraints: Constraints,
file_scan_config: &FileScanConfig,
) -> PlanProperties {
// Equivalence Properties
let eq_properties =
EquivalenceProperties::new_with_orderings(schema, projected_output_ordering);
EquivalenceProperties::new_with_orderings(schema, output_ordering)
.with_constraints(constraints);

PlanProperties::new(
eq_properties,
Expand Down
14 changes: 11 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::physical_plan::{
};

use arrow::datatypes::SchemaRef;
use datafusion_common::Constraints;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
Expand All @@ -48,11 +49,16 @@ pub struct AvroExec {
impl AvroExec {
/// Create a new Avro reader execution plan provided base configurations
pub fn new(base_config: FileScanConfig) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let (
projected_schema,
projected_constraints,
projected_statistics,
projected_output_ordering,
) = base_config.project();
let cache = Self::compute_properties(
Arc::clone(&projected_schema),
&projected_output_ordering,
projected_constraints,
&base_config,
);
Self {
Expand All @@ -73,10 +79,12 @@ impl AvroExec {
fn compute_properties(
schema: SchemaRef,
orderings: &[LexOrdering],
constraints: Constraints,
file_scan_config: &FileScanConfig,
) -> PlanProperties {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)
.with_constraints(constraints);
let n_partitions = file_scan_config.file_groups.len();

PlanProperties::new(
Expand Down
14 changes: 11 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::physical_plan::{
use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Constraints;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

Expand Down Expand Up @@ -209,11 +210,16 @@ impl CsvExecBuilder {
newlines_in_values,
} = self;

let (projected_schema, projected_statistics, projected_output_ordering) =
base_config.project();
let (
projected_schema,
projected_constraints,
projected_statistics,
projected_output_ordering,
) = base_config.project();
let cache = CsvExec::compute_properties(
projected_schema,
&projected_output_ordering,
projected_constraints,
&base_config,
);

Expand Down Expand Up @@ -320,10 +326,12 @@ impl CsvExec {
fn compute_properties(
schema: SchemaRef,
orderings: &[LexOrdering],
constraints: Constraints,
file_scan_config: &FileScanConfig,
) -> PlanProperties {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)
.with_constraints(constraints);

PlanProperties::new(
eq_properties,
Expand Down
54 changes: 38 additions & 16 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type};
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
use datafusion_common::{
exec_err, ColumnStatistics, Constraints, DataFusionError, Statistics,
};
use datafusion_physical_expr::LexOrdering;

use log::warn;
Expand Down Expand Up @@ -114,6 +116,8 @@ pub struct FileScanConfig {
/// concurrently, however files *within* a partition will be read
/// sequentially, one after the next.
pub file_groups: Vec<Vec<PartitionedFile>>,
/// Table constraints
pub constraints: Constraints,
/// Estimated overall statistics of the files, taking `filters` into account.
/// Defaults to [`Statistics::new_unknown`].
pub statistics: Statistics,
Expand Down Expand Up @@ -146,6 +150,7 @@ impl FileScanConfig {
object_store_url,
file_schema,
file_groups: vec![],
constraints: Constraints::empty(),
statistics,
projection: None,
limit: None,
Expand All @@ -154,6 +159,12 @@ impl FileScanConfig {
}
}

/// Set the table constraints of the files
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
Expand Down Expand Up @@ -210,30 +221,31 @@ impl FileScanConfig {
self
}

/// Project the schema and the statistics on the given column indices
pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
/// Project the schema, constraints, and the statistics on the given column indices
pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
return (
Arc::clone(&self.file_schema),
self.constraints.clone(),
self.statistics.clone(),
self.output_ordering.clone(),
);
}

let proj_iter: Box<dyn Iterator<Item = usize>> = match &self.projection {
Some(proj) => Box::new(proj.iter().copied()),
None => Box::new(
0..(self.file_schema.fields().len() + self.table_partition_cols.len()),
),
let proj_indices = if let Some(proj) = &self.projection {
proj
} else {
let len = self.file_schema.fields().len() + self.table_partition_cols.len();
&(0..len).collect::<Vec<_>>()
};

let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_iter {
if idx < self.file_schema.fields().len() {
let field = self.file_schema.field(idx);
for idx in proj_indices {
if *idx < self.file_schema.fields().len() {
let field = self.file_schema.field(*idx);
table_fields.push(field.clone());
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
table_cols_stats.push(self.statistics.column_statistics[*idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
Expand All @@ -254,10 +266,20 @@ impl FileScanConfig {
self.file_schema.metadata().clone(),
));

let projected_constraints = self
.constraints
.project(proj_indices)
.unwrap_or_else(Constraints::empty);

let projected_output_ordering =
get_projected_output_ordering(self, &projected_schema);

(projected_schema, table_stats, projected_output_ordering)
(
projected_schema,
projected_constraints,
table_stats,
projected_output_ordering,
)
}

#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
Expand Down Expand Up @@ -635,7 +657,7 @@ mod tests {
)]),
);

let (proj_schema, proj_statistics, _) = conf.project();
let (proj_schema, _, proj_statistics, _) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
proj_schema.field(file_schema.fields().len()).name(),
Expand Down Expand Up @@ -675,7 +697,7 @@ mod tests {
);

// verify the proj_schema includes the last column and exactly the same the field it is defined
let (proj_schema, _proj_statistics, _) = conf.project();
let (proj_schema, _, _, _) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
*proj_schema.field(file_schema.fields().len()),
Expand Down Expand Up @@ -708,7 +730,7 @@ mod tests {
)]),
);

let (proj_schema, proj_statistics, _) = conf.project();
let (proj_schema, _, proj_statistics, _) = conf.project();
assert_eq!(
columns(&proj_schema),
vec!["date".to_owned(), "c1".to_owned()]
Expand Down
Loading
Loading