From 88cf89fd8fc9ae95691695bbfae5d2f30c4550ed Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sun, 8 Dec 2024 14:09:59 -0500 Subject: [PATCH] [cypher] load state snapshot (#5) * add community wallet flag to snapshot state * add asserts * patch load single archive * rename cli subcommand * wip * refactor ledger replay * submit to db * add entrypoint test for exchange * add index * ignore expensive test * patch * patch index * handle submit errors * revert * better error handling * patch total funding * patch replay * use btree instead of hash, add fixtures and test * patch cypher query * docs * complete tests --- docs/local_testing.md | 3 + src/analytics/enrich_account_funding.rs | 666 +++++++++++++----------- src/extract_snapshot.rs | 10 +- src/load.rs | 9 +- src/load_exchange_orders.rs | 12 +- src/neo4j_init.rs | 10 + src/scan.rs | 12 +- src/schema_account_state.rs | 8 +- src/warehouse_cli.rs | 38 +- tests/test_analytics.rs | 138 ++++- tests/test_enrich_exchange.rs | 23 +- tests/test_load_state.rs | 43 +- tests/test_neo4j_meta.rs | 2 +- 13 files changed, 633 insertions(+), 341 deletions(-) diff --git a/docs/local_testing.md b/docs/local_testing.md index 79585a9..11cbed5 100644 --- a/docs/local_testing.md +++ b/docs/local_testing.md @@ -8,6 +8,9 @@ Start a Neo4j instance. Choose a password ``. Allow it to create the d export LIBRA_GRAPH_DB_URI='neo4j://localhost' export LIBRA_GRAPH_DB_USER='neo4j' export LIBRA_GRAPH_DB_PASS= + +# optionally export trace logging +export RUST_LOG=trace ``` Import the sample exchange orders diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index fbbf4c6..526d2d0 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -1,108 +1,150 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::trace; +use log::{error, trace}; use neo4rs::{Graph, Query}; +// use log::trace; +// use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ - collections::HashMap, + collections::BTreeMap, fs::{self, File}, io::Read, }; use crate::schema_exchange_orders::ExchangeOrder; -#[derive(Default, Debug, Deserialize, Serialize)] -pub struct AccountData { - pub daily_balances: HashMap, f64>, // Map of daily balances - pub daily_funding: HashMap, f64>, // Map of daily funding amounts - pub daily_inflows: HashMap, f64>, // Map of daily inflow amounts - pub daily_outflows: HashMap, f64>, // Map of daily outflow amounts - pub daily_user_flows: HashMap, f64>, // Amount when the account was a `user` - pub daily_accepter_flows: HashMap, f64>, // Amount when the account was an `accepter` +#[derive(Default, Debug, Clone, Deserialize, Serialize)] +pub struct AccountDataAlt { + pub current_balance: f64, + pub total_funded: f64, + pub total_outflows: f64, + pub total_inflows: f64, + pub daily_funding: f64, + pub daily_inflows: f64, + pub daily_outflows: f64, } -impl AccountData { - pub fn to_cypher_map(&self, id: u32) -> String { - let mut list_literal: String = "".to_owned(); - self.daily_balances.iter().for_each(|(date, _) | { - let obj = format!( - r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, - id, - date.to_rfc3339(), - self.daily_balances.get(date).unwrap_or(&0.0), - self.daily_funding.get(date).unwrap_or(&0.0), - self.daily_inflows.get(date).unwrap_or(&0.0), - self.daily_outflows.get(date).unwrap_or(&0.0), - self.daily_user_flows.get(date).unwrap_or(&0.0), - self.daily_accepter_flows.get(date).unwrap_or(&0.0) - ); - - list_literal.push_str(&obj); - list_literal.push(','); - - }); - - list_literal.pop(); // need to drop last comma "," - format!("[{}]", list_literal) - } -} +#[derive(Default, Debug, Deserialize, Serialize)] +pub struct UserLedger(pub BTreeMap, AccountDataAlt>); #[derive(Default, Debug, Deserialize, Serialize)] -pub struct BalanceTracker { - pub accounts: HashMap, // Tracks data for each user -} +pub struct BalanceTracker(pub BTreeMap); // Tracks data for each user impl BalanceTracker { pub fn new() -> Self { - Self { - accounts: HashMap::new(), + BalanceTracker(BTreeMap::new()) + } + /// Replay all transactions sequentially and return a balance tracker + pub fn replay_transactions(&mut self, orders: &mut [ExchangeOrder]) -> Result<()> { + orders.sort_by_key(|order| order.filled_at); + for o in orders { + self.process_transaction_alt(o); } + Ok(()) } - pub fn process_transaction(&mut self, order: &ExchangeOrder) { - let date = order.created_at; - let (buyer_id, seller_id, amount) = match order.order_type.as_str() { - "Buy" => (order.user, order.accepter, order.amount * order.price), - "Sell" => (order.accepter, order.user, order.amount * order.price), + pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { + let date = order.filled_at; + match order.order_type.as_str() { + "Buy" => { + // user offered to buy coins (Buyer) + // he sends USD + // accepter sends coins. (Seller) + + self.update_balance_and_flows_alt(order.user, date, order.amount, true); + self.update_balance_and_flows_alt(order.accepter, date, order.amount, false); + } + "Sell" => { + // user offered to sell coins (Seller) + // he sends Coins + // accepter sends USD. (Buyer) + self.update_balance_and_flows_alt(order.accepter, date, order.amount, true); + self.update_balance_and_flows_alt(order.user, date, order.amount, false); + } _ => { println!("ERROR: not a valid Buy/Sell order, {:?}", &order); - return; } - }; - - self.update_balance_and_flows(seller_id, date, -amount, false); - self.update_balance_and_flows(buyer_id, date, amount, true); + } } - - fn update_balance_and_flows( + fn update_balance_and_flows_alt( &mut self, user_id: u32, date: DateTime, amount: f64, - is_user: bool, + credit: bool, ) { - let account = self.accounts.entry(user_id).or_default(); - let daily_balance = account.daily_balances.entry(date).or_insert(0.0); + let ul = self.0.entry(user_id).or_default(); - if amount > 0.0 { - *account.daily_inflows.entry(date).or_insert(0.0) += amount; - } else { - *account.daily_outflows.entry(date).or_insert(0.0) += -amount; + let has_history = !ul.0.is_empty(); + + let most_recent_date = *ul.0.keys().max_by(|x, y| x.cmp(y)).unwrap_or(&date); + + // NOTE the previous record may be today's record from a previous transaction. Need to take care in the aggregation below + + // // TODO: gross, this shouldn't clone + // let previous = if let Some(d) = most_recent_date { + // ul.0.entry(*).or_default().to_owned() + // } else { + // AccountDataAlt::default() + // }; + + if most_recent_date > date { + // don't know what to here + error!("most recent ledger date is higher than current day"); + return; + }; + + let previous = + ul.0.get(&most_recent_date) + .unwrap_or(&AccountDataAlt::default()) + .clone(); + + let today = ul.0.entry(date).or_default(); + + // roll over from previous + if has_history { + today.current_balance = previous.current_balance; + today.total_funded = previous.total_funded; + today.total_inflows = previous.total_inflows; + today.total_outflows = previous.total_outflows; } - if is_user { - *account.daily_user_flows.entry(date).or_insert(0.0) += amount; + if credit { + today.current_balance += amount; + today.total_inflows += amount; + // there are records from today + if most_recent_date == date { + today.daily_inflows = previous.daily_inflows + amount; + } else { + // today's first record + today.daily_inflows = amount; + } } else { - *account.daily_accepter_flows.entry(date).or_insert(0.0) += amount; + // debit + today.current_balance += -amount; + today.total_outflows += amount; + + if most_recent_date == date { + today.daily_outflows = previous.daily_outflows + amount; + } else { + today.daily_outflows = amount; + } } - let new_balance = *daily_balance + amount; - if new_balance < 0.0 { - let funding_needed = -new_balance; - *account.daily_funding.entry(date).or_insert(0.0) += funding_needed; - *daily_balance = 0.0; - } else { - *daily_balance = new_balance; + // find out if the outflows created a funding requirement on the account + if today.current_balance < 0.0 { + let negative_balance = today.current_balance.abs(); + // funding was needed + today.total_funded += negative_balance; + + // if the previous record is from today + if most_recent_date == date { + today.daily_funding = previous.daily_funding + negative_balance; + } else { + today.daily_funding = negative_balance; + } + // reset to zero + today.current_balance = 0.0; } } @@ -125,165 +167,103 @@ impl BalanceTracker { } None } - /// Generate a Cypher query string to insert data into Neo4j - pub fn generate_cypher_query(&self, map: String) -> String { - // r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, - format!( - r#" - UNWIND {map} AS account - MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) - MERGE (ul:UserLedger {{date: datetime(account.date)}}) - SET ul.balance = account.balance, - ul.funding = account.funding, - ul.inflows = account.inflows, - ul.outflows = account.outflows, - ul.user_flows = account.user_flows, - ul.accepter_flows = account.accepter_flows - MERGE (sa)-[r:Daily]->(ul) - SET r.date = datetime(account.date) - RETURN COUNT(r) as merged_relations - "#, - ) - } -} -/// Manages cache logic and invokes replay_transactions only if necessary -pub fn get_or_recalculate_balances( - orders: &mut [ExchangeOrder], - cache_file: Option, - force_recalculate: bool, -) -> BalanceTracker { - if !force_recalculate && cache_file.is_some() { - if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap()) - { - return cached_tracker; - } - } + pub fn to_cypher_map(&self, id: u32) -> Result { + let ul = self.0.get(&id).context("no user")?; + let mut list_literal: String = "".to_owned(); - let tracker = replay_transactions(orders); - if let Some(p) = cache_file { - tracker.save_to_cache(&p); - } - tracker -} + for date in ul.0.keys() { + if let Some(acc) = ul.0.get(date) { + let obj = format!( + r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, + id, + date.to_rfc3339(), + acc.current_balance, + acc.total_funded, + acc.total_inflows, + acc.total_outflows, + acc.daily_funding, + acc.daily_inflows, + acc.daily_outflows, + ); + + list_literal.push_str(&obj); + list_literal.push(','); + } else { + continue; + } + } -/// Replay all transactions sequentially and return a balance tracker -pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker { - let mut tracker = BalanceTracker::new(); - let sorted_orders = orders; - sorted_orders.sort_by_key(|order| order.created_at); - for order in sorted_orders { - tracker.process_transaction(order); + list_literal.pop(); // need to drop last comma "," + Ok(format!("[{}]", list_literal)) } - tracker -} -/// submit to db -pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result { - let mut merged_relations = 0u64; - for (id, acc) in balances.accounts.iter() { - let data = acc.to_cypher_map(*id); - let query_literal = balances.generate_cypher_query(data); + pub async fn submit_one_id(&self, id: u32, pool: &Graph) -> Result { + let data = self.to_cypher_map(id)?; + let query_literal = generate_cypher_query(data); let query = Query::new(query_literal); let mut result = pool.execute(query).await?; - while let Some(r) = result.next().await? { - if let Ok(i) = r.get::("merged_relations") { - trace!("merged ledger in tx: {i}"); - merged_relations += i; - }; + let row = result.next().await?.context("no row returned")?; + + let merged: u64 = row + .get("merged_relations") + .context("no unique_accounts field")?; + + trace!("merged ledger in tx: {merged}"); + Ok(merged) + } + /// submit to db + pub async fn submit_ledger(&self, pool: &Graph) -> Result { + let mut merged_relations = 0u64; + for id in self.0.keys() { + match self.submit_one_id(*id, pool).await { + Ok(m) => merged_relations += m, + Err(e) => error!("could not submit user ledger: {}", e), + } } + Ok(merged_relations) } - Ok(merged_relations) +} + +/// Generate a Cypher query string to insert data into Neo4j +pub fn generate_cypher_query(map: String) -> String { + // r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, + format!( + r#" + UNWIND {map} AS account + MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) + MERGE (ul:UserLedger {{swap_id: account.swap_id, date: datetime(account.date)}}) + SET ul.current_balance = account.current_balance, + ul.total_funded = account.total_funded, + ul.total_inflows = account.total_inflows, + ul.total_outflows = account.total_outflows, + ul.daily_funding = account.daily_funding, + ul.daily_inflows = account.daily_inflows, + ul.daily_outflows = account.daily_outflows + MERGE (sa)-[r:DailyLedger]->(ul) + SET r.date = datetime(account.date) + RETURN COUNT(r) as merged_relations + "#, + ) } /// Helper function to parse "YYYY-MM-DD" into `DateTime` -fn parse_date(date_str: &str) -> DateTime { +pub fn parse_date(date_str: &str) -> DateTime { let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset DateTime::parse_from_rfc3339(&datetime_str) .expect("Invalid date format; expected YYYY-MM-DD") .with_timezone(&Utc) } -/// Reusable function to print account data -pub fn print_account_data(user_id: u32, data: &AccountData) { - println!("User: {}", user_id); - for (date, balance) in &data.daily_balances { - println!(" Date: {}, Balance: {}", date, balance); - } - for (date, funding) in &data.daily_funding { - println!(" Date: {}, Funding: {}", date, funding); - } - for (date, inflow) in &data.daily_inflows { - println!(" Date: {}, Inflows: {}", date, inflow); - } - for (date, outflow) in &data.daily_outflows { - println!(" Date: {}, Outflows: {}", date, outflow); - } - for (date, user_flow) in &data.daily_user_flows { - println!(" Date: {}, User Flow: {}", date, user_flow); - } - for (date, accepter_flow) in &data.daily_accepter_flows { - println!(" Date: {}, Accepter Flow: {}", date, accepter_flow); - } -} - -/// Display statistics for a specific account within a date range -pub fn display_account_statistics( - user_id: u32, - data: &AccountData, - start_date: &str, - end_date: &str, -) { - let start = parse_date(start_date); - let end = parse_date(end_date); - - println!( - "Statistics for User {} from {} to {}", - user_id, start_date, end_date - ); - - let mut total_balance = 0.0; - let mut total_funding = 0.0; - let mut total_inflows = 0.0; - let mut total_outflows = 0.0; - - for (date, balance) in &data.daily_balances { - if *date >= start && *date <= end { - total_balance += balance; - } - } - - for (date, funding) in &data.daily_funding { - if *date >= start && *date <= end { - total_funding += funding; - } - } - - for (date, inflow) in &data.daily_inflows { - if *date >= start && *date <= end { - total_inflows += inflow; - } - } - - for (date, outflow) in &data.daily_outflows { - if *date >= start && *date <= end { - total_outflows += outflow; - } - } - - println!(" Total Balance: {:.2}", total_balance); - println!(" Total Funding: {:.2}", total_funding); - println!(" Total Inflows: {:.2}", total_inflows); - println!(" Total Outflows: {:.2}", total_outflows); -} #[test] fn test_replay_transactions() { - // Create orders with meaningful data and specific dates let mut orders = vec![ + // user_1 creates an offer to BUY, user_2 accepts. + // user_1 sends USD, user_2 moves 10 coins. ExchangeOrder { user: 1, - order_type: "BUY".to_string(), + order_type: "Buy".to_string(), amount: 10.0, price: 2.0, created_at: parse_date("2024-03-01"), @@ -296,8 +276,10 @@ fn test_replay_transactions() { shill_bid: None, }, ExchangeOrder { + // user 2 creates an offer to SELL, user 3 accepts. + // user 3 sends USD user 2 moves amount of coins. user: 2, - order_type: "SELL".to_string(), + order_type: "Sell".to_string(), amount: 5.0, price: 3.0, created_at: parse_date("2024-03-05"), @@ -309,9 +291,11 @@ fn test_replay_transactions() { price_vs_rms_24hour: 0.0, shill_bid: None, }, + // user 3 creates an offer to BUY, user 1 accepts. + // user 3 sends USD user 1 moves amount of coins. ExchangeOrder { user: 3, - order_type: "BUY".to_string(), + order_type: "Buy".to_string(), amount: 15.0, price: 1.5, created_at: parse_date("2024-03-10"), @@ -325,107 +309,185 @@ fn test_replay_transactions() { }, ]; - let tracker = replay_transactions(&mut orders); + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders).unwrap(); + + let accs = tracker.0; + + let user_1 = accs.get(&1).unwrap(); + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-02")).unwrap(); + + assert!(acc.current_balance == 10.0); + assert!(acc.total_funded == 0.0); + assert!(acc.total_outflows == 0.0); + assert!(acc.total_inflows == 10.0); + assert!(acc.daily_funding == 0.0); + assert!(acc.daily_inflows == 10.0); + assert!(acc.daily_outflows == 0.0); + + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-11")).unwrap(); + + // balance got drawn to negative on sale of 15 coin + assert!(acc.current_balance == 0.0); + // implied he had to fund with at least 5 coins + assert!(acc.total_funded == 5.0); + assert!(acc.total_outflows == 15.0); + // the all-time inflows should not have changed from the previous period + assert!(acc.total_inflows == 10.0); + assert!(acc.daily_funding == 5.0); + assert!(acc.daily_inflows == 0.0); + assert!(acc.daily_outflows == 15.0); + + let user_1 = accs.get(&3).unwrap(); + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-06")).unwrap(); + + assert!(acc.current_balance == 5.0); + assert!(acc.total_funded == 0.0); + assert!(acc.total_outflows == 0.0); + assert!(acc.total_inflows == 5.0); + assert!(acc.daily_funding == 0.0); + assert!(acc.daily_inflows == 5.0); + assert!(acc.daily_outflows == 0.0); + + let (_, acc) = user_1.0.get_key_value(&parse_date("2024-03-11")).unwrap(); + + // balance should increase again + assert!(acc.current_balance == 20.0); + assert!(acc.total_funded == 0.0); + assert!(acc.total_outflows == 0.0); + assert!(acc.total_inflows == 20.0); + assert!(acc.daily_funding == 0.0); + assert!(acc.daily_inflows == 15.0); + assert!(acc.daily_outflows == 0.0); +} - // Analyze results for March 2024 - for (user_id, data) in &tracker.accounts { - print_account_data(*user_id, data); - display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); +#[test] +fn test_example_user() -> Result<()> { + use crate::extract_exchange_orders; + use std::path::PathBuf; + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| { + if el.filled_at < parse_date("2024-01-16") { + if el.user == 123 { + return true; + }; + if el.accepter == 123 { + return true; + }; + } + false + }); + + assert!(orders.len() == 68); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + + // check that running totals e.g. total_funded are always monotonically increasing. + + // Dump case + // This user only had outflows of coins, and thus an increasing funding requirement. + let user = tracker.0.get(&123).unwrap(); + assert!(user.0.len() == 68); + + // btree is already sorted + let mut prev_funding = 0.0; + let mut prev_inflows = 0.0; + let mut prev_outflows = 0.0; + for (_d, acc) in user.0.iter() { + assert!( + acc.total_funded >= prev_funding, + "total_funded is not monotonically increasing" + ); + assert!( + acc.total_inflows >= prev_inflows, + "total_inflows is not monotonically increasing" + ); + assert!( + acc.total_outflows >= prev_outflows, + "total_outflows is not monotonically increasing" + ); + prev_funding = acc.total_funded; + prev_inflows = acc.total_inflows; + prev_outflows = acc.total_outflows; } + + Ok(()) } -#[ignore] -// TODO: check paths #[test] -fn test_cache_mechanism() { - let cache_file = "balance_tracker_cache.json".to_string(); - let mut orders = vec![ - ExchangeOrder { - user: 1, - order_type: "BUY".to_string(), - amount: 10.0, - price: 2.0, - created_at: parse_date("2024-03-01"), - filled_at: parse_date("2024-03-02"), - accepter: 2, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ExchangeOrder { - user: 2, - order_type: "SELL".to_string(), - amount: 5.0, - price: 3.0, - created_at: parse_date("2024-03-05"), - filled_at: parse_date("2024-03-06"), - accepter: 3, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ]; +fn test_example_week() -> Result<()> { + // history for two users 123, and 336 + use crate::extract_exchange_orders; + use std::path::PathBuf; + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| el.filled_at < parse_date("2024-01-16")); + assert!(orders.len() == 956); - let tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), true); - assert!(tracker.accounts.contains_key(&1)); - assert!(tracker.accounts.contains_key(&2)); + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + + // // check that running totals e.g. total_funded are always monotonically increasing. + + // Dump case + // This user only had outflows of coins, and thus an increasing funding requirement. + let user = tracker.0.get(&123).unwrap(); + + // btree is already sorted + let mut prev_funding = 0.0; + let mut prev_inflows = 0.0; + let mut prev_outflows = 0.0; + for (_d, acc) in user.0.iter() { + assert!( + acc.total_funded >= prev_funding, + "total_funded is not monotonically increasing" + ); + assert!( + acc.total_inflows >= prev_inflows, + "total_inflows is not monotonically increasing" + ); + assert!( + acc.total_outflows >= prev_outflows, + "total_outflows is not monotonically increasing" + ); + prev_funding = acc.total_funded; + prev_inflows = acc.total_inflows; + prev_outflows = acc.total_outflows; + } - // Test loading from cache - let cached_tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), false); - assert!(cached_tracker.accounts.contains_key(&1)); - assert!(cached_tracker.accounts.contains_key(&2)); + // Active Trading case, 336 + // This user only had outflows of coins, and thus an increasing funding requirement. + let user = tracker.0.get(&336).unwrap(); + + // btree is already sorted + let mut prev_funding = 0.0; + let mut prev_inflows = 0.0; + let mut prev_outflows = 0.0; + for (_d, acc) in user.0.iter() { + assert!( + acc.total_funded >= prev_funding, + "total_funded is not monotonically increasing" + ); + assert!( + acc.total_inflows >= prev_inflows, + "total_inflows is not monotonically increasing" + ); + assert!( + acc.total_outflows >= prev_outflows, + "total_outflows is not monotonically increasing" + ); + prev_funding = acc.total_funded; + prev_inflows = acc.total_inflows; + prev_outflows = acc.total_outflows; + } - // Cleanup - let _ = fs::remove_file(cache_file); + Ok(()) } - -// #[test] - -// fn test_cypher_query() { -// let tracker = BalanceTracker::new(); // Assume tracker is populated -// // let params = tracker.generate_cypher_params(); -// let query = tracker.generate_cypher_query(); -// // dbg!(¶ms); -// dbg!(&query); -// } - -// I'm coding some data analysis in rust. - -// I have a vector structs that looks like this: - -// pub struct ExchangeOrder { -// pub user: u32, -// #[serde(rename = "orderType")] -// pub order_type: String, -// #[serde(deserialize_with = "deserialize_amount")] -// pub amount: f64, -// #[serde(deserialize_with = "deserialize_amount")] -// pub price: f64, -// pub created_at: DateTime, -// pub filled_at: DateTime, -// pub accepter: u32, -// #[serde(skip_deserializing)] -// pub rms_hour: f64, -// #[serde(skip_deserializing)] -// pub rms_24hour: f64, -// #[serde(skip_deserializing)] -// pub price_vs_rms_hour: f64, -// #[serde(skip_deserializing)] -// pub price_vs_rms_24hour: f64, -// #[serde(skip_deserializing)] -// pub shill_bid: Option, // New field to indicate if it took the best price -// } - -// My goal is to determine the amount of funding ('amount') that each account required at a given time. We will need to replay all the transaction history sequentially. - -// We need a new data structure to track account balances. Accepting a BUY transaction by another User, would decrease the total balance of the accepter, and increase of the User. Accepting a SELL transaction, would increase the balance of the accepter, and decrease that of the User. - -// We also need a data structure to save when there were funding events to the account. We can assume all accounts start at 0 total_balance. This means that we need to also track a funded_event_amount, for whenever the account would have a negative balance. - -// As for granularity of time we should just track daily balances, and daily funding. - -// How would I do this in Rust? diff --git a/src/extract_snapshot.rs b/src/extract_snapshot.rs index 4f7a812..e492c24 100644 --- a/src/extract_snapshot.rs +++ b/src/extract_snapshot.rs @@ -10,7 +10,10 @@ use libra_backwards_compatibility::version_five::{ use libra_storage::read_snapshot::{accounts_from_snapshot_backup, load_snapshot_manifest}; use libra_types::{ exports::AccountAddress, - move_resource::{libra_coin::LibraCoinStoreResource, wallet::SlowWalletResource}, + move_resource::{ + cumulative_deposits::CumulativeDepositResource, libra_coin::LibraCoinStoreResource, + wallet::SlowWalletResource, + }, }; use log::{error, info, warn}; @@ -112,6 +115,11 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result()? { + s.donor_voice_acc = true; + } + warehouse_state.push(s); } } diff --git a/src/load.rs b/src/load.rs index c50dff8..c40af79 100644 --- a/src/load.rs +++ b/src/load.rs @@ -8,8 +8,8 @@ use crate::{ scan::{ArchiveMap, ManifestInfo}, }; -use anyhow::{Context, Result}; -use log::{info, warn}; +use anyhow::{bail, Context, Result}; +use log::{error, info, warn}; use neo4rs::Graph; /// takes all the archives from a map, and tries to load them sequentially @@ -74,7 +74,10 @@ pub async fn try_load_one_archive( crate::scan::BundleContent::Unknown => todo!(), crate::scan::BundleContent::StateSnapshot => { let snaps = match man.version { - crate::scan::FrameworkVersion::Unknown => todo!(), + crate::scan::FrameworkVersion::Unknown => { + error!("no framework version detected"); + bail!("could not load archive from manifest"); + } crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?, crate::scan::FrameworkVersion::V6 => { extract_current_snapshot(&man.archive_dir).await? diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index ef1a384..963a4c4 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,7 +5,7 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - analytics::{enrich_account_funding, enrich_rms}, + analytics::{enrich_account_funding::BalanceTracker, enrich_rms}, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder, }; @@ -86,14 +86,20 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { let mut orders = extract_exchange_orders::read_orders_from_file(path)?; + info!("completed parsing orders"); + // add RMS stats to each order enrich_rms::include_rms_stats(&mut orders); + info!("completed rms statistics"); + // find likely shill bids enrich_rms::process_sell_order_shill(&mut orders); enrich_rms::process_buy_order_shill(&mut orders); + info!("completed shill bid calculation"); - let balances = enrich_account_funding::replay_transactions(&mut orders); - let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; + let mut balances = BalanceTracker::new(); + balances.replay_transactions(&mut orders)?; + let ledger_inserts = balances.submit_ledger(pool).await?; info!("exchange ledger relations inserted: {}", ledger_inserts); swap_batch(&orders, pool, batch_size).await diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index 6d19eb8..f2eb51d 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -35,6 +35,14 @@ pub static INDEX_SWAP_ID: &str = pub static INDEX_SWAP_TIME: &str = "CREATE INDEX swap_time IF NOT EXISTS FOR ()-[r:Swap]-() ON (r.filled_at)"; +pub static INDEX_EXCHANGE_LEDGER: &str = " + CREATE INDEX user_ledger IF NOT EXISTS FOR (ul:UserLedger) ON (ul.date) + "; + +pub static INDEX_EXCHANGE_LINK_LEDGER: &str = " + CREATE INDEX link_ledger IF NOT EXISTS FOR ()-[r:DailyLedger]->() ON (r.date) + "; + /// get the testing neo4j connection pub async fn get_neo4j_localhost_pool(port: u16) -> Result { let uri = format!("127.0.0.1:{port}"); @@ -67,6 +75,8 @@ pub async fn maybe_create_indexes(graph: &Graph) -> Result<()> { INDEX_TX_HASH, INDEX_TX_FUNCTION, INDEX_SWAP_ID, + INDEX_EXCHANGE_LEDGER, + INDEX_EXCHANGE_LINK_LEDGER, ]) .await?; txn.commit().await?; diff --git a/src/scan.rs b/src/scan.rs index c9cb407..bd1844d 100644 --- a/src/scan.rs +++ b/src/scan.rs @@ -36,10 +36,13 @@ impl ManifestInfo { match self.contents { BundleContent::Unknown => return FrameworkVersion::Unknown, BundleContent::StateSnapshot => { + let man_path = self.archive_dir.join(self.contents.filename()); + dbg!(&man_path); + // first check if the v7 manifest will parse - if load_snapshot_manifest(&self.archive_dir).is_ok() { + if let Ok(_bak) = load_snapshot_manifest(&man_path) { self.version = FrameworkVersion::V7; - } + }; if v5_read_from_snapshot_manifest(&self.archive_dir).is_ok() { self.version = FrameworkVersion::V5; @@ -98,15 +101,19 @@ pub fn scan_dir_archive( ) -> Result { let path = parent_dir.canonicalize()?; let filename = content_opt.unwrap_or(BundleContent::Unknown).filename(); + dbg!(&filename); let pattern = format!( "{}/**/{}", path.to_str().context("cannot parse starting dir")?, filename, ); + dbg!(&pattern); + let mut archive = BTreeMap::new(); for entry in glob(&pattern)? { + dbg!(&entry); match entry { Ok(manifest_path) => { let dir = manifest_path @@ -114,6 +121,7 @@ pub fn scan_dir_archive( .context("no parent dir found")? .to_owned(); let contents = test_content(&manifest_path); + dbg!(&contents); let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned(); let mut m = ManifestInfo { archive_dir: dir.clone(), diff --git a/src/schema_account_state.rs b/src/schema_account_state.rs index ad9f858..a44fc3d 100644 --- a/src/schema_account_state.rs +++ b/src/schema_account_state.rs @@ -19,6 +19,7 @@ pub struct WarehouseAccState { pub balance: u64, pub slow_wallet_locked: u64, pub slow_wallet_transferred: u64, + pub donor_voice_acc: bool, } impl Default for WarehouseAccState { @@ -30,6 +31,7 @@ impl Default for WarehouseAccState { balance: Default::default(), slow_wallet_locked: Default::default(), slow_wallet_transferred: Default::default(), + donor_voice_acc: false, } } } @@ -43,6 +45,7 @@ impl WarehouseAccState { balance: 0, slow_wallet_locked: 0, slow_wallet_transferred: 0, + donor_voice_acc: false, } } pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) { @@ -57,7 +60,7 @@ impl WarehouseAccState { /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { format!( - r#"{{address: "{}", balance: {}, version: {}, sequence_num: {}, slow_locked: {}, slow_transfer: {}, framework_version: "{}" }}"#, + r#"{{address: "{}", balance: {}, version: {}, sequence_num: {}, slow_locked: {}, slow_transfer: {}, framework_version: "{}", donor_voice: {} }}"#, self.address.to_hex_literal(), self.balance, self.time.version, @@ -65,6 +68,7 @@ impl WarehouseAccState { self.slow_wallet_locked, self.slow_wallet_transferred, self.time.framework_version, + self.donor_voice_acc, ) } @@ -87,7 +91,7 @@ impl WarehouseAccState { UNWIND tx_data AS tx MERGE (addr:Account {{address: tx.address}}) - MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance, framework_version: tx.framework_version, version: tx.version, sequence_num: tx.sequence_num, slow_locked: tx.slow_locked, slow_transfer: tx.slow_transfer }}) + MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance, framework_version: tx.framework_version, version: tx.version, sequence_num: tx.sequence_num, slow_locked: tx.slow_locked, slow_transfer: tx.slow_transfer, donor_voice: tx.donor_voice }}) MERGE (addr)-[rel:State {{version: tx.version}} ]->(snap) RETURN diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index fe1e220..584e151 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -1,6 +1,6 @@ use anyhow::{bail, Result}; use clap::{Parser, Subcommand}; -use log::{info, warn}; +use log::{error, info, warn}; use neo4rs::Graph; use serde_json::json; use std::path::PathBuf; @@ -59,7 +59,7 @@ pub enum Sub { batch_size: Option, }, /// process and load a single archive - LoadOne { + IngestOne { #[clap(long, short('d'))] /// location of archive archive_dir: PathBuf, @@ -126,32 +126,28 @@ impl WarehouseCli { neo4j_init::maybe_create_indexes(&pool).await?; ingest_all(&map, &pool, self.clear_queue, batch_size.unwrap_or(250)).await?; } - Sub::LoadOne { + Sub::IngestOne { archive_dir, batch_size, - } => match scan_dir_archive(archive_dir, None)?.0.get(archive_dir) { - Some(man) => { + } => { + let am = scan_dir_archive(archive_dir, None)?; + if am.0.is_empty() { + error!("cannot find .manifest file under {}", archive_dir.display()); + } + for (_p, man) in am.0 { let pool = try_db_connection_pool(self).await?; neo4j_init::maybe_create_indexes(&pool).await?; - try_load_one_archive(man, &pool, batch_size.unwrap_or(250)).await?; - } - None => { - bail!(format!( - "ERROR: cannot find .manifest file under {}", - archive_dir.display() - )); + try_load_one_archive(&man, &pool, batch_size.unwrap_or(250)).await?; } - }, + } Sub::Check { archive_dir } => { - match scan_dir_archive(archive_dir, None)?.0.get(archive_dir) { - Some(_) => todo!(), - None => { - bail!(format!( - "ERROR: cannot find .manifest file under {}", - archive_dir.display() - )); - } + let am = scan_dir_archive(archive_dir, None)?; + if am.0.is_empty() { + error!("cannot find .manifest file under {}", archive_dir.display()); + } + for (p, man) in am.0 { + info!("manifest found at {} \n {:?}", p.display(), man); } } Sub::EnrichExchange { diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index e8ecd51..48db7e8 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -3,7 +3,11 @@ use anyhow::Result; use std::path::PathBuf; use libra_forensic_db::{ - analytics, extract_exchange_orders, load_exchange_orders, + analytics::{ + self, + enrich_account_funding::{parse_date, BalanceTracker}, + }, + extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, }; use support::neo4j_testcontainer::start_neo4j_container; @@ -96,3 +100,135 @@ async fn test_rms_batch() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_submit_exchange_ledger() -> Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| { + if el.filled_at < parse_date("2024-01-16") { + if el.user == 123 { + return true; + }; + if el.accepter == 123 { + return true; + }; + } + false + }); + + assert!(orders.len() == 68); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + dbg!(&tracker.0.len()); + let days_records = tracker.0.len(); + assert!(days_records == 47); + + let user = tracker.0.get(&123).unwrap(); + assert!(user.0.len() == 68); + + let res = tracker.submit_one_id(123, &graph).await?; + + // the number of transactions merged should equal the number of orders + assert!(res == orders.len() as u64); + + // check there are transaction records with function args. + let cypher_query = neo4rs::query( + "MATCH (s:SwapAccount)-[r:DailyLedger]->(ul:UserLedger) + WHERE s.swap_id = 123 + ORDER BY ul.date + RETURN s.swap_id AS id, ul.date AS date, ul.total_funded AS funded + ", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + let mut prev_funding = 0; + let mut i = 0; + + // Fetch the first row only + while let Some(r) = result.next().await? { + if let Ok(s) = r.get::("funded") { + i += 1; + assert!(s >= prev_funding, "funded totals should always increase"); + prev_funding = s; + } + } + + assert!(i == orders.len()); + + Ok(()) +} + +#[tokio::test] +async fn test_submit_exchange_ledger_all() -> Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + orders.retain(|el| el.filled_at < parse_date("2024-01-16")); + + assert!(orders.len() == 956); + + let mut tracker = BalanceTracker::new(); + tracker.replay_transactions(&mut orders)?; + let days_records = tracker.0.len(); + assert!(days_records == 367); // each users * dates with txs + + let user = tracker.0.get(&123).unwrap(); + assert!(user.0.len() == 68); + + let res = tracker.submit_ledger(&graph).await?; + + // there should be double len of ledgers, since user and accepter will have a ledger + assert!(res == (orders.len() * 2) as u64); + + // check there are transaction records with function args. + let cypher_query = neo4rs::query( + "MATCH (s:SwapAccount)-[r:DailyLedger]->(ul:UserLedger) + WHERE s.swap_id = 123 + ORDER BY ul.date + RETURN s.swap_id AS id, ul.date AS date, ul.total_funded AS funded + ", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + let mut prev_funding = 0; + let mut i = 0; + + // Fetch the first row only + while let Some(r) = result.next().await? { + if let Ok(s) = r.get::("funded") { + i += 1; + + assert!(s >= prev_funding, "funded totals should always increase"); + prev_funding = s; + } + } + + assert!(i == user.0.len()); + + Ok(()) +} diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 392fdb2..a6ca3d9 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - analytics::{enrich_account_funding, enrich_rms}, + analytics::{enrich_account_funding::BalanceTracker, enrich_rms}, extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_exchange_orders::ExchangeOrder, @@ -72,9 +72,10 @@ fn test_enrich_account_funding() { let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); - let balance = enrich_account_funding::replay_transactions(&mut orders); + let mut balance = BalanceTracker::new(); + balance.replay_transactions(&mut orders).unwrap(); - dbg!(balance.accounts.len()); + assert!(balance.0.len() == 3957); } #[test] @@ -173,3 +174,19 @@ async fn e2e_swap_data() -> Result<()> { Ok(()) } + +#[ignore] +#[tokio::test] +async fn test_entry_point_exchange_load() -> Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port).await?; + maybe_create_indexes(&graph).await?; + + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + load_exchange_orders::load_from_json(&buf, &graph, 10).await?; + Ok(()) +} diff --git a/tests/test_load_state.rs b/tests/test_load_state.rs index cb93665..2d32694 100644 --- a/tests/test_load_state.rs +++ b/tests/test_load_state.rs @@ -1,13 +1,14 @@ mod support; use libra_forensic_db::{ - extract_snapshot::extract_v5_snapshot, + extract_snapshot::{extract_current_snapshot, extract_v5_snapshot}, load_account_state::{impl_batch_snapshot_insert, snapshot_batch}, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_account_state::WarehouseAccState, }; use support::{ - fixtures::v5_state_manifest_fixtures_path, neo4j_testcontainer::start_neo4j_container, + fixtures::{v5_state_manifest_fixtures_path, v7_state_manifest_fixtures_path}, + neo4j_testcontainer::start_neo4j_container, }; #[tokio::test] @@ -36,6 +37,44 @@ async fn test_snapshot_unit() -> anyhow::Result<()> { #[tokio::test] async fn test_snapshot_batch() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); + let archive_path = v7_state_manifest_fixtures_path(); + assert!(archive_path.exists()); + let vec_snap = extract_current_snapshot(&archive_path).await?; + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&graph) + .await + .expect("could start index"); + + let merged_snapshots = impl_batch_snapshot_insert(&graph, &vec_snap[..100]).await?; + + assert!(merged_snapshots.created_tx == 100); + + // check DB to see what is persisted + let cypher_query = neo4rs::query( + "MATCH ()-[r:State]->() + RETURN count(r) AS count_state_edges", + ); + + // Execute the query + let mut result = graph.execute(cypher_query).await?; + + // Fetch the first row only + let row = result.next().await?.unwrap(); + let count: i64 = row.get("count_state_edges").unwrap(); + + assert!(count == 100i64); + + Ok(()) +} + +#[tokio::test] +async fn test_v5_snapshot_batch() -> anyhow::Result<()> { libra_forensic_db::log_setup(); let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest"); assert!(manifest_file.exists()); diff --git a/tests/test_neo4j_meta.rs b/tests/test_neo4j_meta.rs index a9c4fb1..c214e20 100644 --- a/tests/test_neo4j_meta.rs +++ b/tests/test_neo4j_meta.rs @@ -76,7 +76,7 @@ async fn test_tx_insert() -> Result<()> { } #[tokio::test] -async fn test_init_indices() { +async fn test_init_indexes() { let c = start_neo4j_container(); let port = c.get_host_port_ipv4(7687); let graph = get_neo4j_localhost_pool(port)