From 8e8765056daaa899f3f98eed522fee1054eea4ff Mon Sep 17 00:00:00 2001
From: longfangsong <longfangsong@icloud.com>
Date: Wed, 21 Aug 2024 11:50:55 +0200
Subject: [PATCH 1/2] Use prometheus_static_metric as global metric collector

---
 Cargo.lock                      | 13 +++++++++
 Cargo.toml                      |  1 +
 src/core/tuple.rs               |  5 ++--
 src/main.rs                     | 39 ++++++++-----------------
 src/metrics.rs                  |  7 +++++
 src/sql/expression/aggregate.rs |  2 +-
 src/sql/expression/function.rs  |  4 +--
 src/sql/expression/mod.rs       |  2 +-
 src/sql/planner/binder.rs       | 51 ++++++++++++---------------------
 src/sql/planner/scalar.rs       |  2 +-
 src/sql/runtime/builder.rs      |  5 ++--
 src/sql/runtime/executor.rs     | 38 +++++++-----------------
 src/sql/runtime/mod.rs          |  1 -
 src/sql/session/mod.rs          |  4 +--
 14 files changed, 72 insertions(+), 102 deletions(-)
 create mode 100644 src/metrics.rs

diff --git a/Cargo.lock b/Cargo.lock
index 30435be..2aa36a4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1280,6 +1280,18 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "prometheus-static-metric"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8f30cdb09c39930b8fa5e0f23cbb895ab3f766b187403a0ba0956fc1ef4f0e5"
+dependencies = [
+ "lazy_static",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "protobuf"
 version = "2.28.0"
@@ -1434,6 +1446,7 @@ dependencies = [
  "pgwire",
  "pretty_env_logger",
  "prometheus",
+ "prometheus-static-metric",
  "rumqttc",
  "serde",
  "serde_json",
diff --git a/Cargo.toml b/Cargo.toml
index a891267..260b5f0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ prometheus = "0.13"
 sysinfo = "0.26"
 toml = "0.5"
 once_cell = "1.8"
+prometheus-static-metric = "0.5.1"
 
 [dev-dependencies]
 sqllogictest = "0.13.0"
diff --git a/src/core/tuple.rs b/src/core/tuple.rs
index 4c49631..5d6825d 100644
--- a/src/core/tuple.rs
+++ b/src/core/tuple.rs
@@ -1,4 +1,3 @@
-use log::info;
 use std::{collections::HashMap, fmt::Display};
 
 use crate::sql::planner::binder::ProjItem;
@@ -18,8 +17,8 @@ impl Tuple {
     }
 
     pub fn new_default() -> Self {
-        let mut values = Vec::new();
-        let mut columns = Vec::new();
+        let values = Vec::new();
+        let columns = Vec::new();
         Tuple::new(columns, values)
     }
 
diff --git a/src/main.rs b/src/main.rs
index c5855e2..a02c44c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -5,19 +5,15 @@ use std::{collections::HashMap, mem, sync::Arc, time::Duration};
 
 use axum::{
     extract::{self},
-    response::IntoResponse,
     routing::{delete, get, post},
     Json, Router,
 };
 use log::{info, LevelFilter};
-use prometheus::{Encoder, IntGauge, Registry, TextEncoder};
+use prometheus::{Encoder, Registry, TextEncoder};
 use rumqttc::QoS;
 use serde::{Deserialize, Serialize};
 use sysinfo::{CpuExt, System, SystemExt};
-use tokio::{
-    sync::{mpsc, Mutex},
-    time,
-};
+use tokio::{sync::Mutex, time};
 
 use catalog::Catalog;
 use core::Tuple;
@@ -31,6 +27,7 @@ mod catalog;
 mod config;
 mod connector;
 mod core;
+mod metrics;
 mod sql;
 mod storage;
 mod util;
@@ -42,14 +39,11 @@ struct AppState {
     views: HashMap<usize, View>,
     next_id: usize,
     dummy_subscribers: HashMap<usize, Vec<Tuple>>,
-    registry: Registry,
 }
 
 impl AppState {
-    pub fn new(registry: Registry) -> Self {
-        let mut app_state = Self::default();
-        app_state.registry = registry;
-        return app_state;
+    pub fn new() -> Self {
+        Self::default()
     }
 }
 
@@ -92,11 +86,10 @@ async fn execute_sql(
                 }
             }
         });
