Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Jan 13, 2025
1 parent 499621c commit cf7dff9
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 102 deletions.
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ stateDiagram
ErrorClean --> [*]: DELETE
```

## How to test

```bash
cargo test
```

## Configurations

### rucat server configurations
Expand Down Expand Up @@ -214,6 +208,19 @@ bash build rucat_state_monitor.sh
- create an Apache Spark using the REST API
- connect to the spark connect server

## Test

### Unit test

```bash
cargo test
```

### Integration test (not automated)

1. Create a Spark engine, wait it to be running. Get the engine info, stop the engine, restart the engine, delete the engine.
2. Create a Spark engine with wrong configurations, wait it to be in error state. Delete the engine.

## TODO

1. catch the spark driver log before deleting?
Expand All @@ -224,7 +231,9 @@ bash build rucat_state_monitor.sh
6. More resource clients: Yarn, Spark standalone, Spark local, rust shuttle etc.
7. expose spark rpc port and web ui port
8. Remove all unreachable code by using stronger type.
9. Add test for state monitor.
10. Define engine states into 3 subtypes: InProgress, WaitTo and Stable. Customized the serde for the engine state.

## Debug

Dummy command that can make a pod running forever: `tail -f /dev/null`
Dummy command that can make a pod run forever: `tail -f /dev/null`
181 changes: 87 additions & 94 deletions rucat_state_monitor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use ::core::time::Duration;
use ::std::{borrow::Cow, time::SystemTime};
use ::std::{
borrow::Cow,
time::{Instant, SystemTime},
};

use ::rucat_common::{
database::{Database, EngineIdAndInfo},
Expand Down Expand Up @@ -51,102 +54,13 @@ where
/// This function runs forever to monitor the state of engines.
pub async fn run_state_monitor(&self) -> ! {
loop {
let start_time = std::time::Instant::now();
let start_time = Instant::now();
match self.db_client.list_engines_need_update().await {
Ok(engines) => {
info!("Detect {} engines need to update", engines.len());
// TODO: make this execute in parallel
for EngineIdAndInfo { id, info } in engines {
match info.state {
WaitToStart => {
if self.acquire_engine(&id, &WaitToStart).await {
info!("Create engine {}", id);
// create engine resource
let err_msg = match self
.resource_manager
.create_resource(&id, &info)
.await
{
Ok(()) => {
info!("Create engine resource for {}", id);
None
}
Err(e) => {
error!(
"Failed to create engine resource for {}: {}",
id, e
);
Some(Cow::Owned(e.to_string()))
}
};
self.release_engine(&TriggerStart, &id, err_msg).await;
}
}
WaitToTerminate => {
if self.acquire_engine(&id, &WaitToTerminate).await {
info!("Terminate engine {}", id);
// clean engine resource
let err_msg =
match self.resource_manager.clean_resource(&id).await {
Ok(()) => {
info!("Clean engine resource for {}", id);
None
}
Err(e) => {
error!(
"Failed to clean engine resource for {}: {}",
id, e
);
Some(Cow::Owned(e.to_string()))
}
};
self.release_engine(&TriggerTermination, &id, err_msg).await;
}
}
ErrorWaitToClean(s) => {
if self.acquire_engine(&id, &ErrorWaitToClean(s.clone())).await {
info!("Clean resource for error state engine {}", id);
// clean engine resource
let err_msg =
match self.resource_manager.clean_resource(&id).await {
Ok(()) => {
info!("Clean engine resource for {}", id);
None
}
Err(e) => {
error!(
"Failed to clean engine resource for {}: {}",
id, e
);
Some(Cow::Owned(e.to_string()))
}
};
self.release_engine(&ErrorTriggerClean(s), &id, err_msg)
.await;
}
}

old_state @ (Running
| StartInProgress
| TerminateInProgress
| ErrorCleanInProgress(_)) => {
let resource_state =
self.resource_manager.get_resource_state(&id).await;
let new_state = resource_state
.get_new_engine_state(&old_state)
.unwrap_or(old_state.clone());
self.inspect_engine_state_updating(
&id,
&old_state,
&new_state,
self.get_next_update_time(&new_state),
)
.await;
}
other => {
error!("Should not monitor engine {} in state {:?}", id, other);
}
}
for e in engines {
self.sync_engine(e).await;
}
}
Err(e) => {
Expand All @@ -163,6 +77,86 @@ where
}
}

/// Sync the engine state with the resource manager.
/// And update the engine state in the database.
async fn sync_engine(&self, engine: EngineIdAndInfo) {
let EngineIdAndInfo { id, info } = engine;
match info.state {
WaitToStart => {
if self.acquire_engine(&id, &WaitToStart).await {
info!("Create engine {}", id);
// create engine resource
let err_msg = match self.resource_manager.create_resource(&id, &info).await {
Ok(()) => {
info!("Create engine resource for {}", id);
None
}
Err(e) => {
error!("Failed to create engine resource for {}: {}", id, e);
Some(Cow::Owned(e.to_string()))
}
};
self.release_engine(&TriggerStart, &id, err_msg).await;
}
}
WaitToTerminate => {
if self.acquire_engine(&id, &WaitToTerminate).await {
info!("Terminate engine {}", id);
// clean engine resource
let err_msg = match self.resource_manager.clean_resource(&id).await {
Ok(()) => {
info!("Clean engine resource for {}", id);
None
}
Err(e) => {
error!("Failed to clean engine resource for {}: {}", id, e);
Some(Cow::Owned(e.to_string()))
}
};
self.release_engine(&TriggerTermination, &id, err_msg).await;
}
}
ErrorWaitToClean(s) => {
if self.acquire_engine(&id, &ErrorWaitToClean(s.clone())).await {
info!("Clean resource for error state engine {}", id);
// clean engine resource
let err_msg = match self.resource_manager.clean_resource(&id).await {
Ok(()) => {
info!("Clean engine resource for {}", id);
None
}
Err(e) => {
error!("Failed to clean engine resource for {}: {}", id, e);
Some(Cow::Owned(e.to_string()))
}
};
self.release_engine(&ErrorTriggerClean(s), &id, err_msg)
.await;
}
}

old_state @ (Running
| StartInProgress
| TerminateInProgress
| ErrorCleanInProgress(_)) => {
let resource_state = self.resource_manager.get_resource_state(&id).await;
let new_state = resource_state
.get_new_engine_state(&old_state)
.unwrap_or(old_state.clone());
self.inspect_engine_state_updating(
&id,
&old_state,
&new_state,
self.get_next_update_time(&new_state),
)
.await;
}
other => {
error!("Should not monitor engine {} in state {:?}", id, other);
}
}
}

/// For engine in state `Trigger*`, release it by updating its state to `*InProgress`,
/// or to Error states if error message is provided.
async fn release_engine(
Expand All @@ -173,7 +167,6 @@ where
) {
let now = SystemTime::now();
let next_update_time = Some(now + self.check_interval);
// TODO: wrap `Trigger*` states in a new type
let (new_state, next_update_time) = match (current_state, err_msg) {
(TriggerStart, None) => (StartInProgress, next_update_time),
(TriggerStart, Some(s)) => (ErrorClean(s), None),
Expand Down
1 change: 0 additions & 1 deletion rucat_state_monitor/src/resource_manager/k8s_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl K8sPodState {

impl ResourceState for K8sPodState {
fn get_new_engine_state(&self, old_state: &EngineState) -> Option<EngineState> {
// TODO: wrap `Running` and `*InProgress` states in a new type
match (old_state, self) {
(EngineState::StartInProgress, Self::Pending | Self::Unknown) => None,
(EngineState::StartInProgress, Self::Running) => Some(EngineState::Running),
Expand Down

0 comments on commit cf7dff9

Please sign in to comment.