diff --git a/Cargo.lock b/Cargo.lock index 8469f5696a85..64e03cd10401 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,7 +93,7 @@ name = "api" version = "0.1.0" dependencies = [ "datatypes", - "prost", + "prost 0.11.0", "snafu", "tonic", "tonic-build", @@ -1040,8 +1040,8 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86" dependencies = [ - "prost", - "prost-types", + "prost 0.11.0", + "prost-types 0.11.1", "tonic", "tracing-core", ] @@ -1058,7 +1058,7 @@ dependencies = [ "futures", "hdrhistogram", "humantime", - "prost-types", + "prost-types 0.11.1", "serde", "serde_json", "thread_local", @@ -1628,7 +1628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1259da3b15ec7e54bd7203adb2c4335adb9ca1d47b56220d650e52c247e824a" dependencies = [ "http", - "prost", + "prost 0.11.0", "tokio", "tokio-stream", "tonic", @@ -2504,10 +2504,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "logical-plans" -version = "0.1.0" - [[package]] name = "lru" version = "0.7.8" @@ -3610,6 +3606,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" +dependencies = [ + "bytes", + "prost-derive 0.9.0", +] + [[package]] name = "prost" version = "0.11.0" @@ -3617,7 +3623,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.0", +] + +[[package]] +name = "prost-build" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" +dependencies = [ + "bytes", + "heck 0.3.3", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prost 0.9.0", + "prost-types 0.9.0", + "regex", + "tempfile", + "which", ] [[package]] @@ -3633,13 +3659,26 @@ dependencies = [ "log", "multimap", "petgraph", - "prost", - "prost-types", + "prost 0.11.0", + "prost-types 0.11.1", "regex", "tempfile", "which", ] +[[package]] +name = "prost-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "prost-derive" version = "0.11.0" @@ -3653,6 +3692,16 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-types" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" +dependencies = [ + "bytes", + "prost 0.9.0", +] + [[package]] name = "prost-types" version = "0.11.1" @@ -3660,7 +3709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" dependencies = [ "bytes", - "prost", + "prost 0.11.0", ] [[package]] @@ -4844,7 +4893,7 @@ dependencies = [ "object-store", "paste", "planus", - "prost", + "prost 0.11.0", "rand 0.8.5", "regex", "serde", @@ -4986,6 +5035,35 @@ dependencies = [ "winapi", ] +[[package]] +name = "substrait" +version = "0.1.0" +dependencies = [ + "bytes", + "catalog", + "common-error", + "datafusion", + "datatypes", + "futures", + "prost 0.9.0", + "snafu", + "substrait 0.2.0", + "table", + "tokio", +] + +[[package]] +name = "substrait" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46079e9004f5e069eae2976d4e23ea29c4e215b1096d3d53b76b19879f346100" +dependencies = [ + "glob", + "prost 0.9.0", + "prost-build 0.9.0", + "prost-types 0.9.0", +] + [[package]] name = "subtle" version = "2.4.1" @@ -5412,8 +5490,8 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.11.0", + "prost-derive 0.11.0", "tokio", "tokio-stream", "tokio-util", @@ -5432,7 +5510,7 @@ checksum = "2fbcd2800e34e743b9ae795867d5f77b535d3a3be69fd731e39145719752df8c" dependencies = [ "prettyplease", "proc-macro2", - "prost-build", + "prost-build 0.11.1", "quote", "syn", ] diff --git a/Cargo.toml b/Cargo.toml index 8b4fbec25fd6..379350be29a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,13 +12,13 @@ members = [ "src/common/query", "src/common/recordbatch", "src/common/runtime", + "src/common/substrait", "src/common/telemetry", "src/common/time", "src/datanode", "src/datatypes", "src/frontend", "src/log-store", - "src/logical-plans", "src/meta-client", "src/meta-srv", "src/object-store", diff --git a/src/common/grpc/src/physical/plan.rs b/src/common/grpc/src/physical/plan.rs index dd12183cd30d..6914c833dc0a 100644 --- a/src/common/grpc/src/physical/plan.rs +++ b/src/common/grpc/src/physical/plan.rs @@ -184,7 +184,7 @@ impl ExecutionPlan for MockExecution { _runtime: Arc, ) -> datafusion::error::Result { let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5])); - let name_array = Arc::new(Utf8Array::::from_slice([ + let name_array = Arc::new(Utf8Array::::from_slice([ "zhangsan", "lisi", "wangwu", "Tony", "Mike", ])); let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25])); diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml new file mode 100644 index 000000000000..960a21e5415d --- /dev/null +++ b/src/common/substrait/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "substrait" +version = "0.1.0" +edition = "2021" + +[dependencies] +bytes = "1.1" +catalog = { path = "../../catalog" } +common-error = { path = "../error" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } +futures = "0.3" +prost = "0.9" +snafu = { version = "0.7", features = ["backtraces"] } +table = { path = "../../table" } + +[dependencies.substrait_proto] +package = "substrait" +version = "0.2" + +[dev-dependencies] +datatypes = { path = "../../datatypes" } +table = { path = "../../table" } +tokio = { version = "1.0", features = ["full"] } diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs new file mode 100644 index 000000000000..d9881603d444 --- /dev/null +++ b/src/common/substrait/src/df_logical.rs @@ -0,0 +1,360 @@ +use std::sync::Arc; + +use bytes::{Buf, Bytes, BytesMut}; +use catalog::CatalogManagerRef; +use common_error::prelude::BoxedError; +use datafusion::datasource::TableProvider; +use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema}; +use prost::Message; +use snafu::ensure; +use snafu::{OptionExt, ResultExt}; +use substrait_proto::protobuf::plan_rel::RelType as PlanRelType; +use substrait_proto::protobuf::read_rel::{NamedTable, ReadType}; +use substrait_proto::protobuf::rel::RelType; +use substrait_proto::protobuf::PlanRel; +use substrait_proto::protobuf::ReadRel; +use substrait_proto::protobuf::Rel; +use table::table::adapter::DfTableProviderAdapter; + +use crate::error::Error; +use crate::error::{ + DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, InternalSnafu, + InvalidParametersSnafu, MissingFieldSnafu, TableNotFoundSnafu, UnknownPlanSnafu, + UnsupportedExprSnafu, UnsupportedPlanSnafu, +}; +use crate::SubstraitPlan; + +pub struct DFLogicalSubstraitConvertor { + catalog_manager: CatalogManagerRef, +} + +impl SubstraitPlan for DFLogicalSubstraitConvertor { + type Error = Error; + + type Plan = LogicalPlan; + + fn decode(&self, message: B) -> Result { + let plan_rel = PlanRel::decode(message).context(DecodeRelSnafu)?; + let rel = match plan_rel.rel_type.context(EmptyPlanSnafu)? { + PlanRelType::Rel(rel) => rel, + PlanRelType::Root(_) => UnsupportedPlanSnafu { + name: "Root Relation", + } + .fail()?, + }; + self.convert_rel(rel) + } + + fn encode(&self, plan: Self::Plan) -> Result { + let rel = self.convert_plan(plan)?; + + let mut buf = BytesMut::new(); + rel.encode(&mut buf).context(EncodeRelSnafu)?; + + Ok(buf.freeze()) + } +} + +impl DFLogicalSubstraitConvertor { + pub fn new(catalog_manager: CatalogManagerRef) -> Self { + Self { catalog_manager } + } +} + +impl DFLogicalSubstraitConvertor { + pub fn convert_rel(&self, rel: Rel) -> Result { + let rel_type = rel.rel_type.context(EmptyPlanSnafu)?; + let logical_plan = match rel_type { + RelType::Read(read_rel) => self.convert_read_rel(read_rel), + RelType::Filter(_filter_rel) => UnsupportedPlanSnafu { + name: "Filter Relation", + } + .fail()?, + RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu { + name: "Fetch Relation", + } + .fail()?, + RelType::Aggregate(_aggr_rel) => UnsupportedPlanSnafu { + name: "Fetch Relation", + } + .fail()?, + RelType::Sort(_sort_rel) => UnsupportedPlanSnafu { + name: "Sort Relation", + } + .fail()?, + RelType::Join(_join_rel) => UnsupportedPlanSnafu { + name: "Join Relation", + } + .fail()?, + RelType::Project(_project_rel) => UnsupportedPlanSnafu { + name: "Project Relation", + } + .fail()?, + RelType::Set(_set_rel) => UnsupportedPlanSnafu { + name: "Set Relation", + } + .fail()?, + RelType::ExtensionSingle(_ext_single_rel) => UnsupportedPlanSnafu { + name: "Extension Single Relation", + } + .fail()?, + RelType::ExtensionMulti(_ext_multi_rel) => UnsupportedPlanSnafu { + name: "Extension Multi Relation", + } + .fail()?, + RelType::ExtensionLeaf(_ext_leaf_rel) => UnsupportedPlanSnafu { + name: "Extension Leaf Relation", + } + .fail()?, + RelType::Cross(_cross_rel) => UnsupportedPlanSnafu { + name: "Cross Relation", + } + .fail()?, + }?; + + Ok(logical_plan) + } + + fn convert_read_rel(&self, read_rel: Box) -> Result { + // Extract the catalog, schema and table name from NamedTable. Assume the first three are those names. + + let read_type = read_rel.read_type.context(MissingFieldSnafu { + field: "read_type", + plan: "Read", + })?; + let (table_name, schema_name, catalog_name) = match read_type { + ReadType::NamedTable(mut named_table) => { + ensure!( + named_table.names.len() == 3, + InvalidParametersSnafu { + reason: + "NamedTable should contains three names for catalog, schema and table", + } + ); + ( + named_table.names.pop().unwrap(), + named_table.names.pop().unwrap(), + named_table.names.pop().unwrap(), + ) + } + ReadType::VirtualTable(_) | ReadType::LocalFiles(_) | ReadType::ExtensionTable(_) => { + UnsupportedExprSnafu { + name: "Non-NamedTable Read", + } + .fail()? + } + }; + + // Get table handle from catalog manager + let table_ref = self + .catalog_manager + .table(Some(&catalog_name), Some(&schema_name), &table_name) + .map_err(BoxedError::new) + .context(InternalSnafu)? + .context(TableNotFoundSnafu { + name: format!("{}.{}.{}", catalog_name, schema_name, table_name), + })?; + let adapter = Arc::new(DfTableProviderAdapter::new(table_ref)); + // Get schema direct from the table. + // TODO(ruihang): Maybe need to verify the schema with the one in Substrait? + let schema = adapter + .schema() + .to_dfschema_ref() + .context(DFInternalSnafu)?; + + // TODO(ruihang): Support projection, filters and limit + Ok(LogicalPlan::TableScan(TableScan { + table_name, + source: adapter, + projection: None, + projected_schema: schema, + filters: vec![], + limit: None, + })) + } +} + +impl DFLogicalSubstraitConvertor { + pub fn convert_plan(&self, plan: LogicalPlan) -> Result { + match plan { + LogicalPlan::Projection(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Filter(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Window(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Aggregate(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Sort(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Join(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::CrossJoin(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Repartition(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Union(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::TableScan(table_scan) => { + let read_rel = self.convert_table_scan_plan(table_scan)?; + Ok(Rel { + rel_type: Some(RelType::Read(Box::new(read_rel))), + }) + } + LogicalPlan::EmptyRelation(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::Limit(_) => UnsupportedPlanSnafu { + name: "DataFusion Logical Projection", + } + .fail()?, + LogicalPlan::CreateExternalTable(_) + | LogicalPlan::CreateMemoryTable(_) + | LogicalPlan::DropTable(_) + | LogicalPlan::Values(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Extension(_) => InvalidParametersSnafu { + reason: format!( + "Trying to convert DDL/DML plan to substrait proto, plan: {:?}", + plan + ), + } + .fail()?, + } + } + + pub fn convert_table_scan_plan(&self, table_scan: TableScan) -> Result { + let provider = table_scan + .source + .as_any() + .downcast_ref::() + .context(UnknownPlanSnafu)?; + let table_info = provider.table().table_info(); + + let catalog_name = table_info.catalog_name.clone(); + let schema_name = table_info.schema_name.clone(); + let table_name = table_info.name.clone(); + + let named_table = NamedTable { + names: vec![catalog_name, schema_name, table_name], + advanced_extension: None, + }; + let read_type = ReadType::NamedTable(named_table); + + let read_rel = ReadRel { + common: None, + base_schema: None, + filter: None, + projection: None, + advanced_extension: None, + read_type: Some(read_type), + }; + + Ok(read_rel) + } +} + +#[cfg(test)] +mod test { + use catalog::{ + memory::{MemoryCatalogProvider, MemorySchemaProvider}, + CatalogList, CatalogProvider, LocalCatalogManager, RegisterTableRequest, + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, + }; + use datatypes::schema::Schema; + use table::{requests::CreateTableRequest, test_util::EmptyTable, test_util::MockTableEngine}; + + use super::*; + + const DEFAULT_TABLE_NAME: &str = "SubstraitTable"; + + async fn build_mock_catalog_manager() -> CatalogManagerRef { + let mock_table_engine = Arc::new(MockTableEngine::new()); + let catalog_manager = Arc::new( + LocalCatalogManager::try_new(mock_table_engine) + .await + .unwrap(), + ); + let schema_provider = Arc::new(MemorySchemaProvider::new()); + let catalog_provider = Arc::new(MemoryCatalogProvider::new()); + catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider); + catalog_manager.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider); + + catalog_manager.init().await.unwrap(); + catalog_manager + } + + fn build_create_table_request(table_name: N) -> CreateTableRequest { + CreateTableRequest { + id: 1, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + desc: None, + schema: Arc::new(Schema::new(vec![])), + primary_key_indices: vec![], + create_if_not_exists: true, + table_options: Default::default(), + } + } + + async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) { + let convertor = DFLogicalSubstraitConvertor::new(catalog); + + let rel = convertor.convert_plan(plan.clone()).unwrap(); + let tripped_plan = convertor.convert_rel(rel).unwrap(); + + assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan)); + } + + #[tokio::test] + async fn test_bare_table_scan() { + let catalog_manager = build_mock_catalog_manager().await; + let table_ref = Arc::new(EmptyTable::new(build_create_table_request( + DEFAULT_TABLE_NAME, + ))); + catalog_manager + .register_table(RegisterTableRequest { + catalog: Some(DEFAULT_CATALOG_NAME.to_string()), + schema: Some(DEFAULT_SCHEMA_NAME.to_string()), + table_name: DEFAULT_TABLE_NAME.to_string(), + table_id: 1, + table: table_ref.clone(), + }) + .await + .unwrap(); + let adapter = Arc::new(DfTableProviderAdapter::new(table_ref)); + let schema = adapter.schema().to_dfschema_ref().unwrap(); + + let table_scan_plan = LogicalPlan::TableScan(TableScan { + table_name: DEFAULT_TABLE_NAME.to_string(), + source: adapter, + projection: None, + projected_schema: schema, + filters: vec![], + limit: None, + }); + + logical_plan_round_trip(table_scan_plan, catalog_manager).await; + } +} diff --git a/src/common/substrait/src/error.rs b/src/common/substrait/src/error.rs new file mode 100644 index 000000000000..6cebc1902032 --- /dev/null +++ b/src/common/substrait/src/error.rs @@ -0,0 +1,92 @@ +use std::any::Any; + +use common_error::prelude::{BoxedError, ErrorExt, StatusCode}; +use datafusion::error::DataFusionError; +use prost::{DecodeError, EncodeError}; +use snafu::{Backtrace, ErrorCompat, Snafu}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Unsupported physical expr: {}", name))] + UnsupportedPlan { name: String, backtrace: Backtrace }, + + #[snafu(display("Unsupported physical plan: {}", name))] + UnsupportedExpr { name: String, backtrace: Backtrace }, + + #[snafu(display("Failed to decode substrait relation, source: {}", source))] + DecodeRel { + source: DecodeError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to encode substrait relation, source: {}", source))] + EncodeRel { + source: EncodeError, + backtrace: Backtrace, + }, + + #[snafu(display("Input plan is empty"))] + EmptyPlan { backtrace: Backtrace }, + + #[snafu(display("Input expression is empty"))] + EmptyExpr { backtrace: Backtrace }, + + #[snafu(display("Missing required field in protobuf, field: {}, plan: {}", field, plan))] + MissingField { + field: String, + plan: String, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid parameters: {}", reason))] + InvalidParameters { + reason: String, + backtrace: Backtrace, + }, + + #[snafu(display("Internal error from DataFusion: {}", source))] + DFInternal { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Internal error: {}", source))] + Internal { + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Table quering not found: {}", name))] + TableNotFound { name: String, backtrace: Backtrace }, + + #[snafu(display("Cannot convert plan doesn't belong to GrepTimeDB"))] + UnknownPlan { backtrace: Backtrace }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::UnsupportedPlan { .. } | Error::UnsupportedExpr { .. } => { + StatusCode::Unsupported + } + Error::UnknownPlan { .. } + | Error::EncodeRel { .. } + | Error::DecodeRel { .. } + | Error::EmptyPlan { .. } + | Error::EmptyExpr { .. } + | Error::MissingField { .. } + | Error::InvalidParameters { .. } + | Error::TableNotFound { .. } => StatusCode::InvalidArguments, + Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs new file mode 100644 index 000000000000..d5576daaa94a --- /dev/null +++ b/src/common/substrait/src/lib.rs @@ -0,0 +1,16 @@ +mod df_logical; +mod error; + +use bytes::{Buf, Bytes}; + +pub use crate::df_logical::DFLogicalSubstraitConvertor; + +pub trait SubstraitPlan { + type Error: std::error::Error; + + type Plan; + + fn decode(&self, message: B) -> Result; + + fn encode(&self, plan: Self::Plan) -> Result; +} diff --git a/src/logical-plans/Cargo.toml b/src/logical-plans/Cargo.toml deleted file mode 100644 index 9f48d4ac882d..000000000000 --- a/src/logical-plans/Cargo.toml +++ /dev/null @@ -1,7 +0,0 @@ -[package] -name = "logical-plans" -version = "0.1.0" -edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/src/logical-plans/src/lib.rs b/src/logical-plans/src/lib.rs deleted file mode 100644 index 8b137891791f..000000000000 --- a/src/logical-plans/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -