From d02043ccd1ee5c45ca7355083e2d746a1a5b1354 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 29 Dec 2023 10:55:41 -0700 Subject: [PATCH] add compression type to csv serde --- datafusion/proto/proto/datafusion.proto | 16 +++++++++------- datafusion/proto/src/generated/pbjson.rs | 19 +++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 17 ++++++++++------- datafusion/proto/src/logical_plan/mod.rs | 9 +++++++-- .../proto/src/physical_plan/to_proto.rs | 9 +++------ 5 files changed, 48 insertions(+), 22 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index e2a47330ff1c..d5f8397aa30c 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1222,20 +1222,22 @@ message ParquetWriterOptions { } message CsvWriterOptions { + // Compression type + CompressionTypeVariant compression = 1; // Optional column delimiter. Defaults to `b','` - string delimiter = 1; + string delimiter = 2; // Whether to write column names as file headers. Defaults to `true` - bool has_header = 2; + bool has_header = 3; // Optional date format for date arrays - string date_format = 3; + string date_format = 4; // Optional datetime format for datetime arrays - string datetime_format = 4; + string datetime_format = 5; // Optional timestamp format for timestamp arrays - string timestamp_format = 5; + string timestamp_format = 6; // Optional time format for time arrays - string time_format = 6; + string time_format = 7; // Optional value to represent null - string null_value = 7; + string null_value = 8; } message WriterProperties { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cfba929ebfe2..12e834d75adf 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -5394,6 +5394,9 @@ impl serde::Serialize for CsvWriterOptions { { use serde::ser::SerializeStruct; let mut len = 0; + if self.compression != 0 { + len += 1; + } if !self.delimiter.is_empty() { len += 1; } @@ -5416,6 +5419,11 @@ impl serde::Serialize for CsvWriterOptions { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.CsvWriterOptions", len)?; + if self.compression != 0 { + let v = CompressionTypeVariant::try_from(self.compression) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.compression)))?; + struct_ser.serialize_field("compression", &v)?; + } if !self.delimiter.is_empty() { struct_ser.serialize_field("delimiter", &self.delimiter)?; } @@ -5447,6 +5455,7 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "compression", "delimiter", "has_header", "hasHeader", @@ -5464,6 +5473,7 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { #[allow(clippy::enum_variant_names)] enum GeneratedField { + Compression, Delimiter, HasHeader, DateFormat, @@ -5492,6 +5502,7 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { E: serde::de::Error, { match value { + "compression" => Ok(GeneratedField::Compression), "delimiter" => Ok(GeneratedField::Delimiter), "hasHeader" | "has_header" => Ok(GeneratedField::HasHeader), "dateFormat" | "date_format" => Ok(GeneratedField::DateFormat), @@ -5518,6 +5529,7 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { where V: serde::de::MapAccess<'de>, { + let mut compression__ = None; let mut delimiter__ = None; let mut has_header__ = None; let mut date_format__ = None; @@ -5527,6 +5539,12 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { let mut null_value__ = None; while let Some(k) = map_.next_key()? { match k { + GeneratedField::Compression => { + if compression__.is_some() { + return Err(serde::de::Error::duplicate_field("compression")); + } + compression__ = Some(map_.next_value::()? as i32); + } GeneratedField::Delimiter => { if delimiter__.is_some() { return Err(serde::de::Error::duplicate_field("delimiter")); @@ -5572,6 +5590,7 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { } } Ok(CsvWriterOptions { + compression: compression__.unwrap_or_default(), delimiter: delimiter__.unwrap_or_default(), has_header: has_header__.unwrap_or_default(), date_format: date_format__.unwrap_or_default(), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 819e2e2e73f6..4ee0b70325ca 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1677,26 +1677,29 @@ pub struct ParquetWriterOptions { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CsvWriterOptions { + /// Compression type + #[prost(enumeration = "CompressionTypeVariant", tag = "1")] + pub compression: i32, /// Optional column delimiter. Defaults to `b','` - #[prost(string, tag = "1")] + #[prost(string, tag = "2")] pub delimiter: ::prost::alloc::string::String, /// Whether to write column names as file headers. Defaults to `true` - #[prost(bool, tag = "2")] + #[prost(bool, tag = "3")] pub has_header: bool, /// Optional date format for date arrays - #[prost(string, tag = "3")] + #[prost(string, tag = "4")] pub date_format: ::prost::alloc::string::String, /// Optional datetime format for datetime arrays - #[prost(string, tag = "4")] + #[prost(string, tag = "5")] pub datetime_format: ::prost::alloc::string::String, /// Optional timestamp format for timestamp arrays - #[prost(string, tag = "5")] + #[prost(string, tag = "6")] pub timestamp_format: ::prost::alloc::string::String, /// Optional time format for time arrays - #[prost(string, tag = "6")] + #[prost(string, tag = "7")] pub time_format: ::prost::alloc::string::String, /// Optional value to represent null - #[prost(string, tag = "7")] + #[prost(string, tag = "8")] pub null_value: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 966733f4b392..b4662910f7bd 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1648,8 +1648,10 @@ impl AsLogicalPlan for LogicalPlanNode { match opt.as_ref() { FileTypeWriterOptions::CSV(csv_opts) => { let csv_options = &csv_opts.writer_options; - let csv_writer_options = - csv_writer_options_to_proto(csv_options); + let csv_writer_options = csv_writer_options_to_proto( + csv_options, + &CompressionTypeVariant::UNCOMPRESSED, + ); let csv_options = file_type_writer_options::FileType::CsvOptions( csv_writer_options, @@ -1706,8 +1708,11 @@ impl AsLogicalPlan for LogicalPlanNode { pub(crate) fn csv_writer_options_to_proto( csv_options: &WriterBuilder, + compression: &CompressionTypeVariant, ) -> protobuf::CsvWriterOptions { + let compression: protobuf::CompressionTypeVariant = compression.into(); protobuf::CsvWriterOptions { + compression: compression.into(), delimiter: (csv_options.delimiter() as char).to_string(), has_header: csv_options.header(), date_format: csv_options.date_format().unwrap_or("").to_owned(), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9239b2fe2348..f4e3f9e4dca7 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -908,12 +908,9 @@ impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions { FileTypeWriterOptions::CSV(CsvWriterOptions { writer_options, compression, - }) => { - let _compression: protobuf::CompressionTypeVariant = compression.into(); - protobuf::file_type_writer_options::FileType::CsvOptions( - csv_writer_options_to_proto(writer_options), - ) - } + }) => protobuf::file_type_writer_options::FileType::CsvOptions( + csv_writer_options_to_proto(writer_options, compression), + ), FileTypeWriterOptions::JSON(JsonWriterOptions { compression }) => { let compression: protobuf::CompressionTypeVariant = compression.into(); protobuf::file_type_writer_options::FileType::JsonOptions(