Skip to content

Commit

Permalink
Fix stage key extraction (#2472)
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkharderdev authored May 6, 2022
1 parent 7c98a43 commit 940fbf0
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions ballista/rust/scheduler/src/state/persistent_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,21 @@ fn extract_job_id_from_job_key(job_key: &str) -> Result<&str> {

fn extract_stage_id_from_stage_key(stage_key: &str) -> Result<StageKey> {
let splits: Vec<&str> = stage_key.split('/').collect();
if splits.len() < 4 {
if splits.len() > 4 {
Ok((
splits[splits.len() - 2].to_string(),
splits[splits.len() - 1].parse::<u32>().map_err(|e| {
BallistaError::Internal(format!(
"Invalid stage ID in stage key: {}, {:?}",
stage_key, e
))
})?,
))
} else {
Err(BallistaError::Internal(format!(
"Unexpected stage key: {}",
stage_key
)))
} else {
Ok((
splits.get(2).unwrap().to_string(),
splits.get(3).unwrap().parse::<u32>().unwrap(),
))
}
}

Expand All @@ -352,3 +357,32 @@ fn encode_protobuf<T: Message + Default>(msg: &T) -> Result<Vec<u8>> {
})?;
Ok(value)
}

#[cfg(test)]
mod test {
use super::extract_stage_id_from_stage_key;

#[test]
fn test_extract_stage_id_from_stage_key() {
let (job_id, stage_id) =
extract_stage_id_from_stage_key("/ballista/default/stages/2Yoyba8/1")
.expect("extracting stage key");

assert_eq!(job_id.as_str(), "2Yoyba8");
assert_eq!(stage_id, 1);

let (job_id, stage_id) =
extract_stage_id_from_stage_key("ballista/default/stages/2Yoyba8/1")
.expect("extracting stage key");

assert_eq!(job_id.as_str(), "2Yoyba8");
assert_eq!(stage_id, 1);

let (job_id, stage_id) =
extract_stage_id_from_stage_key("ballista//stages/2Yoyba8/1")
.expect("extracting stage key");

assert_eq!(job_id.as_str(), "2Yoyba8");
assert_eq!(stage_id, 1);
}
}

0 comments on commit 940fbf0

Please sign in to comment.