Skip to content

Commit

Permalink
feat: SQLNESS SLEEP <DURATION_STRING> (#67)
Browse files Browse the repository at this point in the history
## Rationale

Some test would require a certain amount of time to output final
results(i.e. continous aggregate with some amount of delay)
So it make sense for sqlness to have a `sleep` function, i.e.
mysql&postgre sql both have sleep function, but it could be useful to
have a sleep function in test client side anyway.

## Detailed Changes

add a `-- SQLNESS SLEEP <Milliseconds>` to sleep for given time in
milliseconds before executing query,
which internally just spawn a new thread for sleeping when sleep is
needed, this is for simplicity sake, and also to allow cross-runtime
`async` sleep. The overhead of spawn a thread is deemed low since the
query is sleeping anyway, the only reason why we can't just blocking
sleep directly is because it's `async` up there so blocking a `async`
task make a lot of trouble to any `async` runtime.

## Test Plan

a simple test added for waiting given time
  • Loading branch information
discord9 authored Jun 18, 2024
1 parent e4c602e commit 5469532
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 9 deletions.
1 change: 1 addition & 0 deletions sqlness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ readme = { workspace = true }
[dependencies]
async-trait = "0.1"
derive_builder = "0.11"
duration-str = "0.11.2"
minijinja = "1"
mysql = { version = "23.0.1", optional = true }
postgres = { version = "0.19.7", optional = true }
Expand Down
14 changes: 8 additions & 6 deletions sqlness/src/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl Query {
where
W: Write,
{
let context = self.before_execute_intercept();
let context = self.before_execute_intercept().await;
for comment in &self.comment_lines {
writer.write_all(comment.as_bytes())?;
writer.write_all("\n".as_bytes())?;
Expand All @@ -160,7 +160,7 @@ impl Query {
.query(context.clone(), format!("{sql};"))
.await
.to_string();
self.after_execute_intercept(&mut result);
self.after_execute_intercept(&mut result).await;
self.write_result(writer, result)?;
}
}
Expand All @@ -172,19 +172,21 @@ impl Query {
///
/// Interceptors may change either the query to be displayed or the query to be executed,
/// so we need to return the query to caller.
fn before_execute_intercept(&mut self) -> QueryContext {
async fn before_execute_intercept(&mut self) -> QueryContext {
let mut context = QueryContext::default();

for interceptor in &self.interceptors {
interceptor.before_execute(&mut self.execute_query, &mut context);
interceptor
.before_execute_async(&mut self.execute_query, &mut context)
.await;
}

context
}

fn after_execute_intercept(&mut self, result: &mut String) {
async fn after_execute_intercept(&mut self, result: &mut String) {
for interceptor in &self.interceptors {
interceptor.after_execute(result);
interceptor.after_execute_async(result).await;
}
}

Expand Down
22 changes: 21 additions & 1 deletion sqlness/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,33 @@ use crate::{
pub mod arg;
pub mod env;
pub mod replace;
pub mod sleep;
pub mod sort_result;
pub mod template;

pub type InterceptorRef = Box<dyn Interceptor>;
pub type InterceptorRef = Box<dyn Interceptor + Send + Sync>;

#[async_trait::async_trait]
pub trait Interceptor {
#[allow(unused_variables)]
fn before_execute(&self, execute_query: &mut Vec<String>, context: &mut QueryContext) {}

#[allow(unused_variables)]
async fn before_execute_async(
&self,
execute_query: &mut Vec<String>,
context: &mut QueryContext,
) {
self.before_execute(execute_query, context)
}

#[allow(unused_variables)]
fn after_execute(&self, result: &mut String) {}

#[allow(unused_variables)]
async fn after_execute_async(&self, result: &mut String) {
self.after_execute(result)
}
}

pub type InterceptorFactoryRef = Arc<dyn InterceptorFactory>;
Expand Down Expand Up @@ -93,6 +109,10 @@ fn builtin_interceptors() -> HashMap<String, InterceptorFactoryRef> {
template::PREFIX.to_string(),
Arc::new(TemplateInterceptorFactory {}) as _,
),
(
sleep::PREFIX.to_string(),
Arc::new(sleep::SleepInterceptorFactory {}) as _,
),
]
.into_iter()
.map(|(prefix, factory)| (prefix.to_string(), factory))
Expand Down
101 changes: 101 additions & 0 deletions sqlness/src/interceptor/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 CeresDB Project Authors. Licensed under Apache-2.0.

use std::pin::Pin;
use std::task::Context;
use std::time::{Duration, Instant};

use crate::error::Result;
use crate::interceptor::{Interceptor, InterceptorFactory, InterceptorRef};
use crate::SqlnessError;

pub const PREFIX: &str = "SLEEP";

/// Sleep for given duration before executing the query.
///
/// # Example
/// ``` sql
/// -- SQLNESS SLEEP <Duration>
/// SELECT 1;
/// ```
///
/// valid duration format:
/// - `1s` for 1 second
/// - `1ms` for 1 millisecond
/// - `1s500ms` for 1.5 seconds
/// etc. See detailed format in [duration_str](https://docs.rs/duration-str/0.11.2/duration_str/) crate
///
/// Note that this implementation is not accurate and may be affected by the system load.
/// It is guaranteed that the sleep time is at least the given milliseconds, but the lag may be
/// longer.
#[derive(Debug)]
pub struct SleepInterceptor {
duration: Duration,
}

struct Sleep {
now: Instant,
duration: Duration,
}
impl core::future::Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let elapsed = self.now.elapsed();
if elapsed < self.duration {
let waker = cx.waker().clone();
// detach the thread and let it wake the waker later
let remaining = self.duration.saturating_sub(elapsed);
std::thread::spawn(move || {
std::thread::sleep(remaining);
waker.wake();
});
std::task::Poll::Pending
} else {
std::task::Poll::Ready(())
}
}
}

#[async_trait::async_trait]
impl Interceptor for SleepInterceptor {
async fn before_execute_async(
&self,
_execute_query: &mut Vec<String>,
_context: &mut crate::case::QueryContext,
) {
// impl a cross-runtime sleep
Sleep {
now: Instant::now(),
duration: self.duration,
}
.await;
}
}

pub struct SleepInterceptorFactory;

impl InterceptorFactory for SleepInterceptorFactory {
fn try_new(&self, ctx: &str) -> Result<InterceptorRef> {
let duration = duration_str::parse(ctx).map_err(|e| SqlnessError::InvalidContext {
prefix: PREFIX.to_string(),
msg: format!("Failed to parse duration: {}", e),
})?;
Ok(Box::new(SleepInterceptor { duration }))
}
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
async fn wait_1500ms() {
let input = "1s500ms";
let interceptor = SleepInterceptorFactory {}.try_new(input).unwrap();
let now = Instant::now();
interceptor
.before_execute_async(&mut vec![], &mut crate::QueryContext::default())
.await;
let elasped = now.elapsed().as_millis() as u64;
assert!(elasped >= 1500);
}
}
2 changes: 0 additions & 2 deletions sqlness/src/interceptor/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ impl Interceptor for TemplateInterceptor {
.map(|v| v.to_string())
.collect::<Vec<_>>();
}

fn after_execute(&self, _result: &mut String) {}
}

impl InterceptorFactory for TemplateInterceptorFactory {
Expand Down

0 comments on commit 5469532

Please sign in to comment.