diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 2eb04ab6cbab..5c14b198176e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -22,6 +22,8 @@ use datafusion::arrow::array::ArrayRef; use datafusion::arrow::compute::kernels::sort::SortOptions; use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, Schema}; use datafusion::datasource::file_format::json::JsonSink; +//use datafusion::datasource::file_format::csv::CsvSink; +use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ @@ -31,6 +33,7 @@ use datafusion::execution::context::ExecutionProps; use datafusion::logical_expr::{ create_udf, BuiltinScalarFunction, JoinType, Operator, Volatility, }; +use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_expr::window::SlidingAggregateWindowExpr; use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr}; use datafusion::physical_plan::aggregates::{ @@ -63,6 +66,7 @@ use datafusion::physical_plan::{ use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_common::file_options::json_writer::JsonWriterOptions; +use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{FileTypeWriterOptions, Result}; @@ -755,6 +759,78 @@ fn roundtrip_json_sink() -> Result<()> { ))) } +// #[test] +// fn roundtrip_csv_sink() -> Result<()> { +// let field_a = Field::new("plan_type", DataType::Utf8, false); +// let field_b = Field::new("plan", DataType::Utf8, false); +// let schema = Arc::new(Schema::new(vec![field_a, field_b])); +// let input = Arc::new(PlaceholderRowExec::new(schema.clone())); +// +// let file_sink_config = FileSinkConfig { +// object_store_url: ObjectStoreUrl::local_filesystem(), +// file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], +// table_paths: vec![ListingTableUrl::parse("file:///")?], +// output_schema: schema.clone(), +// table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], +// single_file_output: true, +// overwrite: true, +// file_type_writer_options: FileTypeWriterOptions::CSV(CsvWriterOptions::new( +// CompressionTypeVariant::UNCOMPRESSED, +// )), +// }; +// let data_sink = Arc::new(CsvSink::new(file_sink_config)); +// let sort_order = vec![PhysicalSortRequirement::new( +// Arc::new(Column::new("plan_type", 0)), +// Some(SortOptions { +// descending: true, +// nulls_first: false, +// }), +// )]; +// +// roundtrip_test(Arc::new(FileSinkExec::new( +// input, +// data_sink, +// schema.clone(), +// Some(sort_order), +// ))) +// } + +#[test] +fn roundtrip_parquet_sink() -> Result<()> { + let field_a = Field::new("plan_type", DataType::Utf8, false); + let field_b = Field::new("plan", DataType::Utf8, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let input = Arc::new(PlaceholderRowExec::new(schema.clone())); + + let file_sink_config = FileSinkConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], + table_paths: vec![ListingTableUrl::parse("file:///")?], + output_schema: schema.clone(), + table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)], + single_file_output: true, + overwrite: true, + file_type_writer_options: FileTypeWriterOptions::Parquet( + ParquetWriterOptions::new(WriterProperties::default()), + ), + }; + let data_sink = Arc::new(ParquetSink::new(file_sink_config)); + let sort_order = vec![PhysicalSortRequirement::new( + Arc::new(Column::new("plan_type", 0)), + Some(SortOptions { + descending: true, + nulls_first: false, + }), + )]; + + roundtrip_test(Arc::new(FileSinkExec::new( + input, + data_sink, + schema.clone(), + Some(sort_order), + ))) +} + #[test] fn roundtrip_sym_hash_join() -> Result<()> { let field_a = Field::new("col", DataType::Int64, false);