From 95dac68d4f121d9d81edca161ac7be29925d24f1 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 2 Aug 2024 10:49:31 +0800 Subject: [PATCH 1/2] fix alias Signed-off-by: Song Gao --- src/connector/mod.rs | 2 +- src/core/datum.rs | 8 ++-- src/core/tuple.rs | 16 +++++--- src/main.rs | 14 ++++--- src/sql/planner/binder.rs | 73 ++++++++++++++++++++++++++++--------- src/sql/planner/mod.rs | 11 +++--- src/sql/planner/scope.rs | 31 ++++++---------- src/sql/runtime/builder.rs | 24 ++++++------ src/sql/runtime/executor.rs | 20 ++++++---- 9 files changed, 123 insertions(+), 76 deletions(-) diff --git a/src/connector/mod.rs b/src/connector/mod.rs index 9525f81..1019b1b 100644 --- a/src/connector/mod.rs +++ b/src/connector/mod.rs @@ -11,4 +11,4 @@ impl MqttClient { let (client, event_loop) = AsyncClient::new(mqtt_options, 10); MqttClient { client, event_loop } } -} \ No newline at end of file +} diff --git a/src/core/datum.rs b/src/core/datum.rs index 7b87da3..bbac9ad 100644 --- a/src/core/datum.rs +++ b/src/core/datum.rs @@ -19,8 +19,8 @@ pub enum Datum { impl Serialize for Datum { fn serialize(&self, serializer: S) -> Result - where - S: Serializer, + where + S: Serializer, { match self { Datum::Int(i) => serializer.serialize_i64(*i), @@ -34,8 +34,8 @@ impl Serialize for Datum { impl<'de> Deserialize<'de> for Datum { fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, + where + D: Deserializer<'de>, { let value = serde_json::Value::deserialize(deserializer)?; match value { diff --git a/src/core/tuple.rs b/src/core/tuple.rs index ee5720e..7480ca9 100644 --- a/src/core/tuple.rs +++ b/src/core/tuple.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; +use crate::sql::planner::binder::ProjItem; use serde_json::Value; use super::Datum; @@ -35,13 +35,17 @@ impl Tuple { self.values.get(index).cloned() } - pub fn project(&self, indices: &[(usize, String)]) -> Tuple { + pub fn project(&self, indices: &[ProjItem]) -> Tuple { let mut new_keys = Vec::new(); let mut new_values = Vec::new(); - for (_, key) in indices { - if let Some(index) = self.keys.iter().position(|k| k == key) { - new_keys.push(key.clone()); + for item in indices { + if let Some(index) = self.keys.iter().position(|k| *k == item.name) { + if !item.alias_name.is_empty() { + new_keys.push(item.alias_name.clone()) + } else { + new_keys.push(item.name.clone()) + } new_values.push(self.values[index].clone()); } } diff --git a/src/main.rs b/src/main.rs index b5fceb5..b09b58f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,8 @@ use std::{collections::HashMap, mem, sync::Arc}; use axum::{ extract::{self}, - Json, - Router, routing::{delete, get, post}, + routing::{delete, get, post}, + Json, Router, }; use log::{info, LevelFilter}; use rumqttc::QoS; @@ -15,18 +15,18 @@ use tokio::sync::{mpsc, Mutex}; use catalog::Catalog; use core::Tuple; -use sql::{runtime::executor::View, Session, session::context::QueryContext}; +use sql::{runtime::executor::View, session::context::QueryContext, Session}; use storage::StorageManager; use util::SimpleLogger; use crate::connector::MqttClient; mod catalog; +mod connector; mod core; mod sql; mod storage; mod util; -mod connector; static LOGGER: SimpleLogger = SimpleLogger; @@ -80,7 +80,11 @@ async fn execute_sql( tokio::spawn(async move { while let Ok(Ok(Some(v))) = receiver.recv().await { let data = v.parse_into_json().unwrap(); - sender.client.publish("/yisa/data2", QoS::AtLeastOnce, false, data).await.unwrap(); + sender + .client + .publish("/yisa/data2", QoS::AtLeastOnce, false, data) + .await + .unwrap(); } info!("Subscribers of /yisa/data2 is closed"); }); diff --git a/src/sql/planner/binder.rs b/src/sql/planner/binder.rs index 67a67f2..236b103 100644 --- a/src/sql/planner/binder.rs +++ b/src/sql/planner/binder.rs @@ -8,7 +8,7 @@ use crate::{ core::{ErrorKind, SQLError, Tuple, Type}, sql::{ planner::{scalar::bind_scalar, scope::Scope}, - runtime::{DDLJob}, + runtime::DDLJob, session::context::QueryContext, }, }; @@ -16,9 +16,9 @@ use crate::{ use super::{ aggregate::AggregateFunctionVisitor, bind_context::BindContext, - Column, - Plan, - scalar::bind_aggregate_function, ScalarExpr, scope::{QualifiedNamePrefix, Variable}, + scalar::bind_aggregate_function, + scope::{QualifiedNamePrefix, Variable}, + Column, Plan, ScalarExpr, }; struct FlattenedSelectItem { @@ -30,6 +30,25 @@ pub struct Binder<'a> { ctx: &'a mut QueryContext, } +#[derive(Clone, Debug)] +pub struct ProjItem { + pub index: usize, + pub name: String, + pub alias_name: String, + pub is_column: bool, +} + +impl ProjItem { + pub fn new(index: usize, name: String, alias_name: String, is_column: bool) -> Self { + ProjItem { + index, + name, + alias_name, + is_column, + } + } +} + impl<'a> Binder<'a> { pub fn new(ctx: &'a mut QueryContext) -> Self { Self { ctx } @@ -265,7 +284,12 @@ impl<'a> Binder<'a> { for expr in &select_stmt.group_by { let scalar = bind_scalar(ctx, &from_scope, expr)?; - if let ScalarExpr::Column(Column { index, name }) = &scalar { + if let ScalarExpr::Column(Column { + column_name, + table_name, + index, + }) = &scalar + { // If the group key is a column, we don't need to evaluate it group_scope .variables @@ -336,12 +360,27 @@ impl<'a> Binder<'a> { let mut scalar_maps = vec![]; for select_item in flattened_select_list.iter() { let scalar = bind_scalar(ctx, &group_scope, &select_item.expr)?; - if let ScalarExpr::Column(Column { index, name }) = scalar { + if let ScalarExpr::Column(Column { + column_name, + table_name, + index, + }) = scalar + { // If the select item is a column, we don't need to evaluate it - output_projections.push((index, name.clone())); + output_projections.push(ProjItem::new( + index, + column_name.clone(), + select_item.alias.clone(), + true, + )); } else { scalar_maps.push(scalar); - output_projections.push((group_scope.variables.len(), select_item.alias.clone())); + output_projections.push(ProjItem::new( + group_scope.variables.len(), + select_item.alias.clone(), + select_item.alias.clone(), + false, + )); } } if !scalar_maps.is_empty() { @@ -354,14 +393,14 @@ impl<'a> Binder<'a> { // Project the result let plan = Plan::Project { input: Box::new(plan), - projections: output_projections.iter().map(|(index, name)| (*index, name.clone())).collect(), + projections: output_projections.iter().map(|item| item.clone()).collect(), }; let output_scope = Scope { variables: output_projections .iter() - .map(|(_, name)| Variable { - name: name.clone(), + .map(|item| Variable { + name: item.name.clone(), prefix: None, expr: None, }) @@ -402,9 +441,9 @@ impl<'a> Binder<'a> { Some(Ident::new(prefix.table_name.clone())), Some(Ident::new(v.name.clone())), ] - .into_iter() - .flatten() - .collect(), + .into_iter() + .flatten() + .collect(), ), alias: v.name.clone(), } @@ -586,9 +625,9 @@ impl<'a> Binder<'a> { for variable in scope.variables.iter_mut() { match &mut variable.prefix { Some(QualifiedNamePrefix { - schema_name, - table_name, - }) => { + schema_name, + table_name, + }) => { *schema_name = None; *table_name = alias.name.to_string(); } diff --git a/src/sql/planner/mod.rs b/src/sql/planner/mod.rs index dee038a..4a7c83e 100644 --- a/src/sql/planner/mod.rs +++ b/src/sql/planner/mod.rs @@ -1,8 +1,8 @@ use std::fmt::Display; -use crate::core::Datum; +use crate::{core::Datum, sql::planner::binder::ProjItem}; -use super::runtime::{DDLJob}; +use super::runtime::DDLJob; pub mod aggregate; pub mod bind_context; @@ -27,7 +27,7 @@ pub enum Plan { input: Box, }, Project { - projections: Vec<(usize, String)>, + projections: Vec, input: Box, }, Filter { @@ -54,7 +54,8 @@ pub enum Plan { #[derive(Debug, Clone, PartialEq)] pub struct Column { - pub name: String, + pub column_name: String, + pub table_name: String, pub index: usize, } @@ -122,7 +123,7 @@ fn indent_format_plan(f: &mut std::fmt::Formatter, plan: &Plan, indent: usize) - indent_str, projections .iter() - .map(|v| format!("#{}", v.1.clone())) + .map(|v| format!("#{}", v.name.clone())) .collect::>() .join(", ") )?; diff --git a/src/sql/planner/scope.rs b/src/sql/planner/scope.rs index c38d59e..67f4a3b 100644 --- a/src/sql/planner/scope.rs +++ b/src/sql/planner/scope.rs @@ -35,24 +35,10 @@ impl Scope { variable.1.name == column_name.to_string() && variable - .1 - .prefix - .as_ref() - .map_or(false, |prefix| prefix.table_name == table_name.to_string()) - } - _ if ident.len() == 3 => { - let schema_name = &ident[0]; - let table_name = &ident[1]; - let column_name = &ident[2]; - - variable.1.name == column_name.to_string() - && variable.1.prefix.as_ref().map_or(false, |prefix| { - prefix.table_name == table_name.to_string() - && prefix - .schema_name + .1 + .prefix .as_ref() - .map_or(false, |schema| schema == &schema_name.to_string()) - }) + .map_or(false, |prefix| prefix.table_name == table_name.to_string()) } _ => false, }) @@ -63,7 +49,8 @@ impl Scope { Ok(None) } else if candidates.len() == 1 { Ok(Some(Column { - name: candidates[0].1.name.clone(), + column_name: candidates[0].1.name.clone(), + table_name: candidates[0].1.prefix.clone().unwrap().table_name, index: candidates[0].0, })) } else { @@ -87,7 +74,13 @@ impl Scope { } false }) - .map(|(index, v)| ScalarExpr::Column(Column { name: v.name.clone(), index })) + .map(|(index, v)| { + ScalarExpr::Column(Column { + column_name: v.name.clone(), + table_name: v.prefix.clone().unwrap().table_name, + index, + }) + }) } } diff --git a/src/sql/runtime/builder.rs b/src/sql/runtime/builder.rs index 6e36870..de73a48 100644 --- a/src/sql/runtime/builder.rs +++ b/src/sql/runtime/builder.rs @@ -4,17 +4,16 @@ use crate::{ sql::{ expression::{ aggregate::AggregateFunctionRegistry, - type_check::{ColumnTypeResolver, type_check, type_check_aggregate_function}, + type_check::{type_check, type_check_aggregate_function, ColumnTypeResolver}, }, - planner::{Column, Plan}, + planner::{binder::ProjItem, Column, Plan}, session::context::QueryContext, }, }; use super::executor::{ - DDLExecutor, ExecuteTreeNode, Executor, FilterExecutor, HashAggregateExecutor, - MapExecutor, NestedLoopJoinExecutor, ProjectExecutor, ScanExecutor, StateModifier, - ValuesExecutor, + DDLExecutor, ExecuteTreeNode, Executor, FilterExecutor, HashAggregateExecutor, MapExecutor, + NestedLoopJoinExecutor, ProjectExecutor, ScanExecutor, StateModifier, ValuesExecutor, }; /// Schema of the tuple in current context @@ -24,11 +23,11 @@ pub struct Schema { } impl Schema { - pub fn project(&self, projections: &[(usize, String)]) -> Self { + pub fn project(&self, projections: &[ProjItem]) -> Self { Self { column_types: projections .iter() - .map(|index| self.column_types[index.0].clone()) + .map(|item| self.column_types[item.index].clone()) .collect(), } } @@ -86,7 +85,7 @@ impl<'a> ExecutorBuilder<'a> { Box::new(input_executor.unwrap_execute_tree_node()), projections.clone(), )) - .into(), + .into(), schema.project(projections), )) } @@ -137,7 +136,7 @@ impl<'a> ExecutorBuilder<'a> { Box::new(input_executor), predicate_fn, )) - .into(), + .into(), schema, )) } @@ -183,7 +182,7 @@ impl<'a> ExecutorBuilder<'a> { Box::new(right_executor), Box::new(left_executor), )) - .into(), + .into(), schema, )) } @@ -237,13 +236,14 @@ impl<'a> ExecutorBuilder<'a> { group_by, aggregates, )) - .into(), + .into(), schema, )) } Plan::Explain(display_str) => { - let values_exec = ExecuteTreeNode::from(ValuesExecutor::new(vec![Tuple::new_default()])).into(); + let values_exec = + ExecuteTreeNode::from(ValuesExecutor::new(vec![Tuple::new_default()])).into(); Ok((values_exec, Schema::default())) } Plan::Use(schema_name) => Ok(( diff --git a/src/sql/runtime/executor.rs b/src/sql/runtime/executor.rs index 2800f65..0311326 100644 --- a/src/sql/runtime/executor.rs +++ b/src/sql/runtime/executor.rs @@ -10,19 +10,20 @@ use serde_json::Value; use tokio::sync::broadcast; use crate::{ - core::{Datum, ErrorKind, SQLError, tuple::Tuple}, + connector::MqttClient, + core::{tuple::Tuple, Datum, ErrorKind, SQLError}, sql::{ expression::{ aggregate::{AggregateFunction, AggregateState}, Expression, }, + planner::binder::ProjItem, session::context::QueryContext, }, storage::relation::ScanState, }; -use crate::connector::MqttClient; -use super::{DDLJob}; +use super::DDLJob; /// These commands are used to build up an execting tree which executes on a stream /// They will be construct the tree and execting forever until we close them. @@ -145,11 +146,11 @@ impl ExecuteTreeNode { #[derive(Debug)] pub struct ProjectExecutor { pub child: Box, - pub projections: Vec<(usize, String)>, + pub projections: Vec, } impl ProjectExecutor { - pub fn new(child: Box, projections: Vec<(usize, String)>) -> Self { + pub fn new(child: Box, projections: Vec) -> Self { Self { child, projections } } @@ -207,7 +208,11 @@ impl ScanExecutor { let topic = String::from("/yisa/data"); tokio::spawn(async move { info!("ScanExecutor listening"); - mqtt_client.client.subscribe(topic, QoS::AtLeastOnce).await.unwrap(); + mqtt_client + .client + .subscribe(topic, QoS::AtLeastOnce) + .await + .unwrap(); loop { let event = mqtt_client.event_loop.poll().await.unwrap(); while let Ok(notification) = mqtt_client.event_loop.poll().await { @@ -215,7 +220,8 @@ impl ScanExecutor { Event::Incoming(Packet::Publish(publish)) => { // let topic = publish.topic.clone(); let message = String::from_utf8_lossy(&publish.payload); - let parsed: HashMap = serde_json::from_str(message.as_ref()).unwrap(); + let parsed: HashMap = + serde_json::from_str(message.as_ref()).unwrap(); let tuple = Tuple::from_hashmap(parsed); // println!("recv {tuple}"); result_tx.send(Ok(Some(tuple))).unwrap(); From ca2d1733e25a87054e38a1a5df6de9427925eefd Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 2 Aug 2024 10:50:16 +0800 Subject: [PATCH 2/2] revise doc Signed-off-by: Song Gao --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cb6fd62..bf70427 100644 --- a/README.md +++ b/README.md @@ -16,12 +16,14 @@ curl --request POST \ curl --request POST \ --url http://127.0.0.1:3000/view \ --data '{ - "sql": "select a from t;" + "sql": "select a as e from t;" }' ``` send/recv data by mqtt broker 127.0.0.1:1883 + send data topic: /yisa/data + recv data topic: /yisa/data2 ## Delete the view