Skip to content

Commit

Permalink
feat(torture): unify rollback and rollback crash
Browse files Browse the repository at this point in the history
The unification of the two has the side effect of handling
ENOSPC errors also for rollbacks that are expected to crash.
  • Loading branch information
gabriele-0201 authored and pepyakin committed Feb 15, 2025
1 parent fe2cbce commit 2d0e980
Showing 1 changed file with 69 additions and 74 deletions.
143 changes: 69 additions & 74 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl Workload {
}) {
// UNWRAP: scheduled_rollback has just be checked to be `Some`
let (scheduled_rollback, should_crash) = self.scheduled_rollback.take().unwrap();
self.perform_scheduled_rollback(&rr, scheduled_rollback, should_crash)
self.exercise_rollback(&rr, scheduled_rollback, should_crash)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -407,11 +407,8 @@ impl Workload {
// Do not schedule new rollbacks if they are already scheduled.
let is_rollback_scheduled = self.scheduled_rollback.is_some();
if !is_rollback_scheduled && self.state.rng.gen_bool(self.state.biases.rollback) {
if self.state.rng.gen_bool(self.state.biases.rollback_crash) {
self.schedule_rollback(true /*should_crash*/).await?
} else {
self.schedule_rollback(false /*should_crash*/).await?
}
let should_crash = self.state.rng.gen_bool(self.state.biases.rollback_crash);
self.schedule_rollback(should_crash).await?
}

let should_crash = self.state.rng.gen_bool(self.state.biases.commit_crash);
Expand Down Expand Up @@ -596,95 +593,93 @@ impl Workload {
Ok(())
}

async fn perform_scheduled_rollback(
async fn exercise_rollback(
&mut self,
rr: &comms::RequestResponse,
scheduled_rollback: ScheduledRollback,
should_crash: Option<Duration>,
) -> anyhow::Result<()> {
let ScheduledRollback {
n_commits,
n_commits: n_commits_to_rollback,
snapshot,
..
} = scheduled_rollback;

match should_crash {
None => self.exercise_rollback(&rr, n_commits, snapshot).await,
Some(crash_delay) => {
self.exercise_rollback_crashing(&rr, n_commits, snapshot, crash_delay)
.await
}
}
}
let maybe_crash_text = if should_crash.is_some() { " crash" } else { "" };
trace!(
"exercising rollback{} of {} commits",
maybe_crash_text,
n_commits_to_rollback
);

async fn exercise_rollback(
&mut self,
rr: &comms::RequestResponse,
n_commits_to_rollback: usize,
snapshot: Snapshot,
) -> anyhow::Result<()> {
trace!("exercising rollback of {} commits", n_commits_to_rollback);
let ToSupervisor::RollbackResponse { outcome } = rr
let rollback_outcome = rr
.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
should_crash: None,
should_crash: should_crash.clone(),
},
))
.await?
else {
return Err(anyhow::anyhow!(
"RollbackCommit did not execute successfully"
));
};
.await?;

self.ensure_outcome_validity(rr, &outcome).await?;
if should_crash.is_some() {
let ToSupervisor::Ack = rollback_outcome else {
return Err(anyhow::anyhow!(
"RollbackCommit crash did not execute successfully"
));
};

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if agent_sync_seqn != self.state.committed.sync_seqn + 1 {
return Err(anyhow::anyhow!("Unexpected sync_seqn after rollback"));
}
self.wait_for_crash().await?;

self.state.rollback(snapshot);
self.ensure_snapshot_validity(rr).await?;
Ok(())
}
// During a rollback crash, every type of error could happen.
// However the agent will be respawned, so it will just
// make sure the rollback was correctly applied or not.
self.spawn_new_agent().await?;
let agent = self.agent.as_ref().unwrap();
let rr = agent.rr().clone();

async fn exercise_rollback_crashing(
&mut self,
rr: &comms::RequestResponse,
n_commits_to_rollback: usize,
snapshot: Snapshot,
crash_delay: Duration,
) -> anyhow::Result<()> {
trace!(
"exercising rollback crash of {} commits",
n_commits_to_rollback
);
rr.send_request(crate::message::ToAgent::Rollback(
crate::message::RollbackPayload {
n_commits: n_commits_to_rollback,
should_crash: Some(crash_delay),
},
))
.await?;

self.wait_for_crash().await?;

// Spawns a new agent and checks whether the rollback was applied to the database and if so
// we rollback to the correct snapshot in the state.
self.spawn_new_agent().await?;
let rr = self.agent.as_ref().unwrap().rr().clone();
let sync_seqn = rr.send_query_sync_seqn().await?;
let last_sync_seqn = self.state.committed.sync_seqn;
if sync_seqn == last_sync_seqn + 1 {
// sync_seqn has increased, so the rollback is expected to be applied correctly
self.state.rollback(snapshot);
} else if sync_seqn == last_sync_seqn {
// The rollback successfully crashed.
info!("rollback crashed, seqno: {}", last_sync_seqn);
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let last_sync_seqn = self.state.committed.sync_seqn;
if agent_sync_seqn == last_sync_seqn + 1 {
// sync_seqn has increased, so the rollback is expected to be applied correctly
self.state.rollback(snapshot);
} else if agent_sync_seqn == last_sync_seqn {
// The rollback successfully crashed.
info!("rollback crashed, seqno: {}", last_sync_seqn);
} else {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after rollback{}",
maybe_crash_text
));
}
} else {
return Err(anyhow::anyhow!("Unexpected sync_seqn after rollback crash"));
let ToSupervisor::RollbackResponse { outcome } = rollback_outcome else {
return Err(anyhow::anyhow!(
"RollbackCommit did not execute successfully"
));
};

let was_enospc_enabled = self.enabled_enospc;
self.ensure_outcome_validity(rr, &outcome).await?;

let agent_sync_seqn = rr.send_query_sync_seqn().await?;
let last_sync_seqn = self.state.committed.sync_seqn;
if agent_sync_seqn == last_sync_seqn + 1 {
// sync_seqn has increased, so the rollback is expected to be applied correctly
self.state.rollback(snapshot);
} else if agent_sync_seqn == last_sync_seqn {
if !was_enospc_enabled {
return Err(anyhow::anyhow!(
"Rollback was expexted to succeed. Unexpected sync_seqn",
));
}
// The rollback successfully crashed.
info!("rollback crashed, seqno: {}", last_sync_seqn);
} else {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after rollback{}",
maybe_crash_text
));
}
}

self.ensure_snapshot_validity(&rr).await?;
Expand Down

0 comments on commit 2d0e980

Please sign in to comment.