Skip to content

Commit

Permalink
feat(torture): unify exec commit and commit crash
Browse files Browse the repository at this point in the history
The unification of the two has the side effect of handling
ENOSPC errors also for commits that are expected to crash.
  • Loading branch information
gabriele-0201 committed Feb 14, 2025
1 parent e36bfc5 commit 5ac99ba
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 109 deletions.
14 changes: 7 additions & 7 deletions torture/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::trace;

use crate::message::{
self, CommitOutcome, CommitPayload, Envelope, InitOutcome, KeyValueChange, OpenOutcome,
OpenPayload, RollbackPayload, ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
self, CommitPayload, Envelope, InitOutcome, KeyValueChange, OpenOutcome, OpenPayload, Outcome,
RollbackPayload, ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE,
};

/// The entrypoint for the agent.
Expand Down Expand Up @@ -277,7 +277,7 @@ impl Agent {
OpenOutcome::Success
}

async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> CommitOutcome {
async fn commit(&mut self, changeset: Vec<KeyValueChange>) -> Outcome {
// UNWRAP: `nomt` is always `Some` except recreation.
let nomt = self.nomt.as_ref().unwrap();
let session = nomt.begin_session(SessionParams::default());
Expand All @@ -298,13 +298,13 @@ impl Agent {

// Classify the result into one of the outcome bins.
let outcome = match commit_result {
Ok(()) => CommitOutcome::Success,
Err(ref err) if is_enospc(err) => CommitOutcome::StorageFull,
Err(_) => CommitOutcome::UnknownFailure,
Ok(()) => Outcome::Success,
Err(ref err) if is_enospc(err) => Outcome::StorageFull,
Err(err) => Outcome::UnknownFailure(err.to_string()),
};

// Log the outcome if it was not successful.
if !matches!(outcome, CommitOutcome::Success) {
if !matches!(outcome, Outcome::Success) {
trace!("unsuccessful commit: {:?}", outcome);
}

Expand Down
14 changes: 7 additions & 7 deletions torture/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ pub enum ToAgent {
GracefulShutdown,
}

/// Different outcomes of a commit operation.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum CommitOutcome {
/// The commit was successful.
/// Different outcomes of an operation.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub enum Outcome {
/// The operation was successful.
Success,
/// The commit failed because the storage is full, dubbed ENOSPC.
/// The operation failed because the storage is full, dubbed ENOSPC.
StorageFull,
/// Some other failure occurred.
UnknownFailure,
UnknownFailure(String),
}

/// Elaboration on the agent initialization result inside of [`ToSupervisor::InitResponse`].
Expand Down Expand Up @@ -167,7 +167,7 @@ pub enum ToSupervisor {
/// The time it took for the operation to complete.
elapsed: Duration,
/// The outcome of the operation.
outcome: CommitOutcome,
outcome: Outcome,
},
/// The response to a query for a key-value pair.
QueryValue(Option<Value>),
Expand Down
6 changes: 1 addition & 5 deletions torture/src/supervisor/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ impl SpawnedAgentController {
}
}

pub async fn open(
&mut self,
bitbox_seed: [u8; 16],
rollback: Option<u32>,
) -> Result<OpenOutcome> {
pub async fn open(&self, bitbox_seed: [u8; 16], rollback: Option<u32>) -> Result<OpenOutcome> {
let response = self
.rr
.send_request(crate::message::ToAgent::Open(crate::message::OpenPayload {
Expand Down
196 changes: 106 additions & 90 deletions torture/src/supervisor/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,43 +414,124 @@ impl Workload {
}
}

if self.state.rng.gen_bool(self.state.biases.commit_crash) {
self.exercise_commit_crashing(&rr).await?;
} else {
self.exercise_commit(&rr).await?;
}
let should_crash = self.state.rng.gen_bool(self.state.biases.commit_crash);
self.exercise_commit(&rr, should_crash).await?;

Ok(())
}

/// Commit a changeset.
async fn exercise_commit(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
trace!("exercising commit");
async fn exercise_commit(
&mut self,
rr: &comms::RequestResponse,
should_crash: bool,
) -> anyhow::Result<()> {
let should_crash = if should_crash {
trace!("exercising commit crash");
Some(self.get_crash_delay())
} else {
trace!("exercising commit");
None
};

// Generate a changeset and the associated snapshot
let (snapshot, changeset) = self.state.gen_commit();
let ToSupervisor::CommitResponse { elapsed, outcome } = rr
let commit_response = rr
.send_request(crate::message::ToAgent::Commit(
crate::message::CommitPayload {
changeset: changeset.clone(),
should_crash: None,
should_crash,
},
))
.await?
else {
return Err(anyhow::anyhow!("Commit did not execute successfully"));
.await?;

let is_applied = if should_crash.is_some() {
let ToSupervisor::Ack = commit_response else {
return Err(anyhow::anyhow!("Commit crash did not execute successfully"));
};

self.wait_for_crash().await?;

// During a commit crash, every type of error could happen.
// However the agent will be respawned, so it will just
// make sure the changeset was correctly applied or reverted.
self.spawn_new_agent().await?;
let agent = self.agent.as_ref().unwrap();
let rr = agent.rr().clone();

// Sample the agent to make sure the changeset was correctly applied or reverted.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if snapshot.sync_seqn == agent_sync_seqn {
true
} else if self.state.committed.sync_seqn == agent_sync_seqn {
false
} else {
return Err(anyhow::anyhow!("Unexpected sync_seqn after commit crash",));
}
} else {
let ToSupervisor::CommitResponse { elapsed, outcome } = commit_response else {
return Err(anyhow::anyhow!("Commit did not execute successfully"));
};

// Keep track of ENOSPC because the flag could be erased during the agent's respawn
let was_enospc_enabled = self.enabled_enospc;
self.ensure_outcome_validity(rr, &outcome).await?;

if matches!(outcome, crate::message::Outcome::Success) {
self.n_successfull_commit += 1;
self.tot_commit_time += elapsed;
}

// Sample the agent to make sure the changeset was correctly applied or reverted.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if was_enospc_enabled {
false
} else if snapshot.sync_seqn == agent_sync_seqn {
true
} else {
return Err(anyhow::anyhow!("Unexpected sync_seqn after commit"));
}
};

if is_applied {
self.ensure_changeset_applied(&rr, &changeset).await?;
self.state.commit(snapshot);
} else {
self.ensure_changeset_reverted(&rr, &changeset).await?;
}

Ok(())
}

fn get_crash_delay(&self) -> Duration {
// The agent should crash after `crash_delay`ns.
// If no data avaible crash after 300ms.
let mut crash_delay_millis = self
.tot_commit_time
.as_millis()
.checked_div(self.n_successfull_commit as u128)
.unwrap_or(300) as u64;
// Crash a little bit earlier than the average commit time to increase the
// possibilities of crashing during sync.
crash_delay_millis = (crash_delay_millis as f64 * 0.98) as u64;
Duration::from_millis(crash_delay_millis)
}

async fn ensure_outcome_validity(
&mut self,
rr: &comms::RequestResponse,
outcome: &crate::message::Outcome,
) -> Result<()> {
match outcome {
crate::message::CommitOutcome::Success => {
// The commit was successful.
crate::message::Outcome::Success => {
// The operation was successful.
if self.enabled_enospc {
return Err(anyhow::anyhow!("Commit should have failed with ENOSPC"));
return Err(anyhow::anyhow!("Operation should have failed with ENOSPC"));
}
}
crate::message::CommitOutcome::StorageFull => {
// The commit failed due to `ENOSPC`.
crate::message::Outcome::StorageFull => {
if !self.enabled_enospc {
return Err(anyhow::anyhow!("Commit should have succeeded"));
return Err(anyhow::anyhow!("Operation should have succeeded"));
}

// At this point, we expect the agent will have its NOMT instance poisoned.
Expand All @@ -459,10 +540,9 @@ impl Workload {
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if self.state.committed.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after failed commit with ENOSPC"
"Unexpected sync_seqn after failed operation with ENOSPC"
));
}
self.ensure_changeset_reverted(rr, &changeset).await?;

// Now we instruct the agent to re-open NOMT and check if the sync_seqn is still the same.
// We will reuse the same process.
Expand All @@ -472,81 +552,17 @@ impl Workload {
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if self.state.committed.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!(
"Unexpected sync_seqn after failed commit with ENOSPC"
"Unexpected sync_seqn after failed operation with ENOSPC"
));
}

// Return early to not account for the commit in the statistics.
return Ok(());
}
crate::message::CommitOutcome::UnknownFailure => {
return Err(anyhow::anyhow!("Commit failed due to unknown reasons"));
crate::message::Outcome::UnknownFailure(err) => {
return Err(anyhow::anyhow!(
"Operation failed due to unknown reasons: {}",
err
));
}
}

self.n_successfull_commit += 1;
self.tot_commit_time += elapsed;

// Sample the agent to make sure the changeset was correctly applied.
let agent_sync_seqn = rr.send_query_sync_seqn().await?;
if snapshot.sync_seqn != agent_sync_seqn {
return Err(anyhow::anyhow!("Unexpected sync_seqn after commit"));
}
self.ensure_changeset_applied(rr, &changeset).await?;

self.state.commit(snapshot);

Ok(())
}

/// Commit a changeset and induce a crash.
async fn exercise_commit_crashing(
&mut self,
rr: &comms::RequestResponse,
) -> anyhow::Result<()> {
trace!("exercising commit crash");

// Generate a changeset and the associated snapshot. Ask the agent to commit the changeset.
let (snapshot, changeset) = self.state.gen_commit();

// The agent should crash after `crash_delay`ns.
// If no data avaible crash after 300ms.
let mut crash_delay_millis = self
.tot_commit_time
.as_millis()
.checked_div(self.n_successfull_commit as u128)
.unwrap_or(300) as u64;
// Crash a little bit earlier than the average commit time to increase the
// possibilities of crashing during sync.
crash_delay_millis = (crash_delay_millis as f64 * 0.98) as u64;

rr.send_request(crate::message::ToAgent::Commit(
crate::message::CommitPayload {
changeset: changeset.clone(),
should_crash: Some(Duration::from_millis(crash_delay_millis)),
},
))
.await?;

self.wait_for_crash().await?;

// Spawns a new agent and checks whether the commit was applied to the database and if so
// we commit the snapshot to the state.
self.spawn_new_agent().await?;
let rr = self.agent.as_ref().unwrap().rr().clone();
let seqno = rr.send_query_sync_seqn().await?;
if seqno == snapshot.sync_seqn {
self.ensure_changeset_applied(&rr, &changeset).await?;
self.state.commit(snapshot);
} else {
info!(
"commit. seqno ours: {}, theirs: {}",
snapshot.sync_seqn, seqno
);
self.ensure_changeset_reverted(&rr, &changeset).await?;
}

self.ensure_snapshot_validity(&rr).await?;
Ok(())
}

Expand Down

0 comments on commit 5ac99ba

Please sign in to comment.