From 8e69aef9734cb1e69f0bdd25aade64b0e85096ad Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 2 Jun 2023 16:14:05 +0800 Subject: [PATCH] feat: serialize/deserialize support for PromQL plans (#1684) * implement serializer Signed-off-by: Ruihang Xia * fix clippy and CR comments Signed-off-by: Ruihang Xia * fix compile error Signed-off-by: Ruihang Xia * register registry Signed-off-by: Ruihang Xia * enable promql plan for dist planner Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 5 +- Cargo.toml | 1 + src/api/Cargo.toml | 2 +- src/meta-srv/src/service/router.rs | 2 +- src/promql/Cargo.toml | 2 + src/promql/src/error.rs | 9 +- src/promql/src/extension_plan/empty_metric.rs | 6 +- .../src/extension_plan/instant_manipulate.rs | 49 ++++++++- src/promql/src/extension_plan/normalize.rs | 37 ++++++- .../src/extension_plan/range_manipulate.rs | 42 ++++++- .../src/extension_plan/series_divide.rs | 34 +++++- src/query/Cargo.toml | 1 + src/query/src/dist_plan/commutativity.rs | 17 ++- src/query/src/extension_serializer.rs | 103 ++++++++++++++++++ src/query/src/lib.rs | 1 + src/query/src/query_engine/state.rs | 2 + .../standalone/{ => common}/tql/basic.result | 0 .../standalone/{ => common}/tql/basic.sql | 0 .../{ => common}/tql/literal_only.result | 0 .../{ => common}/tql/literal_only.sql | 0 20 files changed, 295 insertions(+), 18 deletions(-) create mode 100644 src/query/src/extension_serializer.rs rename tests/cases/standalone/{ => common}/tql/basic.result (100%) rename tests/cases/standalone/{ => common}/tql/basic.sql (100%) rename tests/cases/standalone/{ => common}/tql/literal_only.result (100%) rename tests/cases/standalone/{ => common}/tql/literal_only.sql (100%) diff --git a/Cargo.lock b/Cargo.lock index eb97b9395285..4fa808559b4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4054,7 +4054,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=44c5adf34938d0650c18a14db2a374bdee471ae7#44c5adf34938d0650c18a14db2a374bdee471ae7" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" dependencies = [ "prost", "serde", @@ -6686,7 +6686,9 @@ dependencies = [ "datafusion", "datatypes", "futures", + "greptime-proto", "promql-parser", + "prost", "query", "session", "snafu", @@ -6954,6 +6956,7 @@ dependencies = [ "format_num", "futures", "futures-util", + "greptime-proto", "humantime", "metrics", "num", diff --git a/Cargo.toml b/Cargo.toml index ac1654d77fc6..d4f669b3cae1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" } parquet = "40.0" paste = "1.0" prost = "0.11" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index fb9f7a08aba5..28780b2cddf0 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "44c5adf34938d0650c18a14db2a374bdee471ae7" } +greptime-proto.workspace = true prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index d6f4bb2f63e4..4dfa22714eb0 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -138,7 +138,7 @@ async fn handle_create( return Ok(RouteResponse { header: Some(ResponseHeader::failed( cluster_id, - Error::not_enough_available_datanodes(partitions.len(), peers.len()), + Error::not_enough_active_datanodes(peers.len() as _), )), ..Default::default() }); diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 5c2047dba826..c9df5fb37665 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -15,7 +15,9 @@ common-function-macro = { path = "../common/function-macro" } datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" +greptime-proto.workspace = true promql-parser = "0.1.1" +prost.workspace = true session = { path = "../session" } snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index b2d56048614b..360ab227cfd6 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -84,6 +84,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to deserialize: {}", source))] + Deserialize { + source: prost::DecodeError, + location: Location, + }, + #[snafu(display("Empty range is not expected, location: {}", location))] EmptyRange { location: Location }, @@ -120,7 +126,8 @@ impl ErrorExt for Error { | ExpectExpr { .. } | ExpectRangeSelector { .. } | ZeroRangeSelector { .. } - | ColumnNotFound { .. } => StatusCode::InvalidArguments, + | ColumnNotFound { .. } + | Deserialize { .. } => StatusCode::InvalidArguments, UnknownTable { .. } | DataFusionPlanning { .. } diff --git a/src/promql/src/extension_plan/empty_metric.rs b/src/promql/src/extension_plan/empty_metric.rs index 27e3d34063cb..dff1b4485261 100644 --- a/src/promql/src/extension_plan/empty_metric.rs +++ b/src/promql/src/extension_plan/empty_metric.rs @@ -84,6 +84,10 @@ impl EmptyMetric { }) } + pub const fn name() -> &'static str { + "EmptyMetric" + } + pub fn to_execution_plan( &self, session_state: &SessionState, @@ -110,7 +114,7 @@ impl EmptyMetric { impl UserDefinedLogicalNodeCore for EmptyMetric { fn name(&self) -> &str { - "EmptyMetric" + Self::name() } fn inputs(&self) -> Vec<&LogicalPlan> { diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 991b3832508f..9abec5001f1c 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -21,10 +21,10 @@ use std::task::{Context, Poll}; use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::DFSchemaRef; +use datafusion::common::{DFSchema, DFSchemaRef}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -34,7 +34,11 @@ use datafusion::physical_plan::{ use datatypes::arrow::compute; use datatypes::arrow::error::Result as ArrowResult; use futures::{Stream, StreamExt}; +use greptime_proto::substrait_extension as pb; +use prost::Message; +use snafu::ResultExt; +use crate::error::{DeserializeSnafu, Result}; use crate::extension_plan::Millisecond; /// Manipulate the input record batch to make it suitable for Instant Operator. @@ -56,7 +60,7 @@ pub struct InstantManipulate { impl UserDefinedLogicalNodeCore for InstantManipulate { fn name(&self) -> &str { - "InstantManipulate" + Self::name() } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -115,6 +119,10 @@ impl InstantManipulate { } } + pub const fn name() -> &'static str { + "InstantManipulate" + } + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { Arc::new(InstantManipulateExec { start: self.start, @@ -127,6 +135,41 @@ impl InstantManipulate { metric: ExecutionPlanMetricsSet::new(), }) } + + pub fn serialize(&self) -> Vec { + pb::InstantManipulate { + start: self.start, + end: self.end, + interval: self.interval, + lookback_delta: self.lookback_delta, + time_index: self.time_index_column.clone(), + field_index: self.field_column.clone().unwrap_or_default(), + } + .encode_to_vec() + } + + pub fn deserialize(bytes: &[u8]) -> Result { + let pb_instant_manipulate = + pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?; + let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + let field_column = if pb_instant_manipulate.field_index.is_empty() { + None + } else { + Some(pb_instant_manipulate.field_index) + }; + Ok(Self { + start: pb_instant_manipulate.start, + end: pb_instant_manipulate.end, + lookback_delta: pb_instant_manipulate.lookback_delta, + interval: pb_instant_manipulate.interval, + time_index_column: pb_instant_manipulate.time_index, + field_column, + input: placeholder_plan, + }) + } } #[derive(Debug)] diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 636afdfd6d4a..de575a7ce25f 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -19,10 +19,10 @@ use std::task::{Context, Poll}; use datafusion::arrow::array::{BooleanArray, Float64Array}; use datafusion::arrow::compute; -use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics}; +use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; use datafusion::error::DataFusionError; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -33,7 +33,11 @@ use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::error::Result as ArrowResult; use datatypes::arrow::record_batch::RecordBatch; use futures::{Stream, StreamExt}; +use greptime_proto::substrait_extension as pb; +use prost::Message; +use snafu::ResultExt; +use crate::error::{DeserializeSnafu, Result}; use crate::extension_plan::Millisecond; /// Normalize the input record batch. Notice that for simplicity, this method assumes @@ -54,7 +58,7 @@ pub struct SeriesNormalize { impl UserDefinedLogicalNodeCore for SeriesNormalize { fn name(&self) -> &str { - "SeriesNormalize" + Self::name() } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -104,6 +108,10 @@ impl SeriesNormalize { } } + pub const fn name() -> &'static str { + "SeriesNormalize" + } + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { Arc::new(SeriesNormalizeExec { offset: self.offset, @@ -113,6 +121,29 @@ impl SeriesNormalize { metric: ExecutionPlanMetricsSet::new(), }) } + + pub fn serialize(&self) -> Vec { + pb::SeriesNormalize { + offset: self.offset, + time_index: self.time_index_column_name.clone(), + filter_nan: self.need_filter_out_nan, + } + .encode_to_vec() + } + + pub fn deserialize(bytes: &[u8]) -> Result { + let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?; + let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + Ok(Self::new( + pb_normalize.offset, + pb_normalize.time_index, + pb_normalize.filter_nan, + placeholder_plan, + )) + } } #[derive(Debug)] diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index abf55a309ade..de56269e5e17 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -26,7 +26,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::{DFField, DFSchema, DFSchemaRef}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -35,7 +35,11 @@ use datafusion::physical_plan::{ }; use datafusion::sql::TableReference; use futures::{Stream, StreamExt}; +use greptime_proto::substrait_extension as pb; +use prost::Message; +use snafu::ResultExt; +use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result}; use crate::extension_plan::Millisecond; use crate::range_array::RangeArray; @@ -85,6 +89,10 @@ impl RangeManipulate { }) } + pub const fn name() -> &'static str { + "RangeManipulate" + } + pub fn build_timestamp_range_name(time_index: &str) -> String { format!("{time_index}_range") } @@ -145,11 +153,41 @@ impl RangeManipulate { metric: ExecutionPlanMetricsSet::new(), }) } + + pub fn serialize(&self) -> Vec { + pb::RangeManipulate { + start: self.start, + end: self.end, + interval: self.interval, + range: self.range, + time_index: self.time_index.clone(), + tag_columns: self.field_columns.clone(), + } + .encode_to_vec() + } + + pub fn deserialize(bytes: &[u8]) -> Result { + let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?; + let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + Self::new( + pb_range_manipulate.start, + pb_range_manipulate.end, + pb_range_manipulate.interval, + pb_range_manipulate.range, + pb_range_manipulate.time_index, + pb_range_manipulate.tag_columns, + placeholder_plan, + ) + .context(DataFusionPlanningSnafu) + } } impl UserDefinedLogicalNodeCore for RangeManipulate { fn name(&self) -> &str { - "RangeManipulate" + Self::name() } fn inputs(&self) -> Vec<&LogicalPlan> { diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 55c2916b15e6..da5d3cd4ddc2 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -20,10 +20,10 @@ use std::task::{Context, Poll}; use datafusion::arrow::array::{Array, StringArray}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::DFSchemaRef; +use datafusion::common::{DFSchema, DFSchemaRef}; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::TaskContext; -use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ @@ -32,6 +32,11 @@ use datafusion::physical_plan::{ }; use datatypes::arrow::compute; use futures::{ready, Stream, StreamExt}; +use greptime_proto::substrait_extension as pb; +use prost::Message; +use snafu::ResultExt; + +use crate::error::{DeserializeSnafu, Result}; #[derive(Debug, PartialEq, Eq, Hash)] pub struct SeriesDivide { @@ -41,7 +46,7 @@ pub struct SeriesDivide { impl UserDefinedLogicalNodeCore for SeriesDivide { fn name(&self) -> &str { - "SeriesDivide" + Self::name() } fn inputs(&self) -> Vec<&LogicalPlan> { @@ -75,6 +80,10 @@ impl SeriesDivide { Self { tag_columns, input } } + pub const fn name() -> &'static str { + "SeriesDivide" + } + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { Arc::new(SeriesDivideExec { tag_columns: self.tag_columns.clone(), @@ -82,6 +91,25 @@ impl SeriesDivide { metric: ExecutionPlanMetricsSet::new(), }) } + + pub fn serialize(&self) -> Vec { + pb::SeriesDivide { + tag_columns: self.tag_columns.clone(), + } + .encode_to_vec() + } + + pub fn deserialize(bytes: &[u8]) -> Result { + let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?; + let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + Ok(Self { + tag_columns: pb_series_divide.tag_columns, + input: placeholder_plan, + }) + } } #[derive(Debug)] diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index e953fbe7fbcd..400784201f92 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -32,6 +32,7 @@ datafusion-sql.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" futures-util.workspace = true +greptime-proto.workspace = true humantime = "2.1" metrics.workspace = true object-store = { path = "../object-store" } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 51b4742ae599..82344a856c44 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -15,6 +15,9 @@ use std::sync::Arc; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; +use promql::extension_plan::{ + EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, +}; #[allow(dead_code)] pub enum Commutativity { @@ -69,8 +72,18 @@ impl Categorizer { } } - pub fn check_extension_plan(_plan: &dyn UserDefinedLogicalNode) -> Commutativity { - todo!("enumerate all the extension plans here") + pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity { + match plan.name() { + name if name == EmptyMetric::name() + || name == InstantManipulate::name() + || name == SeriesNormalize::name() + || name == RangeManipulate::name() + || name == SeriesDivide::name() => + { + Commutativity::Commutative + } + _ => Commutativity::Unsupported, + } } } diff --git a/src/query/src/extension_serializer.rs b/src/query/src/extension_serializer.rs new file mode 100644 index 000000000000..c94668a6c7d0 --- /dev/null +++ b/src/query/src/extension_serializer.rs @@ -0,0 +1,103 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datafusion::error::Result; +use datafusion::execution::registry::SerializerRegistry; +use datafusion_common::DataFusionError; +use datafusion_expr::UserDefinedLogicalNode; +use promql::extension_plan::{ + EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, +}; + +pub struct ExtensionSerializer; + +impl SerializerRegistry for ExtensionSerializer { + /// Serialize this node to a byte array. This serialization should not include + /// input plans. + fn serialize_logical_plan(&self, node: &dyn UserDefinedLogicalNode) -> Result> { + match node.name() { + name if name == InstantManipulate::name() => { + let instant_manipulate = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to InstantManipulate"); + Ok(instant_manipulate.serialize()) + } + name if name == SeriesNormalize::name() => { + let series_normalize = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to SeriesNormalize"); + Ok(series_normalize.serialize()) + } + name if name == RangeManipulate::name() => { + let range_manipulate = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to RangeManipulate"); + Ok(range_manipulate.serialize()) + } + name if name == SeriesDivide::name() => { + let series_divide = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to SeriesDivide"); + Ok(series_divide.serialize()) + } + name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( + "EmptyMetric should not be serialized".to_string(), + )), + other => Err(DataFusionError::NotImplemented(format!( + "Serizlize logical plan for {}", + other + ))), + } + } + + /// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from + /// bytes. + fn deserialize_logical_plan( + &self, + name: &str, + bytes: &[u8], + ) -> Result> { + match name { + name if name == InstantManipulate::name() => { + let instant_manipulate = InstantManipulate::deserialize(bytes)?; + Ok(Arc::new(instant_manipulate)) + } + name if name == SeriesNormalize::name() => { + let series_normalize = SeriesNormalize::deserialize(bytes)?; + Ok(Arc::new(series_normalize)) + } + name if name == RangeManipulate::name() => { + let range_manipulate = RangeManipulate::deserialize(bytes)?; + Ok(Arc::new(range_manipulate)) + } + name if name == SeriesDivide::name() => { + let series_divide = SeriesDivide::deserialize(bytes)?; + Ok(Arc::new(series_divide)) + } + name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( + "EmptyMetric should not be deserialized".to_string(), + )), + other => Err(DataFusionError::NotImplemented(format!( + "Deserialize logical plan for {}", + other + ))), + } + } +} diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 7689a297261c..8405c00fe634 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -18,6 +18,7 @@ pub mod datafusion; pub mod dist_plan; pub mod error; pub mod executor; +pub mod extension_serializer; pub mod logical_optimizer; mod metrics; mod optimizer; diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 051f40c014de..edf9ed1008cd 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -36,6 +36,7 @@ use partition::manager::PartitionRuleManager; use promql::extension_plan::PromExtensionPlanner; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; +use crate::extension_serializer::ExtensionSerializer; use crate::optimizer::order_hint::OrderHintRule; use crate::optimizer::type_conversion::TypeConversionRule; use crate::query_engine::options::QueryOptions; @@ -83,6 +84,7 @@ impl QueryEngineState { runtime_env, Arc::new(MemoryCatalogList::default()), // pass a dummy catalog list ) + .with_serializer_registry(Arc::new(ExtensionSerializer)) .with_analyzer_rules(analyzer.rules) .with_query_planner(Arc::new(DfQueryPlanner::new( partition_manager, diff --git a/tests/cases/standalone/tql/basic.result b/tests/cases/standalone/common/tql/basic.result similarity index 100% rename from tests/cases/standalone/tql/basic.result rename to tests/cases/standalone/common/tql/basic.result diff --git a/tests/cases/standalone/tql/basic.sql b/tests/cases/standalone/common/tql/basic.sql similarity index 100% rename from tests/cases/standalone/tql/basic.sql rename to tests/cases/standalone/common/tql/basic.sql diff --git a/tests/cases/standalone/tql/literal_only.result b/tests/cases/standalone/common/tql/literal_only.result similarity index 100% rename from tests/cases/standalone/tql/literal_only.result rename to tests/cases/standalone/common/tql/literal_only.result diff --git a/tests/cases/standalone/tql/literal_only.sql b/tests/cases/standalone/common/tql/literal_only.sql similarity index 100% rename from tests/cases/standalone/tql/literal_only.sql rename to tests/cases/standalone/common/tql/literal_only.sql