Skip to content

Commit

Permalink
enable check interval for engine
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Jan 10, 2025
1 parent 2ae2bfd commit e036d5b
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 81 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,17 @@ bash build rucat_state_monitor.sh

## TODO

1. Handle timeout for `Trigger*` states.
2. catch the spark driver log before deleting?
3. implement rucat-client (based on spark-connect-rs)
4. mock resource client. <https://github.com/asomers/mockall>
5. rucat server HA
6. multi rucat state monitors
7. More resource clients: Yarn, Spark standalone, Spark local, rust shuttle etc.
8. expose spark rpc port and web ui port
9. Remove all unreachable code by using stronger type.
1. catch the spark driver log before deleting?
2. implement rucat-client (based on spark-connect-rs)
3. mock resource client. <https://github.com/asomers/mockall>
4. rucat server HA
5. multi rucat state monitors
6. More resource clients: Yarn, Spark standalone, Spark local, rust shuttle etc.
7. expose spark rpc port and web ui port
8. Remove all unreachable code by using stronger type.
9. Define state monitor as a type.
10. rename WaitToStart to WaitToCreate
11. don't need to use millis for check interval, seconds is enough.

## Debug

Expand Down
22 changes: 18 additions & 4 deletions rucat_common/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pub mod surrealdb_client;
use ::core::future::Future;
use ::std::time::SystemTime;

use crate::engine::{CreateEngineRequest, EngineId};
use crate::engine::{EngineInfo, EngineState};
Expand Down Expand Up @@ -29,24 +30,38 @@ pub struct EngineIdAndInfo {
pub trait Database: Sized + Send + Sync + 'static {
/// Add the metadata of a new engine in the database,
/// generate an id for the engine and return it.
/// # Parameters
/// - `engine`: create engine request
/// - `next_update_time`: The time when the engine should be updated by the state monitor.
/// `None` means the engine does not need to be updated anymore.
/// # Return
/// - `Ok(EngineId)` if the engine is successfully added.
/// - `Err(_)` if any error occurs in the database.
fn add_engine(
&self,
engine: CreateEngineRequest,
next_update_time: Option<SystemTime>,
) -> impl Future<Output = Result<EngineId>> + Send;

/// Remove Engine.
/// # Return
/// - `Ok(None)` if the engine does not exist.
/// - `Ok(Some(UpdateEngineStateResponse))` if the engine exists.
/// - `Err(_)` if any error occurs in the database.
fn delete_engine(
fn remove_engine(
&self,
id: &EngineId,
current_state: &EngineState,
) -> impl Future<Output = Result<Option<UpdateEngineStateResponse>>> + Send;

/// Update the engine state to `after` only when
/// the engine exists and the current state is `before`.
/// # Parameters
/// - `id`: The id of the engine.
/// - `before`: The expected state of the engine before the update.
/// - `after`: The state that engine is wanted to be updated to.
/// - `next_update_time_millis`: The time when the engine should be updated by the state monitor.
/// `None` means the engine does not need to be updated anymore.
/// # Return
/// - `Ok(None)` if the engine does not exist.
/// - `Ok(Some(UpdateEngineStateResponse))` if the engine exists.
Expand All @@ -56,6 +71,7 @@ pub trait Database: Sized + Send + Sync + 'static {
id: &EngineId,
before: &EngineState,
after: &EngineState,
next_update_time_millis: Option<SystemTime>,
) -> impl Future<Output = Result<Option<UpdateEngineStateResponse>>> + Send;

/// Return `Ok(None)` if the engine does not exist
Expand All @@ -64,9 +80,7 @@ pub trait Database: Sized + Send + Sync + 'static {
/// Return a sorted list of all engine ids
fn list_engines(&self) -> impl Future<Output = Result<Vec<EngineId>>> + Send;

/// Return all engines that need to be updated.
/// This includes engines in state `WaitTo*`,
/// or those in `Running` and `*InProgress`, and the engine info has been outdated.
/// Return all out-of-date engines that need to be updated.
fn list_engines_need_update(&self)
-> impl Future<Output = Result<Vec<EngineIdAndInfo>>> + Send;
}
38 changes: 29 additions & 9 deletions rucat_common/src/database/surrealdb_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Client of SurrealDB
use ::std::time::{SystemTime, UNIX_EPOCH};

use ::serde::Deserialize;

use crate::engine::{CreateEngineRequest, EngineId};
Expand Down Expand Up @@ -54,10 +56,18 @@ impl SurrealDBClient {
.map_err(RucatError::fail_to_connect_database)?;
Ok(Self { client })
}

fn convert_system_time_to_millis(time: SystemTime) -> u128 {
time.duration_since(UNIX_EPOCH).unwrap().as_millis()
}
}

