Skip to content

Commit

Permalink
Support CopyTo::partition_by in datafusion proto (apache#9306)
Browse files Browse the repository at this point in the history
* add support for CopyTo::partition_by in proto

Signed-off-by: Hoang Pham <[email protected]>

* simplify partition_by logic

Signed-off-by: Hoang Pham <[email protected]>

---------

Signed-off-by: Hoang Pham <[email protected]>
  • Loading branch information
PhVHoang authored Feb 25, 2024
1 parent ff36f6d commit 148b4d2
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 7 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct CopyTo {
pub output_url: String,
/// The file format to output (explicitly defined or inferred from file extension)
pub file_format: FileType,
/// Detmines which, if any, columns should be used for hive-style partitioned writes
/// Determines which, if any, columns should be used for hive-style partitioned writes
pub partition_by: Vec<String>,
/// Arbitrary options as tuples
pub copy_options: CopyOptions,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ message CopyToNode {
FileTypeWriterOptions writer_options = 5;
}
string file_type = 6;
repeated string partition_by = 7;
}

message SQLOptions {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,12 +913,13 @@ impl AsLogicalPlan for LogicalPlanNode {
}
None => return Err(proto_error("CopyTo missing CopyOptions")),
};

Ok(datafusion_expr::LogicalPlan::Copy(
datafusion_expr::dml::CopyTo {
input: Arc::new(input),
output_url: copy.output_url.clone(),
file_format: FileType::from_str(&copy.file_type)?,
partition_by: vec![],
partition_by: copy.partition_by.clone(),
copy_options,
},
))
Expand Down Expand Up @@ -1642,7 +1643,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
partition_by: _,
partition_by,
}) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
input,
Expand Down Expand Up @@ -1726,6 +1727,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url: output_url.to_string(),
file_type: file_format.to_string(),
copy_options: copy_options_proto,
partition_by: partition_by.clone(),
},
))),
})
Expand Down
11 changes: 7 additions & 4 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
partition_by: vec![],
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)),
});

Expand Down Expand Up @@ -355,7 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.parquet".to_string(),
file_format: FileType::PARQUET,
partition_by: vec![],
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
copy_options: CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
)),
Expand All @@ -369,6 +369,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
LogicalPlan::Copy(copy_to) => {
assert_eq!("test.parquet", copy_to.output_url);
assert_eq!(FileType::PARQUET, copy_to.file_format);
assert_eq!(vec!["a", "b", "c"], copy_to.partition_by);
match &copy_to.copy_options {
CopyOptions::WriterOptions(y) => match y.as_ref() {
FileTypeWriterOptions::Parquet(p) => {
Expand Down Expand Up @@ -404,7 +405,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
file_format: FileType::ARROW,
partition_by: vec![],
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
))),
Expand All @@ -418,6 +419,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
LogicalPlan::Copy(copy_to) => {
assert_eq!("test.arrow", copy_to.output_url);
assert_eq!(FileType::ARROW, copy_to.file_format);
assert_eq!(vec!["a", "b", "c"], copy_to.partition_by);
match &copy_to.copy_options {
CopyOptions::WriterOptions(y) => match y.as_ref() {
FileTypeWriterOptions::Arrow(_) => {}
Expand Down Expand Up @@ -450,7 +452,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
partition_by: vec![],
partition_by: vec!["a".to_string(), "b".to_string(), "c".to_string()],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV(
CsvWriterOptions::new(
writer_properties,
Expand All @@ -467,6 +469,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
LogicalPlan::Copy(copy_to) => {
assert_eq!("test.csv", copy_to.output_url);
assert_eq!(FileType::CSV, copy_to.file_format);
assert_eq!(vec!["a", "b", "c"], copy_to.partition_by);
match &copy_to.copy_options {
CopyOptions::WriterOptions(y) => match y.as_ref() {
FileTypeWriterOptions::CSV(p) => {
Expand Down

0 comments on commit 148b4d2

Please sign in to comment.