From b559be8f4eabd3e2eaf6491837accf5e5d17f6c1 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:53:23 -0500 Subject: [PATCH] wip --- src/analytics/offline_matching.rs | 225 ++++++++++++++++++++++++++---- tests/test_analytics.rs | 60 +++++++- 2 files changed, 258 insertions(+), 27 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index df4c85c..99c2cb6 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -14,8 +14,8 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Deposit { - account: AccountAddress, - deposited: f64, + pub account: AccountAddress, + pub deposited: f64, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -24,9 +24,9 @@ pub struct MinFunding { pub funded: f64, } -pub async fn get_date_range_deposits( +pub async fn get_date_range_deposits_alt( pool: &Graph, - top_n: u64, + _top_n: u64, start: DateTime, end: DateTime, ) -> Result> { @@ -34,22 +34,46 @@ pub async fn get_date_range_deposits( let q = format!( r#" - WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit - MATCH - (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) - WHERE - tx.`block_datetime` > datetime("{}") - AND tx.`block_datetime` < datetime("{}") - WITH - u, - SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount - ORDER BY totalTxAmount DESCENDING - RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited - LIMIT {} - "#, + WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit + + // Step 1: Get the list of all depositors + MATCH (acc:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}}) + WITH DISTINCT(acc) AS all, olswap_deposit + + // Step 2: Match depositors and amounts within the date range + + + + MATCH (all)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}}) + WHERE + tx2.block_datetime > datetime("{}") + AND tx2.block_datetime < datetime("{}") + + + WITH + DISTINCT (all.address) AS account, + COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount + RETURN account, toFloat(deposit_amount) as deposited + ORDER BY deposit_amount DESC + + "#, + // r#" + // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit + // MATCH + // (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) + // WHERE + // tx.`block_datetime` > datetime("{}") + // AND tx.`block_datetime` < datetime("{}") + // WITH + // u, + // SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount + // ORDER BY totalTxAmount DESCENDING + // RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited + + // "#, start.to_rfc3339(), end.to_rfc3339(), - top_n, + // top_n, ); let cypher_query = neo4rs::query(&q); @@ -70,6 +94,73 @@ pub async fn get_date_range_deposits( Ok(top_deposits) } +// pub async fn get_date_range_deposits( +// pool: &Graph, +// top_n: u64, +// start: DateTime, +// end: DateTime, +// ) -> Result> { +// let mut top_deposits = vec![]; + +// let q = format!( +// // r#" +// // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit + +// // // Step 1: Get the list of all depositors +// // MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}}) +// // WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx + +// // // Step 2: Match depositors and amounts within the date range + +// // UNWIND all_depositors AS depositor + +// // OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}}) +// // WHERE tx2.block_datetime >= datetime('{}') AND tx2.block_datetime <= datetime('{}') + +// // WITH +// // depositor.address AS account, +// // COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount +// // RETURN account, toFloat(deposit_amount) as deposited +// // ORDER BY deposited DESC + +// // "#, +// r#" +// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit +// MATCH +// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) +// WHERE +// tx.`block_datetime` > datetime("{}") +// AND tx.`block_datetime` < datetime("{}") +// WITH +// DISTINCT(u), +// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount +// ORDER BY totalTxAmount DESCENDING +// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited + +// "#, +// start.to_rfc3339(), +// end.to_rfc3339(), +// // top_n, +// ); +// let cypher_query = neo4rs::query(&q); + +// // Execute the query +// let mut result = pool.execute(cypher_query).await?; + +// // Fetch the first row only +// while let Some(r) = result.next().await? { +// let account_str = r.get::("account").unwrap_or("unknown".to_string()); +// let deposited = r.get::("deposited").unwrap_or(0.0); +// let d = Deposit { +// account: account_str.parse().unwrap_or(AccountAddress::ZERO), +// deposited, +// }; +// top_deposits.push(d); +// // dbg!(&d); +// } +// Ok(top_deposits) +// } + pub async fn get_exchange_users( pool: &Graph, top_n: u64, @@ -108,6 +199,40 @@ pub async fn get_exchange_users( Ok(min_funding) } +pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result> { + let mut min_funding = vec![]; + + let q = format!( + r#" + MATCH (e:SwapAccount)-[]-(u:UserLedger) + WHERE u.`total_inflows` = 0 + WITH distinct(e.swap_id) AS user_id, max(u.`total_funded`) AS funded + RETURN user_id, funded + ORDER BY funded DESC + "#, + // start.to_rfc3339(), + // end.to_rfc3339(), + // top_n, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let user_id = r.get::("user_id").unwrap_or(0); + let funded = r.get::("funded").unwrap_or(0); + let d = MinFunding { + user_id, + funded: funded as f64, + }; + min_funding.push(d); + // dbg!(&d); + } + Ok(min_funding) +} + pub async fn get_one_exchange_user( pool: &Graph, id: u32, @@ -188,7 +313,7 @@ impl Matching { .map(|el| el.user_id) .collect(); - dbg!(&ids); + // dbg!(&ids); // let user_ledger = funded.iter().find(|el| { // // check if we have already identified it // self.definite.0.get(el.user_id).none() @@ -207,12 +332,12 @@ impl Matching { end: DateTime, save_dir: Option, ) -> Result<()> { - let mut top_n = 3; - while top_n < 20 { + let mut top_n = 5; + while top_n < 25 { let _ = self .breadth_search_by_dates(pool, top_n, start, end, &save_dir) .await; // don't error - top_n += 3; + top_n += 5; } Ok(()) } @@ -236,8 +361,23 @@ impl Matching { println!("day: {}", d); let next_list = get_exchange_users(pool, top_n, start, d).await?; - // TODO: pick top of deposits - let deposits = get_date_range_deposits(pool, 100, start, d).await?; + // // TODO: pick top of deposits + // let deposits = get_date_range_deposits(pool, 1000, start, d).await.unwrap_or_default(); + + // if next_list.len() > 5 { + // // dbg!("next_list"); + // dbg!(&next_list[..5]); + // } + // dbg!(&next_list); + + let deposits = get_date_range_deposits_alt(pool, 1000, start, d) + .await + .unwrap_or_default(); + + // if deposits.len() > 5 { + // dbg!("alt"); + // dbg!(&deposits[..5]); + // } for u in next_list { let _r = self.search(&u, &deposits).await; @@ -252,6 +392,7 @@ impl Matching { Ok(()) } + pub async fn search( &mut self, user: &MinFunding, @@ -271,12 +412,48 @@ impl Matching { bail!("could not find a candidate") } + pub fn match_exact_sellers( + &mut self, + user_list: &[MinFunding], + deposits: &[Deposit], + tolerance: f64, + ) { + user_list.iter().for_each(|user| { + let pending = self.pending.entry(user.user_id).or_default(); + + let candidates: Vec = deposits + .iter() + .filter_map(|el| { + if el.deposited > user.funded && // must always be slightly more + el.deposited < user.funded * tolerance && + !pending.impossible.contains(&el.account) && + // is also not already discovered + !self.definite.values().any(|found| found == &el.account) + { + Some(el.account) + } else { + None + } + }) + .collect(); + + pending.maybe = candidates; + + if pending.maybe.len() == 1 { + // we found a definite match, update it so the next loop doesn't include it + self.definite + .insert(user.user_id, *pending.maybe.first().unwrap()); + } + }) + } + pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { // let mut filtered_depositors = deposits.clone(); let pending = self.pending.entry(user.user_id).or_default(); let mut eval: Vec = vec![]; deposits.iter().for_each(|el| { + dbg!(&el); if el.deposited >= user.funded && // must not already have been tagged impossible !pending.impossible.contains(&el.account) && diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 67b6fb4..6717a1a 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -242,7 +242,7 @@ async fn test_offline_analytics() -> Result<()> { let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; let start_time = parse_date("2024-01-01"); - let end_time = parse_date("2024-01-10"); + let end_time = parse_date("2024-07-10"); let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; // dbg!(&r); @@ -267,13 +267,67 @@ async fn test_offline_analytics_matching() -> Result<()> { .depth_search_by_top_n_accounts( &pool, parse_date("2024-01-07"), - parse_date("2024-03-13"), + parse_date("2024-07-22"), Some(dir), ) .await; - dbg!(&m.definite); Ok(()) } + +#[tokio::test] +async fn test_easy_sellers() -> Result<()> { + libra_forensic_db::log_setup(); + + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; + let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + + let mut user_list = offline_matching::get_exchange_users_only_outflows(&pool).await?; + user_list + .sort_by(|a, b: &offline_matching::MinFunding| b.funded.partial_cmp(&a.funded).unwrap()); + // dbg!(&r[..10]); + + let deposits = offline_matching::get_date_range_deposits_alt( + &pool, + 1000, + parse_date("2024-01-07"), + parse_date("2024-07-22"), + ) + .await + .unwrap_or_default(); + + // dbg!(&deposits[..10]); + + let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); + + m.match_exact_sellers(&user_list, &deposits, 1.01); + + let _ = m + .depth_search_by_top_n_accounts( + &pool, + parse_date("2024-01-07"), + parse_date("2024-07-22"), + Some(dir), + ) + .await; + dbg!(&m.definite.len()); + + // let _ = m + // .depth_search_by_top_n_accounts( + // &pool, + // parse_date("2024-01-07"), + // parse_date("2024-07-22"), + // Some(dir), + // ) + // .await; + + // dbg!(&m.definite); + + Ok(()) +} + +//