From c481e53b163bcafb5bf39364a0e1c9026fccf684 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sun, 8 Dec 2024 14:28:12 -0500 Subject: [PATCH 01/23] add offline analytics queries --- src/analytics/enrich_account_funding.rs | 14 +--- src/analytics/mod.rs | 1 + src/analytics/offline_matching.rs | 101 ++++++++++++++++++++++++ src/date_util.rs | 8 ++ src/lib.rs | 1 + tests/test_analytics.rs | 26 ++++-- 6 files changed, 136 insertions(+), 15 deletions(-) create mode 100644 src/analytics/offline_matching.rs create mode 100644 src/date_util.rs diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 526d2d0..7a9d095 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -2,8 +2,6 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use log::{error, trace}; use neo4rs::{Graph, Query}; -// use log::trace; -// use neo4rs::{Graph, Query}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeMap, @@ -13,6 +11,9 @@ use std::{ use crate::schema_exchange_orders::ExchangeOrder; +#[cfg(test)] +use crate::date_util::parse_date; + #[derive(Default, Debug, Clone, Deserialize, Serialize)] pub struct AccountDataAlt { pub current_balance: f64, @@ -248,14 +249,6 @@ pub fn generate_cypher_query(map: String) -> String { ) } -/// Helper function to parse "YYYY-MM-DD" into `DateTime` -pub fn parse_date(date_str: &str) -> DateTime { - let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset - DateTime::parse_from_rfc3339(&datetime_str) - .expect("Invalid date format; expected YYYY-MM-DD") - .with_timezone(&Utc) -} - #[test] fn test_replay_transactions() { let mut orders = vec![ @@ -364,6 +357,7 @@ fn test_replay_transactions() { #[test] fn test_example_user() -> Result<()> { use crate::extract_exchange_orders; + use std::path::PathBuf; let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 4c9441a..f4ec982 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -1,3 +1,4 @@ pub mod enrich_account_funding; pub mod enrich_rms; pub mod exchange_stats; +pub mod offline_matching; diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs new file mode 100644 index 0000000..0f912ff --- /dev/null +++ b/src/analytics/offline_matching.rs @@ -0,0 +1,101 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use diem_types::account_address::AccountAddress; +use neo4rs::Graph; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Deposit { + account: AccountAddress, + deposited: f64, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct MinFunding { + user_id: u32, + funded: f64, +} + +pub async fn get_date_range_deposits( + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, +) -> Result> { + let mut top_deposits = vec![]; + + let q = format!( + r#" + WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit + MATCH + (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) + WHERE + tx.`block_datetime` > datetime("{}") + AND tx.`block_datetime` < datetime("{}") + WITH + u, + SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount + ORDER BY totalTxAmount DESCENDING + RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited + LIMIT {} + "#, + start.to_rfc3339(), + end.to_rfc3339(), + top_n, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let account_str = r.get::("account").unwrap_or("unknown".to_string()); + let deposited = r.get::("deposited").unwrap_or(0.0); + let d = Deposit { + account: account_str.parse().unwrap_or(AccountAddress::ZERO), + deposited, + }; + top_deposits.push(d); + // dbg!(&d); + } + Ok(top_deposits) +} + +pub async fn get_min_funding( + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, +) -> Result> { + let mut min_funding = vec![]; + + let q = format!( + r#" + MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger) + WHERE d.date > datetime("{}") + AND d.date < datetime("{}") + WITH e.swap_id AS user_id, toFloat(max(ul.`total_funded`)) as funded + RETURN user_id, funded + ORDER BY funded DESC + LIMIT {} + "#, + start.to_rfc3339(), + end.to_rfc3339(), + top_n, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let user_id = r.get::("user_id").unwrap_or(0); + let funded = r.get::("funded").unwrap_or(0.0); + let d = MinFunding { user_id, funded }; + min_funding.push(d); + // dbg!(&d); + } + Ok(min_funding) +} diff --git a/src/date_util.rs b/src/date_util.rs new file mode 100644 index 0000000..7ff7333 --- /dev/null +++ b/src/date_util.rs @@ -0,0 +1,8 @@ +use chrono::{DateTime, Utc}; +/// Helper function to parse "YYYY-MM-DD" into `DateTime` +pub fn parse_date(date_str: &str) -> DateTime { + let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset + DateTime::parse_from_rfc3339(&datetime_str) + .expect("Invalid date format; expected YYYY-MM-DD") + .with_timezone(&Utc) +} diff --git a/src/lib.rs b/src/lib.rs index cc5b904..d6d0fdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod analytics; pub mod batch_tx_type; pub mod cypher_templates; +pub mod date_util; pub mod decode_entry_function; pub mod enrich_exchange_onboarding; pub mod enrich_whitepages; diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 48db7e8..bf6b6ce 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -3,12 +3,10 @@ use anyhow::Result; use std::path::PathBuf; use libra_forensic_db::{ - analytics::{ - self, - enrich_account_funding::{parse_date, BalanceTracker}, - }, + analytics::{self, enrich_account_funding::BalanceTracker, offline_matching}, + date_util::parse_date, extract_exchange_orders, load_exchange_orders, - neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, + neo4j_init::{self, get_neo4j_localhost_pool, maybe_create_indexes}, }; use support::neo4j_testcontainer::start_neo4j_container; @@ -230,5 +228,23 @@ async fn test_submit_exchange_ledger_all() -> Result<()> { assert!(i == user.0.len()); + Ok(()) +} + +#[tokio::test] +async fn test_offline_analytics() -> Result<()> { + libra_forensic_db::log_setup(); + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; + let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + + let start_time = parse_date("2024-01-01"); + let end_time = parse_date("2024-01-10"); + + let r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; + // dbg!(&r); + + let r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?; + + Ok(()) } From f8be7e1ff686da17a0e40d7918d2e18546b9e8ec Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sun, 8 Dec 2024 15:17:07 -0500 Subject: [PATCH 02/23] scaffold matching --- src/analytics/offline_matching.rs | 59 ++++++++++++++++++++++++++++++- tests/test_analytics.rs | 19 ++++++++-- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 0f912ff..0518cab 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -1,5 +1,7 @@ +use std::collections::BTreeMap; + use anyhow::Result; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use diem_types::account_address::AccountAddress; use neo4rs::Graph; use serde::{Deserialize, Serialize}; @@ -99,3 +101,58 @@ pub async fn get_min_funding( } Ok(min_funding) } + +#[derive(Clone, Default, Debug)] +pub struct Candidates { + maybe: Vec, + impossible: Vec, +} + +#[derive(Clone, Default, Debug)] +pub struct Matching(pub BTreeMap); + +impl Matching { + pub fn new() -> Self { + Matching(BTreeMap::new()) + } + + pub fn match_deposit_to_funded(&mut self, deposits: Vec, funded: Vec) { + for f in funded.iter() { + deposits.iter().for_each(|d| { + let candidates = self.0.entry(f.user_id).or_default(); + // only addresses with minimum funded could be a Maybe + if d.deposited >= f.funded { + candidates.maybe.push(d.account); + } else { + candidates.impossible.push(d.account); + } + }); + } + } +} + +pub async fn rip_range(pool: &Graph, start: DateTime, end: DateTime) -> Result { + let mut matches = Matching::new(); + + // loop each day. + for d in days_in_range(start, end) { + let deposits = get_date_range_deposits(pool, 100, start, d).await?; + let funded = get_min_funding(pool, 20, start, d).await?; + + matches.match_deposit_to_funded(deposits, funded); + } + + Ok(matches) +} + +fn days_in_range(start: DateTime, end: DateTime) -> Vec> { + let mut days = Vec::new(); + let mut current = start; + + while current <= end { + days.push(current); + current += Duration::days(1); // Increment by one day + } + + days +} diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index bf6b6ce..4b6e2d5 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -240,11 +240,26 @@ async fn test_offline_analytics() -> Result<()> { let start_time = parse_date("2024-01-01"); let end_time = parse_date("2024-01-10"); - let r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; + let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; // dbg!(&r); - let r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?; + let _r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?; + Ok(()) +} + +#[tokio::test] +async fn test_offline_analytics_matching() -> Result<()> { + libra_forensic_db::log_setup(); + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; + let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + + let start_time = parse_date("2024-01-01"); + let end_time = parse_date("2024-01-10"); + + let r = offline_matching::rip_range(&pool, start_time, end_time).await?; + + dbg!(&r.0.len()); Ok(()) } From ca9bf67c7b74549abd8ccbf832809378c858740e Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Mon, 9 Dec 2024 17:11:45 -0500 Subject: [PATCH 03/23] try broad search expanding the ledger funding window --- src/analytics/offline_matching.rs | 259 ++++++++++++++++++++++++++---- tests/test_analytics.rs | 39 ++++- 2 files changed, 264 insertions(+), 34 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 0518cab..e5d927a 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -1,6 +1,6 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, fs::File, io::Write, path::{Path, PathBuf}}; -use anyhow::Result; +use anyhow::{bail, Result}; use chrono::{DateTime, Duration, Utc}; use diem_types::account_address::AccountAddress; use neo4rs::Graph; @@ -14,8 +14,8 @@ pub struct Deposit { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct MinFunding { - user_id: u32, - funded: f64, + pub user_id: u32, + pub funded: f64, } pub async fn get_date_range_deposits( @@ -64,7 +64,7 @@ pub async fn get_date_range_deposits( Ok(top_deposits) } -pub async fn get_min_funding( +pub async fn get_exchange_users( pool: &Graph, top_n: u64, start: DateTime, @@ -102,50 +102,251 @@ pub async fn get_min_funding( Ok(min_funding) } +pub async fn get_one_exchange_user( + pool: &Graph, + id: u32, + start: DateTime, + end: DateTime, +) -> Result> { + let mut min_funding = vec![]; + + let q = format!( + r#" + MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger) + WHERE d.date > datetime("{}") + AND d.date < datetime("{}") + AND e.swap_id = {} + WITH DISTINCT(e.swap_id) AS user_id, toFloat(max(ul.`total_funded`)) as funded + RETURN user_id, funded + "#, + start.to_rfc3339(), + end.to_rfc3339(), + id, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let user_id = r.get::("user_id").unwrap_or(0); + let funded = r.get::("funded").unwrap_or(0.0); + let d = MinFunding { user_id, funded }; + min_funding.push(d); + // dbg!(&d); + } + Ok(min_funding) +} + +pub struct Matching { + pub definite: BTreeMap, + pub pending: BTreeMap, +} + #[derive(Clone, Default, Debug)] pub struct Candidates { - maybe: Vec, - impossible: Vec, + pub maybe: Vec, + pub impossible: Vec, } #[derive(Clone, Default, Debug)] -pub struct Matching(pub BTreeMap); +pub struct Possible { + pub user: Vec, + pub address: Vec, +} + +impl Default for Matching { + fn default() -> Self { + Self::new() + } +} impl Matching { pub fn new() -> Self { - Matching(BTreeMap::new()) + Self { + definite: BTreeMap::new(), + pending: BTreeMap::new(), + } } - pub fn match_deposit_to_funded(&mut self, deposits: Vec, funded: Vec) { - for f in funded.iter() { - deposits.iter().for_each(|d| { - let candidates = self.0.entry(f.user_id).or_default(); - // only addresses with minimum funded could be a Maybe - if d.deposited >= f.funded { - candidates.maybe.push(d.account); - } else { - candidates.impossible.push(d.account); + pub fn get_next_search_ids(&self, funded: &[MinFunding]) -> Result<(u32, u32)> { + // assumes this is sorted by date + + // find the next two which are not identified, to disambiguate. + let ids: Vec = funded + .iter() + .filter(|el| !self.definite.contains_key(&el.user_id)) + .take(2) + .map(|el| el.user_id) + .collect(); + + dbg!(&ids); + // let user_ledger = funded.iter().find(|el| { + // // check if we have already identified it + // self.definite.0.get(el.user_id).none() + // }); + Ok((*ids.first().unwrap(), *ids.get(1).unwrap())) + } + + pub async fn wide_search( + &mut self, + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, + save_file: Option, + ) -> Result<()> { + // expand the search + // increase the search of top users by funding by expanding the window + // this may retry a number of users, but with more users discovered + // the search space gets smaller + for d in days_in_range(start, end) { + let next_list = get_exchange_users(&pool, top_n, start, d).await?; + + for u in next_list { + let _r = self.search(&pool, u.user_id, start, end).await; + + // after each loop update the file + if let Some(p) = &save_file { + self.write_definite_to_file(&p)?; } - }); + + } } + + Ok(()) } -} + pub async fn search( + &mut self, + pool: &Graph, + user_a: u32, + start: DateTime, + end: DateTime, + ) -> Result { + // exit early + if let Some(a) = self.definite.get(&user_a) { + return Ok(*a); + } + + // loop each day, comparing deposits made to that point + // and funding required for user accounts only to that date + for d in days_in_range(start, end) { + let deposits = get_date_range_deposits(pool, 100, start, d).await?; + + if let Some(funded_a) = get_one_exchange_user(pool, user_a, start, d).await?.first() { + self.eliminate_candidates(funded_a, &deposits); + + if let Some(a) = self.definite.get(&user_a) { + return Ok(*a); + } + } + } + if let Some(a) = self.definite.get(&user_a) { + return Ok(*a); + } + + bail!("could not find a candidate") + } + + pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { + // let mut filtered_depositors = deposits.clone(); + let pending = self + .pending + .entry(user.user_id) + .or_insert(Candidates::default()); + + let mut eval: Vec = vec![]; + deposits.iter().for_each(|el| { + if el.deposited >= user.funded && + // must not already have been tagged impossible + !pending.impossible.contains(&el.account) && + // is also not already discovered + !self.definite.values().any(|found| found == &el.account) + { + eval.push(el.account) + } else { + pending.impossible.push(el.account) + } + }); + + // only increment the first time. + if pending.maybe.is_empty() { + pending.maybe.append(&mut eval); + } else { + // we only keep addresses we see repeatedly (inner join) + eval.retain(|x| pending.maybe.contains(x)); + if eval.len() > 0 { + pending.maybe = eval; + } + } + + println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); + + if pending.maybe.len() == 1 { + // we found a definite match, update it so the next loop doesn't include it + self.definite + .insert(user.user_id, *pending.maybe.first().unwrap()); + } -pub async fn rip_range(pool: &Graph, start: DateTime, end: DateTime) -> Result { - let mut matches = Matching::new(); - // loop each day. - for d in days_in_range(start, end) { - let deposits = get_date_range_deposits(pool, 100, start, d).await?; - let funded = get_min_funding(pool, 20, start, d).await?; + // candidates + } + + pub fn write_definite_to_file(&self, path: &Path) -> Result<()> { + // Serialize the BTreeMap to a JSON string + let json_string = serde_json::to_string_pretty(&self.definite).expect("Failed to serialize"); + + // Save the JSON string to a file + let mut file = File::create(path)?; + file.write_all(json_string.as_bytes())?; - matches.match_deposit_to_funded(deposits, funded); + println!("Data saved to path: {}", path.display()); + Ok(()) } +} - Ok(matches) +pub fn sort_funded(funded: &mut [MinFunding]) { + // sort descending + funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); } -fn days_in_range(start: DateTime, end: DateTime) -> Vec> { +// pub fn maybe_match_deposit_to_funded( +// deposits: Vec, +// funded: Vec, +// ) -> Option<(u32, AccountAddress)> { +// // // sort descending +// // funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); + +// // // find the next two which are not identified, to disambiguate. + +// for f in funded { +// // dbg!(&f); +// let mut candidate_depositors = deposits.clone(); +// candidate_depositors.retain(|el| el.deposited >= f.funded); +// // dbg!(&candidate_depositors); + +// if candidate_depositors.len() == 1 { +// return Some((f.user_id, candidate_depositors.pop().unwrap().account)); +// } +// // deposits.iter().for_each(|d| { +// // // let mut candidates = self.pending.0.entry(f.user_id).or_default(); + +// // // only addresses with minimum funded could be a Maybe +// // if d.deposited >= f.funded { +// // // if we haven't previously marked this as impossible, add it as a maybe +// // if !candidates.impossible.contains(&d.account) { +// // candidates.maybe.push(d.account); +// // } +// // } else { +// // candidates.impossible.push(d.account); +// // } +// // }); +// } +// None +// } + +pub fn days_in_range(start: DateTime, end: DateTime) -> Vec> { let mut days = Vec::new(); let mut current = start; diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 4b6e2d5..ee97974 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -243,7 +243,7 @@ async fn test_offline_analytics() -> Result<()> { let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; // dbg!(&r); - let _r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?; + let _r = offline_matching::get_exchange_users(&pool, 20, start_time, end_time).await?; Ok(()) } @@ -251,15 +251,44 @@ async fn test_offline_analytics() -> Result<()> { #[tokio::test] async fn test_offline_analytics_matching() -> Result<()> { libra_forensic_db::log_setup(); + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; - let start_time = parse_date("2024-01-01"); - let end_time = parse_date("2024-01-10"); - let r = offline_matching::rip_range(&pool, start_time, end_time).await?; + let mut m = offline_matching::Matching::new(); + + let p = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let p = p.join("definite.json"); + + let _ = m.wide_search(&pool, 10, parse_date("2024-01-07"), parse_date("2024-03-13"), Some(p)).await; + // let initial_list = + // offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09")) + // .await?; + + // for u in initial_list { + // let r = m + // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) + // .await; + // // dbg!(&r); + // } + + dbg!(&m.definite); + + // // expand the search + // for d in offline_matching::days_in_range(start_time, parse_date("2024-03-13")) { + // let next_list = offline_matching::get_exchange_users(&pool, 10, start_time, d) + // .await?; + + // for u in next_list { + // let r = m + // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) + // .await; + // dbg!(&r); + // } - dbg!(&r.0.len()); + // dbg!(&m.definite); + // } Ok(()) } From ceb4bf602c14b5b60a9034da6905d4a079d9e596 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Mon, 9 Dec 2024 17:42:53 -0500 Subject: [PATCH 04/23] add caching --- src/analytics/offline_matching.rs | 46 +++++++++++++++++++++++++------ tests/test_analytics.rs | 24 +++++++++++----- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index e5d927a..11047ce 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -1,4 +1,9 @@ -use std::{collections::BTreeMap, fs::File, io::Write, path::{Path, PathBuf}}; +use std::{ + collections::BTreeMap, + fs::{self, File}, + io::Write, + path::{Path, PathBuf}, +}; use anyhow::{bail, Result}; use chrono::{DateTime, Duration, Utc}; @@ -139,12 +144,13 @@ pub async fn get_one_exchange_user( Ok(min_funding) } +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Matching { pub definite: BTreeMap, pub pending: BTreeMap, } -#[derive(Clone, Default, Debug)] +#[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct Candidates { pub maybe: Vec, pub impossible: Vec, @@ -195,7 +201,7 @@ impl Matching { top_n: u64, start: DateTime, end: DateTime, - save_file: Option, + save_dir: Option, ) -> Result<()> { // expand the search // increase the search of top users by funding by expanding the window @@ -203,15 +209,16 @@ impl Matching { // the search space gets smaller for d in days_in_range(start, end) { let next_list = get_exchange_users(&pool, top_n, start, d).await?; + dbg!(&next_list.len()); for u in next_list { let _r = self.search(&pool, u.user_id, start, end).await; // after each loop update the file - if let Some(p) = &save_file { - self.write_definite_to_file(&p)?; + if let Some(p) = &save_dir { + let _ = self.write_definite_to_file(&p.join("definite.json")); + let _ = self.write_cache_to_file(&p); } - } } @@ -281,7 +288,7 @@ impl Matching { } } - println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); + println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); if pending.maybe.len() == 1 { // we found a definite match, update it so the next loop doesn't include it @@ -289,13 +296,34 @@ impl Matching { .insert(user.user_id, *pending.maybe.first().unwrap()); } - // candidates } + pub fn write_cache_to_file(&self, dir: &Path) -> Result<()> { + let json_string = + serde_json::to_string(&self).expect("Failed to serialize"); + + // Save the JSON string to a file + let path = dir.join("cache.json"); + let mut file = File::create(&path)?; + file.write_all(json_string.as_bytes())?; + + println!("Cache saved: {}", path.display()); + Ok(()) + } + pub fn read_cache_from_file(dir: &Path) -> Result { + // Read the file content into a string + let file_path = dir.join("cache.json"); + let json_string = fs::read_to_string(file_path)?; + + // Deserialize the JSON string into a BTreeMap + Ok(serde_json::from_str(&json_string)?) + } + pub fn write_definite_to_file(&self, path: &Path) -> Result<()> { // Serialize the BTreeMap to a JSON string - let json_string = serde_json::to_string_pretty(&self.definite).expect("Failed to serialize"); + let json_string = + serde_json::to_string_pretty(&self.definite).expect("Failed to serialize"); // Save the JSON string to a file let mut file = File::create(path)?; diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index ee97974..e1485b9 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -3,7 +3,11 @@ use anyhow::Result; use std::path::PathBuf; use libra_forensic_db::{ - analytics::{self, enrich_account_funding::BalanceTracker, offline_matching}, + analytics::{ + self, + enrich_account_funding::BalanceTracker, + offline_matching::{self, Matching}, + }, date_util::parse_date, extract_exchange_orders, load_exchange_orders, neo4j_init::{self, get_neo4j_localhost_pool, maybe_create_indexes}, @@ -255,13 +259,19 @@ async fn test_offline_analytics_matching() -> Result<()> { let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let mut m = offline_matching::Matching::new(); + let mut m = Matching::read_cache_from_file(&dir).unwrap_or(Matching::new()); - let p = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let p = p.join("definite.json"); - - let _ = m.wide_search(&pool, 10, parse_date("2024-01-07"), parse_date("2024-03-13"), Some(p)).await; + let _ = m + .wide_search( + &pool, + 25, + parse_date("2024-01-07"), + parse_date("2024-07-22"), + Some(dir), + ) + .await; // let initial_list = // offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09")) // .await?; @@ -288,7 +298,7 @@ async fn test_offline_analytics_matching() -> Result<()> { // } // dbg!(&m.definite); - // } + // } Ok(()) } From b974a588594d98b8745319450a823af4ab863c6b Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:14:25 -0500 Subject: [PATCH 05/23] prevent duplicates in impossible --- src/analytics/offline_matching.rs | 44 +++++-------------------------- 1 file changed, 6 insertions(+), 38 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 11047ce..bbeaf09 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -271,9 +271,13 @@ impl Matching { // is also not already discovered !self.definite.values().any(|found| found == &el.account) { + if !eval.contains(&el.account) { eval.push(el.account) + } } else { - pending.impossible.push(el.account) + if !pending.impossible.contains(&value) { + pending.impossible.push(el.account) + } } }); @@ -300,8 +304,7 @@ impl Matching { } pub fn write_cache_to_file(&self, dir: &Path) -> Result<()> { - let json_string = - serde_json::to_string(&self).expect("Failed to serialize"); + let json_string = serde_json::to_string(&self).expect("Failed to serialize"); // Save the JSON string to a file let path = dir.join("cache.json"); @@ -339,41 +342,6 @@ pub fn sort_funded(funded: &mut [MinFunding]) { funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); } -// pub fn maybe_match_deposit_to_funded( -// deposits: Vec, -// funded: Vec, -// ) -> Option<(u32, AccountAddress)> { -// // // sort descending -// // funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); - -// // // find the next two which are not identified, to disambiguate. - -// for f in funded { -// // dbg!(&f); -// let mut candidate_depositors = deposits.clone(); -// candidate_depositors.retain(|el| el.deposited >= f.funded); -// // dbg!(&candidate_depositors); - -// if candidate_depositors.len() == 1 { -// return Some((f.user_id, candidate_depositors.pop().unwrap().account)); -// } -// // deposits.iter().for_each(|d| { -// // // let mut candidates = self.pending.0.entry(f.user_id).or_default(); - -// // // only addresses with minimum funded could be a Maybe -// // if d.deposited >= f.funded { -// // // if we haven't previously marked this as impossible, add it as a maybe -// // if !candidates.impossible.contains(&d.account) { -// // candidates.maybe.push(d.account); -// // } -// // } else { -// // candidates.impossible.push(d.account); -// // } -// // }); -// } -// None -// } - pub fn days_in_range(start: DateTime, end: DateTime) -> Vec> { let mut days = Vec::new(); let mut current = start; From 9d1717a15c6781d2d1165dbdf9051353b322a9af Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Mon, 9 Dec 2024 20:32:57 -0500 Subject: [PATCH 06/23] try alternate calc --- src/analytics/offline_matching.rs | 86 +++++++++++++++++-------------- tests/test_analytics.rs | 29 +---------- 2 files changed, 50 insertions(+), 65 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index bbeaf09..b600140 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::{bail, Result}; use chrono::{DateTime, Duration, Utc}; use diem_types::account_address::AccountAddress; +use log::{info, trace}; use neo4rs::Graph; use serde::{Deserialize, Serialize}; @@ -195,29 +196,56 @@ impl Matching { Ok((*ids.first().unwrap(), *ids.get(1).unwrap())) } - pub async fn wide_search( + /// progressively scan for top_n funded exchange accounts + /// e.g. start with 5, and each time increase by 1, until reaching 50 for. + /// at each level deep, a breadth search is started + /// Thus every day in timeseries will do a shallow match of the top 5 accounts, and eliminate candidates. Deeper searches benefit from the information from the previous depth searches (exclude impossible matches) + pub async fn depth_search_by_top_n_accounts( &mut self, pool: &Graph, - top_n: u64, start: DateTime, end: DateTime, save_dir: Option, + ) -> Result<()> { + let mut top_n = 10; + while top_n < 50 { + let _ = self + .breadth_search_by_dates(pool, top_n, start, end, &save_dir) + .await; // don't error + top_n += 5; + } + Ok(()) + } + + /// breadth search, for every day in timeseries, check all the top funded + /// accounts against the actual deposited on-chain + /// don't peek into the future, only replay information at each day + pub async fn breadth_search_by_dates( + &mut self, + pool: &Graph, + top_n: u64, + start: DateTime, + end: DateTime, + save_dir: &Option, ) -> Result<()> { // expand the search // increase the search of top users by funding by expanding the window // this may retry a number of users, but with more users discovered // the search space gets smaller for d in days_in_range(start, end) { - let next_list = get_exchange_users(&pool, top_n, start, d).await?; - dbg!(&next_list.len()); + info!("day: {}", d); + let next_list = get_exchange_users(pool, top_n, start, d).await?; + + // TODO: pick top of deposits + let deposits = get_date_range_deposits(pool, 100, start, d).await?; for u in next_list { - let _r = self.search(&pool, u.user_id, start, end).await; + let _r = self.search(&u, &deposits).await; // after each loop update the file if let Some(p) = &save_dir { let _ = self.write_definite_to_file(&p.join("definite.json")); - let _ = self.write_cache_to_file(&p); + let _ = self.write_cache_to_file(p); } } } @@ -226,30 +254,17 @@ impl Matching { } pub async fn search( &mut self, - pool: &Graph, - user_a: u32, - start: DateTime, - end: DateTime, + user: &MinFunding, + deposits: &[Deposit], ) -> Result { // exit early - if let Some(a) = self.definite.get(&user_a) { + if let Some(a) = self.definite.get(&user.user_id) { return Ok(*a); } - // loop each day, comparing deposits made to that point - // and funding required for user accounts only to that date - for d in days_in_range(start, end) { - let deposits = get_date_range_deposits(pool, 100, start, d).await?; - - if let Some(funded_a) = get_one_exchange_user(pool, user_a, start, d).await?.first() { - self.eliminate_candidates(funded_a, &deposits); + self.eliminate_candidates(user, deposits); - if let Some(a) = self.definite.get(&user_a) { - return Ok(*a); - } - } - } - if let Some(a) = self.definite.get(&user_a) { + if let Some(a) = self.definite.get(&user.user_id) { return Ok(*a); } @@ -258,10 +273,7 @@ impl Matching { pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { // let mut filtered_depositors = deposits.clone(); - let pending = self - .pending - .entry(user.user_id) - .or_insert(Candidates::default()); + let pending = self.pending.entry(user.user_id).or_default(); let mut eval: Vec = vec![]; deposits.iter().for_each(|el| { @@ -271,13 +283,11 @@ impl Matching { // is also not already discovered !self.definite.values().any(|found| found == &el.account) { - if !eval.contains(&el.account) { - eval.push(el.account) - } - } else { - if !pending.impossible.contains(&value) { - pending.impossible.push(el.account) + if !eval.contains(&el.account) { + eval.push(el.account) } + } else if !pending.impossible.contains(&el.account) { + pending.impossible.push(el.account) } }); @@ -287,12 +297,12 @@ impl Matching { } else { // we only keep addresses we see repeatedly (inner join) eval.retain(|x| pending.maybe.contains(x)); - if eval.len() > 0 { + if !eval.is_empty() { pending.maybe = eval; } } - println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); + info!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len()); if pending.maybe.len() == 1 { // we found a definite match, update it so the next loop doesn't include it @@ -311,7 +321,7 @@ impl Matching { let mut file = File::create(&path)?; file.write_all(json_string.as_bytes())?; - println!("Cache saved: {}", path.display()); + trace!("Cache saved: {}", path.display()); Ok(()) } pub fn read_cache_from_file(dir: &Path) -> Result { @@ -332,7 +342,7 @@ impl Matching { let mut file = File::create(path)?; file.write_all(json_string.as_bytes())?; - println!("Data saved to path: {}", path.display()); + trace!("Data saved to path: {}", path.display()); Ok(()) } } diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index e1485b9..9243882 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -261,44 +261,19 @@ async fn test_offline_analytics_matching() -> Result<()> { let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - let mut m = Matching::read_cache_from_file(&dir).unwrap_or(Matching::new()); + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); let _ = m - .wide_search( + .depth_search_by_top_n_accounts( &pool, - 25, parse_date("2024-01-07"), parse_date("2024-07-22"), Some(dir), ) .await; - // let initial_list = - // offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09")) - // .await?; - // for u in initial_list { - // let r = m - // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) - // .await; - // // dbg!(&r); - // } dbg!(&m.definite); - // // expand the search - // for d in offline_matching::days_in_range(start_time, parse_date("2024-03-13")) { - // let next_list = offline_matching::get_exchange_users(&pool, 10, start_time, d) - // .await?; - - // for u in next_list { - // let r = m - // .search(&pool, u.user_id, start_time, parse_date("2024-03-13")) - // .await; - // dbg!(&r); - // } - - // dbg!(&m.definite); - // } - Ok(()) } From 8574120e006168a62eaa542c5af71a5e896f0a1f Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Tue, 10 Dec 2024 17:39:34 -0500 Subject: [PATCH 07/23] patches --- src/analytics/offline_matching.rs | 8 ++++---- tests/test_analytics.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index b600140..df4c85c 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -207,12 +207,12 @@ impl Matching { end: DateTime, save_dir: Option, ) -> Result<()> { - let mut top_n = 10; - while top_n < 50 { + let mut top_n = 3; + while top_n < 20 { let _ = self .breadth_search_by_dates(pool, top_n, start, end, &save_dir) .await; // don't error - top_n += 5; + top_n += 3; } Ok(()) } @@ -233,7 +233,7 @@ impl Matching { // this may retry a number of users, but with more users discovered // the search space gets smaller for d in days_in_range(start, end) { - info!("day: {}", d); + println!("day: {}", d); let next_list = get_exchange_users(pool, top_n, start, d).await?; // TODO: pick top of deposits diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 9243882..67b6fb4 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -267,7 +267,7 @@ async fn test_offline_analytics_matching() -> Result<()> { .depth_search_by_top_n_accounts( &pool, parse_date("2024-01-07"), - parse_date("2024-07-22"), + parse_date("2024-03-13"), Some(dir), ) .await; From b559be8f4eabd3e2eaf6491837accf5e5d17f6c1 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:53:23 -0500 Subject: [PATCH 08/23] wip --- src/analytics/offline_matching.rs | 225 ++++++++++++++++++++++++++---- tests/test_analytics.rs | 60 +++++++- 2 files changed, 258 insertions(+), 27 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index df4c85c..99c2cb6 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -14,8 +14,8 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Deposit { - account: AccountAddress, - deposited: f64, + pub account: AccountAddress, + pub deposited: f64, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -24,9 +24,9 @@ pub struct MinFunding { pub funded: f64, } -pub async fn get_date_range_deposits( +pub async fn get_date_range_deposits_alt( pool: &Graph, - top_n: u64, + _top_n: u64, start: DateTime, end: DateTime, ) -> Result> { @@ -34,22 +34,46 @@ pub async fn get_date_range_deposits( let q = format!( r#" - WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit - MATCH - (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) - WHERE - tx.`block_datetime` > datetime("{}") - AND tx.`block_datetime` < datetime("{}") - WITH - u, - SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount - ORDER BY totalTxAmount DESCENDING - RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited - LIMIT {} - "#, + WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit + + // Step 1: Get the list of all depositors + MATCH (acc:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}}) + WITH DISTINCT(acc) AS all, olswap_deposit + + // Step 2: Match depositors and amounts within the date range + + + + MATCH (all)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}}) + WHERE + tx2.block_datetime > datetime("{}") + AND tx2.block_datetime < datetime("{}") + + + WITH + DISTINCT (all.address) AS account, + COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount + RETURN account, toFloat(deposit_amount) as deposited + ORDER BY deposit_amount DESC + + "#, + // r#" + // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit + // MATCH + // (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) + // WHERE + // tx.`block_datetime` > datetime("{}") + // AND tx.`block_datetime` < datetime("{}") + // WITH + // u, + // SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount + // ORDER BY totalTxAmount DESCENDING + // RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited + + // "#, start.to_rfc3339(), end.to_rfc3339(), - top_n, + // top_n, ); let cypher_query = neo4rs::query(&q); @@ -70,6 +94,73 @@ pub async fn get_date_range_deposits( Ok(top_deposits) } +// pub async fn get_date_range_deposits( +// pool: &Graph, +// top_n: u64, +// start: DateTime, +// end: DateTime, +// ) -> Result> { +// let mut top_deposits = vec![]; + +// let q = format!( +// // r#" +// // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit + +// // // Step 1: Get the list of all depositors +// // MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}}) +// // WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx + +// // // Step 2: Match depositors and amounts within the date range + +// // UNWIND all_depositors AS depositor + +// // OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}}) +// // WHERE tx2.block_datetime >= datetime('{}') AND tx2.block_datetime <= datetime('{}') + +// // WITH +// // depositor.address AS account, +// // COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount +// // RETURN account, toFloat(deposit_amount) as deposited +// // ORDER BY deposited DESC + +// // "#, +// r#" +// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit +// MATCH +// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}}) +// WHERE +// tx.`block_datetime` > datetime("{}") +// AND tx.`block_datetime` < datetime("{}") +// WITH +// DISTINCT(u), +// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount +// ORDER BY totalTxAmount DESCENDING +// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited + +// "#, +// start.to_rfc3339(), +// end.to_rfc3339(), +// // top_n, +// ); +// let cypher_query = neo4rs::query(&q); + +// // Execute the query +// let mut result = pool.execute(cypher_query).await?; + +// // Fetch the first row only +// while let Some(r) = result.next().await? { +// let account_str = r.get::("account").unwrap_or("unknown".to_string()); +// let deposited = r.get::("deposited").unwrap_or(0.0); +// let d = Deposit { +// account: account_str.parse().unwrap_or(AccountAddress::ZERO), +// deposited, +// }; +// top_deposits.push(d); +// // dbg!(&d); +// } +// Ok(top_deposits) +// } + pub async fn get_exchange_users( pool: &Graph, top_n: u64, @@ -108,6 +199,40 @@ pub async fn get_exchange_users( Ok(min_funding) } +pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result> { + let mut min_funding = vec![]; + + let q = format!( + r#" + MATCH (e:SwapAccount)-[]-(u:UserLedger) + WHERE u.`total_inflows` = 0 + WITH distinct(e.swap_id) AS user_id, max(u.`total_funded`) AS funded + RETURN user_id, funded + ORDER BY funded DESC + "#, + // start.to_rfc3339(), + // end.to_rfc3339(), + // top_n, + ); + let cypher_query = neo4rs::query(&q); + + // Execute the query + let mut result = pool.execute(cypher_query).await?; + + // Fetch the first row only + while let Some(r) = result.next().await? { + let user_id = r.get::("user_id").unwrap_or(0); + let funded = r.get::("funded").unwrap_or(0); + let d = MinFunding { + user_id, + funded: funded as f64, + }; + min_funding.push(d); + // dbg!(&d); + } + Ok(min_funding) +} + pub async fn get_one_exchange_user( pool: &Graph, id: u32, @@ -188,7 +313,7 @@ impl Matching { .map(|el| el.user_id) .collect(); - dbg!(&ids); + // dbg!(&ids); // let user_ledger = funded.iter().find(|el| { // // check if we have already identified it // self.definite.0.get(el.user_id).none() @@ -207,12 +332,12 @@ impl Matching { end: DateTime, save_dir: Option, ) -> Result<()> { - let mut top_n = 3; - while top_n < 20 { + let mut top_n = 5; + while top_n < 25 { let _ = self .breadth_search_by_dates(pool, top_n, start, end, &save_dir) .await; // don't error - top_n += 3; + top_n += 5; } Ok(()) } @@ -236,8 +361,23 @@ impl Matching { println!("day: {}", d); let next_list = get_exchange_users(pool, top_n, start, d).await?; - // TODO: pick top of deposits - let deposits = get_date_range_deposits(pool, 100, start, d).await?; + // // TODO: pick top of deposits + // let deposits = get_date_range_deposits(pool, 1000, start, d).await.unwrap_or_default(); + + // if next_list.len() > 5 { + // // dbg!("next_list"); + // dbg!(&next_list[..5]); + // } + // dbg!(&next_list); + + let deposits = get_date_range_deposits_alt(pool, 1000, start, d) + .await + .unwrap_or_default(); + + // if deposits.len() > 5 { + // dbg!("alt"); + // dbg!(&deposits[..5]); + // } for u in next_list { let _r = self.search(&u, &deposits).await; @@ -252,6 +392,7 @@ impl Matching { Ok(()) } + pub async fn search( &mut self, user: &MinFunding, @@ -271,12 +412,48 @@ impl Matching { bail!("could not find a candidate") } + pub fn match_exact_sellers( + &mut self, + user_list: &[MinFunding], + deposits: &[Deposit], + tolerance: f64, + ) { + user_list.iter().for_each(|user| { + let pending = self.pending.entry(user.user_id).or_default(); + + let candidates: Vec = deposits + .iter() + .filter_map(|el| { + if el.deposited > user.funded && // must always be slightly more + el.deposited < user.funded * tolerance && + !pending.impossible.contains(&el.account) && + // is also not already discovered + !self.definite.values().any(|found| found == &el.account) + { + Some(el.account) + } else { + None + } + }) + .collect(); + + pending.maybe = candidates; + + if pending.maybe.len() == 1 { + // we found a definite match, update it so the next loop doesn't include it + self.definite + .insert(user.user_id, *pending.maybe.first().unwrap()); + } + }) + } + pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { // let mut filtered_depositors = deposits.clone(); let pending = self.pending.entry(user.user_id).or_default(); let mut eval: Vec = vec![]; deposits.iter().for_each(|el| { + dbg!(&el); if el.deposited >= user.funded && // must not already have been tagged impossible !pending.impossible.contains(&el.account) && diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 67b6fb4..6717a1a 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -242,7 +242,7 @@ async fn test_offline_analytics() -> Result<()> { let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; let start_time = parse_date("2024-01-01"); - let end_time = parse_date("2024-01-10"); + let end_time = parse_date("2024-07-10"); let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; // dbg!(&r); @@ -267,13 +267,67 @@ async fn test_offline_analytics_matching() -> Result<()> { .depth_search_by_top_n_accounts( &pool, parse_date("2024-01-07"), - parse_date("2024-03-13"), + parse_date("2024-07-22"), Some(dir), ) .await; - dbg!(&m.definite); Ok(()) } + +#[tokio::test] +async fn test_easy_sellers() -> Result<()> { + libra_forensic_db::log_setup(); + + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; + let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + + let mut user_list = offline_matching::get_exchange_users_only_outflows(&pool).await?; + user_list + .sort_by(|a, b: &offline_matching::MinFunding| b.funded.partial_cmp(&a.funded).unwrap()); + // dbg!(&r[..10]); + + let deposits = offline_matching::get_date_range_deposits_alt( + &pool, + 1000, + parse_date("2024-01-07"), + parse_date("2024-07-22"), + ) + .await + .unwrap_or_default(); + + // dbg!(&deposits[..10]); + + let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); + + m.match_exact_sellers(&user_list, &deposits, 1.01); + + let _ = m + .depth_search_by_top_n_accounts( + &pool, + parse_date("2024-01-07"), + parse_date("2024-07-22"), + Some(dir), + ) + .await; + dbg!(&m.definite.len()); + + // let _ = m + // .depth_search_by_top_n_accounts( + // &pool, + // parse_date("2024-01-07"), + // parse_date("2024-07-22"), + // Some(dir), + // ) + // .await; + + // dbg!(&m.definite); + + Ok(()) +} + +// From 90502b3e88894ce2b6b363cdc3ee199dd42eff9a Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 12 Dec 2024 13:02:27 -0500 Subject: [PATCH 09/23] update tests --- src/analytics/offline_matching.rs | 32 +++++++--------- tests/test_analytics.rs | 64 +++++++++++++++++++++++-------- 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 99c2cb6..6148029 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -206,6 +206,8 @@ pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result, end: DateTime, + mut top_n: u64, save_dir: Option, ) -> Result<()> { - let mut top_n = 5; - while top_n < 25 { + let top_n_limit = 101; + while top_n < top_n_limit { let _ = self .breadth_search_by_dates(pool, top_n, start, end, &save_dir) .await; // don't error @@ -358,27 +361,13 @@ impl Matching { // this may retry a number of users, but with more users discovered // the search space gets smaller for d in days_in_range(start, end) { - println!("day: {}", d); + info!("day: {}", d); let next_list = get_exchange_users(pool, top_n, start, d).await?; - // // TODO: pick top of deposits - // let deposits = get_date_range_deposits(pool, 1000, start, d).await.unwrap_or_default(); - - // if next_list.len() > 5 { - // // dbg!("next_list"); - // dbg!(&next_list[..5]); - // } - // dbg!(&next_list); - let deposits = get_date_range_deposits_alt(pool, 1000, start, d) .await .unwrap_or_default(); - // if deposits.len() > 5 { - // dbg!("alt"); - // dbg!(&deposits[..5]); - // } - for u in next_list { let _r = self.search(&u, &deposits).await; @@ -438,13 +427,18 @@ impl Matching { .collect(); pending.maybe = candidates; + }); + + // after all users processed, try to find matches + user_list.iter().for_each(|user| { + let pending = self.pending.entry(user.user_id).or_default(); if pending.maybe.len() == 1 { // we found a definite match, update it so the next loop doesn't include it self.definite .insert(user.user_id, *pending.maybe.first().unwrap()); } - }) + }); } pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) { @@ -453,7 +447,7 @@ impl Matching { let mut eval: Vec = vec![]; deposits.iter().for_each(|el| { - dbg!(&el); + // dbg!(&el); if el.deposited >= user.funded && // must not already have been tagged impossible !pending.impossible.contains(&el.account) && diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 6717a1a..837fb4f 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -244,9 +244,6 @@ async fn test_offline_analytics() -> Result<()> { let start_time = parse_date("2024-01-01"); let end_time = parse_date("2024-07-10"); - let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?; - // dbg!(&r); - let _r = offline_matching::get_exchange_users(&pool, 20, start_time, end_time).await?; Ok(()) @@ -268,6 +265,7 @@ async fn test_offline_analytics_matching() -> Result<()> { &pool, parse_date("2024-01-07"), parse_date("2024-07-22"), + 75, Some(dir), ) .await; @@ -284,6 +282,52 @@ async fn test_easy_sellers() -> Result<()> { let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + let mut user_list = offline_matching::get_exchange_users_only_outflows(&pool).await?; + user_list + .sort_by(|a, b: &offline_matching::MinFunding| b.funded.partial_cmp(&a.funded).unwrap()); + dbg!(&user_list.len()); + + let deposits = offline_matching::get_date_range_deposits_alt( + &pool, + 1000, + parse_date("2024-01-07"), + parse_date("2024-07-22"), + ) + .await + .unwrap_or_default(); + + let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); + + m.match_exact_sellers(&user_list, &deposits, 1.05); + + dbg!(&m.definite.len()); + + dbg!(&m.definite); + m.write_cache_to_file(&dir)?; + + // let _ = m + // .depth_search_by_top_n_accounts( + // &pool, + // parse_date("2024-01-07"), + // parse_date("2024-03-15"), + // 101, + // Some(dir), + // ) + // .await; + // dbg!(&m.definite.len()); + + Ok(()) +} + +#[tokio::test] +async fn test_easy_sellers_combined() -> Result<()> { + libra_forensic_db::log_setup(); + + let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; + let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; + let mut user_list = offline_matching::get_exchange_users_only_outflows(&pool).await?; user_list .sort_by(|a, b: &offline_matching::MinFunding| b.funded.partial_cmp(&a.funded).unwrap()); @@ -311,23 +355,11 @@ async fn test_easy_sellers() -> Result<()> { &pool, parse_date("2024-01-07"), parse_date("2024-07-22"), + 10, Some(dir), ) .await; dbg!(&m.definite.len()); - // let _ = m - // .depth_search_by_top_n_accounts( - // &pool, - // parse_date("2024-01-07"), - // parse_date("2024-07-22"), - // Some(dir), - // ) - // .await; - - // dbg!(&m.definite); - Ok(()) } - -// From 82dee72a2a4c1f197ff9f6da8423835a9a5ea177 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 12 Dec 2024 14:18:08 -0500 Subject: [PATCH 10/23] sample cql files --- docs/cql/account_exclusive_trading.cql | 13 +++++++++++++ docs/cql/cycling_coins_between_pairs.cql | 2 ++ docs/cql/find_pump_stats.cql | 14 ++++++++++++++ docs/cql/find_shill_in_range.cql | 4 ++++ docs/cql/find_top_exchange_depositors.cql | 19 +++++++++++++++++++ docs/cql/frequent_exchange_traders.cql | 6 ++++++ docs/cql/shill_trader_pairs.cql | 15 +++++++++++++++ docs/cql/top_exchange_funding_required.cql | 6 ++++++ docs/cql/trace_owner.cql | 19 +++++++++++++++++++ 9 files changed, 98 insertions(+) create mode 100644 docs/cql/account_exclusive_trading.cql create mode 100644 docs/cql/cycling_coins_between_pairs.cql create mode 100644 docs/cql/find_pump_stats.cql create mode 100644 docs/cql/find_shill_in_range.cql create mode 100644 docs/cql/find_top_exchange_depositors.cql create mode 100644 docs/cql/frequent_exchange_traders.cql create mode 100644 docs/cql/shill_trader_pairs.cql create mode 100644 docs/cql/top_exchange_funding_required.cql create mode 100644 docs/cql/trace_owner.cql diff --git a/docs/cql/account_exclusive_trading.cql b/docs/cql/account_exclusive_trading.cql new file mode 100644 index 0000000..fe5b386 --- /dev/null +++ b/docs/cql/account_exclusive_trading.cql @@ -0,0 +1,13 @@ +MATCH (a)-[r:Swap]-(b) +WITH a, b, count(r) AS ab_count +WHERE ab_count >= 5 +MATCH (a)-[r_all:Swap]-() +WITH a, b, ab_count, count(r_all) AS total_a_count +RETURN + a, + b, + ab_count, + total_a_count, + (toFloat(ab_count) / total_a_count) * 100 AS exclusivity_percentage +ORDER BY ab_count DESC, exclusivity_percentage DESC +LIMIT 100 diff --git a/docs/cql/cycling_coins_between_pairs.cql b/docs/cql/cycling_coins_between_pairs.cql new file mode 100644 index 0000000..4d44d5a --- /dev/null +++ b/docs/cql/cycling_coins_between_pairs.cql @@ -0,0 +1,2 @@ +MATCH p=SHORTEST 1 (a:SwapAccount)-[r:Swap]->(b:SwapAccount)-[r2:Swap]->(a:SwapAccount) +RETURN p diff --git a/docs/cql/find_pump_stats.cql b/docs/cql/find_pump_stats.cql new file mode 100644 index 0000000..d1938e2 --- /dev/null +++ b/docs/cql/find_pump_stats.cql @@ -0,0 +1,14 @@ +MATCH (a)-[r:Swap]-() + +WITH a, + count(r) AS total_trades, + sum(CASE WHEN r.shill_bid = true THEN 1 ELSE 0 END) AS shill_bid_count, + sum(CASE WHEN r.price_vs_rms_hour > 1.0 THEN 1 ELSE 0 END) AS pump_count +WHERE total_trades > 100 +RETURN + a, + total_trades, + shill_bid_count, + (toFloat(shill_bid_count) / total_trades) AS shill_bid_percentage, + (toFloat(pump_count) / total_trades) AS pump_percentage +ORDER BY shill_bid_percentage DESC diff --git a/docs/cql/find_shill_in_range.cql b/docs/cql/find_shill_in_range.cql new file mode 100644 index 0000000..67e3a2b --- /dev/null +++ b/docs/cql/find_shill_in_range.cql @@ -0,0 +1,4 @@ +MATCH p=()-[r:Swap {`shill_bid`: TRUE }]->() +WHERE date(r.filled_at) > date("2024-02-10") +AND date(r.filled_at) < date("2024-03-02") +RETURN p diff --git a/docs/cql/find_top_exchange_depositors.cql b/docs/cql/find_top_exchange_depositors.cql new file mode 100644 index 0000000..2eb9654 --- /dev/null +++ b/docs/cql/find_top_exchange_depositors.cql @@ -0,0 +1,19 @@ +WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit + +// Step 1: Get the list of all depositors +MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {address: olswap_deposit}) +WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx + +// Step 2: Match depositors and amounts within the date range + +UNWIND all_depositors AS depositor + +OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {address: olswap_deposit}) +WHERE tx2.block_datetime >= datetime('2024-01-07') AND tx2.block_datetime <= datetime('2024-01-09') + + +RETURN + depositor.address AS depositor_address, + COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0) AS deposit_amount, + count(tx2) +ORDER BY deposit_amount DESC diff --git a/docs/cql/frequent_exchange_traders.cql b/docs/cql/frequent_exchange_traders.cql new file mode 100644 index 0000000..778cd2f --- /dev/null +++ b/docs/cql/frequent_exchange_traders.cql @@ -0,0 +1,6 @@ +MATCH (from:SwapAccount)-[r:Swap]-(to:SwapAccount) +WITH from, to, COUNT(r) AS transaction_count +ORDER BY transaction_count DESC +LIMIT 500 +MATCH p=(from)-[r:Swap]-(to) +RETURN p, transaction_count diff --git a/docs/cql/shill_trader_pairs.cql b/docs/cql/shill_trader_pairs.cql new file mode 100644 index 0000000..d1fe5fe --- /dev/null +++ b/docs/cql/shill_trader_pairs.cql @@ -0,0 +1,15 @@ +MATCH (a)-[r:Swap]-(b) +WITH a, b, + count(r) AS total_count, + sum(CASE WHEN r.shill_bid = true THEN 1 ELSE 0 END) AS shill_bid_count, + sum(CASE WHEN r.price_vs_rms_hour > 1 THEN 1 ELSE 0 END) AS price_vs_rms24h_count +WHERE total_count >= 5 +ORDER BY total_count DESC +RETURN + a, + b, + total_count, + shill_bid_count, + (toFloat(shill_bid_count) / total_count) * 100 AS shill_bid_percentage, + price_vs_rms24h_count, + (toFloat(price_vs_rms24h_count) / total_count) * 100 AS price_vs_rms24h_percentage diff --git a/docs/cql/top_exchange_funding_required.cql b/docs/cql/top_exchange_funding_required.cql new file mode 100644 index 0000000..e115b46 --- /dev/null +++ b/docs/cql/top_exchange_funding_required.cql @@ -0,0 +1,6 @@ +MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger) +WHERE d.date < datetime("2024-01-16") +WITH e.swap_id AS id, max(ul.`total_funded`) as funded + +RETURN id, funded +ORDER BY funded DESCENDING diff --git a/docs/cql/trace_owner.cql b/docs/cql/trace_owner.cql new file mode 100644 index 0000000..bf43486 --- /dev/null +++ b/docs/cql/trace_owner.cql @@ -0,0 +1,19 @@ +WITH [ +// olswap onboarding +'0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a', +// ignore superspreader +'0x85b68bdeb3bd8ca47f1cf90dfb332404290afda582c586cb645b3b045b54825b' +] AS exclude + +MATCH p = SHORTEST 1 (o:Owner {alias: 'name'})-[r *..3]->(:SwapAccount) +WHERE NONE( + r IN relationships(p) + WHERE r.relation IS NOT NULL + AND NOT r.relation IN ["Vouch"] + ) + AND NONE( + n IN nodes(p) + WHERE n.address IS NOT NULL + AND n.address IN exclude + ) +RETURN p From 6b331013ce99b6a983fc03cf60b5ebdd08c0c38e Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 12 Dec 2024 14:46:59 -0500 Subject: [PATCH 11/23] cli cleanup --- src/analytics/offline_matching.rs | 41 ++++++++++++++++---- src/warehouse_cli.rs | 62 ++++++++++++++++++++++++++++++- tests/test_analytics.rs | 59 +---------------------------- 3 files changed, 96 insertions(+), 66 deletions(-) diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 6148029..36c80d8 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{bail, Result}; use chrono::{DateTime, Duration, Utc}; use diem_types::account_address::AccountAddress; -use log::{info, trace}; +use log::{info, trace, warn}; use neo4rs::Graph; use serde::{Deserialize, Serialize}; @@ -202,8 +202,7 @@ pub async fn get_exchange_users( pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result> { let mut min_funding = vec![]; - let q = format!( - r#" + let q = r#" MATCH (e:SwapAccount)-[]-(u:UserLedger) WHERE u.`total_inflows` = 0 AND u.total_outflows = u.total_funded // total outflows are only what was funded @@ -211,11 +210,8 @@ pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result, + end: DateTime, + tolerance: f64, + ) -> Result<()> { + let mut user_list = get_exchange_users_only_outflows(pool).await?; + user_list.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); + dbg!(&user_list.len()); + + let deposits = get_date_range_deposits_alt(pool, 1000, start, end) + .await + .unwrap_or_default(); + + self.match_exact_sellers(&user_list, &deposits, tolerance); + Ok(()) + } pub fn match_exact_sellers( &mut self, user_list: &[MinFunding], @@ -489,12 +503,23 @@ impl Matching { // Save the JSON string to a file let path = dir.join("cache.json"); + let mut file = File::create(&path)?; + file.write_all(json_string.as_bytes())?; trace!("Cache saved: {}", path.display()); Ok(()) } + pub fn clear_cache(dir: &Path) -> Result<()> { + warn!("clearing local cache"); + // Save the JSON string to a file + let path = dir.join("cache.json"); + fs::remove_file(&path)?; + + info!("Cache cleared: {}", path.display()); + Ok(()) + } pub fn read_cache_from_file(dir: &Path) -> Result { // Read the file content into a string let file_path = dir.join("cache.json"); diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index 584e151..ea6765d 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -6,7 +6,8 @@ use serde_json::json; use std::path::PathBuf; use crate::{ - analytics, + analytics::{self, offline_matching::Matching}, + date_util, enrich_exchange_onboarding::{self, ExchangeOnRamp}, enrich_whitepages::{self, Whitepages}, json_rescue_v5_load, @@ -111,6 +112,25 @@ pub enum AnalyticsSub { /// commits the analytics to the db persist: bool, }, + + TradesMatching { + #[clap(long)] + start_day: String, + #[clap(long)] + end_day: String, + + #[clap(long)] + /// slow search producing likely candidates at each day + /// requires top n # for length of initial list to scan + replay_balances: Option, + #[clap(long)] + + /// get perfect deposit matches on dump cases, requires tolerance value of 1.0 or more + match_simple_dumps: Option, + #[clap(long)] + /// clear cache for local matches + clear_cache: bool, + }, } impl WarehouseCli { @@ -207,6 +227,46 @@ impl WarehouseCli { .await?; println!("{:#}", json!(&results).to_string()); } + AnalyticsSub::TradesMatching { + replay_balances, + match_simple_dumps, + clear_cache, + start_day, + end_day, + } => { + let pool = try_db_connection_pool(self).await?; + + let dir: PathBuf = PathBuf::from("."); + + if *clear_cache { + Matching::clear_cache(&dir)?; + } + + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); + if let Some(top_n) = replay_balances { + let _ = m + .depth_search_by_top_n_accounts( + &pool, + date_util::parse_date(start_day), + date_util::parse_date(end_day), + *top_n, + Some(dir), + ) + .await; + } + + if let Some(tolerance) = match_simple_dumps { + m.search_dumps( + &pool, + date_util::parse_date(start_day), + date_util::parse_date(end_day), + *tolerance, + ) + .await?; + } + + println!("{:?}", &m.definite); + } }, }; Ok(()) diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 837fb4f..1875bd6 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -235,20 +235,7 @@ async fn test_submit_exchange_ledger_all() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_offline_analytics() -> Result<()> { - libra_forensic_db::log_setup(); - let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; - let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; - - let start_time = parse_date("2024-01-01"); - let end_time = parse_date("2024-07-10"); - - let _r = offline_matching::get_exchange_users(&pool, 20, start_time, end_time).await?; - - Ok(()) -} - +#[ignore] // requires a fully loaded forensic instance db #[tokio::test] async fn test_offline_analytics_matching() -> Result<()> { libra_forensic_db::log_setup(); @@ -275,6 +262,7 @@ async fn test_offline_analytics_matching() -> Result<()> { Ok(()) } +#[ignore] // requires a fully loaded forensic instance db #[tokio::test] async fn test_easy_sellers() -> Result<()> { libra_forensic_db::log_setup(); @@ -320,46 +308,3 @@ async fn test_easy_sellers() -> Result<()> { Ok(()) } - -#[tokio::test] -async fn test_easy_sellers_combined() -> Result<()> { - libra_forensic_db::log_setup(); - - let (uri, user, pass) = neo4j_init::get_credentials_from_env()?; - let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?; - - let mut user_list = offline_matching::get_exchange_users_only_outflows(&pool).await?; - user_list - .sort_by(|a, b: &offline_matching::MinFunding| b.funded.partial_cmp(&a.funded).unwrap()); - // dbg!(&r[..10]); - - let deposits = offline_matching::get_date_range_deposits_alt( - &pool, - 1000, - parse_date("2024-01-07"), - parse_date("2024-07-22"), - ) - .await - .unwrap_or_default(); - - // dbg!(&deposits[..10]); - - let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - - let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); - - m.match_exact_sellers(&user_list, &deposits, 1.01); - - let _ = m - .depth_search_by_top_n_accounts( - &pool, - parse_date("2024-01-07"), - parse_date("2024-07-22"), - 10, - Some(dir), - ) - .await; - dbg!(&m.definite.len()); - - Ok(()) -} From 628e44af520c3dbd37177db97df5f74e02bd6441 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:01:08 -0500 Subject: [PATCH 12/23] save cache --- src/warehouse_cli.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index ea6765d..14cce96 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -115,8 +115,10 @@ pub enum AnalyticsSub { TradesMatching { #[clap(long)] + /// start day (exclusive) of trades YYYY-MM-DD start_day: String, #[clap(long)] + /// end day (exclusive) of trades YYYY-MM-DD end_day: String, #[clap(long)] @@ -234,14 +236,17 @@ impl WarehouseCli { start_day, end_day, } => { - let pool = try_db_connection_pool(self).await?; - let dir: PathBuf = PathBuf::from("."); if *clear_cache { Matching::clear_cache(&dir)?; } + if replay_balances.is_none() && match_simple_dumps.is_none() { + bail!("nothing to do. Must enter --replay-balance or --match-simple-dumps") + } + let pool = try_db_connection_pool(self).await?; + let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default(); if let Some(top_n) = replay_balances { let _ = m @@ -250,7 +255,7 @@ impl WarehouseCli { date_util::parse_date(start_day), date_util::parse_date(end_day), *top_n, - Some(dir), + Some(dir.clone()), ) .await; } @@ -265,7 +270,10 @@ impl WarehouseCli { .await?; } - println!("{:?}", &m.definite); + m.write_cache_to_file(&dir)?; + m.write_definite_to_file(&dir)?; + + println!("{:#}", json!(&m.definite)); } }, }; From 3b3819319453cf5abd5d96d2b024c7d1ae6e1af6 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 12 Dec 2024 15:13:23 -0500 Subject: [PATCH 13/23] docs --- docs/local_testing.md | 14 ++++++++++++++ src/analytics/offline_matching.rs | 6 +++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/docs/local_testing.md b/docs/local_testing.md index 11cbed5..503a721 100644 --- a/docs/local_testing.md +++ b/docs/local_testing.md @@ -28,3 +28,17 @@ RETURN COUNT(DISTINCT(r)) ``` Should return `25450` + +# Testing offline analytics +NOTE: you must have a fully populated DB to run these queries + +Replay the funding requirement on an exchange and match to deposits. This is slow. +``` +cargo r analytics trades-matching --start-day 2024-01-07 --end-day 2024-01-15 --replay-balances 10 + +``` + +Match simple dumps +``` +cargo r analytics trades-matching --start-day 2024-01-07 --end-day 2024-01-15 --match-simple-dumps 1.01 +``` diff --git a/src/analytics/offline_matching.rs b/src/analytics/offline_matching.rs index 36c80d8..0ee44f7 100644 --- a/src/analytics/offline_matching.rs +++ b/src/analytics/offline_matching.rs @@ -369,7 +369,7 @@ impl Matching { // after each loop update the file if let Some(p) = &save_dir { - let _ = self.write_definite_to_file(&p.join("definite.json")); + let _ = self.write_definite_to_file(p); let _ = self.write_cache_to_file(p); } } @@ -406,7 +406,6 @@ impl Matching { ) -> Result<()> { let mut user_list = get_exchange_users_only_outflows(pool).await?; user_list.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap()); - dbg!(&user_list.len()); let deposits = get_date_range_deposits_alt(pool, 1000, start, end) .await @@ -529,8 +528,9 @@ impl Matching { Ok(serde_json::from_str(&json_string)?) } - pub fn write_definite_to_file(&self, path: &Path) -> Result<()> { + pub fn write_definite_to_file(&self, dir: &Path) -> Result<()> { // Serialize the BTreeMap to a JSON string + let path = &dir.join("definite.json"); let json_string = serde_json::to_string_pretty(&self.definite).expect("Failed to serialize"); From 5418bfdfec570a4cbbebbd36888998303501576a Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:10:57 -0500 Subject: [PATCH 14/23] index --- src/neo4j_init.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index f2eb51d..31620bb 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -29,6 +29,9 @@ pub static INDEX_TX_HASH: &str = pub static INDEX_TX_FUNCTION: &str = "CREATE INDEX tx_function IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.function)"; +pub static INDEX_TX_RELATION: &str = + "CREATE INDEX tx_relation IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.relation)"; + pub static INDEX_SWAP_ID: &str = "CREATE INDEX swap_account_id IF NOT EXISTS FOR (n:SwapAccount) ON (n.swap_id)"; @@ -74,6 +77,7 @@ pub async fn maybe_create_indexes(graph: &Graph) -> Result<()> { INDEX_TX_TIMESTAMP, INDEX_TX_HASH, INDEX_TX_FUNCTION, + INDEX_TX_RELATION, INDEX_SWAP_ID, INDEX_EXCHANGE_LEDGER, INDEX_EXCHANGE_LINK_LEDGER, From 7f97305a5487cb7f45bc57b580775cb442c3cf31 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 16:37:17 -0500 Subject: [PATCH 15/23] refactor ordertyp --- src/analytics/enrich_account_funding.rs | 17 +- src/analytics/enrich_rms.rs | 228 ++++++++++++++++-------- src/schema_exchange_orders.rs | 41 ++++- 3 files changed, 198 insertions(+), 88 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 7a9d095..154fa38 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -9,7 +9,7 @@ use std::{ io::Read, }; -use crate::schema_exchange_orders::ExchangeOrder; +use crate::schema_exchange_orders::{ExchangeOrder, OrderType}; #[cfg(test)] use crate::date_util::parse_date; @@ -46,8 +46,8 @@ impl BalanceTracker { pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { let date = order.filled_at; - match order.order_type.as_str() { - "Buy" => { + match order.order_type { + OrderType::Buy => { // user offered to buy coins (Buyer) // he sends USD // accepter sends coins. (Seller) @@ -55,7 +55,7 @@ impl BalanceTracker { self.update_balance_and_flows_alt(order.user, date, order.amount, true); self.update_balance_and_flows_alt(order.accepter, date, order.amount, false); } - "Sell" => { + OrderType::Sell => { // user offered to sell coins (Seller) // he sends Coins // accepter sends USD. (Buyer) @@ -256,7 +256,7 @@ fn test_replay_transactions() { // user_1 sends USD, user_2 moves 10 coins. ExchangeOrder { user: 1, - order_type: "Buy".to_string(), + order_type: OrderType::Buy, amount: 10.0, price: 2.0, created_at: parse_date("2024-03-01"), @@ -267,12 +267,13 @@ fn test_replay_transactions() { price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + ..Default::default() }, ExchangeOrder { // user 2 creates an offer to SELL, user 3 accepts. // user 3 sends USD user 2 moves amount of coins. user: 2, - order_type: "Sell".to_string(), + order_type: OrderType::Sell, amount: 5.0, price: 3.0, created_at: parse_date("2024-03-05"), @@ -283,12 +284,13 @@ fn test_replay_transactions() { price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + ..Default::default() }, // user 3 creates an offer to BUY, user 1 accepts. // user 3 sends USD user 1 moves amount of coins. ExchangeOrder { user: 3, - order_type: "Buy".to_string(), + order_type: OrderType::Buy, amount: 15.0, price: 1.5, created_at: parse_date("2024-03-10"), @@ -299,6 +301,7 @@ fn test_replay_transactions() { price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + ..Default::default() }, ]; diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index f24b9b6..99d71f1 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -1,7 +1,7 @@ use chrono::Duration; use std::collections::VecDeque; -use crate::schema_exchange_orders::ExchangeOrder; +use crate::schema_exchange_orders::{CompetingOffers, ExchangeOrder}; fn calculate_rms(data: &[f64]) -> f64 { let (sum, count) = data @@ -80,48 +80,126 @@ pub fn include_rms_stats(swaps: &mut [ExchangeOrder]) { } } +fn get_competing_offers( + current_order: &ExchangeOrder, + all_offers: &[ExchangeOrder], +) -> CompetingOffers { + // for o in all_offers { + // // is the offer + // if o.filled_at > current_order.filled_at && + // o.created_at <= current_order.filled_at { + // competing.total_offers += 1; + + // if o.amount <= current_order.amount { + // competing. + // } + + // o.price <= current_swap.price + + // // && + // // o.amount <= current_swap.amount), + + // } + // } + // // Filter for open trades + // let open_orders = swaps + // .iter_mut() + // .filter(|&other_swap| { + // other_swap.filled_at > current_swap.filled_at + // && other_swap.created_at <= current_swap.filled_at + // }) + // .collect::>(); + + // competing.total_open_offers = open_orders.len() as u64; + // competing.total_buy_offers = open_orders.iter().fold(0, |acc, el| { + // if &el.order_type == "Buy" { + // acc + 1 + // } else { + // acc + // } + // }); + // competing.total_sell_offers = open_orders.iter().fold(0, |acc, el| { + // if &el.order_type == "Sell" { + // acc + 1 + // } else { + // acc + // } + // }); + CompetingOffers::default() +} +pub fn process_shill_alt(swaps: &mut [ExchangeOrder]) { + swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at + + // for current_swap in swaps.iter_mut() { + // // competing.total_sell_offers = open_orders.iter().fold(0, |acc, el| { &el.order_type == "Sell"}); + + // // Determine if the current swap took the best price + // let is_shill_bid = match current_swap.order_type { + // // Signs of shill trades. + // // For those offering to SELL coins, as the tx.user (offerer) + // // I should offer to sell near the current clearing price. + // // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. + // // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. + // OrderType::Sell => open_orders.iter().any(|other_swap| + // // if there are cheaper SELL offers, + // // for smaller sizes, then the rational honest actor + // // will pick one of those. + // // So we find the list of open orders which would be + // // better than the one taken how. + // // if there are ANY available, then this SELL order was + // // filled dishonestly. + // other_swap.price <= current_swap.price && + // other_swap.amount <= current_swap.amount), + // _ => false, + // }; + + // // Update the swap with the best price flag + // current_swap.shill_bid = Some(is_shill_bid); + // } +} + pub fn process_sell_order_shill(swaps: &mut [ExchangeOrder]) { swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at - for i in 0..swaps.len() { - let current_swap = &swaps[i]; - // TODO: move this to a filter on the enclosing scope - if current_swap.shill_bid.is_some() { - continue; - }; + // for i in 0..swaps.len() { + // let current_swap = &swaps[i]; + // // TODO: move this to a filter on the enclosing scope + // if current_swap.shill_bid.is_some() { + // continue; + // }; - // Filter for open trades - let open_orders = swaps - .iter() - .filter(|&other_swap| { - other_swap.filled_at > current_swap.filled_at - && other_swap.created_at <= current_swap.filled_at - }) - .collect::>(); - - // Determine if the current swap took the best price - let is_shill_bid = match current_swap.order_type.as_str() { - // Signs of shill trades. - // For those offering to SELL coins, as the tx.user (offerer) - // I should offer to sell near the current clearing price. - // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. - // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. - "Sell" => open_orders.iter().any(|other_swap| - // if there are cheaper SELL offers, - // for smaller sizes, then the rational honest actor - // will pick one of those. - // So we find the list of open orders which would be - // better than the one taken how. - // if there are ANY available, then this SELL order was - // filled dishonestly. - other_swap.price <= current_swap.price && - other_swap.amount <= current_swap.amount), - _ => false, - }; + // // Filter for open trades + // let open_orders = swaps + // .iter() + // .filter(|&other_swap| { + // other_swap.filled_at > current_swap.filled_at + // && other_swap.created_at <= current_swap.filled_at + // }) + // .collect::>(); - // Update the swap with the best price flag - swaps[i].shill_bid = Some(is_shill_bid); - } + // // Determine if the current swap took the best price + // let is_shill_bid = match current_swap.order_type.as_str() { + // // Signs of shill trades. + // // For those offering to SELL coins, as the tx.user (offerer) + // // I should offer to sell near the current clearing price. + // // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. + // // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. + // "Sell" => open_orders.iter().any(|other_swap| + // // if there are cheaper SELL offers, + // // for smaller sizes, then the rational honest actor + // // will pick one of those. + // // So we find the list of open orders which would be + // // better than the one taken how. + // // if there are ANY available, then this SELL order was + // // filled dishonestly. + // other_swap.price <= current_swap.price && + // other_swap.amount <= current_swap.amount), + // _ => false, + // }; + + // // Update the swap with the best price flag + // swaps[i].shill_bid = Some(is_shill_bid); + // } } pub fn process_buy_order_shill(swaps: &mut [ExchangeOrder]) { @@ -136,37 +214,37 @@ pub fn process_buy_order_shill(swaps: &mut [ExchangeOrder]) { continue; }; - // Filter for open trades - let open_orders = swaps - .iter() - .filter(|&other_swap| { - other_swap.filled_at > current_swap.created_at - && other_swap.created_at <= current_swap.created_at - }) - .collect::>(); - - // Determine if the current swap took the best price - let is_shill_bid = match current_swap.order_type.as_str() { - // Signs of shill trades. - // For those offering to BUY coins, as the tx.user (offerer) - // An honest and rational actor would not create a buy order - // higher than other SELL offers which have not been filled. - // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. - "Buy" => open_orders.iter().any(|other_swap| { - if other_swap.order_type == *"Sell" { - // this is not a rational trade if there are - // SELL offers of the same amount (or smaller) - // at a price equal or lower. - return other_swap.price <= current_swap.price - && other_swap.amount <= current_swap.amount; - } - false - }), - _ => false, - }; + // // Filter for open trades + // let open_orders = swaps + // .iter() + // .filter(|&other_swap| { + // other_swap.filled_at > current_swap.created_at + // && other_swap.created_at <= current_swap.created_at + // }) + // .collect::>(); + + // // Determine if the current swap took the best price + // let is_shill_bid = match current_swap.order_type.as_str() { + // // Signs of shill trades. + // // For those offering to BUY coins, as the tx.user (offerer) + // // An honest and rational actor would not create a buy order + // // higher than other SELL offers which have not been filled. + // // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. + // "Buy" => open_orders.iter().any(|other_swap| { + // if other_swap.order_type == *"Sell" { + // // this is not a rational trade if there are + // // SELL offers of the same amount (or smaller) + // // at a price equal or lower. + // return other_swap.price <= current_swap.price + // && other_swap.amount <= current_swap.amount; + // } + // false + // }), + // _ => false, + // }; // Update the swap with the best price flag - swaps[i].shill_bid = Some(is_shill_bid); + swaps[i].shill_bid = Some(false); } } @@ -186,12 +264,13 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 100.0, - order_type: "Buy".into(), + order_type: OrderType::Buy, rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + ..Default::default() }, // less than 12 hours later next trade 5/6/2024 8AM ExchangeOrder { @@ -205,12 +284,13 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 4.0, - order_type: "Buy".into(), + order_type: OrderType::Buy, rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + ..Default::default() }, // less than one hour later ExchangeOrder { @@ -224,12 +304,13 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 4.0, - order_type: "Buy".into(), + order_type: OrderType::Buy, rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + ..Default::default() }, // same time as previous but different traders ExchangeOrder { @@ -243,12 +324,7 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 32.0, - order_type: "Sell".into(), - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, ]; diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index 47868dc..79650cd 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -1,14 +1,33 @@ +use std::fmt; + use anyhow::Result; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, Serialize}; + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub enum OrderType { + Buy, + #[default] + Sell, +} + +impl fmt::Display for OrderType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self { + OrderType::Buy => write!(f, "Buy"), + OrderType::Sell => write!(f, "Sell"), + } + } +} -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[allow(dead_code)] + pub struct ExchangeOrder { pub user: u32, #[serde(rename = "orderType")] - pub order_type: String, + pub order_type: OrderType, #[serde(deserialize_with = "deserialize_amount")] pub amount: f64, #[serde(deserialize_with = "deserialize_amount")] @@ -26,13 +45,24 @@ pub struct ExchangeOrder { pub price_vs_rms_24hour: f64, #[serde(skip_deserializing)] pub shill_bid: Option, // New field to indicate if it took the best price + #[serde(skip_deserializing)] + pub competing_offers: Option, // New field to indicate if it took the best price +} +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct CompetingOffers { + pub offer_type: OrderType, + pub total_open_offers: u64, + pub offers_same_type: u64, + pub offers_within_amount: u64, + pub offers_within_amount_lower_price: u64, + pub offers_within_amount_higher_price: u64, } impl Default for ExchangeOrder { fn default() -> Self { Self { user: 0, - order_type: "Sell".to_string(), + order_type: OrderType::Sell, amount: 1.0, price: 1.0, created_at: DateTime::::from_timestamp_nanos(0), @@ -43,6 +73,7 @@ impl Default for ExchangeOrder { price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, shill_bid: None, + competing_offers: None, } } } @@ -147,7 +178,7 @@ fn test_deserialize_orders() { // Check that the result matches the expected values assert_eq!(orders.len(), 4); assert_eq!(orders[0].user, 1); - assert_eq!(orders[0].order_type, "Sell"); + assert_eq!(orders[0].order_type, OrderType::Sell); assert_eq!(orders[0].amount, 40000.000); assert_eq!(orders[0].accepter, 3768); } From c6bfccf47916eb4d6919656a20fecdbb21598689 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:15:51 -0500 Subject: [PATCH 16/23] shill refactor --- src/analytics/enrich_account_funding.rs | 6 - src/analytics/enrich_rms.rs | 315 ++++++++++++------------ src/load_exchange_orders.rs | 3 +- src/schema_exchange_orders.rs | 20 +- tests/test_enrich_exchange.rs | 16 +- 5 files changed, 170 insertions(+), 190 deletions(-) diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 154fa38..5c00324 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -62,9 +62,6 @@ impl BalanceTracker { self.update_balance_and_flows_alt(order.accepter, date, order.amount, true); self.update_balance_and_flows_alt(order.user, date, order.amount, false); } - _ => { - println!("ERROR: not a valid Buy/Sell order, {:?}", &order); - } } } fn update_balance_and_flows_alt( @@ -266,7 +263,6 @@ fn test_replay_transactions() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, ..Default::default() }, ExchangeOrder { @@ -283,7 +279,6 @@ fn test_replay_transactions() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, ..Default::default() }, // user 3 creates an offer to BUY, user 1 accepts. @@ -300,7 +295,6 @@ fn test_replay_transactions() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, ..Default::default() }, ]; diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index 99d71f1..86b4b8e 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -1,7 +1,7 @@ use chrono::Duration; use std::collections::VecDeque; -use crate::schema_exchange_orders::{CompetingOffers, ExchangeOrder}; +use crate::schema_exchange_orders::{CompetingOffers, ExchangeOrder, OrderType}; fn calculate_rms(data: &[f64]) -> f64 { let (sum, count) = data @@ -84,170 +84,162 @@ fn get_competing_offers( current_order: &ExchangeOrder, all_offers: &[ExchangeOrder], ) -> CompetingOffers { - // for o in all_offers { - // // is the offer - // if o.filled_at > current_order.filled_at && - // o.created_at <= current_order.filled_at { - // competing.total_offers += 1; - - // if o.amount <= current_order.amount { - // competing. - // } - - // o.price <= current_swap.price - - // // && - // // o.amount <= current_swap.amount), - - // } - // } - // // Filter for open trades - // let open_orders = swaps - // .iter_mut() - // .filter(|&other_swap| { - // other_swap.filled_at > current_swap.filled_at - // && other_swap.created_at <= current_swap.filled_at - // }) - // .collect::>(); - - // competing.total_open_offers = open_orders.len() as u64; - // competing.total_buy_offers = open_orders.iter().fold(0, |acc, el| { - // if &el.order_type == "Buy" { - // acc + 1 - // } else { - // acc - // } - // }); - // competing.total_sell_offers = open_orders.iter().fold(0, |acc, el| { - // if &el.order_type == "Sell" { - // acc + 1 - // } else { - // acc - // } - // }); - CompetingOffers::default() -} -pub fn process_shill_alt(swaps: &mut [ExchangeOrder]) { - swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at - - // for current_swap in swaps.iter_mut() { - // // competing.total_sell_offers = open_orders.iter().fold(0, |acc, el| { &el.order_type == "Sell"}); - - // // Determine if the current swap took the best price - // let is_shill_bid = match current_swap.order_type { - // // Signs of shill trades. - // // For those offering to SELL coins, as the tx.user (offerer) - // // I should offer to sell near the current clearing price. - // // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. - // // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. - // OrderType::Sell => open_orders.iter().any(|other_swap| - // // if there are cheaper SELL offers, - // // for smaller sizes, then the rational honest actor - // // will pick one of those. - // // So we find the list of open orders which would be - // // better than the one taken how. - // // if there are ANY available, then this SELL order was - // // filled dishonestly. - // other_swap.price <= current_swap.price && - // other_swap.amount <= current_swap.amount), - // _ => false, - // }; - - // // Update the swap with the best price flag - // current_swap.shill_bid = Some(is_shill_bid); - // } -} - -pub fn process_sell_order_shill(swaps: &mut [ExchangeOrder]) { - swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at - - // for i in 0..swaps.len() { - // let current_swap = &swaps[i]; - // // TODO: move this to a filter on the enclosing scope - // if current_swap.shill_bid.is_some() { - // continue; - // }; - - // // Filter for open trades - // let open_orders = swaps - // .iter() - // .filter(|&other_swap| { - // other_swap.filled_at > current_swap.filled_at - // && other_swap.created_at <= current_swap.filled_at - // }) - // .collect::>(); - - // // Determine if the current swap took the best price - // let is_shill_bid = match current_swap.order_type.as_str() { - // // Signs of shill trades. - // // For those offering to SELL coins, as the tx.user (offerer) - // // I should offer to sell near the current clearing price. - // // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. - // // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. - // "Sell" => open_orders.iter().any(|other_swap| - // // if there are cheaper SELL offers, - // // for smaller sizes, then the rational honest actor - // // will pick one of those. - // // So we find the list of open orders which would be - // // better than the one taken how. - // // if there are ANY available, then this SELL order was - // // filled dishonestly. - // other_swap.price <= current_swap.price && - // other_swap.amount <= current_swap.amount), - // _ => false, - // }; - - // // Update the swap with the best price flag - // swaps[i].shill_bid = Some(is_shill_bid); - // } -} - -pub fn process_buy_order_shill(swaps: &mut [ExchangeOrder]) { - // NEED to sort by created_at to identify shill created BUY orders - swaps.sort_by_key(|swap| swap.created_at); + let mut competition = CompetingOffers { + offer_type: current_order.order_type.clone(), + ..Default::default() + }; - for i in 0..swaps.len() { - let current_swap = &swaps[i]; - - // TODO: move this to a filter on the enclosing scope - if current_swap.shill_bid.is_some() { + for o in all_offers { + if competition.offer_type != o.order_type { continue; - }; + } + + // is the offer + if o.filled_at > current_order.filled_at && o.created_at <= current_order.filled_at { + competition.open_same_type += 1; + if o.amount <= current_order.amount { + competition.within_amount += 1; - // // Filter for open trades - // let open_orders = swaps - // .iter() - // .filter(|&other_swap| { - // other_swap.filled_at > current_swap.created_at - // && other_swap.created_at <= current_swap.created_at - // }) - // .collect::>(); - - // // Determine if the current swap took the best price - // let is_shill_bid = match current_swap.order_type.as_str() { - // // Signs of shill trades. - // // For those offering to BUY coins, as the tx.user (offerer) - // // An honest and rational actor would not create a buy order - // // higher than other SELL offers which have not been filled. - // // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. - // "Buy" => open_orders.iter().any(|other_swap| { - // if other_swap.order_type == *"Sell" { - // // this is not a rational trade if there are - // // SELL offers of the same amount (or smaller) - // // at a price equal or lower. - // return other_swap.price <= current_swap.price - // && other_swap.amount <= current_swap.amount; - // } - // false - // }), - // _ => false, - // }; - - // Update the swap with the best price flag - swaps[i].shill_bid = Some(false); + if o.price <= current_order.price { + competition.within_amount_lower_price += 1; + } + } + } + } + competition +} +pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { + all_transactions.sort_by_key(|el| el.filled_at); // Sort by filled_at + + for i in 0..all_transactions.len() { + // TODO: gross, don't enumerate, borrow checker you won the battle + let mut current_order = all_transactions[i].clone(); + let comp = get_competing_offers(¤t_order, all_transactions); + + // We can only evaluate if an "accepter" is engaged in shill behavior. + // the "offerer" may create unreasonable offers, but the shill trade requires someone accepting. + + match current_order.order_type { + // An accepter may be looking to dispose of coins. + // They must fill someone else's "BUY" offer. + + // Rationally would want to dispose at the highest price possible. + // so if we find that there were more HIGHER offers to buy which this accepter did not take, we must wonder why they are taking a lower price voluntarily. + // it would indicate they are shilling_down + OrderType::Buy => { + if let Some(higher_priced_orders) = comp + .within_amount + .checked_sub(comp.within_amount_lower_price) + { + if higher_priced_orders > 0 { + current_order.accepter_shill_down = true + } + } + // Similarly an accepter may be looking to accumulate coins. + // They rationally will do so at the lowest price available + // We want to check if they are ignoring lower priced offers + // of the same or lower amount. + // If so it means they are pushing the price up. + } + OrderType::Sell => { + if comp.within_amount_lower_price > 0 { + current_order.accepter_shill_up = true + } + } + } } } +// pub fn process_sell_order_shill(swaps: &mut [ExchangeOrder]) { +// swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at + +// // for i in 0..swaps.len() { +// // let current_swap = &swaps[i]; +// // // TODO: move this to a filter on the enclosing scope +// // if current_swap.shill_bid.is_some() { +// // continue; +// // }; + +// // // Filter for open trades +// // let open_orders = swaps +// // .iter() +// // .filter(|&other_swap| { +// // other_swap.filled_at > current_swap.filled_at +// // && other_swap.created_at <= current_swap.filled_at +// // }) +// // .collect::>(); + +// // // Determine if the current swap took the best price +// // let is_shill_bid = match current_swap.order_type.as_str() { +// // // Signs of shill trades. +// // // For those offering to SELL coins, as the tx.user (offerer) +// // // I should offer to sell near the current clearing price. +// // // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. +// // // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. +// // "Sell" => open_orders.iter().any(|other_swap| +// // // if there are cheaper SELL offers, +// // // for smaller sizes, then the rational honest actor +// // // will pick one of those. +// // // So we find the list of open orders which would be +// // // better than the one taken how. +// // // if there are ANY available, then this SELL order was +// // // filled dishonestly. +// // other_swap.price <= current_swap.price && +// // other_swap.amount <= current_swap.amount), +// // _ => false, +// // }; + +// // // Update the swap with the best price flag +// // swaps[i].shill_bid = Some(is_shill_bid); +// // } +// } + +// pub fn process_buy_order_shill(swaps: &mut [ExchangeOrder]) { +// // NEED to sort by created_at to identify shill created BUY orders +// swaps.sort_by_key(|swap| swap.created_at); + +// for i in 0..swaps.len() { +// let current_swap = &swaps[i]; + +// // TODO: move this to a filter on the enclosing scope +// if current_swap.shill_bid.is_some() { +// continue; +// }; + +// // // Filter for open trades +// // let open_orders = swaps +// // .iter() +// // .filter(|&other_swap| { +// // other_swap.filled_at > current_swap.created_at +// // && other_swap.created_at <= current_swap.created_at +// // }) +// // .collect::>(); + +// // // Determine if the current swap took the best price +// // let is_shill_bid = match current_swap.order_type.as_str() { +// // // Signs of shill trades. +// // // For those offering to BUY coins, as the tx.user (offerer) +// // // An honest and rational actor would not create a buy order +// // // higher than other SELL offers which have not been filled. +// // // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. +// // "Buy" => open_orders.iter().any(|other_swap| { +// // if other_swap.order_type == *"Sell" { +// // // this is not a rational trade if there are +// // // SELL offers of the same amount (or smaller) +// // // at a price equal or lower. +// // return other_swap.price <= current_swap.price +// // && other_swap.amount <= current_swap.amount; +// // } +// // false +// // }), +// // _ => false, +// // }; + +// // Update the swap with the best price flag +// swaps[i].shill_bid = Some(false); +// } +// } + #[test] fn test_rms_pipeline() { use chrono::{DateTime, Utc}; @@ -269,7 +261,6 @@ fn test_rms_pipeline() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, ..Default::default() }, // less than 12 hours later next trade 5/6/2024 8AM @@ -289,7 +280,6 @@ fn test_rms_pipeline() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, ..Default::default() }, // less than one hour later @@ -309,7 +299,6 @@ fn test_rms_pipeline() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, ..Default::default() }, // same time as previous but different traders @@ -343,6 +332,6 @@ fn test_rms_pipeline() { assert!(s3.rms_hour == 4.0); assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0)); - process_sell_order_shill(&mut swaps); + process_shill(&mut swaps); dbg!(&swaps); } diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 963a4c4..6bc4109 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -93,8 +93,7 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res info!("completed rms statistics"); // find likely shill bids - enrich_rms::process_sell_order_shill(&mut orders); - enrich_rms::process_buy_order_shill(&mut orders); + enrich_rms::process_shill(&mut orders); info!("completed shill bid calculation"); let mut balances = BalanceTracker::new(); diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index 79650cd..ea76579 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -44,18 +44,18 @@ pub struct ExchangeOrder { #[serde(skip_deserializing)] pub price_vs_rms_24hour: f64, #[serde(skip_deserializing)] - pub shill_bid: Option, // New field to indicate if it took the best price + pub accepter_shill_down: bool, // an accepter pushing price down + #[serde(skip_deserializing)] + pub accepter_shill_up: bool, // an accepter pushing price up #[serde(skip_deserializing)] pub competing_offers: Option, // New field to indicate if it took the best price } #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct CompetingOffers { pub offer_type: OrderType, - pub total_open_offers: u64, - pub offers_same_type: u64, - pub offers_within_amount: u64, - pub offers_within_amount_lower_price: u64, - pub offers_within_amount_higher_price: u64, + pub open_same_type: u64, + pub within_amount: u64, + pub within_amount_lower_price: u64, } impl Default for ExchangeOrder { @@ -72,7 +72,8 @@ impl Default for ExchangeOrder { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + accepter_shill_down: false, + accepter_shill_up: false, competing_offers: None, } } @@ -83,7 +84,7 @@ impl ExchangeOrder { /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { format!( - r#"{{user: {}, accepter: {}, order_type: "{}", amount: {}, price:{}, created_at: datetime("{}"), created_at_ts: {}, filled_at: datetime("{}"), filled_at_ts: {}, shill_bid: {}, rms_hour: {}, rms_24hour: {}, price_vs_rms_hour: {}, price_vs_rms_24hour: {} }}"#, + r#"{{user: {}, accepter: {}, order_type: "{}", amount: {}, price:{}, created_at: datetime("{}"), created_at_ts: {}, filled_at: datetime("{}"), filled_at_ts: {}, accepter_shill_up: {},accepter_shill_down: {}, rms_hour: {}, rms_24hour: {}, price_vs_rms_hour: {}, price_vs_rms_24hour: {} }}"#, self.user, self.accepter, self.order_type, @@ -93,7 +94,8 @@ impl ExchangeOrder { self.created_at.timestamp_micros(), self.filled_at.to_rfc3339(), self.filled_at.timestamp_micros(), - self.shill_bid.unwrap_or(false), + self.accepter_shill_down, + self.accepter_shill_up, self.rms_hour, self.rms_24hour, self.price_vs_rms_hour, diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index a6ca3d9..c3df73d 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -49,13 +49,11 @@ fn test_sell_order_shill() { let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); assert!(orders.len() == 25450); - enrich_rms::process_sell_order_shill(&mut orders); + enrich_rms::process_shill(&mut orders); let count_shill = orders.iter().fold(0, |mut acc, el| { - if let Some(is_shill) = el.shill_bid { - if is_shill { - acc += 1 - } + if el.accepter_shill_up { + acc += 1 } acc }); @@ -85,13 +83,11 @@ fn test_enrich_buy_shill() { let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); assert!(orders.len() == 25450); - enrich_rms::process_buy_order_shill(&mut orders); + enrich_rms::process_shill(&mut orders); let count_shill = orders.iter().fold(0, |mut acc, el| { - if let Some(is_shill) = el.shill_bid { - if is_shill { - acc += 1 - } + if el.accepter_shill_down { + acc += 1 } acc }); From 94f300c52b24d339b565acd060a349fd26dbe3f0 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 17:32:46 -0500 Subject: [PATCH 17/23] clippy --- src/analytics/enrich_rms.rs | 16 ++++++++++------ tests/test_enrich_exchange.rs | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index 86b4b8e..a65135a 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -89,23 +89,26 @@ fn get_competing_offers( ..Default::default() }; - for o in all_offers { - if competition.offer_type != o.order_type { + for other in all_offers { + if competition.offer_type != other.order_type { continue; } - // is the offer - if o.filled_at > current_order.filled_at && o.created_at <= current_order.filled_at { + // is the other offer created in the past, and still not filled + if other.created_at < current_order.filled_at && other.filled_at > current_order.filled_at { competition.open_same_type += 1; - if o.amount <= current_order.amount { + if other.amount <= current_order.amount { competition.within_amount += 1; - if o.price <= current_order.price { + if other.price <= current_order.price { competition.within_amount_lower_price += 1; } } } } + if competition.offer_type == OrderType::Sell && competition.within_amount_lower_price > 0 { + dbg!(&competition); + } competition } pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { @@ -115,6 +118,7 @@ pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { // TODO: gross, don't enumerate, borrow checker you won the battle let mut current_order = all_transactions[i].clone(); let comp = get_competing_offers(¤t_order, all_transactions); + // dbg!(&comp); // We can only evaluate if an "accepter" is engaged in shill behavior. // the "offerer" may create unreasonable offers, but the shill trade requires someone accepting. diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index c3df73d..78de0d2 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -49,7 +49,7 @@ fn test_sell_order_shill() { let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); assert!(orders.len() == 25450); - enrich_rms::process_shill(&mut orders); + enrich_rms::process_shill(&mut orders[..100]); let count_shill = orders.iter().fold(0, |mut acc, el| { if el.accepter_shill_up { From 387b892f826e444304d2516f02789dab050b9a15 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 18:08:12 -0500 Subject: [PATCH 18/23] patch persist shill --- src/analytics/enrich_rms.rs | 107 +++------------------------------- src/schema_exchange_orders.rs | 1 - tests/test_enrich_exchange.rs | 11 +--- 3 files changed, 11 insertions(+), 108 deletions(-) diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index a65135a..0fc93d6 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -106,24 +106,22 @@ fn get_competing_offers( } } } - if competition.offer_type == OrderType::Sell && competition.within_amount_lower_price > 0 { - dbg!(&competition); - } + competition } pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { all_transactions.sort_by_key(|el| el.filled_at); // Sort by filled_at - for i in 0..all_transactions.len() { - // TODO: gross, don't enumerate, borrow checker you won the battle - let mut current_order = all_transactions[i].clone(); - let comp = get_competing_offers(¤t_order, all_transactions); - // dbg!(&comp); + // TODO: gross, see what you make me do, borrow checker. + let temp_tx = all_transactions.to_vec(); + + for current_order in all_transactions.iter_mut() { + let comp = get_competing_offers(current_order, &temp_tx); // We can only evaluate if an "accepter" is engaged in shill behavior. // the "offerer" may create unreasonable offers, but the shill trade requires someone accepting. - match current_order.order_type { + match comp.offer_type { // An accepter may be looking to dispose of coins. // They must fill someone else's "BUY" offer. @@ -147,6 +145,7 @@ pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { } OrderType::Sell => { if comp.within_amount_lower_price > 0 { + dbg!(&comp); current_order.accepter_shill_up = true } } @@ -154,96 +153,6 @@ pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { } } -// pub fn process_sell_order_shill(swaps: &mut [ExchangeOrder]) { -// swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at - -// // for i in 0..swaps.len() { -// // let current_swap = &swaps[i]; -// // // TODO: move this to a filter on the enclosing scope -// // if current_swap.shill_bid.is_some() { -// // continue; -// // }; - -// // // Filter for open trades -// // let open_orders = swaps -// // .iter() -// // .filter(|&other_swap| { -// // other_swap.filled_at > current_swap.filled_at -// // && other_swap.created_at <= current_swap.filled_at -// // }) -// // .collect::>(); - -// // // Determine if the current swap took the best price -// // let is_shill_bid = match current_swap.order_type.as_str() { -// // // Signs of shill trades. -// // // For those offering to SELL coins, as the tx.user (offerer) -// // // I should offer to sell near the current clearing price. -// // // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. -// // // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. -// // "Sell" => open_orders.iter().any(|other_swap| -// // // if there are cheaper SELL offers, -// // // for smaller sizes, then the rational honest actor -// // // will pick one of those. -// // // So we find the list of open orders which would be -// // // better than the one taken how. -// // // if there are ANY available, then this SELL order was -// // // filled dishonestly. -// // other_swap.price <= current_swap.price && -// // other_swap.amount <= current_swap.amount), -// // _ => false, -// // }; - -// // // Update the swap with the best price flag -// // swaps[i].shill_bid = Some(is_shill_bid); -// // } -// } - -// pub fn process_buy_order_shill(swaps: &mut [ExchangeOrder]) { -// // NEED to sort by created_at to identify shill created BUY orders -// swaps.sort_by_key(|swap| swap.created_at); - -// for i in 0..swaps.len() { -// let current_swap = &swaps[i]; - -// // TODO: move this to a filter on the enclosing scope -// if current_swap.shill_bid.is_some() { -// continue; -// }; - -// // // Filter for open trades -// // let open_orders = swaps -// // .iter() -// // .filter(|&other_swap| { -// // other_swap.filled_at > current_swap.created_at -// // && other_swap.created_at <= current_swap.created_at -// // }) -// // .collect::>(); - -// // // Determine if the current swap took the best price -// // let is_shill_bid = match current_swap.order_type.as_str() { -// // // Signs of shill trades. -// // // For those offering to BUY coins, as the tx.user (offerer) -// // // An honest and rational actor would not create a buy order -// // // higher than other SELL offers which have not been filled. -// // // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. -// // "Buy" => open_orders.iter().any(|other_swap| { -// // if other_swap.order_type == *"Sell" { -// // // this is not a rational trade if there are -// // // SELL offers of the same amount (or smaller) -// // // at a price equal or lower. -// // return other_swap.price <= current_swap.price -// // && other_swap.amount <= current_swap.amount; -// // } -// // false -// // }), -// // _ => false, -// // }; - -// // Update the swap with the best price flag -// swaps[i].shill_bid = Some(false); -// } -// } - #[test] fn test_rms_pipeline() { use chrono::{DateTime, Utc}; diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index ea76579..20f29cf 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -23,7 +23,6 @@ impl fmt::Display for OrderType { #[derive(Clone, Debug, Deserialize, Serialize)] #[allow(dead_code)] - pub struct ExchangeOrder { pub user: u32, #[serde(rename = "orderType")] diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 78de0d2..767903a 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -49,16 +49,11 @@ fn test_sell_order_shill() { let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); assert!(orders.len() == 25450); - enrich_rms::process_shill(&mut orders[..100]); + enrich_rms::process_shill(&mut orders); - let count_shill = orders.iter().fold(0, |mut acc, el| { - if el.accepter_shill_up { - acc += 1 - } - acc - }); + let count_shill: Vec<_> = orders.iter().filter(|el| el.accepter_shill_up).collect(); - dbg!(&count_shill); + dbg!(&count_shill.len()); // assert!(count_shill == 13723); assert!(orders.len() == 25450); From 3e9ef4ac80b2969c957672cfa4c63e2d44bf72eb Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 18:13:18 -0500 Subject: [PATCH 19/23] patch tests --- src/analytics/enrich_rms.rs | 1 - tests/test_enrich_exchange.rs | 19 +++++-------------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index 0fc93d6..2f245e8 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -145,7 +145,6 @@ pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { } OrderType::Sell => { if comp.within_amount_lower_price > 0 { - dbg!(&comp); current_order.accepter_shill_up = true } } diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 767903a..a87b94a 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -43,7 +43,7 @@ fn test_enrich_rms() { } #[test] -fn test_sell_order_shill() { +fn test_sell_shill_up() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); @@ -53,9 +53,7 @@ fn test_sell_order_shill() { let count_shill: Vec<_> = orders.iter().filter(|el| el.accepter_shill_up).collect(); - dbg!(&count_shill.len()); - - // assert!(count_shill == 13723); + assert!(count_shill.len() == 6039); assert!(orders.len() == 25450); } @@ -72,7 +70,7 @@ fn test_enrich_account_funding() { } #[test] -fn test_enrich_buy_shill() { +fn test_enrich_shill_down() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); @@ -80,16 +78,9 @@ fn test_enrich_buy_shill() { enrich_rms::process_shill(&mut orders); - let count_shill = orders.iter().fold(0, |mut acc, el| { - if el.accepter_shill_down { - acc += 1 - } - acc - }); - - dbg!(&count_shill); + let count_shill_down: Vec<_> = orders.iter().filter(|el| el.accepter_shill_down).collect(); - // assert!(count_shill == 13723); + assert!(count_shill_down.len() == 2319); assert!(orders.len() == 25450); } From 00df73776ed9484dc5bb505e8701936a1b4e3e43 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 18:17:06 -0500 Subject: [PATCH 20/23] patch batch tests --- src/schema_exchange_orders.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index 20f29cf..a90eda6 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -129,7 +129,8 @@ impl ExchangeOrder { created_at_ts: tx.created_at_ts, filled_at: tx.filled_at, filled_at_ts: tx.filled_at_ts, - shill_bid: tx.shill_bid, + accepter_shill_up: tx.accepter_shill_up, + accepter_shill_down: tx.accepter_shill_down, rms_hour: tx.rms_hour, rms_24hour: tx.rms_24hour, price_vs_rms_hour: tx.price_vs_rms_hour, From 03df6b8528ba7b1852e79aebb88c77907cf69afa Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 19:48:47 -0500 Subject: [PATCH 21/23] names --- src/load_exchange_orders.rs | 6 +++--- tests/test_analytics.rs | 6 +++--- tests/test_enrich_exchange.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 6bc4109..c0d0452 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -10,7 +10,7 @@ use crate::{ schema_exchange_orders::ExchangeOrder, }; -pub async fn swap_batch( +pub async fn exchange_txs_batch( txs: &[ExchangeOrder], pool: &Graph, batch_size: usize, @@ -99,7 +99,7 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res let mut balances = BalanceTracker::new(); balances.replay_transactions(&mut orders)?; let ledger_inserts = balances.submit_ledger(pool).await?; - info!("exchange ledger relations inserted: {}", ledger_inserts); + info!("exchange UserLedger relations inserted: {}", ledger_inserts); - swap_batch(&orders, pool, batch_size).await + exchange_txs_batch(&orders, pool, batch_size).await } diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 1875bd6..8076e37 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -30,7 +30,7 @@ async fn test_rms_single() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; // get just one analytics result, never more than one (but can be empty) let list = analytics::exchange_stats::query_rms_analytics_chunk(&graph, 900, 1, false).await?; @@ -60,7 +60,7 @@ async fn test_rms_single_persist() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; // get just one analytics result, never more than one (but can be empty) let list = analytics::exchange_stats::query_rms_analytics_chunk(&graph, 900, 1, true).await?; @@ -90,7 +90,7 @@ async fn test_rms_batch() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; let list = analytics::exchange_stats::query_rms_analytics_concurrent(&graph, None, None, false) .await?; diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index a87b94a..1ba5397 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -141,7 +141,7 @@ async fn e2e_swap_data() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; // now check data was loaded let mut result = graph From 25c12261c0564cd7f6ed62ff7480096808c396e4 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 13 Dec 2024 19:50:52 -0500 Subject: [PATCH 22/23] info prints --- src/load_exchange_orders.rs | 2 +- src/warehouse_cli.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index c0d0452..e17045d 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -99,7 +99,7 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res let mut balances = BalanceTracker::new(); balances.replay_transactions(&mut orders)?; let ledger_inserts = balances.submit_ledger(pool).await?; - info!("exchange UserLedger relations inserted: {}", ledger_inserts); + info!("exchange UserLedger state inserted: {}", ledger_inserts); exchange_txs_batch(&orders, pool, batch_size).await } diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index 14cce96..0d75e7f 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -179,12 +179,16 @@ impl WarehouseCli { let pool = try_db_connection_pool(self).await?; neo4j_init::maybe_create_indexes(&pool).await?; - load_exchange_orders::load_from_json( + let (merged, ignored) = load_exchange_orders::load_from_json( swap_record_json, &pool, batch_size.unwrap_or(250), ) .await?; + info!( + "SUCCESS: exchange transactions merged: {}, ignored: {}", + merged, ignored + ); } Sub::EnrichExchangeOnramp { onboarding_json } => { info!("exchange onramp"); From 8d8e8ccda565419ec02ac5f22a97b3a5797105ba Mon Sep 17 00:00:00 2001 From: Montague McMarten Date: Fri, 13 Dec 2024 21:28:34 -0500 Subject: [PATCH 23/23] patch cypher query --- src/schema_exchange_orders.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index a90eda6..8a98feb 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -83,7 +83,7 @@ impl ExchangeOrder { /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { format!( - r#"{{user: {}, accepter: {}, order_type: "{}", amount: {}, price:{}, created_at: datetime("{}"), created_at_ts: {}, filled_at: datetime("{}"), filled_at_ts: {}, accepter_shill_up: {},accepter_shill_down: {}, rms_hour: {}, rms_24hour: {}, price_vs_rms_hour: {}, price_vs_rms_24hour: {} }}"#, + r#"{{user: {}, accepter: {}, order_type: "{}", amount: {}, price:{}, created_at: datetime("{}"), created_at_ts: {}, filled_at: datetime("{}"), filled_at_ts: {}, accepter_shill_down: {}, accepter_shill_up: {}, rms_hour: {}, rms_24hour: {}, price_vs_rms_hour: {}, price_vs_rms_24hour: {} }}"#, self.user, self.accepter, self.order_type,