Skip to content

Commit

Permalink
Added timeout support for Postgres
Browse files Browse the repository at this point in the history
Support setting statement_timeout and transaction_timeout when
applying migrations to a Postgres database.
  • Loading branch information
MaxBondABE committed Dec 21, 2024
1 parent f3b2737 commit bff9ba6
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 8 deletions.
112 changes: 104 additions & 8 deletions src/backend/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use self::delta::delta;

use super::{Backend, GeneratedMigration, SqiglState};

// Unofficial
pub const STATEMENT_TIMEOUT_ENVVAR: &str = "PGSTATEMENT_TIMEOUT";
pub const TRANSACTION_TIMEOUT_ENVVAR: &str = "PGTRANSACTION_TIMEOUT";

// https://www.postgresql.org/docs/current/libpq-envars.html
pub const HOSTNAME_ENVVAR: &str = "PGHOST";
pub const PORT_ENVVAR: &str = "PGPORT";
Expand Down Expand Up @@ -74,23 +78,63 @@ fn get_port_envvar() -> anyhow::Result<Option<NonZeroU16>> {
}
}

fn get_timeout_envvar(envvar: &str) -> anyhow::Result<Option<f32>> {
if let Some(digits) = get_envvar(envvar)? {
let value: f32 = match digits.parse() {
Ok(x) => x,
Err(e) => return Err(anyhow!("Could not parse {}: {}", envvar, e)),
};
if value >= 0. {
Ok(Some(value))
} else {
Err(anyhow!("Could not parse {}: Must be >= 0", envvar,))
}
} else {
Ok(None)
}
}

