Skip to content

Commit

Permalink
try broad search expanding the ledger funding window
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 9, 2024
1 parent f8be7e1 commit ca9bf67
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 34 deletions.
259 changes: 230 additions & 29 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use std::{collections::BTreeMap, fs::File, io::Write, path::{Path, PathBuf}};

use anyhow::Result;
use anyhow::{bail, Result};
use chrono::{DateTime, Duration, Utc};
use diem_types::account_address::AccountAddress;
use neo4rs::Graph;
Expand All @@ -14,8 +14,8 @@ pub struct Deposit {

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MinFunding {
user_id: u32,
funded: f64,
pub user_id: u32,
pub funded: f64,
}

pub async fn get_date_range_deposits(
Expand Down Expand Up @@ -64,7 +64,7 @@ pub async fn get_date_range_deposits(
Ok(top_deposits)
}

pub async fn get_min_funding(
pub async fn get_exchange_users(
pool: &Graph,
top_n: u64,
start: DateTime<Utc>,
Expand Down Expand Up @@ -102,50 +102,251 @@ pub async fn get_min_funding(
Ok(min_funding)
}

pub async fn get_one_exchange_user(
pool: &Graph,
id: u32,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<MinFunding>> {
let mut min_funding = vec![];

let q = format!(
r#"
MATCH p=(e:SwapAccount)-[d:DailyLedger]-(ul:UserLedger)
WHERE d.date > datetime("{}")
AND d.date < datetime("{}")
AND e.swap_id = {}
WITH DISTINCT(e.swap_id) AS user_id, toFloat(max(ul.`total_funded`)) as funded
RETURN user_id, funded
"#,
start.to_rfc3339(),
end.to_rfc3339(),
id,
);
let cypher_query = neo4rs::query(&q);

// Execute the query
let mut result = pool.execute(cypher_query).await?;

// Fetch the first row only
while let Some(r) = result.next().await? {
let user_id = r.get::<u32>("user_id").unwrap_or(0);
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}

pub struct Matching {
pub definite: BTreeMap<u32, AccountAddress>,
pub pending: BTreeMap<u32, Candidates>,
}

#[derive(Clone, Default, Debug)]
pub struct Candidates {
maybe: Vec<AccountAddress>,
impossible: Vec<AccountAddress>,
pub maybe: Vec<AccountAddress>,
pub impossible: Vec<AccountAddress>,
}

#[derive(Clone, Default, Debug)]
pub struct Matching(pub BTreeMap<u32, Candidates>);
pub struct Possible {
pub user: Vec<u32>,
pub address: Vec<AccountAddress>,
}

impl Default for Matching {
fn default() -> Self {
Self::new()
}
}

impl Matching {
pub fn new() -> Self {
Matching(BTreeMap::new())
Self {
definite: BTreeMap::new(),
pending: BTreeMap::new(),
}
}

pub fn match_deposit_to_funded(&mut self, deposits: Vec<Deposit>, funded: Vec<MinFunding>) {
for f in funded.iter() {
deposits.iter().for_each(|d| {
let candidates = self.0.entry(f.user_id).or_default();
// only addresses with minimum funded could be a Maybe
if d.deposited >= f.funded {
candidates.maybe.push(d.account);
} else {
candidates.impossible.push(d.account);
pub fn get_next_search_ids(&self, funded: &[MinFunding]) -> Result<(u32, u32)> {
// assumes this is sorted by date

// find the next two which are not identified, to disambiguate.
let ids: Vec<u32> = funded
.iter()
.filter(|el| !self.definite.contains_key(&el.user_id))
.take(2)
.map(|el| el.user_id)
.collect();

dbg!(&ids);
// let user_ledger = funded.iter().find(|el| {
// // check if we have already identified it
// self.definite.0.get(el.user_id).none()
// });
Ok((*ids.first().unwrap(), *ids.get(1).unwrap()))
}

pub async fn wide_search(
&mut self,
pool: &Graph,
top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
save_file: Option<PathBuf>,
) -> Result<()> {
// expand the search
// increase the search of top users by funding by expanding the window
// this may retry a number of users, but with more users discovered
// the search space gets smaller
for d in days_in_range(start, end) {
let next_list = get_exchange_users(&pool, top_n, start, d).await?;

for u in next_list {
let _r = self.search(&pool, u.user_id, start, end).await;

// after each loop update the file
if let Some(p) = &save_file {
self.write_definite_to_file(&p)?;
}
});

}
}

Ok(())
}
}
pub async fn search(
&mut self,
pool: &Graph,
user_a: u32,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<AccountAddress> {
// exit early
if let Some(a) = self.definite.get(&user_a) {
return Ok(*a);
}

// loop each day, comparing deposits made to that point
// and funding required for user accounts only to that date
for d in days_in_range(start, end) {
let deposits = get_date_range_deposits(pool, 100, start, d).await?;

if let Some(funded_a) = get_one_exchange_user(pool, user_a, start, d).await?.first() {
self.eliminate_candidates(funded_a, &deposits);

if let Some(a) = self.definite.get(&user_a) {
return Ok(*a);
}
}
}
if let Some(a) = self.definite.get(&user_a) {
return Ok(*a);
}

bail!("could not find a candidate")
}

pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) {
// let mut filtered_depositors = deposits.clone();
let pending = self
.pending
.entry(user.user_id)
.or_insert(Candidates::default());

let mut eval: Vec<AccountAddress> = vec![];
deposits.iter().for_each(|el| {
if el.deposited >= user.funded &&
// must not already have been tagged impossible
!pending.impossible.contains(&el.account) &&
// is also not already discovered
!self.definite.values().any(|found| found == &el.account)
{
eval.push(el.account)
} else {
pending.impossible.push(el.account)
}
});

// only increment the first time.
if pending.maybe.is_empty() {
pending.maybe.append(&mut eval);
} else {
// we only keep addresses we see repeatedly (inner join)
eval.retain(|x| pending.maybe.contains(x));
if eval.len() > 0 {
pending.maybe = eval;
}
}

println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len());

if pending.maybe.len() == 1 {
// we found a definite match, update it so the next loop doesn't include it
self.definite
.insert(user.user_id, *pending.maybe.first().unwrap());
}

pub async fn rip_range(pool: &Graph, start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Matching> {
let mut matches = Matching::new();

// loop each day.
for d in days_in_range(start, end) {
let deposits = get_date_range_deposits(pool, 100, start, d).await?;
let funded = get_min_funding(pool, 20, start, d).await?;
// candidates
}

pub fn write_definite_to_file(&self, path: &Path) -> Result<()> {
// Serialize the BTreeMap to a JSON string
let json_string = serde_json::to_string_pretty(&self.definite).expect("Failed to serialize");

// Save the JSON string to a file
let mut file = File::create(path)?;
file.write_all(json_string.as_bytes())?;

matches.match_deposit_to_funded(deposits, funded);
println!("Data saved to path: {}", path.display());
Ok(())
}
}

Ok(matches)
pub fn sort_funded(funded: &mut [MinFunding]) {
// sort descending
funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap());
}

fn days_in_range(start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<DateTime<Utc>> {
// pub fn maybe_match_deposit_to_funded(
// deposits: Vec<Deposit>,
// funded: Vec<MinFunding>,
// ) -> Option<(u32, AccountAddress)> {
// // // sort descending
// // funded.sort_by(|a, b| b.funded.partial_cmp(&a.funded).unwrap());

// // // find the next two which are not identified, to disambiguate.

// for f in funded {
// // dbg!(&f);
// let mut candidate_depositors = deposits.clone();
// candidate_depositors.retain(|el| el.deposited >= f.funded);
// // dbg!(&candidate_depositors);

// if candidate_depositors.len() == 1 {
// return Some((f.user_id, candidate_depositors.pop().unwrap().account));
// }
// // deposits.iter().for_each(|d| {
// // // let mut candidates = self.pending.0.entry(f.user_id).or_default();

// // // only addresses with minimum funded could be a Maybe
// // if d.deposited >= f.funded {
// // // if we haven't previously marked this as impossible, add it as a maybe
// // if !candidates.impossible.contains(&d.account) {
// // candidates.maybe.push(d.account);
// // }
// // } else {
// // candidates.impossible.push(d.account);
// // }
// // });
// }
// None
// }

pub fn days_in_range(start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<DateTime<Utc>> {
let mut days = Vec::new();
let mut current = start;

Expand Down
39 changes: 34 additions & 5 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,23 +243,52 @@ async fn test_offline_analytics() -> Result<()> {
let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?;
// dbg!(&r);

let _r = offline_matching::get_min_funding(&pool, 20, start_time, end_time).await?;
let _r = offline_matching::get_exchange_users(&pool, 20, start_time, end_time).await?;

Ok(())
}

#[tokio::test]
async fn test_offline_analytics_matching() -> Result<()> {
libra_forensic_db::log_setup();

let (uri, user, pass) = neo4j_init::get_credentials_from_env()?;
let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?;

let start_time = parse_date("2024-01-01");
let end_time = parse_date("2024-01-10");

let r = offline_matching::rip_range(&pool, start_time, end_time).await?;
let mut m = offline_matching::Matching::new();

let p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let p = p.join("definite.json");

let _ = m.wide_search(&pool, 10, parse_date("2024-01-07"), parse_date("2024-03-13"), Some(p)).await;
// let initial_list =
// offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09"))
// .await?;

// for u in initial_list {
// let r = m
// .search(&pool, u.user_id, start_time, parse_date("2024-03-13"))
// .await;
// // dbg!(&r);
// }

dbg!(&m.definite);

// // expand the search
// for d in offline_matching::days_in_range(start_time, parse_date("2024-03-13")) {
// let next_list = offline_matching::get_exchange_users(&pool, 10, start_time, d)
// .await?;

// for u in next_list {
// let r = m
// .search(&pool, u.user_id, start_time, parse_date("2024-03-13"))
// .await;
// dbg!(&r);
// }

dbg!(&r.0.len());
// dbg!(&m.definite);
// }

Ok(())
}

0 comments on commit ca9bf67

Please sign in to comment.