Skip to content

Commit

Permalink
all tests passing in postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Oct 29, 2024
1 parent e5eb10c commit 2361f6e
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 87 deletions.
2 changes: 1 addition & 1 deletion warehouse/sql/migrations/1_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE TABLE
CREATE TABLE balance (
account_address VARCHAR(66) REFERENCES users(account_address) ON DELETE CASCADE,
balance BIGINT NOT NULL,
chain_timestamp TIMESTAMP NOT NULL,
chain_timestamp BIGINT NOT NULL,
db_version BIGINT NOT NULL,
epoch_number BIGINT NOT NULL
);
18 changes: 7 additions & 11 deletions warehouse/src/load_account.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use crate::table_structs::{WarehouseAccount, WarehouseRecord};
use anyhow::Result;
use sqlx::{postgres::PgQueryResult, sqlite::SqliteQueryResult, PgPool, Postgres, QueryBuilder, Sqlite, SqlitePool};
use sqlx::{postgres::PgQueryResult, PgPool, Postgres, QueryBuilder};

pub async fn load_account_state(pool: &PgPool, accounts: &[WarehouseRecord]) -> Result<u64> {
let mut rows = 0;
// insert missing accounts
for ws in accounts.iter() {
let res = insert_one_account(pool, &ws.account).await?;
rows = res.rows_affected();
rows += res.rows_affected();
}

// increment the balance changes
Ok(rows)
}