pub struct PostgresBackend {
tls: MakeTlsConnector,
config: postgres::Config,
db: Client,

// Timeouts are in milliseconds, because a timeout w/o a unit is interpreted
// as milliseconds
// https://www.postgresql.org/docs/17/runtime-config-client.html#GUC-STATEMENT-TIMEOUT
stmt_timeout: Option<usize>,
tx_timeout: Option<usize>,
}
impl PostgresBackend {
pub fn new(config: postgres::Config) -> Result<Self, postgres::Error> {
pub fn new(
config: postgres::Config,
stmt_timeout: Option<usize>,
tx_timeout: Option<usize>,
) -> Result<Self, postgres::Error> {
let tls = MakeTlsConnector::new(TlsConnector::new().unwrap());
let db = config.clone().connect(tls.clone())?;
Ok(Self { db, config, tls })
Ok(Self {
db,
config,
tls,
stmt_timeout,
tx_timeout,
})
}
pub fn new_tls(
config: postgres::Config,
tls: MakeTlsConnector,
stmt_timeout: Option<usize>,
tx_timeout: Option<usize>,
) -> Result<Self, postgres::Error> {
let db = config.clone().connect(tls.clone())?;
Ok(Self { db, config, tls })
Ok(Self {
db,
config,
tls,
stmt_timeout,
tx_timeout,
})
}
pub fn local() -> Result<Self, postgres::Error> {
let tls = MakeTlsConnector::new(TlsConnector::new().unwrap());
Expand All @@ -101,7 +145,13 @@ impl PostgresBackend {
.host("localhost")
.dbname("sqigl");
let db = config.clone().connect(tls.clone())?;
Ok(Self { db, config, tls })
Ok(Self {
db,
config,
tls,
stmt_timeout: Default::default(),
tx_timeout: Default::default(),
})
}
pub fn get(params: &manifest::project::PostgresDatabase) -> anyhow::Result<Self> {
let tls = {
Expand Down Expand Up @@ -130,6 +180,16 @@ impl PostgresBackend {
let username = get_envvar(USERNAME_ENVVAR)?.or_else(|| params.username.clone());
let password = get_envvar(PASSWORD_ENVVAR)?;

// Timeouts are specified as f32 of seconds but stored as milliseconds because
// a timeout w/o a unit is interpreted as milliseconds
// https://www.postgresql.org/docs/17/runtime-config-client.html#GUC-STATEMENT-TIMEOUT
let stmt_timeout: Option<usize> = get_timeout_envvar(STATEMENT_TIMEOUT_ENVVAR)?
.or(params.statement_timeout)
.map(|t| (t * 1000.) as usize);
let tx_timeout: Option<usize> = get_timeout_envvar(TRANSACTION_TIMEOUT_ENVVAR)?
.or(params.transaction_timeout)
.map(|t| (t * 1000.) as usize);

if password.is_some() {
let Some(hostname) = hostname else {
return Err(anyhow!(
Expand Down Expand Up @@ -158,7 +218,7 @@ impl PostgresBackend {
.port(port.get())
.dbname(&database)
.user(&username);
return Ok(Self::new_tls(config, tls)?);
return Ok(Self::new_tls(config, tls, stmt_timeout, tx_timeout)?);
}

let pgpass = match PgPass::load() {
Expand All @@ -179,13 +239,49 @@ impl PostgresBackend {
username,
};
if let Some(creds) = pgpass.find(&query)? {
Ok(Self::new_tls(creds.into(), tls)?)
Ok(Self::new_tls(creds.into(), tls, stmt_timeout, tx_timeout)?)
} else {
Err(anyhow!(
"Could not connect to database: Credentials were not found in pgpass file."
))
}
}
/// Open transaction & sets statement and transaction timeouts.
fn open_transaction(&mut self) -> Result<postgres::Transaction, postgres::Error> {
let mut tx = self.db.transaction()?;

if let Some(timeout) = self.stmt_timeout {
debug!("Setting statement timeout to {:.2}s", timeout as f32 / 60.);
tx.execute(&format!("set local statement_timeout = {}", timeout), &[])?;
}
if let Some(timeout) = self.tx_timeout {
debug!(
"Setting transaction timeout to {:.2}s",
timeout as f32 / 60.
);
match tx.execute(&format!("set local transaction_timeout = {}", timeout), &[]) {
Ok(_) => (),
Err(e) => {
if tx
.query_one(include_str!("sql/supports_transaction_timeout.sql"), &[])
.map(|row| row.get::<_, bool>(0))
.unwrap_or(false)
{
// We ignore errors here, because this is a best-effort attempt to
// provide additional context. The first error is considered canonical.
error!(
"transaction_timeout was specified, but this database doesn't appear \
to support it. This parameter was added in Postgres 17."
)
};

return Err(e);
}
}
}

Ok(tx)
}
}
impl Backend for PostgresBackend {
type Error = postgres::Error;
Expand Down Expand Up @@ -276,7 +372,7 @@ impl Backend for PostgresBackend {
// - All statements are executed in a single transaction
// - The project version is compatible at the start of the transaction
debug!("Opening artifact transaction.");
let mut tx = self.db.transaction()?;
let mut tx = self.open_transaction()?; // Sets timeouts
tx.execute("select from sqigl_internal.state for update", &[])?;
let state = get_state(&mut tx)?;
if !artifact.compatible(&state.project_version) {
Expand Down Expand Up @@ -357,7 +453,7 @@ impl Backend for PostgresBackend {
}
}

let mut tx = self.db.transaction()?;
let mut tx = self.open_transaction()?; // Sets timeouts
let state = get_state(&mut tx)?;
if !artifact.compatible(&state.project_version) {
error!("Migration aborted: Incompatible");
Expand Down
3 changes: 3 additions & 0 deletions src/backend/postgres/sql/supports_transaction_timeout.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
select 'transaction_timeout' in (
select name from pg_settings
);
2 changes: 2 additions & 0 deletions src/manifest/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct PostgresDatabase {
pub username: Option<String>,
pub database: Option<String>,
pub certificate: Option<PathBuf>,
pub statement_timeout: Option<f32>,
pub transaction_timeout: Option<f32>
}

#[derive(Default, Clone, Debug, Serialize, Deserialize)]
Expand Down

0 comments on commit bff9ba6

Please sign in to comment.