Skip to content

Commit

Permalink
feat: serialize/deserialize support for PromQL plans (#1684)
Browse files Browse the repository at this point in the history
* implement serializer

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy and CR comments

Signed-off-by: Ruihang Xia <[email protected]>

* fix compile error

Signed-off-by: Ruihang Xia <[email protected]>

* register registry

Signed-off-by: Ruihang Xia <[email protected]>

* enable promql plan for dist planner

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Jun 2, 2023
1 parent 2615718 commit 8e69aef
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 18 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" }
parquet = "40.0"
paste = "1.0"
prost = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion src/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "44c5adf34938d0650c18a14db2a374bdee471ae7" }
greptime-proto.workspace = true
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn handle_create(
return Ok(RouteResponse {
header: Some(ResponseHeader::failed(
cluster_id,
Error::not_enough_available_datanodes(partitions.len(), peers.len()),
Error::not_enough_active_datanodes(peers.len() as _),
)),
..Default::default()
});
Expand Down
2 changes: 2 additions & 0 deletions src/promql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ common-function-macro = { path = "../common/function-macro" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
greptime-proto.workspace = true
promql-parser = "0.1.1"
prost.workspace = true
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
Expand Down
9 changes: 8 additions & 1 deletion src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to deserialize: {}", source))]
Deserialize {
source: prost::DecodeError,
location: Location,
},

#[snafu(display("Empty range is not expected, location: {}", location))]
EmptyRange { location: Location },

Expand Down Expand Up @@ -120,7 +126,8 @@ impl ErrorExt for Error {
| ExpectExpr { .. }
| ExpectRangeSelector { .. }
| ZeroRangeSelector { .. }
| ColumnNotFound { .. } => StatusCode::InvalidArguments,
| ColumnNotFound { .. }
| Deserialize { .. } => StatusCode::InvalidArguments,

UnknownTable { .. }
| DataFusionPlanning { .. }
Expand Down
6 changes: 5 additions & 1 deletion src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ impl EmptyMetric {
})
}

pub const fn name() -> &'static str {
"EmptyMetric"
}

pub fn to_execution_plan(
&self,
session_state: &SessionState,
Expand All @@ -110,7 +114,7 @@ impl EmptyMetric {

impl UserDefinedLogicalNodeCore for EmptyMetric {
fn name(&self) -> &str {
"EmptyMetric"
Self::name()
}

fn inputs(&self) -> Vec<&LogicalPlan> {
Expand Down
49 changes: 46 additions & 3 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::common::{DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
Expand All @@ -34,7 +34,11 @@ use datafusion::physical_plan::{
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;

use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;

/// Manipulate the input record batch to make it suitable for Instant Operator.
Expand All @@ -56,7 +60,7 @@ pub struct InstantManipulate {

impl UserDefinedLogicalNodeCore for InstantManipulate {
fn name(&self) -> &str {
"InstantManipulate"
Self::name()
}

fn inputs(&self) -> Vec<&LogicalPlan> {
Expand Down Expand Up @@ -115,6 +119,10 @@ impl InstantManipulate {
}
}

pub const fn name() -> &'static str {
"InstantManipulate"
}

pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(InstantManipulateExec {
start: self.start,
Expand All @@ -127,6 +135,41 @@ impl InstantManipulate {
metric: ExecutionPlanMetricsSet::new(),
})
}

pub fn serialize(&self) -> Vec<u8> {
pb::InstantManipulate {
start: self.start,
end: self.end,
interval: self.interval,
lookback_delta: self.lookback_delta,
time_index: self.time_index_column.clone(),
field_index: self.field_column.clone().unwrap_or_default(),
}
.encode_to_vec()
}

pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_instant_manipulate =
pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let field_column = if pb_instant_manipulate.field_index.is_empty() {
None
} else {
Some(pb_instant_manipulate.field_index)
};
Ok(Self {
start: pb_instant_manipulate.start,
end: pb_instant_manipulate.end,
lookback_delta: pb_instant_manipulate.lookback_delta,
interval: pb_instant_manipulate.interval,
time_index_column: pb_instant_manipulate.time_index,
field_column,
input: placeholder_plan,
})
}
}

#[derive(Debug)]
Expand Down
37 changes: 34 additions & 3 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::task::{Context, Poll};

use datafusion::arrow::array::{BooleanArray, Float64Array};
use datafusion::arrow::compute;
use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
Expand All @@ -33,7 +33,11 @@ use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::arrow::record_batch::RecordBatch;
use futures::{Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;

use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;

/// Normalize the input record batch. Notice that for simplicity, this method assumes
Expand All @@ -54,7 +58,7 @@ pub struct SeriesNormalize {

impl UserDefinedLogicalNodeCore for SeriesNormalize {
fn name(&self) -> &str {
"SeriesNormalize"
Self::name()
}

fn inputs(&self) -> Vec<&LogicalPlan> {
Expand Down Expand Up @@ -104,6 +108,10 @@ impl SeriesNormalize {
}
}

pub const fn name() -> &'static str {
"SeriesNormalize"
}

pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SeriesNormalizeExec {
offset: self.offset,
Expand All @@ -113,6 +121,29 @@ impl SeriesNormalize {
metric: ExecutionPlanMetricsSet::new(),
})
}

pub fn serialize(&self) -> Vec<u8> {
pb::SeriesNormalize {
offset: self.offset,
time_index: self.time_index_column_name.clone(),
filter_nan: self.need_filter_out_nan,
}
.encode_to_vec()
}

pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
Ok(Self::new(
pb_normalize.offset,
pb_normalize.time_index,
pb_normalize.filter_nan,
placeholder_plan,
))
}
}

#[derive(Debug)]
Expand Down
42 changes: 40 additions & 2 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::{DFField, DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
Expand All @@ -35,7 +35,11 @@ use datafusion::physical_plan::{
};
use datafusion::sql::TableReference;
use futures::{Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;

use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
use crate::range_array::RangeArray;

Expand Down Expand Up @@ -85,6 +89,10 @@ impl RangeManipulate {
})
}

pub const fn name() -> &'static str {
"RangeManipulate"
}

pub fn build_timestamp_range_name(time_index: &str) -> String {
format!("{time_index}_range")
}
Expand Down Expand Up @@ -145,11 +153,41 @@ impl RangeManipulate {
metric: ExecutionPlanMetricsSet::new(),
})
}

pub fn serialize(&self) -> Vec<u8> {
pb::RangeManipulate {
start: self.start,
end: self.end,
interval: self.interval,
range: self.range,
time_index: self.time_index.clone(),
tag_columns: self.field_columns.clone(),
}
.encode_to_vec()
}

pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
Self::new(
pb_range_manipulate.start,
pb_range_manipulate.end,
pb_range_manipulate.interval,
pb_range_manipulate.range,
pb_range_manipulate.time_index,
pb_range_manipulate.tag_columns,
placeholder_plan,
)
.context(DataFusionPlanningSnafu)
}
}

impl UserDefinedLogicalNodeCore for RangeManipulate {
fn name(&self) -> &str {
"RangeManipulate"
Self::name()
}

fn inputs(&self) -> Vec<&LogicalPlan> {
Expand Down
Loading

0 comments on commit 8e69aef

Please sign in to comment.