impl Database for SurrealDBClient {
async fn add_engine(&self, engine: CreateEngineRequest) -> Result<EngineId> {
async fn add_engine(
&self,
engine: CreateEngineRequest,
next_update_time: Option<SystemTime>,
) -> Result<EngineId> {
let info: EngineInfo = engine.try_into()?;
// always set next_update_time to now when adding a new engine,
// so that the state monitor will update the engine info immediately
Expand All @@ -77,7 +87,7 @@ impl Database for SurrealDBClient {
{ ErrorClean: string };
CREATE ONLY type::table($table)
SET info = $info, next_update_time = time::now()
SET info = $info, next_update_time = $next_update_time
RETURN VALUE record::id(id);
"#;

Expand All @@ -86,6 +96,11 @@ impl Database for SurrealDBClient {
.query(sql)
.bind(("table", Self::TABLE))
.bind(("info", info))
// the next_update_time field is not set in surreal when it is None
.bind((
"next_update_time",
next_update_time.map(Self::convert_system_time_to_millis),
))
.await
.map_err(RucatError::fail_to_update_database)?
.take(1)
Expand All @@ -96,7 +111,7 @@ impl Database for SurrealDBClient {
})
}

async fn delete_engine(
async fn remove_engine(
&self,
id: &EngineId,
current_state: &EngineState,
Expand Down Expand Up @@ -136,6 +151,7 @@ impl Database for SurrealDBClient {
id: &EngineId,
before: &EngineState,
after: &EngineState,
next_update_time: Option<SystemTime>,
) -> Result<Option<UpdateEngineStateResponse>> {
let sql = r#"
let $record_id = type::thing($tb, $id); // 0th return value
Expand All @@ -146,23 +162,25 @@ impl Database for SurrealDBClient {
IF $current_state IS NONE {
RETURN NONE; // 1st return value
} ELSE IF $current_state == $before {
UPDATE ONLY $record_id SET info.state = $after;
UPDATE ONLY $record_id SET info.state = $after, next_update_time = $next_update_time;
RETURN {before_state: $current_state, update_success: true}; // 1st return value
} ELSE {
RETURN {before_state: $current_state, update_success: false}; // 1st return value
}
};
COMMIT TRANSACTION;
"#;

let before_state: Option<UpdateEngineStateResponse> = self
.client
.query(sql)
.bind(("tb", Self::TABLE))
.bind(("id", id.to_string()))
// convert to vec because array cannot be serialized
.bind(("before", before.clone()))
.bind(("after", after.clone()))
.bind((
"next_update_time",
next_update_time.map(Self::convert_system_time_to_millis),
))
.await
.map_err(RucatError::fail_to_update_database)?
.take(1)
Expand Down Expand Up @@ -213,9 +231,7 @@ impl Database for SurrealDBClient {
let sql = r#"
SELECT VALUE {id: record::id(id), info: info}
FROM type::table($tb)
WHERE (info.state IN ["WaitToStart", "WaitToTerminate", "WaitToDelete"] OR info.state.ErrorWaitToClean)
OR ((info.state IN ["Running", "StartInProgress", "TerminateInProgress", "DeleteInProgress"] OR info.state.ErrorCleanInProgress)
AND next_update_time < time::now());
WHERE next_update_time != None && next_update_time < $now;
"#;

#[derive(Deserialize)]
Expand All @@ -228,6 +244,10 @@ impl Database for SurrealDBClient {
.client
.query(sql)
.bind(("tb", Self::TABLE))
.bind((
"now",
Self::convert_system_time_to_millis(SystemTime::now()),
))
.await
.map_err(RucatError::fail_to_read_database)?
.take(0)
Expand Down
21 changes: 14 additions & 7 deletions rucat_server/src/engine/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Restful API for engine management.
use ::std::time::SystemTime;

use ::rucat_common::{
anyhow::anyhow,
database::Database,
Expand All @@ -23,7 +25,10 @@ async fn create_engine<DB: Database>(
State(state): State<AppState<DB>>,
Json(body): Json<CreateEngineRequest>,
) -> Result<Json<EngineId>> {
let id = state.get_db().add_engine(body).await?;
let id = state
.get_db()
.add_engine(body, Some(SystemTime::now()))
.await?;
info!("Creating engine {}, wait to start", id);
Ok(Json(id))
}
Expand All @@ -42,7 +47,7 @@ where
match current_state {
s @ (WaitToStart | Terminated | ErrorClean(_)) => {
let response = db_client
.delete_engine(&id, &s)
.remove_engine(&id, &s)
.await?
.ok_or_else(|| RucatError::engine_not_found(&id))?;
if response.update_success {
Expand Down Expand Up @@ -75,9 +80,9 @@ async fn stop_engine<DB: Database>(
let mut current_state = get_engine_state(&id, db_client).await?;

loop {
let new_state = match current_state {
WaitToStart => Terminated,
StartInProgress | Running => WaitToTerminate,
let (new_state, next_update_time) = match current_state {
WaitToStart => (Terminated, None),
StartInProgress | Running => (WaitToTerminate, Some(SystemTime::now())),
other => {
return Err(RucatError::not_allowed(anyhow!(
"Engine {} is in {:?} state, cannot be stopped",
Expand All @@ -87,7 +92,7 @@ async fn stop_engine<DB: Database>(
}
};
let response = db_client
.update_engine_state(&id, &current_state, &new_state)
.update_engine_state(&id, &current_state, &new_state, next_update_time)
.await?
.ok_or_else(|| RucatError::engine_not_found(&id))?;
if response.update_success {
Expand Down Expand Up @@ -123,7 +128,9 @@ async fn restart_engine<DB: Database>(
}
};
let response = db_client
.update_engine_state(&id, &current_state, &new_state)
// For Running state, we set next_update_time to current time to trigger the state monitor immediately because
// rucat server does not know the check interval of the state monitor.
.update_engine_state(&id, &current_state, &new_state, Some(SystemTime::now()))
.await?
.ok_or_else(|| RucatError::engine_not_found(&id))?;
if response.update_success {
Expand Down
7 changes: 5 additions & 2 deletions rucat_server/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use ::std::time::SystemTime;

use ::mockall::mock;
use ::rucat_common::{
database::{Database, EngineIdAndInfo, UpdateEngineStateResponse},
Expand All @@ -10,13 +12,14 @@ use axum_test::TestServer;
mock! {
pub DB{}
impl Database for DB {
async fn add_engine(&self, engine: CreateEngineRequest) -> Result<EngineId>;
async fn delete_engine(&self, id: &EngineId, current_state: &EngineState) -> Result<Option<UpdateEngineStateResponse>>;
async fn add_engine(&self, engine: CreateEngineRequest, next_update_time: Option<SystemTime>) -> Result<EngineId>;
async fn remove_engine(&self, id: &EngineId, current_state: &EngineState) -> Result<Option<UpdateEngineStateResponse>>;
async fn update_engine_state(
&self,
id: &EngineId,
before: &EngineState,
after: &EngineState,
next_update_time: Option<SystemTime>,
) -> Result<Option<UpdateEngineStateResponse>>;
async fn get_engine(&self, id: &EngineId) -> Result<Option<EngineInfo>>;
async fn list_engines(&self) -> Result<Vec<EngineId>>;
Expand Down
31 changes: 18 additions & 13 deletions rucat_server/tests/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,20 @@ async fn create_engine_with_unsupported_engine_type() -> Result<()> {
async fn create_engine() -> Result<()> {
let mut db = MockDB::new();
db.expect_add_engine()
.with(predicate::eq(CreateEngineRequest {
name: "test".to_owned(),
engine_type: EngineType::Spark,
version: "3.5.3".to_owned(),
config: Some(BTreeMap::from([(
Cow::Borrowed("spark.executor.instances"),
Cow::Borrowed("1"),
)])),
}))
.with(
predicate::eq(CreateEngineRequest {
name: "test".to_owned(),
engine_type: EngineType::Spark,
version: "3.5.3".to_owned(),
config: Some(BTreeMap::from([(
Cow::Borrowed("spark.executor.instances"),
Cow::Borrowed("1"),
)])),
}),
predicate::always(),
)
.times(1)
.returning(|_| Ok(EngineId::new(Cow::Borrowed("123"))?));
.returning(|_, _| Ok(EngineId::new(Cow::Borrowed("123"))?));
let server = get_test_server(false, db).await?;

let response = server
Expand Down Expand Up @@ -200,7 +203,7 @@ async fn delete_engine() -> Result<()> {
EngineTime::now(),
)))
});
db.expect_delete_engine()
db.expect_remove_engine()
.with(
predicate::eq(EngineId::new(Cow::Borrowed("123"))?),
predicate::eq(&WaitToStart),
Expand Down Expand Up @@ -240,9 +243,10 @@ async fn stop_wait_to_start_engine() -> Result<()> {
predicate::eq(EngineId::new(Cow::Borrowed("123"))?),
predicate::eq(&WaitToStart),
predicate::eq(&Terminated),
predicate::eq(None),
)
.times(1)
.returning(|_, _, _| {
.returning(|_, _, _, _| {
Ok(Some(UpdateEngineStateResponse {
before_state: WaitToStart,
update_success: true,
Expand Down Expand Up @@ -290,9 +294,10 @@ async fn restart_terminated_engine() -> Result<()> {
predicate::eq(EngineId::new(Cow::Borrowed("123"))?),
predicate::eq(&Terminated),
predicate::eq(&WaitToStart),
predicate::always(),
)
.times(1)
.returning(|_, _, _| {
.returning(|_, _, _, _| {
Ok(Some(UpdateEngineStateResponse {
before_state: Terminated,
update_success: true,
Expand Down
Loading

0 comments on commit e036d5b

Please sign in to comment.