Skip to content

Commit

Permalink
merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mertak-synnada committed Jan 23, 2025
1 parent aeb1954 commit f42c245
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 462 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ use std::sync::Arc;
use crate::physical_optimizer::parquet_exec;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::data_source::FileSourceConfig;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetConfig};
use datafusion::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
use datafusion::physical_optimizer::test_utils::trim_plan_display;
use datafusion_common::config::ConfigOptions;
Expand All @@ -43,7 +40,6 @@ use datafusion_physical_plan::aggregates::{
};
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::source::DataSourceExec;
use datafusion_physical_plan::ExecutionPlan;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
Expand Down
409 changes: 192 additions & 217 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

234 changes: 105 additions & 129 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::test_utils::{
assert_plan_matches_expected, build_group_by, mock_data, schema,
};
use datafusion_physical_plan::memory::MemorySourceConfig;
use datafusion_physical_plan::source::DataSourceExec;
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode},
collect,
Expand Down Expand Up @@ -260,7 +258,7 @@ fn test_has_order_by() -> Result<()> {
let expected = [
"LocalLimitExec: fetch=10",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[], ordering_mode=Sorted",
"DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
"DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
assert_plan_matches_expected(&plan, &expected)?;
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/tests/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,31 @@ mod sanity_checker;
use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion::datasource::data_source::FileSourceConfig;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetConfig};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::test_utils::schema;
use datafusion_physical_plan::source::DataSourceExec;

/// Create a non sorted parquet exec
pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
ParquetExec::builder(
pub fn parquet_exec(schema: &SchemaRef) -> Arc<DataSourceExec> {
FileSourceConfig::new_exec(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema.clone())
.with_file(PartitionedFile::new("x".to_string(), 100)),
Arc::new(ParquetConfig::default()),
)
.build_arc()
}

/// Create a single parquet file that is sorted
pub(crate) fn parquet_exec_with_sort(
output_ordering: Vec<LexOrdering>,
) -> Arc<ParquetExec> {
ParquetExec::builder(
) -> Arc<DataSourceExec> {
FileSourceConfig::new_exec(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
Arc::new(ParquetConfig::default()),
)
.build_arc()
}

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions datafusion/physical-optimizer/src/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ mod tests {
use datafusion_physical_expr::expressions::{self, cast};
use datafusion_physical_plan::aggregates::AggregateMode;

/// Mock data using a MemoryExec which has an exact count statistic
fn mock_data() -> Result<Arc<MemoryExec>> {
/// Mock data using a MemorySourceConfig which has an exact count statistic
fn mock_data() -> Result<Arc<DataSourceExec>> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-optimizer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use datafusion_physical_plan::memory::MemorySourceConfig;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::source::DataSourceExec;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::union::UnionExec;
Expand Down Expand Up @@ -441,7 +442,7 @@ pub fn stream_exec_ordered_with_projection(
)
}

pub fn mock_data() -> Result<Arc<MemoryExec>> {
pub fn mock_data() -> Result<Arc<DataSourceExec>> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Expand Down Expand Up @@ -469,11 +470,7 @@ pub fn mock_data() -> Result<Arc<MemoryExec>> {
],
)?;

Ok(Arc::new(MemoryExec::try_new(
&[vec![batch]],
Arc::clone(&schema),
None,
)?))
MemorySourceConfig::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
}

pub fn build_group_by(input_schema: &SchemaRef, columns: Vec<String>) -> PhysicalGroupBy {
Expand Down
Loading

0 comments on commit f42c245

Please sign in to comment.