Skip to content

Commit

Permalink
Add serde for Csv and Parquet sink
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 24, 2023
1 parent bf43bb2 commit 5a48f78
Show file tree
Hide file tree
Showing 8 changed files with 689 additions and 5 deletions.
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ impl BatchSerializer for CsvSerializer {
}

/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
pub struct CsvSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand All @@ -474,9 +474,16 @@ impl DisplayAs for CsvSink {
}

impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ async fn fetch_statistics(
}

/// Implements [`DataSink`] for writing to a parquet file.
struct ParquetSink {
pub struct ParquetSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand All @@ -635,10 +635,15 @@ impl DisplayAs for ParquetSink {
}

impl ParquetSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}
/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
Expand Down
24 changes: 24 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ message PhysicalPlanNode {
SymmetricHashJoinExecNode symmetric_hash_join = 25;
InterleaveExecNode interleave = 26;
PlaceholderRowExecNode placeholder_row = 27;
CsvSinkExecNode csv_sink = 28;
ParquetSinkExecNode parquet_sink = 29;
}
}

Expand Down Expand Up @@ -1252,6 +1254,28 @@ message JsonSinkExecNode {
PhysicalSortExprNodeCollection sort_order = 4;
}

message CsvSink {
FileSinkConfig config = 1;
}

message CsvSinkExecNode {
PhysicalPlanNode input = 1;
CsvSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message ParquetSink {
FileSinkConfig config = 1;
}

message ParquetSinkExecNode {
PhysicalPlanNode input = 1;
ParquetSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message PhysicalExtensionNode {
bytes node = 1;
repeated PhysicalPlanNode inputs = 2;
Expand Down
Loading

0 comments on commit 5a48f78

Please sign in to comment.