diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index bbeaf09..b600140 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::{bail, Result}; use chrono::{DateTime, Duration, Utc}; use diem_types::account_address::AccountAddress; +use log::{info, trace}; use neo4rs::Graph; use serde::{Deserialize, Serialize}; @@ -195,29 +196,56 @@ impl Matching { Ok((*ids.first().unwrap(), *ids.get(1).unwrap())) } - pub async fn wide_search( + /// progressively scan for top_n funded exchange accounts + /// e.g. start with 5, and each time increase by 1, until reaching 50 for. + /// at each level deep, a breadth search is started + /// Thus every day in timeseries will do a shallow match of the top 5 accounts, and eliminate candidates. Deeper searches benefit from the information from the previous depth searches (exclude impossible matches) + pub async fn depth_search_by_top_n_accounts( &mut self, pool: &Graph, - top_n: u64, start: DateTime, end: DateTime, save_dir: Option, + ) -> Result<()> { + let mut top_n = 10; + while top_n < 50 { + let _ = self + .breadth_search_by_dates(pool, top_n, start, end, &save_dir) + .await; // don't error + top_n += 5; + } + Ok(()) + } + + /// breadth search, for every day in timeseries, check all the top funded + /// accounts against the actual deposited on-chain + /// don't peek into the future, only replay information at each day + pub async fn breadth_search_by_dates( + &mut self, + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, + save_dir: &Option, ) -> Result<()> { // expand the search // increase the search of top users by funding by expanding the window // this may retry a number of users, but with more users discovered // the search space gets smaller for d in days_in_range(start, end) { - let next_list = get_exchange_users(&pool, top_n, start, d).await?; - dbg!(&next_list.len()); + info!("day: {}", d); + let next_list = get_exchange_users(pool, top_n, start, d).await?; + + // TODO: pick top of deposits + let deposits = get_date_range_deposits(pool, 100, start, d).await?; for u in next_list { - let _r = self.search(&pool, u.user_id, start, end).await; + let _r = self.search(&u, &deposits).await; // after each loop update the file if let Some(p) = &save_dir { let _ = self.write_definite_to_file(&p.join("definite.json")); - let _ = self.write_cache_to_file(&p); + let _ = self.write_cache_to_file(p); } } } @@ -226,30 +254,17 @@ impl Matching { } pub async fn search( &mut self, - pool: &Graph, - user_a: u32, - start: DateTime, - end: DateTime, + user: &MinFunding, + deposits: &[Deposit], ) -> Result { // exit early - if let Some(a) = self.definite.get(&user_a) { + if let Some(a) = self.definite.get(&user.user_id) { return Ok(*a); } - // loop each day, comparing deposits made to that point - // and funding required for user accounts only to that date - for d in days_in_range(start, end) { - let deposits = get_date_range_deposits(pool, 100, start, d).await?; - - if let Some(funded_a) = get_one_exchange_user(pool, user_a, start, d).await?.first() { - self.eliminate_candidates(funded_a, &deposits); + self.eliminate_candidates(user, deposits); - if let Some(a) = self.definite.get(&user_a) { - return Ok(*a); - } - } - } - if let Some(a) = self.definite.get(&user_a) { + if let Some(a) = self.definite.get(&user.user_id) { return Ok(*a); } @@ -258,10 +273,7 @@ impl Matching { pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { // let mut filtered_depositors = deposits.clone(); - let pending = self - .pending - .entry(user.user_id) - .or_insert(Candidates::default()); + let pending = self.pending.entry(user.user_id).or_default(); let mut eval: Vec = vec![]; deposits.iter().for_each(|el| { @@ -271,13 +283,11 @@ impl Matching { // is also not already discovered !self.definite.values().any(|found| found == &el.account) { - if !eval.contains(&el.account) { - eval.push(el.account) - } - } else { - if !pending.impossible.contains(&value) { - pending.impossible.push(el.account) + if !eval.contains(&el.account) { + eval.push(el.account) } + } else if !pending.impossible.contains(&el.account) { + pending.impossible.push(el.account) } }); @@ -287,12 +297,12 @@ impl Matching { } else { // we only keep addresses we see repeatedly (inner join) eval.retain(|x| pending.maybe.contains(x)); - if eval.len() > 0 { + if !eval.is_empty() { pending.maybe = eval; } } - println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); + info!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); if pending.maybe.len() == 1 { // we found a definite match, update it so the next loop doesn't include it @@ -311,7 +321,7 @@ impl Matching { let mut file = File::create(&path)?; file.write_all(json_string.as_bytes())?; - println!("Cache saved: {}", path.display()); + trace!("Cache saved: {}", path.display()); Ok(()) } pub fn read_cache_from_file(dir: &Path) -> Result { @@ -332,7 +342,7 @@ impl Matching { let mut file = File::create(path)?; file.write_all(json_string.as_bytes())?; - println!("Data saved to path: {}", path.display()); + trace!("Data saved to path: {}", path.display()); Ok(()) } } diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index e1485b9..9243882 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -261,44 +261,19 @@ async fn test_offline_analytics_matching() -> Result<()> { let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let mut m = Matching::read_cache_from_file(&dir).unwrap_or(Matching::new()); + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); let _ = m - .wide_search( + .depth_search_by_top_n_accounts( &pool, - 25, parse_date("2024-01-07"), parse_date("2024-07-22"), Some(dir), ) .await; - // let initial_list = - // offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09")) - // .await?; - // for u in initial_list { - // let r = m - // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) - // .await; - // // dbg!(&r); - // } dbg!(&m.definite); - // // expand the search - // for d in offline_matching::days_in_range(start_time, parse_date("2024-03-13")) { - // let next_list = offline_matching::get_exchange_users(&pool, 10, start_time, d) - // .await?; - - // for u in next_list { - // let r = m - // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) - // .await; - // dbg!(&r); - // } - - // dbg!(&m.definite); - // } - Ok(()) }