From 29495defc541693e7311e12015e75c87f278b96a Mon Sep 17 00:00:00 2001 From: longfangsong Date: Fri, 23 Aug 2024 08:26:08 +0200 Subject: [PATCH] fix metrics collecting --- .github/workflows/rust.yml | 13 ++--- Cargo.lock | 2 + Cargo.toml | 2 + src/main.rs | 117 ++++++++++++++++++++++--------------- src/metrics.rs | 8 +++ 5 files changed, 88 insertions(+), 54 deletions(-) create mode 100644 src/metrics.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 8e2cfc1..5c57484 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -2,21 +2,18 @@ name: Rust on: push: - branches: [ "master" ] + branches: ["master"] pull_request: - branches: [ "master" ] + branches: ["master"] env: CARGO_TERM_COLOR: always jobs: build: - runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - name: Build - run: cargo build --verbose - # - name: Run tests - # run: cargo test --verbose + - uses: actions/checkout@v4 + - name: Test + run: cargo test diff --git a/Cargo.lock b/Cargo.lock index 30435be..cbfde18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1428,6 +1428,7 @@ dependencies = [ "derive_more", "enum-as-inner", "futures", + "hyper", "lazy_static", "log", "once_cell", @@ -1442,6 +1443,7 @@ dependencies = [ "sysinfo", "tokio", "toml", + "tower", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a891267..13a9dd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,4 +24,6 @@ toml = "0.5" once_cell = "1.8" [dev-dependencies] +hyper = "1.4.1" sqllogictest = "0.13.0" +tower = "0.4.13" diff --git a/src/main.rs b/src/main.rs index 9b65e54..a7d37d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,21 @@ +#![feature(let_chains)] + #[macro_use] extern crate lazy_static; -use std::{collections::HashMap, mem, process::id, sync::Arc, time::Duration}; +use std::{collections::HashMap, mem, sync::Arc, time::Duration}; use axum::{ extract::{self}, - response::IntoResponse, routing::{delete, get, post}, Json, Router, }; use log::{info, LevelFilter}; -use prometheus::{Encoder, IntGauge, Registry, TextEncoder}; +use prometheus::{Encoder, TextEncoder}; use rumqttc::QoS; use serde::{Deserialize, Serialize}; -use sysinfo::{CpuExt, Pid, ProcessExt, System, SystemExt}; -use tokio::{ - sync::{mpsc, Mutex}, - time, -}; +use sysinfo::{CpuExt, System, SystemExt}; +use tokio::{sync::Mutex, time}; use catalog::Catalog; use core::Tuple; @@ -31,6 +29,7 @@ mod catalog; mod config; mod connector; mod core; +mod metrics; mod sql; mod storage; mod util; @@ -42,14 +41,11 @@ struct AppState { views: HashMap, next_id: usize, dummy_subscribers: HashMap>, - registry: Registry, } impl AppState { - pub fn new(registry: Registry) -> Self { - let mut app_state = Self::default(); - app_state.registry = registry; - return app_state; + pub fn new() -> Self { + Self::default() } } @@ -92,11 +88,10 @@ async fn execute_sql( } } }); - let view_manager = state.clone(); tokio::spawn(async move { while let Ok(Ok(result)) = receiver.recv().await { - if !result.is_none() { - let data = result.unwrap().parse_into_json().unwrap(); + if let Some(result) = result { + let data = result.parse_into_json().unwrap(); sender .client .publish("/yisa/data2", QoS::AtLeastOnce, false, data) @@ -145,48 +140,30 @@ async fn ping() -> &'static str { "pong" } -async fn metrics_handler(extract::State(state): extract::State>>) -> String { - let mut state = state.lock().await; +async fn metrics_handler() -> String { let mut buffer = Vec::new(); let encoder = TextEncoder::new(); - let metric_families = state.registry.gather(); + let metric_families = prometheus::gather(); encoder.encode(&metric_families, &mut buffer).unwrap(); String::from_utf8(buffer).unwrap() } -#[tokio::main] -pub async fn main() { - config::initialize_config(); - log::set_logger(&LOGGER) - .map(|()| log::set_max_level(LevelFilter::Info)) - .unwrap(); - - // initialize Prometheus registry - let registry = Registry::new(); - - // cpu and memory gauge - let cpu_gauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap(); - let memory_gauge = IntGauge::new("memory_usage", "Memory usage in bytes").unwrap(); - - // register gauge - registry.register(Box::new(cpu_gauge.clone())).unwrap(); - registry.register(Box::new(memory_gauge.clone())).unwrap(); +fn start_collecting_metric() { let mut sys = System::new(); - let current_pid = id() as i32; tokio::spawn(async move { let mut interval = time::interval(Duration::from_secs(1)); loop { interval.tick().await; sys.refresh_all(); - if let Some(process) = sys.process(Pid::from(current_pid)) { - cpu_gauge.set(process.cpu_usage() as i64); - memory_gauge.set(process.memory() as i64); - } + let cpu_usage = sys.global_cpu_info().cpu_usage() as i64; + let memory_usage = sys.used_memory() as i64; + metrics::CPU.set(cpu_usage); + metrics::MEMORY.set(memory_usage); } }); +} - let app_state = AppState::new(registry); - +fn app() -> Router { // Initialize database let catalog = Catalog::new(); let storage_mgr = StorageManager::default(); @@ -196,8 +173,8 @@ pub async fn main() { storage_mgr, }; let session = Arc::new(Mutex::new(Session::new(query_ctx))); - let http_addr = "127.0.0.1:3030"; - let app = Router::new() + let app_state = AppState::new(); + Router::new() .route("/metrics", get(metrics_handler)) .route( "/view", @@ -206,8 +183,56 @@ pub async fn main() { .route("/view", get(poll_view)) .route("/view", delete(delete_view)) .route("/ping", get(ping)) - .with_state(Arc::new(Mutex::new(app_state))); + .with_state(Arc::new(Mutex::new(app_state))) +} + +#[tokio::main] +pub async fn main() { + config::initialize_config(); + log::set_logger(&LOGGER) + .map(|()| log::set_max_level(LevelFilter::Info)) + .unwrap(); + start_collecting_metric(); + let app = app(); + let http_addr = "127.0.0.1:3030"; let listener = tokio::net::TcpListener::bind(http_addr).await.unwrap(); info!("HTTP listening to {}", http_addr); axum::serve(listener, app).await.unwrap(); } + +#[cfg(test)] +mod tests { + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use futures::StreamExt; + use tower::ServiceExt; + + use super::*; + #[tokio::test] + async fn metric_works() { + start_collecting_metric(); + let app = app(); + tokio::time::sleep(Duration::from_secs(1)).await; + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let mut body = response.into_body().into_data_stream(); + let mut body_bytes = Vec::new(); + while let Some(Ok(bytes)) = body.next().await { + body_bytes.extend_from_slice(&bytes); + } + let body_string = String::from_utf8(body_bytes).unwrap(); + assert!(body_string.contains("cpu_usage")); + assert!(body_string.contains("memory_usage")); + } +} diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..085ca30 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,8 @@ +use prometheus::{register_int_gauge, IntGauge}; + +lazy_static! { + pub static ref CPU: IntGauge = + register_int_gauge!("cpu_usage", "CPU usage in percentage").unwrap(); + pub static ref MEMORY: IntGauge = + register_int_gauge!("memory_usage", "Memory usage in bytes").unwrap(); +}