diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 12d27cf..3da6e04 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -1,7 +1,8 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use log::trace; -use neo4rs::{Graph, Query}; +use log::error; +// use log::trace; +// use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -11,7 +12,7 @@ use std::{ use crate::schema_exchange_orders::ExchangeOrder; -#[derive(Default, Debug, Deserialize, Serialize)] +#[derive(Default, Debug, Clone, Deserialize, Serialize)] pub struct AccountDataAlt { pub current_balance: f64, pub total_funded: f64, @@ -25,7 +26,19 @@ pub struct AccountDataAlt { #[derive(Default, Debug, Deserialize, Serialize)] pub struct UserLedger(HashMap, AccountDataAlt>); -impl UserLedger { +#[derive(Default, Debug, Deserialize, Serialize)] +pub struct BalanceTracker(HashMap); // Tracks data for each user + +impl BalanceTracker { + /// Replay all transactions sequentially and return a balance tracker + pub fn replay_transactions(&mut self, orders: &mut [ExchangeOrder]) -> Result<()> { + orders.sort_by_key(|order| order.filled_at); + for o in orders { + self.process_transaction_alt(o); + } + Ok(()) + } + pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { let date = order.created_at; match order.order_type.as_str() { @@ -56,26 +69,62 @@ impl UserLedger { amount: f64, credit: bool, ) { - let account = self.0.entry(&user_id).or_default(); - // let daily_balance = account.current_balance.entry(date).or_insert(0.0); - // let day = account.day.entry(date).or_default(); + let ul = self.0.entry(user_id).or_default(); + + let most_recent_date = *ul.0.keys().max_by(|x, y| x.cmp(y)).unwrap_or(&date); + + // NOTE the previous record may be today's record from a previous transaction. Need to take care in the aggregation below + + // // TODO: gross, this shouldn't clone + // let previous = if let Some(d) = most_recent_date { + // ul.0.entry(*).or_default().to_owned() + // } else { + // AccountDataAlt::default() + // }; + + if most_recent_date > date { + // don't know what to here + error!("most recent ledger date is higher than current day"); + return; + }; + + let previous = ul.0.get(&most_recent_date).unwrap().clone(); + + let today = ul.0.entry(date).or_default(); + if credit { - account.current_balance += amount; - account.total_inflows += amount; - account.daily_inflows += amount; + today.current_balance = previous.current_balance + amount; + today.total_inflows = previous.total_inflows + amount; + if most_recent_date == date { + today.daily_inflows = previous.daily_inflows + amount; + } else { + today.daily_inflows = amount; + } // *daily_balance += amount } else { // debit - account.current_balance += -amount; - account.total_outflows += -amount; - account.daily_outflows += -amount; + today.current_balance = previous.current_balance - amount; + today.total_outflows = previous.total_outflows + amount; + + if most_recent_date == date { + today.daily_outflows = previous.daily_outflows + amount; + } else { + today.daily_outflows = amount; + } } - if account.current_balance < 0.0 { - let negative_balance = account.current_balance; + // find out if the outflows created a funding requirement on the account + if today.current_balance < 0.0 { + let negative_balance = today.current_balance.abs(); // funding was needed - account.total_funded += negative_balance; - account.daily_funding += negative_balance; + today.total_funded = previous.total_funded + negative_balance; + if most_recent_date == date { + today.daily_funding += negative_balance; + } else { + today.daily_funding = negative_balance; + } + // reset to zero + today.current_balance = 0.0; } } @@ -98,41 +147,24 @@ impl UserLedger { } None } -} - -#[derive(Default, Debug, Deserialize, Serialize)] -pub struct BalanceTracker { - pub accounts: HashMap, // Tracks data for each user -} - -impl BalanceTracker { - /// Replay all transactions sequentially and return a balance tracker - pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> Self { - let mut tracker: HashMap = HashMap::new(); - orders.sort_by_key(|order| order.filled_at); - for order in sorted_orders { - tracker.process_transaction_alt(order); - } - tracker - } pub fn to_cypher_map(&self, id: u32) -> Result { - let ul = self.accounts.get(&id).context("no user")?; + let ul = self.0.get(&id).context("no user")?; let mut list_literal: String = "".to_owned(); - for (date, _) in &ul.0 { + for date in ul.0.keys() { if let Some(acc) = ul.0.get(date) { let obj = format!( - r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, - id, - date.to_rfc3339(), - acc.current_balance, - acc.total_funded - acc.total_inflows, - acc.total_outflows, - acc.daily_funding, - acc.daily_inflows, - acc.daily_outflows, + r#"{{ swap_id: {}, date: "{}", current_balance: {}, total_funded: {}, total_inflows: {}, total_outflows: {}, daily_funding: {}, daily_inflows: {}, daily_outflows: {} }}"#, + id, + date.to_rfc3339(), + acc.current_balance, + acc.total_funded, + acc.total_inflows, + acc.total_outflows, + acc.daily_funding, + acc.daily_inflows, + acc.daily_outflows, ); list_literal.push_str(&obj); @@ -168,132 +200,14 @@ pub fn generate_cypher_query(map: String) -> String { "#, ) } -// pub fn new() -> Self { -// Self { -// accounts: HashMap::new(), -// } -// } - -// pub fn process_transaction(&mut self, order: &ExchangeOrder) { -// let date = order.created_at; -// let (buyer_id, seller_id, amount) = match order.order_type.as_str() { -// "Buy" => (order.user, order.accepter, order.amount * order.price), -// "Sell" => (order.accepter, order.user, order.amount * order.price), -// _ => { -// println!("ERROR: not a valid Buy/Sell order, {:?}", &order); -// return; -// } -// }; - -// self.update_balance_and_flows(seller_id, date, -amount, false); -// self.update_balance_and_flows(buyer_id, date, amount, true); -// } -// fn update_balance_and_flows( -// &mut self, -// user_id: u32, -// date: DateTime, -// amount: f64, -// credit: bool, -// ) { -// let account = self.accounts.entry(user_id).or_default(); -// let daily_balance = account.daily_balances.entry(date).or_insert(0.0); - -// if credit { -// *account.daily_inflows.entry(date).or_insert(0.0) += amount; -// *daily_balance += amount -// } else { -// // debit -// *account.daily_outflows.entry(date).or_insert(0.0) += -amount; - -// *daily_balance += -amount -// } - -// // if credit { -// // *account.daily_user_flows.entry(date).or_insert(0.0) += amount; -// // } else { -// // *account.daily_accepter_flows.entry(date).or_insert(0.0) += amount; -// // } - -// if *daily_balance < 0.0 { -// // funding was needed -// *account.daily_funding.entry(date).or_insert(0.0) += *daily_balance; -// // reset -// *daily_balance = 0.0; -// } -// } - -// /// Save the balance tracker to a JSON file -// pub fn save_to_cache(&self, file_path: &str) { -// if let Ok(json) = serde_json::to_string(self) { -// let _ = fs::write(file_path, json); -// } -// } - -// /// Load the balance tracker from a JSON file -// pub fn load_from_cache(file_path: &str) -> Option { -// if let Ok(mut file) = File::open(file_path) { -// let mut contents = String::new(); -// if file.read_to_string(&mut contents).is_ok() { -// if let Ok(tracker) = serde_json::from_str(&contents) { -// return Some(tracker); -// } -// } -// } -// None -// } -// /// Generate a Cypher query string to insert data into Neo4j -// pub fn generate_cypher_query(&self, map: String) -> String { -// // r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, -// format!( -// r#" -// UNWIND {map} AS account -// MERGE (sa:SwapAccount {{swap_id: account.swap_id}}) -// MERGE (ul:UserLedger {{date: datetime(account.date)}}) -// SET ul.balance = account.balance, -// ul.funding = account.funding, -// ul.inflows = account.inflows, -// ul.outflows = account.outflows, -// ul.user_flows = account.user_flows, -// ul.accepter_flows = account.accepter_flows -// MERGE (sa)-[r:Daily]->(ul) -// SET r.date = datetime(account.date) -// RETURN COUNT(r) as merged_relations -// "#, -// ) -// } -// } - -// /// Manages cache logic and invokes replay_transactions only if necessary -// pub fn get_or_recalculate_balances( -// orders: &mut [ExchangeOrder], -// cache_file: Option, -// force_recalculate: bool, -// ) -> BalanceTracker { -// if !force_recalculate && cache_file.is_some() { -// if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap()) -// { -// return cached_tracker; -// } -// } - -// let tracker = replay_transactions(orders); -// if let Some(p) = cache_file { -// tracker.save_to_cache(&p); -// } -// tracker -// } - -// /// Replay all transactions sequentially and return a balance tracker -// pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker { -// let mut tracker = BalanceTracker::new(); -// let sorted_orders = orders; -// sorted_orders.sort_by_key(|order| order.filled_at); -// for order in sorted_orders { -// tracker.process_transaction(order); -// } -// tracker -// } +/// Helper function to parse "YYYY-MM-DD" into `DateTime` +fn parse_date(date_str: &str) -> DateTime { + let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset + DateTime::parse_from_rfc3339(&datetime_str) + .expect("Invalid date format; expected YYYY-MM-DD") + .with_timezone(&Utc) +} // /// submit to db // pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result { @@ -314,13 +228,6 @@ pub fn generate_cypher_query(map: String) -> String { // Ok(merged_relations) // } -/// Helper function to parse "YYYY-MM-DD" into `DateTime` -fn parse_date(date_str: &str) -> DateTime { - let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset - DateTime::parse_from_rfc3339(&datetime_str) - .expect("Invalid date format; expected YYYY-MM-DD") - .with_timezone(&Utc) -} // /// Reusable function to print account data // pub fn print_account_data(user_id: u32, data: &AccountData) { // println!("User: {}", user_id); @@ -394,67 +301,67 @@ fn parse_date(date_str: &str) -> DateTime { // println!(" Total Outflows: {:.2}", total_outflows); // } -#[test] -fn test_replay_transactions() { - let mut orders = vec![ - // user 1 creates an offer to BUY, user 2 accepts. - // user 1 sends USD user 2 move amount of coins. - ExchangeOrder { - user: 1, - order_type: "BUY".to_string(), - amount: 10.0, - price: 2.0, - created_at: parse_date("2024-03-01"), - filled_at: parse_date("2024-03-02"), - accepter: 2, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ExchangeOrder { - // user 2 creates an offer to SELL, user 3 accepts. - // user 3 sends USD user 2 moves amount of coins. - user: 2, - order_type: "SELL".to_string(), - amount: 5.0, - price: 3.0, - created_at: parse_date("2024-03-05"), - filled_at: parse_date("2024-03-06"), - accepter: 3, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - // user 3 creates an offer to BUY, user 1 accepts. - // user 3 sends USD user 1 moves amount of coins. - ExchangeOrder { - user: 3, - order_type: "BUY".to_string(), - amount: 15.0, - price: 1.5, - created_at: parse_date("2024-03-10"), - filled_at: parse_date("2024-03-11"), - accepter: 1, - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, - }, - ]; - - let tracker = BalanceTracker::replay_transactions(&mut orders); - - // // Analyze results for March 2024 - // for (user_id, data) in &tracker.accounts { - // print_account_data(*user_id, data); - // display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); - // } -} +// #[test] +// fn test_replay_transactions() { +// let mut orders = vec![ +// // user 1 creates an offer to BUY, user 2 accepts. +// // user 1 sends USD user 2 move amount of coins. +// ExchangeOrder { +// user: 1, +// order_type: "BUY".to_string(), +// amount: 10.0, +// price: 2.0, +// created_at: parse_date("2024-03-01"), +// filled_at: parse_date("2024-03-02"), +// accepter: 2, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// ExchangeOrder { +// // user 2 creates an offer to SELL, user 3 accepts. +// // user 3 sends USD user 2 moves amount of coins. +// user: 2, +// order_type: "SELL".to_string(), +// amount: 5.0, +// price: 3.0, +// created_at: parse_date("2024-03-05"), +// filled_at: parse_date("2024-03-06"), +// accepter: 3, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// // user 3 creates an offer to BUY, user 1 accepts. +// // user 3 sends USD user 1 moves amount of coins. +// ExchangeOrder { +// user: 3, +// order_type: "BUY".to_string(), +// amount: 15.0, +// price: 1.5, +// created_at: parse_date("2024-03-10"), +// filled_at: parse_date("2024-03-11"), +// accepter: 1, +// rms_hour: 0.0, +// rms_24hour: 0.0, +// price_vs_rms_hour: 0.0, +// price_vs_rms_24hour: 0.0, +// shill_bid: None, +// }, +// ]; + +// let tracker = replay_transactions(&mut orders); + +// // // Analyze results for March 2024 +// // for (user_id, data) in &tracker.accounts { +// // print_account_data(*user_id, data); +// // display_account_statistics(*user_id, data, "2024-03-01", "2024-03-31"); +// // } +// } // #[ignore] // // TODO: check paths diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index cff12c9..ce0b242 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,9 +5,7 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - analytics::{enrich_account_funding, enrich_rms}, - extract_exchange_orders, queue, - schema_exchange_orders::ExchangeOrder, + analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder, }; pub async fn swap_batch( @@ -84,17 +82,17 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> Ok((merged as u64, ignored as u64)) } -// pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { -// let mut orders = extract_exchange_orders::read_orders_from_file(path)?; -// // add RMS stats to each order -// enrich_rms::include_rms_stats(&mut orders); -// // find likely shill bids -// enrich_rms::process_sell_order_shill(&mut orders); -// enrich_rms::process_buy_order_shill(&mut orders); +pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { + let mut orders = extract_exchange_orders::read_orders_from_file(path)?; + // add RMS stats to each order + enrich_rms::include_rms_stats(&mut orders); + // find likely shill bids + enrich_rms::process_sell_order_shill(&mut orders); + enrich_rms::process_buy_order_shill(&mut orders); -// let balances = enrich_account_funding::replay_transactions(&mut orders); -// let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; -// info!("exchange ledger relations inserted: {}", ledger_inserts); + // let balances = enrich_account_funding::replay_transactions(&mut orders); + // let ledger_inserts = enrich_account_funding::submit_ledger(&balances, pool).await?; + // info!("exchange ledger relations inserted: {}", ledger_inserts); -// swap_batch(&orders, pool, batch_size).await -// } + swap_batch(&orders, pool, batch_size).await +} diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 392fdb2..acb8f45 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - analytics::{enrich_account_funding, enrich_rms}, + analytics::enrich_rms, extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_exchange_orders::ExchangeOrder, @@ -70,11 +70,11 @@ fn test_sell_order_shill() { fn test_enrich_account_funding() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); - let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + let orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); - let balance = enrich_account_funding::replay_transactions(&mut orders); + // let balance = enrich_account_funding::replay_transactions(&mut orders); - dbg!(balance.accounts.len()); + // dbg!(balance.accounts.len()); } #[test]