pub async fn insert_one_account(
pool: &PgPool,
acc: &WarehouseAccount,
) -> Result<PgQueryResult> {
pub async fn insert_one_account(pool: &PgPool, acc: &WarehouseAccount) -> Result<PgQueryResult> {
let res = sqlx::query(
r#"
INSERT INTO users (account_address, is_legacy)
Expand All @@ -33,7 +30,7 @@ pub async fn insert_one_account(
}

pub async fn batch_insert_account(
pool: &SqlitePool,
pool: &PgPool,
acc: &[WarehouseRecord],
batch_len: usize,
) -> Result<u64> {
Expand All @@ -49,10 +46,10 @@ pub async fn batch_insert_account(

// TODO: return specific commit errors for this batch
pub async fn impl_batch_insert(
pool: &SqlitePool,
pool: &PgPool,
batch_accounts: &[WarehouseRecord],
) -> Result<SqliteQueryResult> {
let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
) -> Result<PgQueryResult> {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
// Note the trailing space; most calls to `QueryBuilder` don't automatically insert
"INSERT INTO users (account_address, is_legacy) ",
);
Expand All @@ -71,7 +68,6 @@ pub async fn impl_batch_insert(
Ok(res)
}


// TODO: return specific commit errors for this batch
pub async fn impl_batch_insert_pg(
pool: &PgPool,
Expand Down
26 changes: 12 additions & 14 deletions warehouse/src/load_coin.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::table_structs::WarehouseRecord;
use anyhow::Result;
use sqlx::{sqlite::SqliteQueryResult, QueryBuilder, Sqlite, SqlitePool};
use sqlx::{postgres::PgQueryResult, PgPool, Postgres, QueryBuilder};

pub async fn batch_insert_account(
pool: &SqlitePool,
pool: &PgPool,
acc: &[WarehouseRecord],
batch_len: usize,
) -> Result<()> {
Expand All @@ -18,20 +18,20 @@ pub async fn batch_insert_account(

// TODO: return specific commit errors for this batch
pub async fn impl_batch_coin_insert(
pool: &SqlitePool,
pool: &PgPool,
batch_accounts: &[WarehouseRecord],
) -> Result<SqliteQueryResult> {
) -> Result<PgQueryResult> {
let filtered = batch_accounts.iter().filter(|el| el.balance.is_some());

let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
r#"
INSERT INTO balance (account_address, balance, chain_timestamp, db_version, epoch_number)
"#,
);

query_builder.push_values(filtered, |mut b, acc| {
let this_account = acc.account.address.to_hex_literal();
let this_balance = acc.balance.as_ref().unwrap().legacy_balance.unwrap() as i64;
let this_balance = acc.balance.as_ref().unwrap().balance as i64;
let this_timestamp = acc.time.timestamp as i64;
b.push_bind(this_account)
.push_bind(this_balance)
Expand All @@ -47,12 +47,10 @@ pub async fn impl_batch_coin_insert(
}

pub async fn alt_increment_one_balance(
pool: &SqlitePool,
pool: &PgPool,
record: &WarehouseRecord,
) -> Result<SqliteQueryResult> {
// let filtered = batch_accounts.iter().filter(|el| el.balance.is_some());

let mut query_builder: QueryBuilder<Sqlite> =
) -> Result<PgQueryResult> {
let mut query_builder: QueryBuilder<Postgres> =
QueryBuilder::new(increment_balance_template(record));
let query = query_builder.build();
let res = query.execute(pool).await?;
Expand All @@ -62,7 +60,7 @@ pub async fn alt_increment_one_balance(

fn increment_balance_template(record: &WarehouseRecord) -> String {
let this_account = record.account.address.to_hex_literal();
let this_balance = record.balance.as_ref().unwrap().legacy_balance.unwrap() as i64;
let this_balance = record.balance.as_ref().unwrap().balance as i64;
let this_timestamp = record.time.timestamp as i64;
let this_version = record.time.version as i64;
let this_epoch = record.time.epoch as i64;
Expand Down Expand Up @@ -96,8 +94,8 @@ fn test_format() {
},
time: WarehouseTime::default(),
balance: Some(WarehouseBalance {
balance: 0,
legacy_balance: Some(10),
balance: 10,
// legacy_balance: Some(10),
}),
};
let s = increment_balance_template(&record);
Expand Down
8 changes: 3 additions & 5 deletions warehouse/src/query_balance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use anyhow::Result;
use libra_types::exports::AccountAddress;
use sqlx::SqlitePool;
use sqlx::PgPool;

use crate::table_structs::WarehouseBalance;

// TODO: return specific commit errors for this batch
pub async fn query_last_balance(
pool: &SqlitePool,
pool: &PgPool,
account: AccountAddress,
) -> Result<WarehouseBalance> {
let account_address = account.to_hex_literal();
Expand All @@ -21,9 +21,7 @@ pub async fn query_last_balance(
"#
);

let row = sqlx::query_as::<_, WarehouseBalance>(&query_template)
.fetch_one(pool)
.await?;
let row: WarehouseBalance = sqlx::query_as(&query_template).fetch_one(pool).await?;

Ok(row)
}
17 changes: 13 additions & 4 deletions warehouse/src/table_structs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use libra_types::exports::AccountAddress;
// use serde::{Serialize, Deserialize};
use sqlx::prelude::FromRow;

#[derive(Debug, Clone)]
/// The basic information for an account
Expand Down Expand Up @@ -35,12 +37,19 @@ pub struct WarehouseAccount {
pub address: AccountAddress,
}

#[derive(Debug, Default, Clone, sqlx::FromRow)]
#[derive(Debug, Default, Clone, FromRow)]
pub struct WarehouseBalance {
// balances in v6+ terms
#[sqlx(try_from = "i64")]
pub balance: u64,
// the balance pre v6 recast
#[sqlx(default)]
pub legacy_balance: Option<u64>,
}

// #[derive(Debug, Default, Clone, FromRow)]
// pub struct WarehouseBalanceAlt {
// // balances in v6+ terms
// #[sqlx(try_from = "i64")]
// pub balance: u64,
// // the balance pre v6 recast
// #[sqlx(default, try_from = "i64")]
// pub legacy_balance: Option<u64>,
// }
1 change: 0 additions & 1 deletion warehouse/tests/sqlx_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ async fn test_migrate_from_file_pg() -> anyhow::Result<()> {
Ok(())
}


// NOTE: left for reference, this is the sqlx test framework runtime, which can setup sqlite dbs. Left here for reference
// #[sqlx::test]
// async fn sql_insert_test(pool: Pool<Sqlite>) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/tests/support/pg_testcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn start_container<'a>() -> Container<'a, GenericImage> {
.with_env_var("POSTGRES_PASSWORD".to_owned(), "postgres".to_owned())
.with_wait_for(WaitFor::message_on_stdout(
// "database system is ready to accept connections",
"PostgreSQL init process complete; ready for start up."
"PostgreSQL init process complete; ready for start up.",
));

let image = RunnableImage::from(container);
Expand Down
92 changes: 44 additions & 48 deletions warehouse/tests/test_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use libra_warehouse::table_structs::{
WarehouseAccount, WarehouseBalance, WarehouseRecord, WarehouseTime,
};

use sqlx::SqlitePool;

fn v5_state_manifest_fixtures_path() -> PathBuf {
let p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let project_root = p.parent().unwrap();
Expand Down Expand Up @@ -55,9 +53,10 @@ async fn batch_insert_account() -> anyhow::Result<()> {
Ok(())
}

#[sqlx::test]
async fn batch_duplicates_fail_gracefully(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
#[tokio::test]
async fn batch_duplicates_fail_gracefully() -> anyhow::Result<()> {
let (pool, _c) = get_test_pool().await?;
libra_warehouse::migrate::maybe_init_pg(&pool).await?;
let mut vec_acct: Vec<WarehouseRecord> = vec![];

// will create duplicates
Expand All @@ -78,10 +77,11 @@ async fn batch_duplicates_fail_gracefully(pool: SqlitePool) -> anyhow::Result<()
Ok(())
}

#[sqlx::test]
#[tokio::test]

async fn test_e2e_load_v5_snapshot(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
async fn test_e2e_load_v5_snapshot() -> anyhow::Result<()> {
let (pool, _c) = get_test_pool().await?;
libra_warehouse::migrate::maybe_init_pg(&pool).await?;

let manifest_file = v5_state_manifest_fixtures_path();
assert!(manifest_file.exists());
Expand All @@ -95,44 +95,43 @@ async fn test_e2e_load_v5_snapshot(pool: SqlitePool) -> anyhow::Result<()> {
Ok(())
}

// #[sqlx::test]
// async fn batch_insert_coin(pool: SqlitePool) -> anyhow::Result<()> {
// libra_warehouse::migrate::maybe_init(&pool).await?;
// let mut vec_state: Vec<WarehouseRecord> = vec![];

// for _i in 0..3 {
// let state = WarehouseRecord {
// account: WarehouseAccount {
// // uniques
// address: AccountAddress::random(),
// },
// time: WarehouseTime::default(),
// balance: Some(WarehouseBalance {
// balance: 0,
// legacy_balance: Some(10),
// }),
// };
#[tokio::test]
async fn batch_insert_coin() -> anyhow::Result<()> {
let (pool, _c) = get_test_pool().await?;
libra_warehouse::migrate::maybe_init_pg(&pool).await?;

// vec_state.push(state);
// }
let mut vec_state: Vec<WarehouseRecord> = vec![];

// // fist must load accounts
// let res = libra_warehouse::load_account::load_account_state(&pool, &vec_state).await?;
for _i in 0..3 {
let state = WarehouseRecord {
account: WarehouseAccount {
// uniques
address: AccountAddress::random(),
},
time: WarehouseTime::default(),
balance: Some(WarehouseBalance { balance: 10 }),
};

// assert!(res == 3);
vec_state.push(state);
}

// let res = libra_warehouse::load_coin::impl_batch_coin_insert(&pool, &vec_state).await?;
// fist must load accounts
let res = libra_warehouse::load_account::load_account_state(&pool, &vec_state).await?;
assert!(res == 3);

// assert!(res.rows_affected() == 3);
let res = libra_warehouse::load_coin::impl_batch_coin_insert(&pool, &vec_state).await?;
dbg!(&res);
assert!(res.rows_affected() == 3);

// Ok(())
// }
Ok(())
}

// The table should not update if the balance remains the same.
// new records are only inserted when the balance changes.
#[sqlx::test]
async fn increment_coin_noop(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
#[tokio::test]
async fn increment_coin_noop() -> anyhow::Result<()> {
let (pool, _c) = get_test_pool().await?;
libra_warehouse::migrate::maybe_init_pg(&pool).await?;
let mut vec_state: Vec<WarehouseRecord> = vec![];
let marlon = AccountAddress::random();
// same user, and same balance, but incremental timestamps
Expand All @@ -148,9 +147,9 @@ async fn increment_coin_noop(pool: SqlitePool) -> anyhow::Result<()> {
epoch: 3 * i,
},
balance: Some(WarehouseBalance {
balance: 0,
balance: 10,
// same balance
legacy_balance: Some(10),
// legacy_balance: Some(10),
}),
};

Expand All @@ -177,10 +176,11 @@ async fn increment_coin_noop(pool: SqlitePool) -> anyhow::Result<()> {
Ok(())
}

// Increment the balance table when there balance changes.
#[sqlx::test]
async fn increment_coin(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
// Only increment the balance table when their balance changes.
#[tokio::test]
async fn increment_coin() -> anyhow::Result<()> {
let (pool, _c) = get_test_pool().await?;
libra_warehouse::migrate::maybe_init_pg(&pool).await?;
let mut vec_state: Vec<WarehouseRecord> = vec![];
let marlon = AccountAddress::random();
// same user, and same balance, but incremental timestamps
Expand All @@ -195,11 +195,7 @@ async fn increment_coin(pool: SqlitePool) -> anyhow::Result<()> {
version: 2 * i,
epoch: 3 * i,
},
balance: Some(WarehouseBalance {
balance: 0,
// different balance each time
legacy_balance: Some(10 * i),
}),
balance: Some(WarehouseBalance { balance: 10 * i }),
};

vec_state.push(state);
Expand Down
4 changes: 2 additions & 2 deletions warehouse/tests/test_migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn can_init_pg() -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;

let id = sqlx::query(
r#"
r#"
INSERT INTO users (account_address, is_legacy)
VALUES ('00000000000000000000000000000000e8953084617dd5c6071cf2918215e183', TRUE)
"#,
Expand All @@ -21,7 +21,7 @@ async fn can_init_pg() -> anyhow::Result<()> {
let id = sqlx::query(
r#"
INSERT INTO balance (account_address, balance, chain_timestamp, db_version, epoch_number)
VALUES ('00000000000000000000000000000000e8953084617dd5c6071cf2918215e183', 11, '2024-10-28 12:34:56', 600, 1)
VALUES ('00000000000000000000000000000000e8953084617dd5c6071cf2918215e183', 11, 192837564738291845, 600, 1)
"#,
)
.execute(&mut *conn)
Expand Down

0 comments on commit 2361f6e

Please sign in to comment.