Skip to content

Commit

Permalink
rewrite data store using surrealql
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Jun 2, 2024
1 parent 2d1d5f8 commit 2b75ea8
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 73 deletions.
95 changes: 49 additions & 46 deletions rucat_server/src/engine_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::{
use rucat_common::error::{Result, RucatError};
use serde::{Deserialize, Serialize};

use crate::state::{data_store::DataStore, AppState};
use crate::state::AppState;
use EngineState::*;

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -48,13 +48,6 @@ pub(crate) struct EngineInfo {
state: EngineState,
}

impl EngineInfo {
pub(crate) fn update_state(mut self, state: EngineState) -> Self {
self.state = state;
self
}
}

impl From<CreateEngineRequest> for EngineInfo {
fn from(value: CreateEngineRequest) -> Self {
EngineInfo {
Expand All @@ -65,15 +58,15 @@ impl From<CreateEngineRequest> for EngineInfo {
}
}

pub(crate) type EngineId = String;

#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub(crate) struct CreateEngineRequest {
name: String,
engine_type: EngineType,
}

pub(crate) type EngineId = String;

/// create an engine with the given configuration
async fn create_engine(
State(state): State<AppState<'_>>,
Expand All @@ -92,40 +85,57 @@ async fn delete_engine(Path(id): Path<EngineId>, State(state): State<AppState<'_
}

/// Stop an engine to release resources. But engine info is still kept in the data store.
/// TODO: make the state checking and updating atomic
async fn stop_engine(Path(id): Path<EngineId>, State(state): State<AppState<'_>>) -> Result<()> {
let data_store = state.get_data_store();

let engine = get_engine_helper(data_store, &id).await?;

match engine.state {
Pending | Running => {
data_store
.update_engine(&id, engine.update_state(Stopped))
.await
}
Stopped => RucatError::NotAllowedError(format!("Engine {} is already stopped", &id)).into(),
}
state
.get_data_store()
.update_engine_state(&id, [Pending, Running], Stopped)
.await?
.map_or_else(
|| {
Err(RucatError::NotFoundError(format!(
"Engine {} not found",
id
)))
},
|response| {
if response.update_success() {
Ok(())
} else {
Err(RucatError::NotAllowedError(format!(
"Engine {} is in {} state, cannot be stopped",
id,
response.get_before_state()
)))
}
},
)
}

/// Restart a stopped engine with the same configuration.
/// TODO: make the state checking and updating atomic
async fn restart_engine(Path(id): Path<EngineId>, State(state): State<AppState<'_>>) -> Result<()> {
let data_store = state.get_data_store();
let engine = get_engine_helper(data_store, &id).await?;

match engine.state {
Stopped => {
data_store
.update_engine(&id, engine.update_state(Pending))
.await
}
other => RucatError::NotAllowedError(format!(
"Engine {} is in {} state, cannot be restart",
&id, other
))
.into(),
}
state
.get_data_store()
.update_engine_state(&id, [Stopped], Pending)
.await?
.map_or_else(
|| {
Err(RucatError::NotFoundError(format!(
"Engine {} not found",
id
)))
},
|response| {
if response.update_success() {
Ok(())
} else {
Err(RucatError::NotAllowedError(format!(
"Engine {} is in {} state, cannot be restarted",
id,
response.get_before_state()
)))
}
},
)
}

async fn get_engine(
Expand Down Expand Up @@ -159,10 +169,3 @@ pub(crate) fn get_engine_router() -> Router<AppState<'static>> {
fn engine_not_found(id: &EngineId) -> RucatError {
RucatError::NotFoundError(format!("Engine {} not found", id))
}

async fn get_engine_helper(data_store: &DataStore<'_>, id: &EngineId) -> Result<EngineInfo> {
data_store
.get_engine(id)
.await?
.ok_or_else(|| engine_not_found(id))
}
122 changes: 98 additions & 24 deletions rucat_server/src/state/data_store.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
//! Datastore to record engines' infomation
use crate::engine_router::{EngineId, EngineInfo};
use crate::engine_router::{EngineId, EngineInfo, EngineState};
use rucat_common::error::{Result, RucatError};
use serde::Deserialize;
use surrealdb::{engine::local::Db, sql::Thing, Surreal};
use surrealdb::{engine::local::Db, Surreal};

type SurrealDBURI<'a> = &'a str;

/// Id of the [Engine] in [DataStore]
#[derive(Debug, Deserialize)]
struct Record {
id: Thing,
/// Response of updating engine state
/// The response contains the engine state before the update
/// and whether the update is successful.
#[derive(Deserialize)]
pub(crate) struct UpdateEngineStateResponse {
before: EngineState,
success: bool,
}

impl From<Record> for EngineId {
fn from(record: Record) -> Self {
record.id.id.to_string()
impl UpdateEngineStateResponse {
pub(crate) fn update_success(&self) -> bool {
self.success
}

pub(crate) fn get_before_state(&self) -> &EngineState {
&self.before
}
}

Expand Down Expand Up @@ -49,32 +56,87 @@ impl<'a> DataStore<'a> {
pub(crate) async fn add_engine(&self, engine: EngineInfo) -> Result<EngineId> {
match self {
Self::Embedded { store } => {
// TODO: return an Option, not a Vec
let record: Vec<Record> = store.create(Self::TABLE).content(engine).await?;
record.first().map_or_else(
|| Err(RucatError::DataStoreError("Add engine fails".to_owned())),
|rd| Ok(rd.id.id.to_string()),
)
let sql = r#"
CREATE ONLY type::table($table)
SET info = $engine
RETURN VALUE meta::id(id);
"#;

let record: Option<EngineId> = store
.query(sql)
.bind(("table", Self::TABLE))
.bind(("engine", engine))
.await?
.take(0)?;
record.ok_or_else(|| RucatError::DataStoreError("Add engine fails".to_owned()))
}
Self::Remote { .. } => todo!(),
}
}

pub(crate) async fn delete_engine(&self, id: &EngineId) -> Result<Option<EngineInfo>> {
match self {
Self::Embedded { store } => Ok(store.delete((Self::TABLE, id)).await?),
Self::Embedded { store } => {
let sql = r#"
SELECT VALUE info from
(DELETE ONLY type::thing($tb, $id) RETURN BEFORE);
"#;
let result: Option<EngineInfo> = store
.query(sql)
.bind(("tb", Self::TABLE))
.bind(("id", id))
.await?
.take(0)?;
Ok(result)
}
Self::Remote { .. } => {
todo!()
}
}
}

/// Update the engine with the given info
pub(crate) async fn update_engine(&self, id: &EngineId, engine: EngineInfo) -> Result<()> {
/// Update the engine state to **after** only when
/// the engine exists and the current state is the same as the **before**.
/// Return the engine state before the update.
/// Return None if the engine does not exist.
/// Throws an error if the engine state is not in the expected state.
pub(crate) async fn update_engine_state<const N: usize>(
&self,
id: &EngineId,
before: [EngineState; N],
after: EngineState,
) -> Result<Option<UpdateEngineStateResponse>> {
match self {
Self::Embedded { store } => {
let _: Option<Record> = store.update((Self::TABLE, id)).content(engine).await?;
Ok(())
// The query returns None if the engine does not exist
// Throws an error if the engine state is not in the expected state
// Otherwise, update the engine state and returns the engine state before update
let sql = r#"
let $record_id = type::thing($tb, $id); // 0th return value
BEGIN TRANSACTION;
LET $current_state = (SELECT VALUE info.state from only $record_id); // 1st return value
IF $current_state IS NONE {
RETURN NONE; // 2nd return value
} ELSE IF $current_state IN $before {
UPDATE ONLY $record_id SET info.state = $after;
RETURN {before: $current_state, success: true}; // 2nd return value
} ELSE {
RETURN {before: $current_state, success: false}; // 2nd return value
};
COMMIT TRANSACTION;
"#;

let before_state: Option<UpdateEngineStateResponse> = store
.query(sql)
.bind(("tb", Self::TABLE))
.bind(("id", id))
// convert to vec because array cannot be serialized
.bind(("before", before.to_vec()))
.bind(("after", after))
.await?
.take(2)?; // The 3rd statement is the if-else which is what we want

Ok(before_state)
}
Self::Remote { .. } => {
todo!()
Expand All @@ -86,8 +148,17 @@ impl<'a> DataStore<'a> {
pub(crate) async fn get_engine(&self, id: &EngineId) -> Result<Option<EngineInfo>> {
match self {
Self::Embedded { store } => {
// have to do this redundant format to pass the type checker
Ok(store.select((Self::TABLE, id)).await?)
let sql = r#"
SELECT VALUE info
FROM ONLY type::thing($tb, $id);
"#;
let info: Option<EngineInfo> = store
.query(sql)
.bind(("tb", Self::TABLE))
.bind(("id", id))
.await?
.take(0)?;
Ok(info)
}
Self::Remote { .. } => {
todo!()
Expand All @@ -99,9 +170,12 @@ impl<'a> DataStore<'a> {
pub(crate) async fn list_engines(&self) -> Result<Vec<EngineId>> {
match self {
DataStore::Embedded { store } => {
let records: Vec<Record> = store.select(Self::TABLE).await?;
let mut ids: Vec<EngineId> = records.into_iter().map(Record::into).collect();
let sql = r#"
SELECT VALUE meta::id(id) FROM type::table($tb);
"#;

let mut ids: Vec<EngineId> =
store.query(sql).bind(("tb", Self::TABLE)).await?.take(0)?;
ids.sort();
Ok(ids)
}
Expand Down
11 changes: 8 additions & 3 deletions rucat_server/tests/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ async fn delete_engine() -> Result<()> {

let response = server.delete(&format!("/engine/{}", engine_id)).await;
response.assert_status_ok();

Ok(())
}

Expand Down Expand Up @@ -172,7 +171,7 @@ async fn stop_engine_twice() -> Result<()> {
let response = server.post(&format!("/engine/{}/stop", engine_id)).await;
response.assert_status_forbidden();
response.assert_text(format!(
"Not allowed error: Engine {} is already stopped",
"Not allowed error: Engine {} is in Stopped state, cannot be stopped",
engine_id
));

Expand Down Expand Up @@ -210,13 +209,19 @@ async fn cannot_restart_pending_engine() -> Result<()> {
let response = server.post(&format!("/engine/{}/restart", engine_id)).await;
response.assert_status_forbidden();
response.assert_text(format!(
"Not allowed error: Engine {} is in Pending state, cannot be restart",
"Not allowed error: Engine {} is in Pending state, cannot be restarted",
engine_id
));

Ok(())
}

#[tokio::test]
#[should_panic(expected = "not yet implemented")]
async fn cannot_restart_running_engine() {
todo!("not yet implemented")
}

#[tokio::test]
async fn list_engines_empty() -> Result<()> {
let server = get_test_server().await?;
Expand Down

0 comments on commit 2b75ea8

Please sign in to comment.