diff --git a/torture/src/agent.rs b/torture/src/agent.rs index 6da0babb..2126fad0 100644 --- a/torture/src/agent.rs +++ b/torture/src/agent.rs @@ -110,12 +110,12 @@ pub async fn run(input: UnixStream) -> Result<()> { should_crash: None, }) => { let start = std::time::Instant::now(); - agent.rollback(n_commits).await?; + let outcome = agent.rollback(n_commits).await; tracing::info!("rollback took {}ms", start.elapsed().as_millis()); stream .send(Envelope { reqno, - message: ToSupervisor::Ack, + message: ToSupervisor::RollbackResponse { outcome }, }) .await?; } @@ -295,27 +295,30 @@ impl Agent { // Perform the commit. let commit_result = tokio::task::block_in_place(|| session.finish(actuals)?.commit(&nomt)); - - // Classify the result into one of the outcome bins. - let outcome = match commit_result { - Ok(()) => Outcome::Success, - Err(ref err) if is_enospc(err) => Outcome::StorageFull, - Err(err) => Outcome::UnknownFailure(err.to_string()), - }; + let commit_outcome = classify_result(commit_result); // Log the outcome if it was not successful. - if !matches!(outcome, Outcome::Success) { - trace!("unsuccessful commit: {:?}", outcome); + if !matches!(commit_outcome, Outcome::Success) { + trace!("unsuccessful commit: {:?}", commit_outcome); } - outcome + commit_outcome } - async fn rollback(&mut self, n_commits: usize) -> Result<()> { + async fn rollback(&mut self, n_commits: usize) -> Outcome { // UNWRAP: `nomt` is always `Some` except recreation. let nomt = self.nomt.as_ref().unwrap(); - tokio::task::block_in_place(|| nomt.rollback(n_commits))?; - Ok(()) + + // Perform the rollback. + let rollback_result = tokio::task::block_in_place(|| nomt.rollback(n_commits)); + let rollback_outcome = classify_result(rollback_result); + + // Log the outcome if it was not successful. + if !matches!(rollback_outcome, Outcome::Success) { + trace!("unsuccessful rollback: {:?}", rollback_outcome); + } + + rollback_outcome } fn query(&mut self, key: message::Key) -> Result> { @@ -332,6 +335,15 @@ impl Agent { } } +/// Classify an operation result into one of the outcome. +fn classify_result(operation_result: anyhow::Result<()>) -> Outcome { + match operation_result { + Ok(()) => Outcome::Success, + Err(ref err) if is_enospc(err) => Outcome::StorageFull, + Err(err) => Outcome::UnknownFailure(err.to_string()), + } +} + /// Examines the given error to determine if it is an `ENOSPC` IO error. fn is_enospc(err: &anyhow::Error) -> bool { let Some(io_err) = err.downcast_ref::() else { diff --git a/torture/src/message.rs b/torture/src/message.rs index eb8ef7cd..22cef753 100644 --- a/torture/src/message.rs +++ b/torture/src/message.rs @@ -166,7 +166,12 @@ pub enum ToSupervisor { CommitResponse { /// The time it took for the operation to complete. elapsed: Duration, - /// The outcome of the operation. + /// The outcome of the commit. + outcome: Outcome, + }, + /// The response to a completed rollback request. + RollbackResponse { + /// The outcome of the rollback. outcome: Outcome, }, /// The response to a query for a key-value pair. diff --git a/torture/src/supervisor/workload.rs b/torture/src/supervisor/workload.rs index 39e49c72..e815a903 100644 --- a/torture/src/supervisor/workload.rs +++ b/torture/src/supervisor/workload.rs @@ -623,13 +623,21 @@ impl Workload { snapshot: Snapshot, ) -> anyhow::Result<()> { trace!("exercising rollback of {} commits", n_commits_to_rollback); - rr.send_request(crate::message::ToAgent::Rollback( - crate::message::RollbackPayload { - n_commits: n_commits_to_rollback, - should_crash: None, - }, - )) - .await?; + let ToSupervisor::RollbackResponse { outcome } = rr + .send_request(crate::message::ToAgent::Rollback( + crate::message::RollbackPayload { + n_commits: n_commits_to_rollback, + should_crash: None, + }, + )) + .await? + else { + return Err(anyhow::anyhow!( + "RollbackCommit did not execute successfully" + )); + }; + + self.ensure_outcome_validity(rr, &outcome).await?; let agent_sync_seqn = rr.send_query_sync_seqn().await?; if agent_sync_seqn != self.state.committed.sync_seqn + 1 {