From d44c7f2ac31100c3d66685336813e195edc4357f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Tue, 25 Jun 2024 13:59:50 +0800 Subject: [PATCH] Add composed extension codec example (#11095) * Add composed extension codec example * asf header * Update datafusion-examples/README.md Co-authored-by: Andrew Lamb * Update datafusion-examples/examples/composed_extension_codec.rs Co-authored-by: Andrew Lamb * Update datafusion-examples/examples/composed_extension_codec.rs Co-authored-by: Andrew Lamb * Update datafusion-examples/examples/composed_extension_codec.rs Co-authored-by: Andrew Lamb * Move main on the top --------- Co-authored-by: Andrew Lamb --- datafusion-examples/Cargo.toml | 1 + datafusion-examples/README.md | 1 + .../examples/composed_extension_codec.rs | 291 ++++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 datafusion-examples/examples/composed_extension_codec.rs diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index c96aa7ae3951..52e3a5525717 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -64,6 +64,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-optimizer = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true, default-features = true } +datafusion-proto = { workspace = true } datafusion-sql = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index f7f98c1d2a99..f78be6a398b5 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -48,6 +48,7 @@ cargo run --example csv_sql - [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a detailed secondary index that covers the contents of several parquet files - [`avro_sql.rs`](examples/avro_sql.rs): Build and run a query plan from a SQL statement against a local AVRO file - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog +- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization - [`csv_sql.rs`](examples/csv_sql.rs): Build and run a query plan from a SQL statement against a local CSV file - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider) diff --git a/datafusion-examples/examples/composed_extension_codec.rs b/datafusion-examples/examples/composed_extension_codec.rs new file mode 100644 index 000000000000..43c6daba211a --- /dev/null +++ b/datafusion-examples/examples/composed_extension_codec.rs @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example demonstrates how to compose multiple PhysicalExtensionCodecs +//! +//! This can be helpful when an Execution plan tree has different nodes from different crates +//! that need to be serialized. +//! +//! For example if your plan has `ShuffleWriterExec` from `datafusion-ballista` and `DeltaScan` from `deltalake` +//! both crates both provide PhysicalExtensionCodec and this example shows how to combine them together +//! +//! ```text +//! ShuffleWriterExec +//! ProjectionExec +//! ... +//! DeltaScan +//! ``` + +use datafusion::common::Result; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan}; +use datafusion::prelude::SessionContext; +use datafusion_common::internal_err; +use datafusion_expr::registry::FunctionRegistry; +use datafusion_expr::ScalarUDF; +use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec}; +use datafusion_proto::protobuf; +use std::any::Any; +use std::fmt::Debug; +use std::ops::Deref; +use std::sync::Arc; + +#[tokio::main] +async fn main() { + // build execution plan that has both types of nodes + // + // Note each node requires a different `PhysicalExtensionCodec` to decode + let exec_plan = Arc::new(ParentExec { + input: Arc::new(ChildExec {}), + }); + let ctx = SessionContext::new(); + + let composed_codec = ComposedPhysicalExtensionCodec { + codecs: vec![ + Arc::new(ParentPhysicalExtensionCodec {}), + Arc::new(ChildPhysicalExtensionCodec {}), + ], + }; + + // serialize execution plan to proto + let proto: protobuf::PhysicalPlanNode = + protobuf::PhysicalPlanNode::try_from_physical_plan( + exec_plan.clone(), + &composed_codec, + ) + .expect("to proto"); + + // deserialize proto back to execution plan + let runtime = ctx.runtime_env(); + let result_exec_plan: Arc = proto + .try_into_physical_plan(&ctx, runtime.deref(), &composed_codec) + .expect("from proto"); + + // assert that the original and deserialized execution plans are equal + assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}")); +} + +/// This example has two types of nodes: `ParentExec` and `ChildExec` which can only +/// be serialized with different `PhysicalExtensionCodec`s +#[derive(Debug)] +struct ParentExec { + input: Arc, +} + +impl DisplayAs for ParentExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "ParentExec") + } +} + +impl ExecutionPlan for ParentExec { + fn name(&self) -> &str { + "ParentExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + unreachable!() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unreachable!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!() + } +} + +/// A PhysicalExtensionCodec that can serialize and deserialize ParentExec +#[derive(Debug)] +struct ParentPhysicalExtensionCodec; + +impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + _registry: &dyn FunctionRegistry, + ) -> Result> { + if buf == "ParentExec".as_bytes() { + Ok(Arc::new(ParentExec { + input: inputs[0].clone(), + })) + } else { + internal_err!("Not supported") + } + } + + fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + if node.as_any().downcast_ref::().is_some() { + buf.extend_from_slice("ParentExec".as_bytes()); + Ok(()) + } else { + internal_err!("Not supported") + } + } +} + +#[derive(Debug)] +struct ChildExec {} + +impl DisplayAs for ChildExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "ChildExec") + } +} + +impl ExecutionPlan for ChildExec { + fn name(&self) -> &str { + "ChildExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + unreachable!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unreachable!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unreachable!() + } +} + +/// A PhysicalExtensionCodec that can serialize and deserialize ChildExec +#[derive(Debug)] +struct ChildPhysicalExtensionCodec; + +impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { + fn try_decode( + &self, + buf: &[u8], + _inputs: &[Arc], + _registry: &dyn FunctionRegistry, + ) -> Result> { + if buf == "ChildExec".as_bytes() { + Ok(Arc::new(ChildExec {})) + } else { + internal_err!("Not supported") + } + } + + fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + if node.as_any().downcast_ref::().is_some() { + buf.extend_from_slice("ChildExec".as_bytes()); + Ok(()) + } else { + internal_err!("Not supported") + } + } +} + +/// A PhysicalExtensionCodec that tries one of multiple inner codecs +/// until one works +#[derive(Debug)] +struct ComposedPhysicalExtensionCodec { + codecs: Vec>, +} + +impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + registry: &dyn FunctionRegistry, + ) -> Result> { + let mut last_err = None; + for codec in &self.codecs { + match codec.try_decode(buf, inputs, registry) { + Ok(plan) => return Ok(plan), + Err(e) => last_err = Some(e), + } + } + Err(last_err.unwrap()) + } + + fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { + let mut last_err = None; + for codec in &self.codecs { + match codec.try_encode(node.clone(), buf) { + Ok(_) => return Ok(()), + Err(e) => last_err = Some(e), + } + } + Err(last_err.unwrap()) + } + + fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result> { + let mut last_err = None; + for codec in &self.codecs { + match codec.try_decode_udf(name, _buf) { + Ok(plan) => return Ok(plan), + Err(e) => last_err = Some(e), + } + } + Err(last_err.unwrap()) + } + + fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec) -> Result<()> { + let mut last_err = None; + for codec in &self.codecs { + match codec.try_encode_udf(_node, _buf) { + Ok(_) => return Ok(()), + Err(e) => last_err = Some(e), + } + } + Err(last_err.unwrap()) + } +}