diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 0518cab..e5d927a 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -1,6 +1,6 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, fs::File, io::Write, path::{Path, PathBuf}}; -use anyhow::Result; +use anyhow::{bail, Result}; use chrono::{DateTime, Duration, Utc}; use diem_types::account_address::AccountAddress; use neo4rs::Graph; @@ -14,8 +14,8 @@ pub struct Deposit { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct MinFunding { - user_id: u32, - funded: f64, + pub user_id: u32, + pub funded: f64, } pub async fn get_date_range_deposits( @@ -64,7 +64,7 @@ pub async fn get_date_range_deposits( Ok(top_deposits) } -pub async fn get_min_funding( +pub async fn get_exchange_users( pool: &Graph, top_n: u64, start: DateTime, @@ -102,50 +102,251 @@ pub async fn get_min_funding( Ok(min_funding) } +pub async fn get_one_exchange_user( + pool: &Graph, + id: u32, + start: DateTime, + end: DateTime, +) -> Result> { + let mut min_funding = vec![]; + + let q = format!( + r#" + MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger) + WHERE d.date > datetime("{}") + AND d.date < datetime("{}") + AND e.swap_id = {} + WITH DISTINCT(e.swap_id) AS user_id, toFloat(max(ul.`total_funded`)) as funded + RETURN user_id, funded + "#, + start.to_rfc3339(), + end.to_rfc3339(), + id, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let user_id = r.get::("user_id").unwrap_or(0); + let funded = r.get::("funded").unwrap_or(0.0); + let d = MinFunding { user_id, funded }; + min_funding.push(d); + // dbg!(&d); + } + Ok(min_funding) +} + +pub struct Matching { + pub definite: BTreeMap, + pub pending: BTreeMap, +} + #[derive(Clone, Default, Debug)] pub struct Candidates { - maybe: Vec, - impossible: Vec, + pub maybe: Vec, + pub impossible: Vec, } #[derive(Clone, Default, Debug)] -pub struct Matching(pub BTreeMap); +pub struct Possible { + pub user: Vec, + pub address: Vec, +} + +impl Default for Matching { + fn default() -> Self { + Self::new() + } +} impl Matching { pub fn new() -> Self { - Matching(BTreeMap::new()) + Self { + definite: BTreeMap::new(), + pending: BTreeMap::new(), + } } - pub fn match_deposit_to_funded(&mut self, deposits: Vec, funded: Vec) { - for f in funded.iter() { - deposits.iter().for_each(|d| { - let candidates = self.0.entry(f.user_id).or_default(); - // only addresses with minimum funded could be a Maybe - if d.deposited >= f.funded { - candidates.maybe.push(d.account); - } else { - candidates.impossible.push(d.account); + pub fn get_next_search_ids(&self, funded: &[MinFunding]) -> Result<(u32, u32)> { + // assumes this is sorted by date + + // find the next two which are not identified, to disambiguate. + let ids: Vec = funded + .iter() + .filter(|el| !self.definite.contains_key(&el.user_id)) + .take(2) + .map(|el| el.user_id) + .collect(); + + dbg!(&ids); + // let user_ledger = funded.iter().find(|el| { + // // check if we have already identified it + // self.definite.0.get(el.user_id).none() + // }); + Ok((*ids.first().unwrap(), *ids.get(1).unwrap())) + } + + pub async fn wide_search( + &mut self, + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, + save_file: Option, + ) -> Result<()> { + // expand the search + // increase the search of top users by funding by expanding the window + // this may retry a number of users, but with more users discovered + // the search space gets smaller + for d in days_in_range(start, end) { + let next_list = get_exchange_users(&pool, top_n, start, d).await?; + + for u in next_list { + let _r = self.search(&pool, u.user_id, start, end).await; + + // after each loop update the file + if let Some(p) = &save_file { + self.write_definite_to_file(&p)?; } - }); + + } } + + Ok(()) } -} + pub async fn search( + &mut self, + pool: &Graph, + user_a: u32, + start: DateTime, + end: DateTime, + ) -> Result { + // exit early + if let Some(a) = self.definite.get(&user_a) { + return Ok(*a); + } + + // loop each day, comparing deposits made to that point + // and funding required for user accounts only to that date + for d in days_in_range(start, end) { + let deposits = get_date_range_deposits(pool, 100, start, d).await?; + + if let Some(funded_a) = get_one_exchange_user(pool, user_a, start, d).await?.first() { + self.eliminate_candidates(funded_a, &deposits); + + if let Some(a) = self.definite.get(&user_a) { + return Ok(*a); + } + } + } + if let Some(a) = self.definite.get(&user_a) { + return Ok(*a); + } + + bail!("could not find a candidate") + } + + pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { + // let mut filtered_depositors = deposits.clone(); + let pending = self + .pending + .entry(user.user_id) + .or_insert(Candidates::default()); + + let mut eval: Vec = vec![]; + deposits.iter().for_each(|el| { + if el.deposited >= user.funded && + // must not already have been tagged impossible + !pending.impossible.contains(&el.account) && + // is also not already discovered + !self.definite.values().any(|found| found == &el.account) + { + eval.push(el.account) + } else { + pending.impossible.push(el.account) + } + }); + + // only increment the first time. + if pending.maybe.is_empty() { + pending.maybe.append(&mut eval); + } else { + // we only keep addresses we see repeatedly (inner join) + eval.retain(|x| pending.maybe.contains(x)); + if eval.len() > 0 { + pending.maybe = eval; + } + } + + println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); + + if pending.maybe.len() == 1 { + // we found a definite match, update it so the next loop doesn't include it + self.definite + .insert(user.user_id, *pending.maybe.first().unwrap()); + } -pub async fn rip_range(pool: &Graph, start: DateTime, end: DateTime) -> Result { - let mut matches = Matching::new(); - // loop each day. - for d in days_in_range(start, end) { - let deposits = get_date_range_deposits(pool, 100, start, d).await?; - let funded = get_min_funding(pool, 20, start, d).await?; + // candidates + } + + pub fn write_definite_to_file(&self, path: &Path) -> Result<()> { + // Serialize the BTreeMap to a JSON string + let json_string = serde_json::to_string_pretty(&self.definite).expect("Failed to serialize"); + + // Save the JSON string to a file + let mut file = File::create(path)?; + file.write_all(json_string.as_bytes())?; - matches.match_deposit_to_funded(deposits, funded); + println!("Data saved to path: {}", path.display()); + Ok(()) } +} - Ok(matches) +pub fn sort_funded(funded: &mut [MinFunding]) { + // sort descending + funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); } -fn days_in_range(start: DateTime, end: DateTime) -> Vec> { +// pub fn maybe_match_deposit_to_funded( +// deposits: Vec, +// funded: Vec, +// ) -> Option<(u32, AccountAddress)> { +// // // sort descending +// // funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); + +// // // find the next two which are not identified, to disambiguate. + +// for f in funded { +// // dbg!(&f); +// let mut candidate_depositors = deposits.clone(); +// candidate_depositors.retain(|el| el.deposited >= f.funded); +// // dbg!(&candidate_depositors); + +// if candidate_depositors.len() == 1 { +// return Some((f.user_id, candidate_depositors.pop().unwrap().account)); +// } +// // deposits.iter().for_each(|d| { +// // // let mut candidates = self.pending.0.entry(f.user_id).or_default(); + +// // // only addresses with minimum funded could be a Maybe +// // if d.deposited >= f.funded { +// // // if we haven't previously marked this as impossible, add it as a maybe +// // if !candidates.impossible.contains(&d.account) { +// // candidates.maybe.push(d.account); +// // } +// // } else { +// // candidates.impossible.push(d.account); +// // } +// // }); +// } +// None +// } + +pub fn days_in_range(start: DateTime, end: DateTime) -> Vec> { let mut days = Vec::new(); let mut current = start; diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 4b6e2d5..ee97974 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -243,7 +243,7 @@ async fn test_offline_analytics() -> Result<()> { let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; // dbg!(&r); - let _r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?; + let _r = offline_matching::get_exchange_users(&pool, 20, start_time, end_time).await?; Ok(()) } @@ -251,15 +251,44 @@ async fn test_offline_analytics() -> Result<()> { #[tokio::test] async fn test_offline_analytics_matching() -> Result<()> { libra_forensic_db::log_setup(); + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; - let start_time = parse_date("2024-01-01"); - let end_time = parse_date("2024-01-10"); - let r = offline_matching::rip_range(&pool, start_time, end_time).await?; + let mut m = offline_matching::Matching::new(); + + let p = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let p = p.join("definite.json"); + + let _ = m.wide_search(&pool, 10, parse_date("2024-01-07"), parse_date("2024-03-13"), Some(p)).await; + // let initial_list = + // offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09")) + // .await?; + + // for u in initial_list { + // let r = m + // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) + // .await; + // // dbg!(&r); + // } + + dbg!(&m.definite); + + // // expand the search + // for d in offline_matching::days_in_range(start_time, parse_date("2024-03-13")) { + // let next_list = offline_matching::get_exchange_users(&pool, 10, start_time, d) + // .await?; + + // for u in next_list { + // let r = m + // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) + // .await; + // dbg!(&r); + // } - dbg!(&r.0.len()); + // dbg!(&m.definite); + // } Ok(()) }