Skip to content

Commit

Permalink
fix: Fix Sqlite transactions
Browse files Browse the repository at this point in the history
After ignoring the database settings for a while the it came back to
bite us when implementing the new extension paradigm. Sqlite started
emitting database busy errors when a long running transaction was
interrupted by some other write.

This is because when you start a transaction in sqlite by default
it occurs as DEFFERED. This means that sqlite does not try to lock th
database until it comes across a write call. This means that some other
thread could possibly come in and make a write call before the current
transaction is finished. When this happens sqlite will see that the underlying
data has changed and return an error for the open transaction.

This is obviously bad because losing transactions is not a current situation
we recover gracefully from. Instead it would be better if sqlite simply
followed suite with other databases and just immediatley held a lock
when opening a transaction. Which is actually able to be enabled by
simply using the `BEGIN IMMEDIATE` statement when starting a transaction.

But nothing is ever that simple. The library that we're using sqlx
[does not yet support](launchbadge/sqlx#481)
the ability to call `BEGIN IMMEDIATE` on transactions. Thus we had to
come up with a hack.

The hack being that when opening a transaction we immediately call
a write to a dummy table as our first call. This should immediatley
cause sqlite to hold a lock and more or less mimic the behavior of
`BEGIN IMMEDIATE`.
  • Loading branch information
clintjedwards committed Oct 1, 2024
1 parent 9eee9f7 commit c1aed51
Show file tree
Hide file tree
Showing 39 changed files with 413 additions and 377 deletions.
4 changes: 2 additions & 2 deletions gofer/src/api/deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub async fn list_deployments(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -398,7 +398,7 @@ pub async fn get_deployment(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down
6 changes: 3 additions & 3 deletions gofer/src/api/event_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl EventBus {
pub async fn try_publish(&self, kind: Kind) -> Result<Event> {
let new_event = Event::new(kind.clone());

let mut conn = self.storage.conn().await.with_context(|| {
let mut conn = self.storage.write_conn().await.with_context(|| {
format!(
"could not publish event for kind '{}'; Database error;",
new_event.kind,
Expand Down Expand Up @@ -320,7 +320,7 @@ impl EventBus {
tokio::spawn(async move {
let new_event = Event::new(kind.clone());

let mut conn = match self.storage.conn().await {
let mut conn = match self.storage.write_conn().await {
Ok(conn) => conn,
Err(err) => {
error!(error = %err, kind = %new_event.kind, "Could not publish event; Database error;");
Expand Down Expand Up @@ -362,7 +362,7 @@ async fn prune_events(storage: &storage::Db, retention: u64) -> Result<(), stora
let mut offset = 0;
let mut total_pruned = 0;

let mut conn = match storage.conn().await {
let mut conn = match storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
error!("could not prune events; connection error");
Expand Down
6 changes: 3 additions & 3 deletions gofer/src/api/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub async fn stream_events(
return Ok(());
}

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(websocket_error(
Expand Down Expand Up @@ -305,7 +305,7 @@ pub async fn get_event(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -373,7 +373,7 @@ pub async fn delete_event(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down
23 changes: 7 additions & 16 deletions gofer/src/api/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use futures::{SinkExt, StreamExt};
use reqwest::{header, Client};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use sqlx::Acquire;
use std::{collections::HashMap, str::FromStr, sync::Arc};
use strum::{Display, EnumString};
use tracing::{debug, error, info};
Expand Down Expand Up @@ -316,22 +315,14 @@ async fn start_extension(
token_roles,
);

let mut conn = match api_state.storage.conn().await {
let mut tx = match api_state.storage.open_tx().await {
Ok(conn) => conn,
Err(e) => {
error!(message = "Could not open connection to database", error = %e);
bail!("Could not open connection to database")
}
};

let mut tx = match conn.begin().await {
Ok(tx) => tx,
Err(e) => {
error!(message = "Could not open transaction to database", error = %e);
bail!("Could not open transaction to database")
}
};

// If there was a previous key then just delete it, if there wasn't we don't particularly care.
let _ = storage::tokens::delete(&mut tx, &registration.key_id).await;

Expand Down Expand Up @@ -589,7 +580,7 @@ async fn start_extension(
/// the initial extension information so it can check for connectivity and store the network location.
/// This information will eventually be used in other parts of the API to communicate with said extensions.
pub async fn start_extensions(api_state: Arc<ApiState>) -> Result<()> {
let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
error!(message = "Could not open connection to database", error = %e);
Expand Down Expand Up @@ -651,7 +642,7 @@ pub async fn stop_extensions(api_state: Arc<ApiState>) {
/// This function doesn't start those extensions it just makes sure they are registered
/// so the more broad [`start_extensions`] function can start them.
pub async fn install_std_extensions(api_state: Arc<ApiState>) -> Result<()> {
let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
error!(message = "Could not open connection to database", error = %e);
Expand Down Expand Up @@ -914,7 +905,7 @@ pub async fn install_extension(
)
})?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -1106,7 +1097,7 @@ pub async fn update_extension(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -1204,7 +1195,7 @@ pub async fn uninstall_extension(

let _ = api_state.extensions.remove(&path.extension_id);

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -1461,7 +1452,7 @@ pub async fn list_extension_subscriptions(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down
4 changes: 2 additions & 2 deletions gofer/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ async fn init_api(conf: conf::api::ApiConfig) -> Result<Arc<ApiState>> {
);

// Load our current value for ignore_pipeline_run_events into memory.
let mut conn = match storage.conn().await {
let mut conn = match storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
bail!(
Expand Down Expand Up @@ -973,7 +973,7 @@ pub async fn interpolate_vars(
});
}
InterpolationKind::GlobalSecret => {
let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
bail!("Could not establish a connection to the database during interpolation; {:#?}", e);
Expand Down
25 changes: 6 additions & 19 deletions gofer/src/api/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use dropshot::{
use http::StatusCode;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use sqlx::Acquire;
use std::sync::Arc;
use tracing::error;

Expand Down Expand Up @@ -121,7 +120,7 @@ pub async fn list_namespaces(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -194,7 +193,7 @@ pub async fn get_namespace(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -291,7 +290,7 @@ pub async fn create_namespace(
));
};

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -394,7 +393,7 @@ pub async fn update_namespace(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut tx = match api_state.storage.open_tx().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand All @@ -408,18 +407,6 @@ pub async fn update_namespace(

let updatable_fields = storage::namespaces::UpdatableFields::from(body.clone());

let mut tx = match conn.begin().await {
Ok(tx) => tx,
Err(e) => {
return Err(http_error!(
"Could not open transaction to database",
http::StatusCode::INTERNAL_SERVER_ERROR,
rqctx.request_id.clone(),
Some(e.into())
));
}
};

if let Err(e) = storage::namespaces::update(&mut tx, &path.namespace_id, updatable_fields).await
{
match e {
Expand Down Expand Up @@ -508,7 +495,7 @@ pub async fn delete_namespace(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -557,7 +544,7 @@ pub async fn create_default_namespace(api_state: Arc<ApiState>) -> Result<()> {
"The original namespace created automatically by the Gofer system.",
);

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
error!(message = "Could not open connection to database", error = %e);
Expand Down
18 changes: 9 additions & 9 deletions gofer/src/api/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ pub async fn list_run_objects(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -428,7 +428,7 @@ pub async fn put_run_object(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -534,7 +534,7 @@ pub async fn delete_run_object(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -638,7 +638,7 @@ pub async fn list_pipeline_objects(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -797,7 +797,7 @@ pub async fn put_pipeline_object(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -896,7 +896,7 @@ pub async fn delete_pipeline_object(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -987,7 +987,7 @@ pub async fn list_extension_objects(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.read_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -1135,7 +1135,7 @@ pub async fn put_extension_object(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down Expand Up @@ -1232,7 +1232,7 @@ pub async fn delete_extension_object(
)
.await?;

let mut conn = match api_state.storage.conn().await {
let mut conn = match api_state.storage.write_conn().await {
Ok(conn) => conn,
Err(e) => {
return Err(http_error!(
Expand Down
Loading

0 comments on commit c1aed51

Please sign in to comment.