Skip to content

Commit

Permalink
Address reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 27, 2024
1 parent ace9815 commit 07a438d
Show file tree
Hide file tree
Showing 44 changed files with 58 additions and 10 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl CustomExec {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl ArrowExec {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
projected_output_ordering: &[LexOrdering],
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl AvroExec {
&self.base_config
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl CsvExec {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl NdJsonExec {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl ParquetExec {
Partitioning::UnknownPartitioning(file_config.file_groups.len())
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ pub(crate) mod tests {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
PlanPropertiesCache::new(
input.equivalence_properties().clone(), // Equivalence Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl OutputRequirementExec {
self.input.clone()
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
PlanPropertiesCache::new(
input.equivalence_properties().clone(), // Equivalence Properties
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,7 @@ mod tests {
Self { cache }
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl StatisticsExec {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::TableReference;
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use futures::Stream;
Expand All @@ -55,7 +56,6 @@ use tempfile::TempDir;
#[cfg(feature = "parquet")]
pub use datafusion_common::test_util::parquet_test_data;
pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};
use datafusion_physical_expr::EquivalenceProperties;

/// Scan an empty data source, mainly used in tests
pub fn scan_empty(
Expand Down Expand Up @@ -246,6 +246,7 @@ impl UnboundedExec {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
batch_produce: Option<usize>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl CustomExecutionPlan {
Self { projection, cache }
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl CustomPlan {
Self { batches, cache }
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl StatisticsValidation {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use datafusion::{
UserDefinedLogicalNodeCore,
},
optimizer::{optimize_children, OptimizerConfig, OptimizerRule},
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
Partitioning, PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream,
Expand All @@ -91,7 +92,6 @@ use datafusion::{
};

use async_trait::async_trait;
use datafusion_physical_expr::EquivalenceProperties;
use futures::{Stream, StreamExt};

/// Execute the specified sql and return the resulting record batches
Expand Down Expand Up @@ -421,6 +421,7 @@ impl TopKExec {
Self { input, k, cache }
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ impl AggregateExec {
true
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
Expand Down Expand Up @@ -1629,6 +1630,7 @@ mod tests {
Self { yield_first, cache }
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl AnalyzeExec {
&self.input
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
schema: SchemaRef,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl CoalesceBatchesExec {
self.target_batch_size
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
// The coalesce batches operator does not make any changes to the
// partitioning of its input.
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl CoalescePartitionsExec {
&self.input
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
// Coalescing partitions loses existing orderings:
let mut eq_properties = input.equivalence_properties().clone();
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl EmptyExec {
Partitioning::UnknownPartitioning(n_partitions)
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef, n_partitions: usize) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Self::output_partitioning_helper(n_partitions);
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl ExplainExec {
self.verbose
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
PlanPropertiesCache::new(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl FilterExec {
self.default_selectivity
}

/// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics.
fn statistics_helper(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -157,6 +158,7 @@ impl FilterExec {
})
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
predicate: &Arc<dyn PhysicalExpr>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl CrossJoinExec {
&self.right
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
check_join_is_valid, estimate_join_statistics, get_final_indices_from_bit_map,
need_produce_result_in_final, partitioned_join_output_partitioning,
BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMap, JoinHashMapOffset,
JoinHashMapType, JoinOn, StatefulStreamResult,
JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult,
},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
Expand Down Expand Up @@ -65,7 +65,6 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::PhysicalExprRef;

use crate::joins::utils::JoinOnRef;
use ahash::RandomState;
use futures::{ready, Stream, StreamExt, TryStreamExt};

Expand Down Expand Up @@ -406,6 +405,7 @@ impl HashJoinExec {
JoinSide::Right
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl NestedLoopJoinExec {
&self.join_type
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl SortMergeJoinExec {
self.left.as_ref()
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ impl SymmetricHashJoinExec {
})
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl GlobalLimitExec {
self.fetch
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
PlanPropertiesCache::new(
input.equivalence_properties().clone(), // Equivalence Properties
Expand Down Expand Up @@ -292,6 +293,7 @@ impl LocalLimitExec {
self.fetch
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
PlanPropertiesCache::new(
input.equivalence_properties().clone(), // Equivalence Properties
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl MemoryExec {
self.schema.clone()
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/placeholder_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl PlaceholderRowExec {
Partitioning::UnknownPartitioning(n_partitions)
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef, n_partitions: usize) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);
// Get output partitioning:
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl ProjectionExec {
&self.input
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
projection_mapping: &ProjectionMapping,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl RecursiveQueryExec {
})
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(schema: SchemaRef) -> PlanPropertiesCache {
let eq_properties = EquivalenceProperties::new(schema);

Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,8 @@ impl RepartitionExec {
input: &Arc<dyn ExecutionPlan>,
preserve_order: bool,
) -> Vec<bool> {
if preserve_order {
vec![true]
} else {
// We preserve ordering when input partitioning is 1
vec![input.output_partitioning().partition_count() <= 1]
}
// We preserve ordering when repartition is order preserving variant or input partitioning is 1
vec![preserve_order || input.output_partitioning().partition_count() <= 1]
}

fn eq_properties_helper(
Expand All @@ -637,6 +633,7 @@ impl RepartitionExec {
eq_properties
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl PartialSortExec {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ impl SortExec {
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
input: &Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl SortPreservingMergeExec {
self.fetch
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(input: &Arc<dyn ExecutionPlan>) -> PlanPropertiesCache {
PlanPropertiesCache::new(
input.equivalence_properties().clone(), // Equivalence Properties
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl StreamingTableExec {
self.infinite
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn create_cache(
schema: SchemaRef,
orderings: &[LexOrdering],
Expand Down
Loading

0 comments on commit 07a438d

Please sign in to comment.