From 5469532cdaf87791b3af5d7e2918e92725f8dfc5 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 18 Jun 2024 17:27:20 +0800 Subject: [PATCH] feat: SQLNESS SLEEP (#67) ## Rationale Some test would require a certain amount of time to output final results(i.e. continous aggregate with some amount of delay) So it make sense for sqlness to have a `sleep` function, i.e. mysql&postgre sql both have sleep function, but it could be useful to have a sleep function in test client side anyway. ## Detailed Changes add a `-- SQLNESS SLEEP ` to sleep for given time in milliseconds before executing query, which internally just spawn a new thread for sleeping when sleep is needed, this is for simplicity sake, and also to allow cross-runtime `async` sleep. The overhead of spawn a thread is deemed low since the query is sleeping anyway, the only reason why we can't just blocking sleep directly is because it's `async` up there so blocking a `async` task make a lot of trouble to any `async` runtime. ## Test Plan a simple test added for waiting given time --- sqlness/Cargo.toml | 1 + sqlness/src/case.rs | 14 ++-- sqlness/src/interceptor.rs | 22 +++++- sqlness/src/interceptor/sleep.rs | 101 ++++++++++++++++++++++++++++ sqlness/src/interceptor/template.rs | 2 - 5 files changed, 131 insertions(+), 9 deletions(-) create mode 100644 sqlness/src/interceptor/sleep.rs diff --git a/sqlness/Cargo.toml b/sqlness/Cargo.toml index 5314b3c..764fff4 100644 --- a/sqlness/Cargo.toml +++ b/sqlness/Cargo.toml @@ -12,6 +12,7 @@ readme = { workspace = true } [dependencies] async-trait = "0.1" derive_builder = "0.11" +duration-str = "0.11.2" minijinja = "1" mysql = { version = "23.0.1", optional = true } postgres = { version = "0.19.7", optional = true } diff --git a/sqlness/src/case.rs b/sqlness/src/case.rs index a890f4e..4eba35e 100644 --- a/sqlness/src/case.rs +++ b/sqlness/src/case.rs @@ -142,7 +142,7 @@ impl Query { where W: Write, { - let context = self.before_execute_intercept(); + let context = self.before_execute_intercept().await; for comment in &self.comment_lines { writer.write_all(comment.as_bytes())?; writer.write_all("\n".as_bytes())?; @@ -160,7 +160,7 @@ impl Query { .query(context.clone(), format!("{sql};")) .await .to_string(); - self.after_execute_intercept(&mut result); + self.after_execute_intercept(&mut result).await; self.write_result(writer, result)?; } } @@ -172,19 +172,21 @@ impl Query { /// /// Interceptors may change either the query to be displayed or the query to be executed, /// so we need to return the query to caller. - fn before_execute_intercept(&mut self) -> QueryContext { + async fn before_execute_intercept(&mut self) -> QueryContext { let mut context = QueryContext::default(); for interceptor in &self.interceptors { - interceptor.before_execute(&mut self.execute_query, &mut context); + interceptor + .before_execute_async(&mut self.execute_query, &mut context) + .await; } context } - fn after_execute_intercept(&mut self, result: &mut String) { + async fn after_execute_intercept(&mut self, result: &mut String) { for interceptor in &self.interceptors { - interceptor.after_execute(result); + interceptor.after_execute_async(result).await; } } diff --git a/sqlness/src/interceptor.rs b/sqlness/src/interceptor.rs index b3e8a14..c312746 100644 --- a/sqlness/src/interceptor.rs +++ b/sqlness/src/interceptor.rs @@ -17,17 +17,33 @@ use crate::{ pub mod arg; pub mod env; pub mod replace; +pub mod sleep; pub mod sort_result; pub mod template; -pub type InterceptorRef = Box; +pub type InterceptorRef = Box; +#[async_trait::async_trait] pub trait Interceptor { #[allow(unused_variables)] fn before_execute(&self, execute_query: &mut Vec, context: &mut QueryContext) {} + #[allow(unused_variables)] + async fn before_execute_async( + &self, + execute_query: &mut Vec, + context: &mut QueryContext, + ) { + self.before_execute(execute_query, context) + } + #[allow(unused_variables)] fn after_execute(&self, result: &mut String) {} + + #[allow(unused_variables)] + async fn after_execute_async(&self, result: &mut String) { + self.after_execute(result) + } } pub type InterceptorFactoryRef = Arc; @@ -93,6 +109,10 @@ fn builtin_interceptors() -> HashMap { template::PREFIX.to_string(), Arc::new(TemplateInterceptorFactory {}) as _, ), + ( + sleep::PREFIX.to_string(), + Arc::new(sleep::SleepInterceptorFactory {}) as _, + ), ] .into_iter() .map(|(prefix, factory)| (prefix.to_string(), factory)) diff --git a/sqlness/src/interceptor/sleep.rs b/sqlness/src/interceptor/sleep.rs new file mode 100644 index 00000000..49b216a --- /dev/null +++ b/sqlness/src/interceptor/sleep.rs @@ -0,0 +1,101 @@ +// Copyright 2024 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::pin::Pin; +use std::task::Context; +use std::time::{Duration, Instant}; + +use crate::error::Result; +use crate::interceptor::{Interceptor, InterceptorFactory, InterceptorRef}; +use crate::SqlnessError; + +pub const PREFIX: &str = "SLEEP"; + +/// Sleep for given duration before executing the query. +/// +/// # Example +/// ``` sql +/// -- SQLNESS SLEEP +/// SELECT 1; +/// ``` +/// +/// valid duration format: +/// - `1s` for 1 second +/// - `1ms` for 1 millisecond +/// - `1s500ms` for 1.5 seconds +/// etc. See detailed format in [duration_str](https://docs.rs/duration-str/0.11.2/duration_str/) crate +/// +/// Note that this implementation is not accurate and may be affected by the system load. +/// It is guaranteed that the sleep time is at least the given milliseconds, but the lag may be +/// longer. +#[derive(Debug)] +pub struct SleepInterceptor { + duration: Duration, +} + +struct Sleep { + now: Instant, + duration: Duration, +} +impl core::future::Future for Sleep { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll { + let elapsed = self.now.elapsed(); + if elapsed < self.duration { + let waker = cx.waker().clone(); + // detach the thread and let it wake the waker later + let remaining = self.duration.saturating_sub(elapsed); + std::thread::spawn(move || { + std::thread::sleep(remaining); + waker.wake(); + }); + std::task::Poll::Pending + } else { + std::task::Poll::Ready(()) + } + } +} + +#[async_trait::async_trait] +impl Interceptor for SleepInterceptor { + async fn before_execute_async( + &self, + _execute_query: &mut Vec, + _context: &mut crate::case::QueryContext, + ) { + // impl a cross-runtime sleep + Sleep { + now: Instant::now(), + duration: self.duration, + } + .await; + } +} + +pub struct SleepInterceptorFactory; + +impl InterceptorFactory for SleepInterceptorFactory { + fn try_new(&self, ctx: &str) -> Result { + let duration = duration_str::parse(ctx).map_err(|e| SqlnessError::InvalidContext { + prefix: PREFIX.to_string(), + msg: format!("Failed to parse duration: {}", e), + })?; + Ok(Box::new(SleepInterceptor { duration })) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn wait_1500ms() { + let input = "1s500ms"; + let interceptor = SleepInterceptorFactory {}.try_new(input).unwrap(); + let now = Instant::now(); + interceptor + .before_execute_async(&mut vec![], &mut crate::QueryContext::default()) + .await; + let elasped = now.elapsed().as_millis() as u64; + assert!(elasped >= 1500); + } +} diff --git a/sqlness/src/interceptor/template.rs b/sqlness/src/interceptor/template.rs index 07903b6..d4a0f9b 100644 --- a/sqlness/src/interceptor/template.rs +++ b/sqlness/src/interceptor/template.rs @@ -59,8 +59,6 @@ impl Interceptor for TemplateInterceptor { .map(|v| v.to_string()) .collect::>(); } - - fn after_execute(&self, _result: &mut String) {} } impl InterceptorFactory for TemplateInterceptorFactory {