Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move CBOs and Statistics to physical plan #965

Merged
merged 25 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e8c8a7f
moved statistics method from logical to exec plan
rdettai Sep 2, 2021
6dbb778
[feat] make statistics async
rdettai Sep 3, 2021
2e3353e
[feat] fix tests with partial implem of AggregateStatistics optimizer…
rdettai Sep 3, 2021
305f9ca
[lint] cargo fmt all
rdettai Sep 3, 2021
3bd0498
[fix] better structure for optimizer implem
rdettai Sep 6, 2021
de3d90a
[test] add tests for aggregate_statistics optim
rdettai Sep 6, 2021
184b35f
[feat] add back min max stat optim
rdettai Sep 7, 2021
9131404
[feat] add back hash_build_probe_order optim
rdettai Sep 7, 2021
c84afbc
[fix] align on new compound name rule
rdettai Sep 7, 2021
d7845fa
[test] unit hash_build_probe_order optim
rdettai Sep 8, 2021
e2c16c6
[test] union statistics compute
rdettai Sep 8, 2021
3571997
[test] stats for record batch helper
rdettai Sep 8, 2021
f909245
[test] statistics column projection
rdettai Sep 8, 2021
83d9a19
[test] stat computing for various plans
rdettai Sep 8, 2021
5175ca8
[test] adding some integ tests for stats
rdettai Sep 9, 2021
bc9e02b
[test] sanity check for window expr
rdettai Sep 9, 2021
6847aef
[test] window stat check len
rdettai Sep 9, 2021
5b0c445
[fix] suggestion about bool arithmetic
rdettai Sep 9, 2021
b95f5b6
[fix] should never commit from github
rdettai Sep 9, 2021
0ca5824
[review] for loop to iterator
rdettai Sep 10, 2021
8854a63
[test] unit test for join optim
rdettai Sep 10, 2021
8d884fa
[fix] updated comments according to PR review
rdettai Sep 12, 2021
61f9f63
[fix] doc comments according to review hints
rdettai Sep 12, 2021
9d4ebd5
[test] adding back previously asserted cases
rdettai Sep 12, 2021
b37111e
[fix] running aggregate and join CBOs first
rdettai Sep 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ message Statistics {
int64 num_rows = 1;
int64 total_byte_size = 2;
repeated ColumnStats column_stats = 3;
bool is_exact = 4;
}

message PartitionedFile {
Expand Down
10 changes: 1 addition & 9 deletions ballista/rust/core/src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{any::Any, sync::Arc};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::Result as DFResult;
use datafusion::{
datasource::{datasource::Statistics, TableProvider},
datasource::TableProvider,
logical_plan::{Expr, LogicalPlan},
physical_plan::ExecutionPlan,
};
Expand Down Expand Up @@ -61,12 +61,4 @@ impl TableProvider for DfTableAdapter {
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(self.plan.clone())
}

fn statistics(&self) -> Statistics {
Statistics {
num_rows: None,
total_byte_size: None,
column_statistics: None,
}
}
}
8 changes: 7 additions & 1 deletion ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
SendableRecordBatchStream, Statistics,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -203,6 +203,12 @@ impl ExecutionPlan for DistributedQueryExec {
}
}
}

fn statistics(&self) -> Statistics {
// We cannot infer the statistics until the logical plan
// is converted to a physical plan.
Statistics::default()
}
}

async fn fetch_partition(
Expand Down
111 changes: 109 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{any::Any, pin::Pin};

use crate::client::BallistaClient;
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};

use crate::utils::WrappedStream;
use async_trait::async_trait;
Expand All @@ -31,7 +31,9 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Metric, Partitioning};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Metric, Partitioning, Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
Expand Down Expand Up @@ -156,6 +158,38 @@ impl ExecutionPlan for ShuffleReaderExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
stats_for_partitions(
self.partition
.iter()
.flatten()
.map(|loc| loc.partition_stats),
)
}
}

fn stats_for_partitions(
partition_stats: impl Iterator<Item = PartitionStats>,
) -> Statistics {
// TODO stats: add column statistics to PartitionStats
partition_stats.fold(
Statistics {
is_exact: true,
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: None,
},
|mut acc, part| {
// if any statistic is unkown it makes the entire statistic unkown
acc.num_rows = acc.num_rows.zip(part.num_rows).map(|(a, b)| a + b as usize);
acc.total_byte_size = acc
.total_byte_size
.zip(part.num_bytes)
.map(|(a, b)| a + b as usize);
acc
},
)
}

