Skip to content

Commit

Permalink
Use sqlx::any instead of postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
fkettelhoit committed Jun 17, 2024
1 parent ce0cefa commit 1101aa8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
8 changes: 7 additions & 1 deletion examples/postgres-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ parlay = { path = "../../", version = "0.1.0" }
reqwest = { version = "0.11.23", features = ["json"] }
serde = "1.0.197"
serde_json = "1.0.115"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres"] }
sqlx = { version = "0.7.4", features = [
"runtime-tokio",
"any",
"postgres",
"mysql",
"sqlite",
] }
tokio = { version = "1.35.1", features = ["macros", "rt", "rt-multi-thread"] }
tower-http = { version = "0.5.1", features = ["fs", "trace"] }
tracing = "0.1.40"
Expand Down
25 changes: 14 additions & 11 deletions examples/postgres-integration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use parlay::{
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::{postgres::PgPoolOptions, Row};
use sqlx::{
any::{install_default_drivers, AnyQueryResult, AnyRow},
AnyPool, Pool, Row,
};
use std::{
borrow::BorrowMut, collections::HashMap, net::SocketAddr, path::PathBuf, process::exit,
result::Result, sync::Arc, time::Duration,
Expand Down Expand Up @@ -101,6 +104,7 @@ type MpcState = Arc<Mutex<MpcComms>>;
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt::init();
install_default_drivers();
let Cli {
port,
config,
Expand Down Expand Up @@ -234,21 +238,20 @@ async fn main() -> Result<(), Error> {
} = policy;
if let (Some(output_db), Some(output)) = (output_db, output) {
info!("Connecting to output db at {output_db}...");
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(output_db)
.await?;
let pool: AnyPool = Pool::connect(output_db).await?;
if let Some(setup) = setup {
let rows_affected =
sqlx::query(setup).execute(&pool).await?.rows_affected();
let result: AnyQueryResult =
sqlx::query(setup).execute(&pool).await?;
let rows_affected = result.rows_affected();
debug!("{rows_affected} rows affected by '{setup}'");
}
for row in rows {
let mut query = sqlx::query(output);
for field in row {
query = query.bind(field);
}
let rows = query.execute(&pool).await?.rows_affected();
let result: AnyQueryResult = query.execute(&pool).await?;
let rows = result.rows_affected();
debug!("Inserted {rows} row(s)");
}
} else {
Expand Down Expand Up @@ -343,13 +346,13 @@ async fn execute_mpc(
} = policy;
let (prg, input) = if let Some(db) = db {
info!("Connecting to input db at {db}...");
let pool = PgPoolOptions::new().max_connections(5).connect(db).await?;
let rows = sqlx::query(input).fetch_all(&pool).await?;
let pool: AnyPool = Pool::connect(db).await?;
let rows: Vec<AnyRow> = sqlx::query(input).fetch_all(&pool).await?;
info!("'{input}' returned {} rows from {db}", rows.len());

let mut my_consts = HashMap::new();
for (k, c) in constants {
let row = sqlx::query(&c.query).fetch_one(&pool).await?;
let row: AnyRow = sqlx::query(&c.query).fetch_one(&pool).await?;
if row.len() != 1 {
bail!(
"Expected a single scalar value, but got {} from query '{}'",
Expand Down

0 comments on commit 1101aa8

Please sign in to comment.