-        let view_manager = state.clone();
         tokio::spawn(async move {
             while let Ok(Ok(result)) = receiver.recv().await {
-                if !result.is_none() {
-                    let data = result.unwrap().parse_into_json().unwrap();
+                if let Some(result) = result {
+                    let data = result.parse_into_json().unwrap();
                     sender
                         .client
                         .publish("/yisa/data2", QoS::AtLeastOnce, false, data)
@@ -145,11 +138,10 @@ async fn ping() -> &'static str {
     "pong"
 }
 
-async fn metrics_handler(extract::State(state): extract::State<Arc<Mutex<AppState>>>) -> String {
-    let mut state = state.lock().await;
+async fn metrics_handler() -> String {
     let mut buffer = Vec::new();
     let encoder = TextEncoder::new();
-    let metric_families = state.registry.gather();
+    let metric_families = prometheus::gather();
     encoder.encode(&metric_families, &mut buffer).unwrap();
     String::from_utf8(buffer).unwrap()
 }
@@ -164,13 +156,6 @@ pub async fn main() {
     // initialize Prometheus registry
     let registry = Registry::new();
 
-    // cpu and memory gauge
-    let cpu_gauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap();
-    let memory_gauge = IntGauge::new("memory_usage", "Memory usage in bytes").unwrap();
-
-    // register gauge
-    registry.register(Box::new(cpu_gauge.clone())).unwrap();
-    registry.register(Box::new(memory_gauge.clone())).unwrap();
     let mut sys = System::new();
     tokio::spawn(async move {
         let mut interval = time::interval(Duration::from_secs(1));
@@ -179,12 +164,12 @@ pub async fn main() {
             sys.refresh_all();
             let cpu_usage = sys.global_cpu_info().cpu_usage() as i64;
             let memory_usage = sys.used_memory() as i64;
-            cpu_gauge.set(cpu_usage);
-            memory_gauge.set(memory_usage);
+            metrics::CPU.set(cpu_usage);
+            metrics::MEMORY.set(memory_usage);
         }
     });
 
-    let app_state = AppState::new(registry);
+    let app_state = AppState::new();
 
     // Initialize database
     let catalog = Catalog::new();
diff --git a/src/metrics.rs b/src/metrics.rs
new file mode 100644
index 0000000..da574ce
--- /dev/null
+++ b/src/metrics.rs
@@ -0,0 +1,7 @@
+use prometheus::IntGauge;
+
+lazy_static! {
+    pub static ref CPU: IntGauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap();
+    pub static ref MEMORY: IntGauge =
+        IntGauge::new("memory_usage", "Memory usage in bytes").unwrap();
+}
diff --git a/src/sql/expression/aggregate.rs b/src/sql/expression/aggregate.rs
index 0218f62..b7b3f66 100644
--- a/src/sql/expression/aggregate.rs
+++ b/src/sql/expression/aggregate.rs
@@ -90,7 +90,7 @@ impl AggregateFunctionRegistry {
         });
         self.functions
             .entry(name.to_string())
-            .or_insert_with(Vec::new)
+            .or_default()
             .push(func);
     }
 
diff --git a/src/sql/expression/function.rs b/src/sql/expression/function.rs
index 32bb3e4..b895eaa 100644
--- a/src/sql/expression/function.rs
+++ b/src/sql/expression/function.rs
@@ -52,7 +52,7 @@ impl ScalarFunctionRegistry {
 
         self.functions
             .entry(name.to_string())
-            .or_insert_with(Vec::new)
+            .or_default()
             .push(Arc::new(scalar_func));
     }
 
@@ -82,7 +82,7 @@ impl ScalarFunctionRegistry {
 
         self.functions
             .entry(name.to_string())
-            .or_insert_with(Vec::new)
+            .or_default()
             .push(Arc::new(scalar_func));
     }
 
diff --git a/src/sql/expression/mod.rs b/src/sql/expression/mod.rs
index 01d101f..ecec920 100644
--- a/src/sql/expression/mod.rs
+++ b/src/sql/expression/mod.rs
@@ -27,7 +27,7 @@ impl Expression {
     pub fn eval(&self, tuple: &Tuple) -> Result<Datum, SQLError> {
         match self {
             Expression::Column(column_name, _) => {
-                Ok(tuple.get_by_name(&column_name).ok_or_else(|| {
+                Ok(tuple.get_by_name(column_name).ok_or_else(|| {
                     SQLError::new(
                         ErrorKind::RuntimeError,
                         format!("cannot find column at name: {column_name}"),
diff --git a/src/sql/planner/binder.rs b/src/sql/planner/binder.rs
index 09e9d51..2e20232 100644
--- a/src/sql/planner/binder.rs
+++ b/src/sql/planner/binder.rs
@@ -2,11 +2,10 @@ use sqlparser::ast::{
     Expr, Ident, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, SetExpr,
     Statement, TableAlias, TableFactor, TableWithJoins, Visit,
 };
-use std::any::Any;
 
 use crate::{
     catalog::defs::{ColumnDefinition, TableDefinition},
-    core::{ErrorKind, SQLError, Tuple, Type},
+    core::{ErrorKind, SQLError, Type},
     sql::{
         planner::{scalar::bind_scalar, scope::Scope},
         runtime::DDLJob,
@@ -23,7 +22,6 @@ use super::{
 };
 
 use crate::core::Datum;
-use log::info;
 
 struct FlattenedSelectItem {
     expr: Expr,
@@ -208,7 +206,7 @@ impl<'a> Binder<'a> {
         if alias_name.eq(&String::from("?column?")) {
             return column_name;
         }
-        return alias_name;
+        alias_name
     }
 
     pub fn bind_select_statement(
@@ -250,40 +248,27 @@ impl<'a> Binder<'a> {
         let mut group_by = select_stmt.group_by.clone();
 
         if group_by.len() == 1 {
-            let group_by_expr = group_by.get(0).unwrap();
+            let group_by_expr = group_by.first().unwrap();
             let se = bind_scalar(ctx, &from_scope, group_by_expr)?;
-            match se {
-                ScalarExpr::FunctionCall(name, args) => {
-                    if name.eq(&String::from("tumblingWindow")) {
-                        if args.len() == 2 {
-                            match args[0].clone() {
-                                ScalarExpr::Literal(datum) => match datum {
-                                    Datum::String(s) => {
-                                        if s.eq(&String::from("ss")) {
-                                            match args[1].clone() {
-                                                ScalarExpr::Literal(datum) => match datum {
-                                                    Datum::Int(v) => {
-                                                        plan = Plan::Window {
-                                                            window_type: WindowType::TumblingWindow,
-                                                            length: v,
-                                                            input: Box::new(plan),
-                                                        };
-                                                        group_by.remove(0);
-                                                    }
-                                                    _ => {}
-                                                },
-                                                _ => {}
-                                            }
-                                        }
+            if let ScalarExpr::FunctionCall(name, args) = se {
+                if name.eq(&String::from("tumblingWindow")) && args.len() == 2 {
+                    if let ScalarExpr::Literal(datum) = args[0].clone() {
+                        if let Datum::String(s) = datum {
+                            if s.eq(&String::from("ss")) {
+                                if let ScalarExpr::Literal(datum) = args[1].clone() {
+                                    if let Datum::Int(v) = datum {
+                                        plan = Plan::Window {
+                                            window_type: WindowType::TumblingWindow,
+                                            length: v,
+                                            input: Box::new(plan),
+                                        };
+                                        group_by.remove(0);
                                     }
-                                    _ => {}
-                                },
-                                _ => {}
+                                }
                             }
                         }
                     }
                 }
-                _ => {}
             }
         }
 
@@ -444,7 +429,7 @@ impl<'a> Binder<'a> {
         // Project the result
         let plan = Plan::Project {
             input: Box::new(plan),
-            projections: output_projections.iter().map(|item| item.clone()).collect(),
+            projections: output_projections.to_vec(),
         };
 
         let output_scope = Scope {
diff --git a/src/sql/planner/scalar.rs b/src/sql/planner/scalar.rs
index d34ffa0..8427a5b 100644
--- a/src/sql/planner/scalar.rs
+++ b/src/sql/planner/scalar.rs
@@ -90,7 +90,7 @@ pub fn bind_aggregate_function(
             ));
         }
 
-        if let Some(arg) = func.args.get(0) {
+        if let Some(arg) = func.args.first() {
             match arg {
                 ast::FunctionArg::Unnamed(arg) => match arg {
                     // Rewrite count(*) to count()
diff --git a/src/sql/runtime/builder.rs b/src/sql/runtime/builder.rs
index 4373dec..544280d 100644
--- a/src/sql/runtime/builder.rs
+++ b/src/sql/runtime/builder.rs
@@ -122,7 +122,7 @@ impl<'a> ExecutorBuilder<'a> {
                     ExecuteTreeNode::from(WindowExecutor::new(
                         Box::new(input_executor.unwrap_execute_tree_node()),
                         (*window_type).clone(),
-                        (*length).clone(),
+                        *length,
                     ))
                     .into(),
                     schema,
@@ -242,8 +242,7 @@ impl<'a> ExecutorBuilder<'a> {
                     schema,
                 ))
             }
-
-            Plan::Explain(display_str) => {
+            Plan::Explain(_) => {
                 let values_exec =
                     ExecuteTreeNode::from(ValuesExecutor::new(vec![Tuple::new_default()])).into();
                 Ok((values_exec, Schema::default()))
diff --git a/src/sql/runtime/executor.rs b/src/sql/runtime/executor.rs
index b4fc0b7..8d462f9 100644
--- a/src/sql/runtime/executor.rs
+++ b/src/sql/runtime/executor.rs
@@ -4,7 +4,6 @@ use std::{
 };
 
 use derive_more::From;
-use futures::io::Window;
 use log::info;
 use rumqttc::{Event, Packet, QoS};
 use serde_json::Value;
@@ -15,12 +14,9 @@ use tokio::{
 
 use crate::{
     connector::MqttClient,
-    core::{tuple::Tuple, Datum, ErrorKind, SQLError, Type},
+    core::{tuple::Tuple, Datum, SQLError},
     sql::{
-        expression::{
-            aggregate::{AggregateFunction, AggregateState},
-            Expression,
-        },
+        expression::{aggregate::AggregateFunction, Expression},
         planner::{binder::ProjItem, WindowType},
         session::context::QueryContext,
     },
@@ -64,7 +60,7 @@ impl Executor {
         if let Executor::BuildExecuteTree(t) = self {
             t
         } else {
-            panic!("!!!")
+            panic!("Is not an `ExecuteTreeNode`")
         }
     }
 }
@@ -112,7 +108,7 @@ impl DDLExecutor {
                     ctx.storage_mgr.drop_relation(schema_name, table_name);
                 }
             }
-            DDLJob::ShowTables(schema_name) => {
+            DDLJob::ShowTables(_) => {
                 // I refuse to implement this as an DDL
             }
         }
@@ -320,16 +316,11 @@ impl ScanExecutor {
         }
     }
     pub fn start(&self, ctx: &mut QueryContext) -> Result<View, SQLError> {
-        let (stop_tx, mut stop_rx) = broadcast::channel(1);
+        let (stop_tx, _) = broadcast::channel(1);
         let (result_tx, result_rx) = broadcast::channel(512);
         let id = String::from("source");
         let mut mqtt_client = MqttClient::new(&id);
         let topic = String::from("/yisa/data");
-        let def = ctx
-            .catalog
-            .find_table_by_name(&*self.schema_name, &*self.table_name)
-            .unwrap()
-            .unwrap();
         tokio::spawn(async move {
             info!("ScanExecutor listening");
             mqtt_client
@@ -338,23 +329,16 @@ impl ScanExecutor {
                 .await
                 .unwrap();
             loop {
-                // let event = mqtt_client.event_loop.poll().await.unwrap();
                 while let Ok(notification) = mqtt_client.event_loop.poll().await {
-                    match notification {
-                        Event::Incoming(Packet::Publish(publish)) => {
-                            // let topic = publish.topic.clone();
-                            let message = String::from_utf8_lossy(&publish.payload);
-                            let parsed: HashMap<String, Value> =
-                                serde_json::from_str(message.as_ref()).unwrap();
-                            let tuple = Tuple::from_hashmap(parsed);
-                            // println!("scan recv {tuple}");
-                            result_tx.send(Ok(Some(tuple))).unwrap();
-                        }
-                        _ => {}
+                    if let Event::Incoming(Packet::Publish(publish)) = notification {
+                        let message = String::from_utf8_lossy(&publish.payload);
+                        let parsed: HashMap<String, Value> =
+                            serde_json::from_str(message.as_ref()).unwrap();
+                        let tuple = Tuple::from_hashmap(parsed);
+                        result_tx.send(Ok(Some(tuple))).unwrap();
                     }
                 }
             }
-            info!("ScanExecutor no longer listening");
         });
         Ok(View {
             result_receiver: result_rx,
diff --git a/src/sql/runtime/mod.rs b/src/sql/runtime/mod.rs
index cf65a0e..d6568aa 100644
--- a/src/sql/runtime/mod.rs
+++ b/src/sql/runtime/mod.rs
@@ -4,7 +4,6 @@ pub mod executor;
 
 pub use ddl::*;
 use executor::View;
-use log::info;
 
 use self::builder::ExecutorBuilder;
 use super::{planner::Plan, session::context::QueryContext};
diff --git a/src/sql/session/mod.rs b/src/sql/session/mod.rs
index 23ffdd2..9df47dd 100644
--- a/src/sql/session/mod.rs
+++ b/src/sql/session/mod.rs
@@ -1,8 +1,6 @@
 pub mod context;
 
 use log::info;
-use pgwire::api::results::FieldInfo;
-use sqlparser::ast::Statement;
 
 use self::context::QueryContext;
 use super::{
@@ -10,7 +8,7 @@ use super::{
     planner::binder::Binder,
     runtime::{execute_plan, executor::View},
 };
-use crate::core::{SQLError, Tuple};
+use crate::core::SQLError;
 
 pub struct Session {
     ctx: QueryContext,

From 216d82ced1c82beb3dca795eeb6b9741595f7123 Mon Sep 17 00:00:00 2001
From: longfangsong <longfangsong@icloud.com>
Date: Wed, 21 Aug 2024 12:01:26 +0200
Subject: [PATCH 2/2] fix the ridiculously deep nested if-let chain in
 binder.rs

---
 src/main.rs               |  2 ++
 src/sql/planner/binder.rs | 29 +++++++++++++----------------
 2 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index a02c44c..92fefb4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,5 @@
+#![feature(let_chains)]
+
 #[macro_use]
 extern crate lazy_static;
 
diff --git a/src/sql/planner/binder.rs b/src/sql/planner/binder.rs
index 2e20232..3777214 100644
--- a/src/sql/planner/binder.rs
+++ b/src/sql/planner/binder.rs
@@ -251,22 +251,19 @@ impl<'a> Binder<'a> {
             let group_by_expr = group_by.first().unwrap();
             let se = bind_scalar(ctx, &from_scope, group_by_expr)?;
             if let ScalarExpr::FunctionCall(name, args) = se {
-                if name.eq(&String::from("tumblingWindow")) && args.len() == 2 {
-                    if let ScalarExpr::Literal(datum) = args[0].clone() {
-                        if let Datum::String(s) = datum {
-                            if s.eq(&String::from("ss")) {
-                                if let ScalarExpr::Literal(datum) = args[1].clone() {
-                                    if let Datum::Int(v) = datum {
-                                        plan = Plan::Window {
-                                            window_type: WindowType::TumblingWindow,
-                                            length: v,
-                                            input: Box::new(plan),
-                                        };
-                                        group_by.remove(0);
-                                    }
-                                }
-                            }
-                        }
+                if name == "tumblingWindow" && args.len() == 2 {
+                    if let (
+                        ScalarExpr::Literal(Datum::String(s)),
+                        ScalarExpr::Literal(Datum::Int(v)),
+                    ) = (&args[0], &args[1])
+                        && s == "ss"
+                    {
+                        plan = Plan::Window {
+                            window_type: WindowType::TumblingWindow,
+                            length: *v,
+                            input: Box::new(plan),
+                        };
+                        group_by.remove(0);
                     }
                 }
             }