From c481e53b163bcafb5bf39364a0e1c9026fccf684 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sun, 8 Dec 2024 14:28:12 -0500 Subject: [PATCH] add offline analytics queries --- src/analytics/enrich_account_funding.rs | 14 +--- src/analytics/mod.rs | 1 + src/analytics/offline_matching.rs | 101 ++++++++++++++++++++++++ src/date_util.rs | 8 ++ src/lib.rs | 1 + tests/test_analytics.rs | 26 ++++-- 6 files changed, 136 insertions(+), 15 deletions(-) create mode 100644 src/analytics/offline_matching.rs create mode 100644 src/date_util.rs diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 526d2d0..7a9d095 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -2,8 +2,6 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use log::{error, trace}; use neo4rs::{Graph, Query}; -// use log::trace; -// use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeMap, @@ -13,6 +11,9 @@ use std::{ use crate::schema_exchange_orders::ExchangeOrder; +#[cfg(test)] +use crate::date_util::parse_date; + #[derive(Default, Debug, Clone, Deserialize, Serialize)] pub struct AccountDataAlt { pub current_balance: f64, @@ -248,14 +249,6 @@ pub fn generate_cypher_query(map: String) -> String { ) } -/// Helper function to parse "YYYY-MM-DD" into `DateTime` -pub fn parse_date(date_str: &str) -> DateTime { - let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset - DateTime::parse_from_rfc3339(&datetime_str) - .expect("Invalid date format; expected YYYY-MM-DD") - .with_timezone(&Utc) -} - #[test] fn test_replay_transactions() { let mut orders = vec![ @@ -364,6 +357,7 @@ fn test_replay_transactions() { #[test] fn test_example_user() -> Result<()> { use crate::extract_exchange_orders; + use std::path::PathBuf; let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 4c9441a..f4ec982 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -1,3 +1,4 @@ pub mod enrich_account_funding; pub mod enrich_rms; pub mod exchange_stats; +pub mod offline_matching; diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs new file mode 100644 index 0000000..0f912ff --- /dev/null +++ b/src/analytics/offline_matching.rs @@ -0,0 +1,101 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use diem_types::account_address::AccountAddress; +use neo4rs::Graph; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Deposit { + account: AccountAddress, + deposited: f64, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct MinFunding { + user_id: u32, + funded: f64, +} + +pub async fn get_date_range_deposits( + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, +) -> Result> { + let mut top_deposits = vec![]; + + let q = format!( + r#" + WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit + MATCH + (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) + WHERE + tx.`block_datetime` > datetime("{}") + AND tx.`block_datetime` < datetime("{}") + WITH + u, + SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount + ORDER BY totalTxAmount DESCENDING + RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited + LIMIT {} + "#, + start.to_rfc3339(), + end.to_rfc3339(), + top_n, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let account_str = r.get::("account").unwrap_or("unknown".to_string()); + let deposited = r.get::("deposited").unwrap_or(0.0); + let d = Deposit { + account: account_str.parse().unwrap_or(AccountAddress::ZERO), + deposited, + }; + top_deposits.push(d); + // dbg!(&d); + } + Ok(top_deposits) +} + +pub async fn get_min_funding( + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, +) -> Result> { + let mut min_funding = vec![]; + + let q = format!( + r#" + MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger) + WHERE d.date > datetime("{}") + AND d.date < datetime("{}") + WITH e.swap_id AS user_id, toFloat(max(ul.`total_funded`)) as funded + RETURN user_id, funded + ORDER BY funded DESC + LIMIT {} + "#, + start.to_rfc3339(), + end.to_rfc3339(), + top_n, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let user_id = r.get::("user_id").unwrap_or(0); + let funded = r.get::("funded").unwrap_or(0.0); + let d = MinFunding { user_id, funded }; + min_funding.push(d); + // dbg!(&d); + } + Ok(min_funding) +} diff --git a/src/date_util.rs b/src/date_util.rs new file mode 100644 index 0000000..7ff7333 --- /dev/null +++ b/src/date_util.rs @@ -0,0 +1,8 @@ +use chrono::{DateTime, Utc}; +/// Helper function to parse "YYYY-MM-DD" into `DateTime` +pub fn parse_date(date_str: &str) -> DateTime { + let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset + DateTime::parse_from_rfc3339(&datetime_str) + .expect("Invalid date format; expected YYYY-MM-DD") + .with_timezone(&Utc) +} diff --git a/src/lib.rs b/src/lib.rs index cc5b904..d6d0fdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod analytics; pub mod batch_tx_type; pub mod cypher_templates; +pub mod date_util; pub mod decode_entry_function; pub mod enrich_exchange_onboarding; pub mod enrich_whitepages; diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 48db7e8..bf6b6ce 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -3,12 +3,10 @@ use anyhow::Result; use std::path::PathBuf; use libra_forensic_db::{ - analytics::{ - self, - enrich_account_funding::{parse_date, BalanceTracker}, - }, + analytics::{self, enrich_account_funding::BalanceTracker, offline_matching}, + date_util::parse_date, extract_exchange_orders, load_exchange_orders, - neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, + neo4j_init::{self, get_neo4j_localhost_pool, maybe_create_indexes}, }; use support::neo4j_testcontainer::start_neo4j_container; @@ -230,5 +228,23 @@ async fn test_submit_exchange_ledger_all() -> Result<()> { assert!(i == user.0.len()); + Ok(()) +} + +#[tokio::test] +async fn test_offline_analytics() -> Result<()> { + libra_forensic_db::log_setup(); + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; + let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + + let start_time = parse_date("2024-01-01"); + let end_time = parse_date("2024-01-10"); + + let r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; + // dbg!(&r); + + let r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?; + + Ok(()) }