diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index e02bb8a..07e70a5 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -1,4 +1,6 @@ +use anyhow::Result; use chrono::{DateTime, Utc}; +use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -17,6 +19,33 @@ pub struct AccountData { pub daily_user_flows: HashMap, f64>, // Amount when the account was a `user` pub daily_accepter_flows: HashMap, f64>, // Amount when the account was an `accepter` } + +impl AccountData { + pub fn to_cypher_map(&self, id: u32) -> String { + let mut list_literal: String = "".to_owned(); + self.daily_balances.iter().for_each(|(date, _) | { + let obj = format!( + r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, + id, + date.to_rfc3339(), + self.daily_balances.get(date).unwrap_or(&0.0), + self.daily_funding.get(date).unwrap_or(&0.0), + self.daily_inflows.get(date).unwrap_or(&0.0), + self.daily_outflows.get(date).unwrap_or(&0.0), + self.daily_user_flows.get(date).unwrap_or(&0.0), + self.daily_accepter_flows.get(date).unwrap_or(&0.0) + ); + + list_literal.push_str(&obj); + list_literal.push(','); + + }); + + list_literal.pop(); // need to drop last comma "," + format!("[{}]", list_literal) + } +} + #[derive(Default, Debug, Deserialize, Serialize)] pub struct BalanceTracker { pub accounts: HashMap, // Tracks data for each user @@ -92,36 +121,80 @@ impl BalanceTracker { } None } + /// Generate a Cypher query string to insert data into Neo4j + pub fn generate_cypher_query(&self) -> String { + let mut query = String::new(); + + // r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#, + query.push_str( + r#" + UNWIND $accounts AS account + MERGE (sa:SwapAccount {swap_id: account.swap_id}) + MERGE (ul:UserLedger {date: account.date}) + SET ul.balance = account.balance, + ul.funding = account.funding, + ul.inflows = account.inflows, + ul.outflows = account.outflows, + ul.user_flows = account.user_flows, + ul.accepter_flows = account.accepter_flows + MERGE (sa)-[:Daily {date: account.date}]->(ul) + "#, + ); + + query + } } /// Manages cache logic and invokes replay_transactions only if necessary pub fn get_or_recalculate_balances( - orders: Vec, - cache_file: &str, + orders: &mut [ExchangeOrder], + cache_file: Option, force_recalculate: bool, ) -> BalanceTracker { - if !force_recalculate { - if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file) { + if !force_recalculate && cache_file.is_some() { + if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap()) + { return cached_tracker; } } let tracker = replay_transactions(orders); - tracker.save_to_cache(cache_file); + if let Some(p) = cache_file { + tracker.save_to_cache(&p); + } tracker } /// Replay all transactions sequentially and return a balance tracker -pub fn replay_transactions(orders: Vec) -> BalanceTracker { +pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker { let mut tracker = BalanceTracker::new(); - let mut sorted_orders = orders; + let sorted_orders = orders; sorted_orders.sort_by_key(|order| order.created_at); for order in sorted_orders { - tracker.process_transaction(&order); + tracker.process_transaction(order); } tracker } +/// submit to db +pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result<()> { + let query_literal = balances.generate_cypher_query(); + println!("Cypher Query:\n{}", &query_literal); + + for (id, acc) in balances.accounts.iter() { + let data = acc.to_cypher_map(*id); + dbg!("Cypher Parameters:\n{:?}", &data); + + let query = Query::new(query_literal.clone()).param("accounts", data); + let mut result = pool.execute(query).await?; + + while let Some(r) = result.next().await? { + dbg!(&r); + } + } + Ok(()) +} + /// Helper function to parse "YYYY-MM-DD" into `DateTime` fn parse_date(date_str: &str) -> DateTime { let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset @@ -205,7 +278,7 @@ pub fn display_account_statistics( #[test] fn test_replay_transactions() { // Create orders with meaningful data and specific dates - let orders = vec![ + let mut orders = vec![ ExchangeOrder { user: 1, order_type: "BUY".to_string(), @@ -250,7 +323,7 @@ fn test_replay_transactions() { }, ]; - let tracker = replay_transactions(orders); + let tracker = replay_transactions(&mut orders); // Analyze results for March 2024 for (user_id, data) in &tracker.accounts { @@ -261,8 +334,8 @@ fn test_replay_transactions() { #[test] fn test_cache_mechanism() { - let cache_file = "balance_tracker_cache.json"; - let orders = vec![ + let cache_file = "balance_tracker_cache.json".to_string(); + let mut orders = vec![ ExchangeOrder { user: 1, order_type: "BUY".to_string(), @@ -293,12 +366,12 @@ fn test_cache_mechanism() { }, ]; - let tracker = get_or_recalculate_balances(orders.clone(), cache_file, true); + let tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), true); assert!(tracker.accounts.contains_key(&1)); assert!(tracker.accounts.contains_key(&2)); // Test loading from cache - let cached_tracker = get_or_recalculate_balances(orders, cache_file, false); + let cached_tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), false); assert!(cached_tracker.accounts.contains_key(&1)); assert!(cached_tracker.accounts.contains_key(&2)); @@ -306,6 +379,16 @@ fn test_cache_mechanism() { let _ = fs::remove_file(cache_file); } +#[test] + +fn test_cypher_query() { + let tracker = BalanceTracker::new(); // Assume tracker is populated + // let params = tracker.generate_cypher_params(); + let query = tracker.generate_cypher_query(); + // dbg!(¶ms); + dbg!(&query); +} + // I'm coding some data analysis in rust. // I have a vector structs that looks like this: diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 7aedde2..b6eac38 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,7 +5,9 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder, + analytics::{enrich_account_funding, enrich_rms}, + extract_exchange_orders, queue, + schema_exchange_orders::ExchangeOrder, }; pub async fn swap_batch( @@ -90,5 +92,8 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res enrich_rms::process_sell_order_shill(&mut orders); enrich_rms::process_buy_order_shill(&mut orders); + let balances = enrich_account_funding::replay_transactions(&mut orders); + enrich_account_funding::submit_ledger(&balances, pool).await?; + swap_batch(&orders, pool, batch_size).await }