async fn fetch_partition(
Expand All @@ -177,3 +211,76 @@ async fn fetch_partition(
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_stats_for_partitions_empty() {
let result = stats_for_partitions(std::iter::empty());

let exptected = Statistics {
is_exact: true,
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: None,
};

assert_eq!(result, exptected);
}

#[tokio::test]
async fn test_stats_for_partitions_full() {
let part_stats = vec![
PartitionStats {
num_rows: Some(10),
num_bytes: Some(84),
num_batches: Some(1),
},
PartitionStats {
num_rows: Some(4),
num_bytes: Some(65),
num_batches: None,
},
];

let result = stats_for_partitions(part_stats.into_iter());

let exptected = Statistics {
is_exact: true,
num_rows: Some(14),
total_byte_size: Some(149),
column_statistics: None,
};

assert_eq!(result, exptected);
}

#[tokio::test]
async fn test_stats_for_partitions_missing() {
let part_stats = vec![
PartitionStats {
num_rows: Some(10),
num_bytes: Some(84),
num_batches: Some(1),
},
PartitionStats {
num_rows: None,
num_bytes: None,
num_batches: None,
},
];

let result = stats_for_partitions(part_stats.into_iter());

let exptected = Statistics {
is_exact: true,
num_rows: None,
total_byte_size: None,
column_statistics: None,
};

assert_eq!(result, exptected);
}
}
6 changes: 5 additions & 1 deletion ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion::physical_plan::metrics::{
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::Partitioning::RoundRobinBatch;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream,
DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream, Statistics,
};
use futures::StreamExt;
use hashbrown::HashMap;
Expand Down Expand Up @@ -417,6 +417,10 @@ impl ExecutionPlan for ShuffleWriterExec {
}
}
}

fn statistics(&self) -> Statistics {
self.plan.statistics()
}
}

fn result_schema() -> SchemaRef {
Expand Down
12 changes: 11 additions & 1 deletion ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::serde::scheduler::PartitionLocation;

use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, Statistics,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
Expand Down Expand Up @@ -117,4 +119,12 @@ impl ExecutionPlan for UnresolvedShuffleExec {
}
}
}

fn statistics(&self) -> Statistics {
// We could try to fetch the statistics here from the shuffle writer,
// but it is much more valuable to optimize the plan once this
// nodes has been replaced by the actual ShuffleReaderExec which will
// have more accurate statistics from its input partitions.
Statistics::default()
}
}
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ impl TryInto<Statistics> for &protobuf::Statistics {
num_rows: Some(self.num_rows as usize),
total_byte_size: Some(self.total_byte_size as usize),
column_statistics: Some(column_statistics),
is_exact: self.is_exact,
})
}
}
Expand Down Expand Up @@ -1177,8 +1178,7 @@ impl TryInto<Field> for &protobuf::Field {
}

use crate::serde::protobuf::ColumnStats;
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::physical_plan::{aggregates, windows};
use datafusion::physical_plan::{aggregates, windows, ColumnStatistics, Statistics};
use datafusion::prelude::{
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
sha384, sha512, trim, upper,
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::serde::{protobuf, BallistaError};
use datafusion::arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
};
use datafusion::datasource::datasource::{ColumnStatistics, Statistics};
use datafusion::datasource::{CsvFile, PartitionedFile, TableDescriptor};
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Expand All @@ -36,6 +35,7 @@ use datafusion::physical_plan::functions::BuiltinScalarFunction;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
use datafusion::physical_plan::{ColumnStatistics, Statistics};
use datafusion::{datasource::parquet::ParquetTable, logical_plan::exprlist_to_fields};
use protobuf::{
arrow_type, logical_expr_node::ExprType, scalar_type, DateUnit, PrimitiveScalarType,
Expand Down Expand Up @@ -278,6 +278,7 @@ impl From<&Statistics> for protobuf::Statistics {
num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
column_stats,
is_exact: s.is_exact,
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::datasource::Statistics;
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::datasource::FilePartition;
use datafusion::execution::context::{
Expand Down Expand Up @@ -74,7 +73,9 @@ use datafusion::physical_plan::{
sort::{SortExec, SortOptions},
Partitioning,
};
use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, Statistics, WindowExpr,
};
use datafusion::prelude::CsvReadOptions;
use log::debug;
use protobuf::physical_expr_node::ExprType;
Expand Down
6 changes: 5 additions & 1 deletion ballista/rust/executor/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::arrow::{
};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion::{error::Result, physical_plan::RecordBatchStream};
use futures::stream::SelectAll;
Expand Down Expand Up @@ -116,6 +116,10 @@ impl ExecutionPlan for CollectExec {
}
}
}

fn statistics(&self) -> Statistics {
self.plan.statistics()
}
}

struct MergedRecordBatchStream {
Expand Down
9 changes: 0 additions & 9 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use std::io::{Read, Seek};
use std::string::String;
use std::sync::{Arc, Mutex};

use crate::datasource::datasource::Statistics;
use crate::datasource::{Source, TableProvider};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::Expr;
Expand All @@ -54,7 +53,6 @@ pub struct CsvFile {
has_header: bool,
delimiter: u8,
file_extension: String,
statistics: Statistics,
}

impl CsvFile {
Expand Down Expand Up @@ -82,7 +80,6 @@ impl CsvFile {
has_header: options.has_header,
delimiter: options.delimiter,
file_extension: String::from(options.file_extension),
statistics: Statistics::default(),
})
}

Expand All @@ -105,7 +102,6 @@ impl CsvFile {
schema,
has_header: options.has_header,
delimiter: options.delimiter,
statistics: Statistics::default(),
file_extension: String::new(),
})
}
Expand Down Expand Up @@ -133,7 +129,6 @@ impl CsvFile {
schema,
has_header: options.has_header,
delimiter: options.delimiter,
statistics: Statistics::default(),
file_extension: String::new(),
})
}
Expand Down Expand Up @@ -210,10 +205,6 @@ impl TableProvider for CsvFile {
};
Ok(Arc::new(exec))
}

fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}

#[cfg(test)]
Expand Down
Loading