Skip to content

Commit

Permalink
make state monitor test friendly
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Jan 15, 2025
1 parent 689d58b commit d38b151
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 29 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ cargo test
4. rucat server HA
5. multi rucat state monitors
6. More resource clients: Yarn, Spark standalone, Spark local, rust shuttle etc.
7. expose spark rpc port and web ui port
8. Remove all unreachable code by using stronger type.
9. Add test for state monitor.
10. Define engine states into 3 subtypes: InProgress, WaitTo and Stable. Customized the serde for the engine state.
7. Add test for state monitor.
8. rucat session for RPC connect. (Protocol design)

## Debug

Expand Down
118 changes: 93 additions & 25 deletions rucat_state_monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ where
&id,
&old_state,
&new_state,
self.get_next_update_time(&new_state),
get_next_update_time(
&new_state,
SystemTime::now(),
self.check_interval,
self.trigger_state_timeout,
),
)
.await;
}
Expand All @@ -165,20 +170,23 @@ where
id: &EngineId,
err_msg: Option<Cow<'static, str>>,
) {
let now = SystemTime::now();
let next_update_time = Some(now + self.check_interval);
let (new_state, next_update_time) = match (current_state, err_msg) {
(TriggerStart, None) => (StartInProgress, next_update_time),
(TriggerStart, Some(s)) => (ErrorClean(s), None),
(TriggerTermination, None) => (TerminateInProgress, next_update_time),
(TriggerTermination, Some(s)) => (ErrorWaitToClean(s), Some(now)),
(ErrorTriggerClean(s), None) => (ErrorCleanInProgress(s.clone()), next_update_time),
(ErrorTriggerClean(s1), Some(s2)) => (
ErrorWaitToClean(Cow::Owned(format!("{}\n\n{}", s1, s2))),
Some(now),
),
let new_state = match (current_state, err_msg) {
(TriggerStart, None) => StartInProgress,
(TriggerStart, Some(s)) => ErrorClean(s),
(TriggerTermination, None) => TerminateInProgress,
(TriggerTermination, Some(s)) => ErrorWaitToClean(s),
(ErrorTriggerClean(s), None) => ErrorCleanInProgress(s.clone()),
(ErrorTriggerClean(s1), Some(s2)) => {
ErrorWaitToClean(Cow::Owned(format!("{}\n\n{}", s1, s2)))
}
_ => unreachable!("Should not release engine in state {:?}", current_state),
};
let next_update_time = get_next_update_time(
&new_state,
SystemTime::now(),
self.check_interval,
self.trigger_state_timeout,
);
let response = self
.db_client
.update_engine_state(id, current_state, &new_state, next_update_time)
Expand Down Expand Up @@ -221,13 +229,18 @@ where
/// # Return
/// Whether the engine is acquired successfully.
async fn acquire_engine(&self, id: &EngineId, current_state: &EngineState) -> bool {
let next_update_time = Some(SystemTime::now() + self.trigger_state_timeout);
let new_state = match current_state {
WaitToStart => TriggerStart,
WaitToTerminate => TriggerTermination,
ErrorWaitToClean(s) => ErrorTriggerClean(s.clone()),
_ => unreachable!("Should not acquire engine in state {:?}", current_state),
};
let next_update_time = get_next_update_time(
&new_state,
SystemTime::now(),
self.check_interval,
self.trigger_state_timeout,
);
self.inspect_engine_state_updating(id, current_state, &new_state, next_update_time)
.await
}
Expand Down Expand Up @@ -276,18 +289,73 @@ where
}
}
}
}

fn get_next_update_time(&self, state: &EngineState) -> Option<SystemTime> {
let now = SystemTime::now();
match state {
WaitToStart | WaitToTerminate | ErrorWaitToClean(_) => Some(now),
TriggerStart | TriggerTermination | ErrorTriggerClean(_) => {
Some(now + self.trigger_state_timeout)
}
StartInProgress | Running | TerminateInProgress | ErrorCleanInProgress(_) => {
Some(now + self.check_interval)
}
Terminated | ErrorClean(_) => None,
/// Get the next update time of the engine.
/// # Parameters
/// - `state`: The state of the engine.
/// - `now`: The current time.
/// - `check_interval`: The interval between two updates.
/// - `trigger_state_timeout`: The timeout of trigger states.
/// # Return
/// `None` means the engine does not need to be updated anymore.
/// `Some(SystemTime)` means the engine should be updated at the returned time.
fn get_next_update_time(
state: &EngineState,
now: SystemTime,
check_interval: Duration,
trigger_state_timeout: Duration,
) -> Option<SystemTime> {
match state {
WaitToStart | WaitToTerminate | ErrorWaitToClean(_) => Some(now),
TriggerStart | TriggerTermination | ErrorTriggerClean(_) => {
Some(now + trigger_state_timeout)
}
StartInProgress | Running | TerminateInProgress | ErrorCleanInProgress(_) => {
Some(now + check_interval)
}
Terminated | ErrorClean(_) => None,
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_next_update_time() {
let now = SystemTime::UNIX_EPOCH;
let check_interval = Duration::from_secs(3);
let trigger_state_timeout = Duration::from_secs(5);
let get_next_update_time =
|state| get_next_update_time(state, now, check_interval, trigger_state_timeout);

let check_time = Some(now + check_interval);
let trigger_time = Some(now + trigger_state_timeout);
let now = Some(now);
assert_eq!(get_next_update_time(&WaitToStart), now);
assert_eq!(get_next_update_time(&WaitToTerminate), now);
assert_eq!(
get_next_update_time(&ErrorWaitToClean(Cow::Borrowed("error")),),
now
);
assert_eq!(get_next_update_time(&TriggerStart), trigger_time);
assert_eq!(get_next_update_time(&TriggerTermination), trigger_time);
assert_eq!(
get_next_update_time(&ErrorTriggerClean(Cow::Borrowed("error"))),
trigger_time
);
assert_eq!(get_next_update_time(&StartInProgress), check_time);
assert_eq!(get_next_update_time(&Running), check_time);
assert_eq!(get_next_update_time(&TerminateInProgress), check_time);
assert_eq!(
get_next_update_time(&ErrorCleanInProgress(Cow::Borrowed("error"))),
check_time
);
assert_eq!(get_next_update_time(&Terminated), None);
assert_eq!(
get_next_update_time(&ErrorClean(Cow::Borrowed("error"))),
None
);
}
}

0 comments on commit d38b151

Please sign in to comment.