Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 24, 2023
1 parent 5a48f78 commit b18c1ad
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::compute::kernels::sort::SortOptions;
use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema};
use datafusion::datasource::file_format::json::JsonSink;
//use datafusion::datasource::file_format::csv::CsvSink;
use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
Expand All @@ -31,6 +33,7 @@ use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{
create_udf, BuiltinScalarFunction, JoinType, Operator, Volatility,
};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr};
use datafusion::physical_plan::aggregates::{
Expand Down Expand Up @@ -63,6 +66,7 @@ use datafusion::physical_plan::{
use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{FileTypeWriterOptions, Result};
Expand Down Expand Up @@ -755,6 +759,78 @@ fn roundtrip_json_sink() -> Result<()> {
)))
}

// #[test]
// fn roundtrip_csv_sink() -> Result<()> {
// let field_a = Field::new("plan_type", DataType::Utf8, false);
// let field_b = Field::new("plan", DataType::Utf8, false);
// let schema = Arc::new(Schema::new(vec![field_a, field_b]));
// let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
//
// let file_sink_config = FileSinkConfig {
// object_store_url: ObjectStoreUrl::local_filesystem(),
// file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
// table_paths: vec![ListingTableUrl::parse("file:///")?],
// output_schema: schema.clone(),
// table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
// single_file_output: true,
// overwrite: true,
// file_type_writer_options: FileTypeWriterOptions::CSV(CsvWriterOptions::new(
// CompressionTypeVariant::UNCOMPRESSED,
// )),
// };
// let data_sink = Arc::new(CsvSink::new(file_sink_config));
// let sort_order = vec![PhysicalSortRequirement::new(
// Arc::new(Column::new("plan_type", 0)),
// Some(SortOptions {
// descending: true,
// nulls_first: false,
// }),
// )];
//
// roundtrip_test(Arc::new(FileSinkExec::new(
// input,
// data_sink,
// schema.clone(),
// Some(sort_order),
// )))
// }

#[test]
fn roundtrip_parquet_sink() -> Result<()> {
let field_a = Field::new("plan_type", DataType::Utf8, false);
let field_b = Field::new("plan", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));

let file_sink_config = FileSinkConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
single_file_output: true,
overwrite: true,
file_type_writer_options: FileTypeWriterOptions::Parquet(
ParquetWriterOptions::new(WriterProperties::default()),
),
};
let data_sink = Arc::new(ParquetSink::new(file_sink_config));
let sort_order = vec![PhysicalSortRequirement::new(
Arc::new(Column::new("plan_type", 0)),
Some(SortOptions {
descending: true,
nulls_first: false,
}),
)];

roundtrip_test(Arc::new(FileSinkExec::new(
input,
data_sink,
schema.clone(),
Some(sort_order),
)))
}

#[test]
fn roundtrip_sym_hash_join() -> Result<()> {
let field_a = Field::new("col", DataType::Int64, false);
Expand Down

0 comments on commit b18c1ad

Please sign in to comment.