From 9c12919786be0cfce5c4817101a378669ba002e2 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sun, 9 Feb 2025 05:55:53 -0500 Subject: [PATCH] Update proto to support to/from json with an extension codec (#14561) #14560 --- datafusion/proto/src/bytes/mod.rs | 35 ++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 12ddb4cb2e32..da01d89c0c3d 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -199,11 +199,7 @@ pub fn logical_plan_to_bytes(plan: &LogicalPlan) -> Result { #[cfg(feature = "json")] pub fn logical_plan_to_json(plan: &LogicalPlan) -> Result { let extension_codec = DefaultLogicalExtensionCodec {}; - let protobuf = - protobuf::LogicalPlanNode::try_from_logical_plan(plan, &extension_codec) - .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; - serde_json::to_string(&protobuf) - .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}")) + logical_plan_to_json_with_extension_codec(plan, &extension_codec) } /// Serialize a LogicalPlan as bytes, using the provided extension codec @@ -220,13 +216,24 @@ pub fn logical_plan_to_bytes_with_extension_codec( Ok(buffer.into()) } +/// Serialize a LogicalPlan as JSON using the provided extension codec +#[cfg(feature = "json")] +pub fn logical_plan_to_json_with_extension_codec( + plan: &LogicalPlan, + extension_codec: &dyn LogicalExtensionCodec, +) -> Result { + let protobuf = + protobuf::LogicalPlanNode::try_from_logical_plan(plan, extension_codec) + .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; + serde_json::to_string(&protobuf) + .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}")) +} + /// Deserialize a LogicalPlan from JSON #[cfg(feature = "json")] pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result { - let back: protobuf::LogicalPlanNode = serde_json::from_str(json) - .map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?; let extension_codec = DefaultLogicalExtensionCodec {}; - back.try_into_logical_plan(ctx, &extension_codec) + logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes @@ -249,6 +256,18 @@ pub fn logical_plan_from_bytes_with_extension_codec( protobuf.try_into_logical_plan(ctx, extension_codec) } +/// Deserialize a LogicalPlan from JSON +#[cfg(feature = "json")] +pub fn logical_plan_from_json_with_extension_codec( + json: &str, + ctx: &SessionContext, + extension_codec: &dyn LogicalExtensionCodec, +) -> Result { + let back: protobuf::LogicalPlanNode = serde_json::from_str(json) + .map_err(|e| plan_datafusion_err!("Error deserializing plan: {e}"))?; + back.try_into_logical_plan(ctx, extension_codec) +} + /// Serialize a PhysicalPlan as bytes pub fn physical_plan_to_bytes(plan: Arc) -> Result { let extension_codec = DefaultPhysicalExtensionCodec {};