From 148b4d22f1d6de2bc6269ab96ba2e48e4735a9f9 Mon Sep 17 00:00:00 2001 From: Hoang Pham Date: Sun, 25 Feb 2024 20:36:56 +0700 Subject: [PATCH] Support CopyTo::partition_by in datafusion proto (#9306) * add support for CopyTo::partition_by in proto Signed-off-by: Hoang Pham * simplify partition_by logic Signed-off-by: Hoang Pham --------- Signed-off-by: Hoang Pham --- datafusion/expr/src/logical_plan/dml.rs | 2 +- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 18 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 2 ++ datafusion/proto/src/logical_plan/mod.rs | 6 ++++-- .../tests/cases/roundtrip_logical_plan.rs | 11 +++++++---- 6 files changed, 33 insertions(+), 7 deletions(-) diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index a55781eda643..7f04bd8973d6 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -36,7 +36,7 @@ pub struct CopyTo { pub output_url: String, /// The file format to output (explicitly defined or inferred from file extension) pub file_format: FileType, - /// Detmines which, if any, columns should be used for hive-style partitioned writes + /// Determines which, if any, columns should be used for hive-style partitioned writes pub partition_by: Vec, /// Arbitrary options as tuples pub copy_options: CopyOptions, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e779e29cb8da..7673ce86ae1d 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -327,6 +327,7 @@ message CopyToNode { FileTypeWriterOptions writer_options = 5; } string file_type = 6; + repeated string partition_by = 7; } message SQLOptions { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index f5f15aa3e428..65483f9ac467 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3795,6 +3795,9 @@ impl serde::Serialize for CopyToNode { if !self.file_type.is_empty() { len += 1; } + if !self.partition_by.is_empty() { + len += 1; + } if self.copy_options.is_some() { len += 1; } @@ -3808,6 +3811,9 @@ impl serde::Serialize for CopyToNode { if !self.file_type.is_empty() { struct_ser.serialize_field("fileType", &self.file_type)?; } + if !self.partition_by.is_empty() { + struct_ser.serialize_field("partitionBy", &self.partition_by)?; + } if let Some(v) = self.copy_options.as_ref() { match v { copy_to_node::CopyOptions::SqlOptions(v) => { @@ -3833,6 +3839,8 @@ impl<'de> serde::Deserialize<'de> for CopyToNode { "outputUrl", "file_type", "fileType", + "partition_by", + "partitionBy", "sql_options", "sqlOptions", "writer_options", @@ -3844,6 +3852,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode { Input, OutputUrl, FileType, + PartitionBy, SqlOptions, WriterOptions, } @@ -3870,6 +3879,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode { "input" => Ok(GeneratedField::Input), "outputUrl" | "output_url" => Ok(GeneratedField::OutputUrl), "fileType" | "file_type" => Ok(GeneratedField::FileType), + "partitionBy" | "partition_by" => Ok(GeneratedField::PartitionBy), "sqlOptions" | "sql_options" => Ok(GeneratedField::SqlOptions), "writerOptions" | "writer_options" => Ok(GeneratedField::WriterOptions), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3894,6 +3904,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode { let mut input__ = None; let mut output_url__ = None; let mut file_type__ = None; + let mut partition_by__ = None; let mut copy_options__ = None; while let Some(k) = map_.next_key()? { match k { @@ -3915,6 +3926,12 @@ impl<'de> serde::Deserialize<'de> for CopyToNode { } file_type__ = Some(map_.next_value()?); } + GeneratedField::PartitionBy => { + if partition_by__.is_some() { + return Err(serde::de::Error::duplicate_field("partitionBy")); + } + partition_by__ = Some(map_.next_value()?); + } GeneratedField::SqlOptions => { if copy_options__.is_some() { return Err(serde::de::Error::duplicate_field("sqlOptions")); @@ -3935,6 +3952,7 @@ impl<'de> serde::Deserialize<'de> for CopyToNode { input: input__, output_url: output_url__.unwrap_or_default(), file_type: file_type__.unwrap_or_default(), + partition_by: partition_by__.unwrap_or_default(), copy_options: copy_options__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 69d035239cb8..a567269e3356 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -511,6 +511,8 @@ pub struct CopyToNode { pub output_url: ::prost::alloc::string::String, #[prost(string, tag = "6")] pub file_type: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "7")] + pub partition_by: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(oneof = "copy_to_node::CopyOptions", tags = "4, 5")] pub copy_options: ::core::option::Option, } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index aaaf165e3276..f107af757a71 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -913,12 +913,13 @@ impl AsLogicalPlan for LogicalPlanNode { } None => return Err(proto_error("CopyTo missing CopyOptions")), }; + Ok(datafusion_expr::LogicalPlan::Copy( datafusion_expr::dml::CopyTo { input: Arc::new(input), output_url: copy.output_url.clone(), file_format: FileType::from_str(©.file_type)?, - partition_by: vec![], + partition_by: copy.partition_by.clone(), copy_options, }, )) @@ -1642,7 +1643,7 @@ impl AsLogicalPlan for LogicalPlanNode { output_url, file_format, copy_options, - partition_by: _, + partition_by, }) => { let input = protobuf::LogicalPlanNode::try_from_logical_plan( input, @@ -1726,6 +1727,7 @@ impl AsLogicalPlan for LogicalPlanNode { output_url: output_url.to_string(), file_type: file_format.to_string(), copy_options: copy_options_proto, + partition_by: partition_by.clone(), }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 6ca757908159..e3bd2cb1dc47 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -324,7 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> { input: Arc::new(input), output_url: "test.csv".to_string(), file_format: FileType::CSV, - partition_by: vec![], + partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()], copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)), }); @@ -355,7 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { input: Arc::new(input), output_url: "test.parquet".to_string(), file_format: FileType::PARQUET, - partition_by: vec![], + partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()], copy_options: CopyOptions::WriterOptions(Box::new( FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)), )), @@ -369,6 +369,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> { LogicalPlan::Copy(copy_to) => { assert_eq!("test.parquet", copy_to.output_url); assert_eq!(FileType::PARQUET, copy_to.file_format); + assert_eq!(vec!["a", "b", "c"], copy_to.partition_by); match ©_to.copy_options { CopyOptions::WriterOptions(y) => match y.as_ref() { FileTypeWriterOptions::Parquet(p) => { @@ -404,7 +405,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> { input: Arc::new(input), output_url: "test.arrow".to_string(), file_format: FileType::ARROW, - partition_by: vec![], + partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()], copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow( ArrowWriterOptions::new(), ))), @@ -418,6 +419,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> { LogicalPlan::Copy(copy_to) => { assert_eq!("test.arrow", copy_to.output_url); assert_eq!(FileType::ARROW, copy_to.file_format); + assert_eq!(vec!["a", "b", "c"], copy_to.partition_by); match ©_to.copy_options { CopyOptions::WriterOptions(y) => match y.as_ref() { FileTypeWriterOptions::Arrow(_) => {} @@ -450,7 +452,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { input: Arc::new(input), output_url: "test.csv".to_string(), file_format: FileType::CSV, - partition_by: vec![], + partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()], copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV( CsvWriterOptions::new( writer_properties, @@ -467,6 +469,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> { LogicalPlan::Copy(copy_to) => { assert_eq!("test.csv", copy_to.output_url); assert_eq!(FileType::CSV, copy_to.file_format); + assert_eq!(vec!["a", "b", "c"], copy_to.partition_by); match ©_to.copy_options { CopyOptions::WriterOptions(y) => match y.as_ref() { FileTypeWriterOptions::CSV(p) => {