Skip to content

Commit

Permalink
cypher query to submit exchange account ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 4, 2024
1 parent 7c2345e commit c9c131b
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 15 deletions.
111 changes: 97 additions & 14 deletions src/analytics/enrich_account_funding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use neo4rs::{Graph, Query};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
Expand All @@ -17,6 +19,33 @@ pub struct AccountData {
pub daily_user_flows: HashMap<DateTime<Utc>, f64>, // Amount when the account was a `user`
pub daily_accepter_flows: HashMap<DateTime<Utc>, f64>, // Amount when the account was an `accepter`
}

impl AccountData {
pub fn to_cypher_map(&self, id: u32) -> String {
let mut list_literal: String = "".to_owned();
self.daily_balances.iter().for_each(|(date, _) | {
let obj = format!(
r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#,
id,
date.to_rfc3339(),
self.daily_balances.get(date).unwrap_or(&0.0),
self.daily_funding.get(date).unwrap_or(&0.0),
self.daily_inflows.get(date).unwrap_or(&0.0),
self.daily_outflows.get(date).unwrap_or(&0.0),
self.daily_user_flows.get(date).unwrap_or(&0.0),
self.daily_accepter_flows.get(date).unwrap_or(&0.0)
);

list_literal.push_str(&obj);
list_literal.push(',');

});

list_literal.pop(); // need to drop last comma ","
format!("[{}]", list_literal)
}
}

#[derive(Default, Debug, Deserialize, Serialize)]
pub struct BalanceTracker {
pub accounts: HashMap<u32, AccountData>, // Tracks data for each user
Expand Down Expand Up @@ -92,36 +121,80 @@ impl BalanceTracker {
}
None
}
/// Generate a Cypher query string to insert data into Neo4j
pub fn generate_cypher_query(&self) -> String {
let mut query = String::new();

// r#"{{ swap_id: {}, date: "{}", balance: {}, funding: {}, inflows: {}, outflows: {}, user_flows: {}, accepter_flows: {} }}"#,
query.push_str(
r#"
UNWIND $accounts AS account
MERGE (sa:SwapAccount {swap_id: account.swap_id})
MERGE (ul:UserLedger {date: account.date})
SET ul.balance = account.balance,
ul.funding = account.funding,
ul.inflows = account.inflows,
ul.outflows = account.outflows,
ul.user_flows = account.user_flows,
ul.accepter_flows = account.accepter_flows
MERGE (sa)-[:Daily {date: account.date}]->(ul)
"#,
);

query
}
}

/// Manages cache logic and invokes replay_transactions only if necessary
pub fn get_or_recalculate_balances(
orders: Vec<ExchangeOrder>,
cache_file: &str,
orders: &mut [ExchangeOrder],
cache_file: Option<String>,
force_recalculate: bool,
) -> BalanceTracker {
if !force_recalculate {
if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file) {
if !force_recalculate && cache_file.is_some() {
if let Some(cached_tracker) = BalanceTracker::load_from_cache(cache_file.as_ref().unwrap())
{
return cached_tracker;
}
}

let tracker = replay_transactions(orders);
tracker.save_to_cache(cache_file);
if let Some(p) = cache_file {
tracker.save_to_cache(&p);
}
tracker
}

/// Replay all transactions sequentially and return a balance tracker
pub fn replay_transactions(orders: Vec<ExchangeOrder>) -> BalanceTracker {
pub fn replay_transactions(orders: &mut [ExchangeOrder]) -> BalanceTracker {
let mut tracker = BalanceTracker::new();
let mut sorted_orders = orders;
let sorted_orders = orders;
sorted_orders.sort_by_key(|order| order.created_at);
for order in sorted_orders {
tracker.process_transaction(&order);
tracker.process_transaction(order);
}
tracker
}

/// submit to db
pub async fn submit_ledger(balances: &BalanceTracker, pool: &Graph) -> Result<()> {
let query_literal = balances.generate_cypher_query();
println!("Cypher Query:\n{}", &query_literal);

for (id, acc) in balances.accounts.iter() {
let data = acc.to_cypher_map(*id);
dbg!("Cypher Parameters:\n{:?}", &data);

let query = Query::new(query_literal.clone()).param("accounts", data);
let mut result = pool.execute(query).await?;

while let Some(r) = result.next().await? {
dbg!(&r);
}
}
Ok(())
}

/// Helper function to parse "YYYY-MM-DD" into `DateTime<Utc>`
fn parse_date(date_str: &str) -> DateTime<Utc> {
let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset
Expand Down Expand Up @@ -205,7 +278,7 @@ pub fn display_account_statistics(
#[test]
fn test_replay_transactions() {
// Create orders with meaningful data and specific dates
let orders = vec![
let mut orders = vec![
ExchangeOrder {
user: 1,
order_type: "BUY".to_string(),
Expand Down Expand Up @@ -250,7 +323,7 @@ fn test_replay_transactions() {
},
];

let tracker = replay_transactions(orders);
let tracker = replay_transactions(&mut orders);

// Analyze results for March 2024
for (user_id, data) in &tracker.accounts {
Expand All @@ -261,8 +334,8 @@ fn test_replay_transactions() {

#[test]
fn test_cache_mechanism() {
let cache_file = "balance_tracker_cache.json";
let orders = vec![
let cache_file = "balance_tracker_cache.json".to_string();
let mut orders = vec![
ExchangeOrder {
user: 1,
order_type: "BUY".to_string(),
Expand Down Expand Up @@ -293,19 +366,29 @@ fn test_cache_mechanism() {
},
];

let tracker = get_or_recalculate_balances(orders.clone(), cache_file, true);
let tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), true);
assert!(tracker.accounts.contains_key(&1));
assert!(tracker.accounts.contains_key(&2));

// Test loading from cache
let cached_tracker = get_or_recalculate_balances(orders, cache_file, false);
let cached_tracker = get_or_recalculate_balances(&mut orders, Some(cache_file.clone()), false);
assert!(cached_tracker.accounts.contains_key(&1));
assert!(cached_tracker.accounts.contains_key(&2));

// Cleanup
let _ = fs::remove_file(cache_file);
}

#[test]

fn test_cypher_query() {
let tracker = BalanceTracker::new(); // Assume tracker is populated
// let params = tracker.generate_cypher_params();
let query = tracker.generate_cypher_query();
// dbg!(&params);
dbg!(&query);
}

// I'm coding some data analysis in rust.

// I have a vector structs that looks like this:
Expand Down
7 changes: 6 additions & 1 deletion src/load_exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use log::{error, info, warn};
use neo4rs::{query, Graph};

use crate::{
analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder,
analytics::{enrich_account_funding, enrich_rms},
extract_exchange_orders, queue,
schema_exchange_orders::ExchangeOrder,
};

pub async fn swap_batch(
Expand Down Expand Up @@ -90,5 +92,8 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res
enrich_rms::process_sell_order_shill(&mut orders);
enrich_rms::process_buy_order_shill(&mut orders);

let balances = enrich_account_funding::replay_transactions(&mut orders);
enrich_account_funding::submit_ledger(&balances, pool).await?;

swap_batch(&orders, pool, batch_size).await
}

0 comments on commit c9c131b

Please sign in to comment.