Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 11, 2024
1 parent 8574120 commit b559be8
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 27 deletions.
225 changes: 201 additions & 24 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Deposit {
account: AccountAddress,
deposited: f64,
pub account: AccountAddress,
pub deposited: f64,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand All @@ -24,32 +24,56 @@ pub struct MinFunding {
pub funded: f64,
}

pub async fn get_date_range_deposits(
pub async fn get_date_range_deposits_alt(
pool: &Graph,
top_n: u64,
_top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<Vec<Deposit>> {
let mut top_deposits = vec![];

let q = format!(
r#"
WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
MATCH
(u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
WHERE
tx.`block_datetime` > datetime("{}")
AND tx.`block_datetime` < datetime("{}")
WITH
u,
SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
ORDER BY totalTxAmount DESCENDING
RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited
LIMIT {}
"#,
WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit
// Step 1: Get the list of all depositors
MATCH (acc:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}})
WITH DISTINCT(acc) AS all, olswap_deposit
// Step 2: Match depositors and amounts within the date range
MATCH (all)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}})
WHERE
tx2.block_datetime > datetime("{}")
AND tx2.block_datetime < datetime("{}")
WITH
DISTINCT (all.address) AS account,
COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount
RETURN account, toFloat(deposit_amount) as deposited
ORDER BY deposit_amount DESC
"#,
// r#"
// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
// MATCH
// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
// WHERE
// tx.`block_datetime` > datetime("{}")
// AND tx.`block_datetime` < datetime("{}")
// WITH
// u,
// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
// ORDER BY totalTxAmount DESCENDING
// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited

// "#,
start.to_rfc3339(),
end.to_rfc3339(),
top_n,
// top_n,
);
let cypher_query = neo4rs::query(&q);

Expand All @@ -70,6 +94,73 @@ pub async fn get_date_range_deposits(
Ok(top_deposits)
}

// pub async fn get_date_range_deposits(
// pool: &Graph,
// top_n: u64,
// start: DateTime<Utc>,
// end: DateTime<Utc>,
// ) -> Result<Vec<Deposit>> {
// let mut top_deposits = vec![];

// let q = format!(
// // r#"
// // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit

// // // Step 1: Get the list of all depositors
// // MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx

// // // Step 2: Match depositors and amounts within the date range

// // UNWIND all_depositors AS depositor

// // OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WHERE tx2.block_datetime >= datetime('{}') AND tx2.block_datetime <= datetime('{}')

// // WITH
// // depositor.address AS account,
// // COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount
// // RETURN account, toFloat(deposit_amount) as deposited
// // ORDER BY deposited DESC

// // "#,
// r#"
// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
// MATCH
// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
// WHERE
// tx.`block_datetime` > datetime("{}")
// AND tx.`block_datetime` < datetime("{}")
// WITH
// DISTINCT(u),
// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
// ORDER BY totalTxAmount DESCENDING
// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited

// "#,
// start.to_rfc3339(),
// end.to_rfc3339(),
// // top_n,
// );
// let cypher_query = neo4rs::query(&q);

// // Execute the query
// let mut result = pool.execute(cypher_query).await?;

// // Fetch the first row only
// while let Some(r) = result.next().await? {
// let account_str = r.get::<String>("account").unwrap_or("unknown".to_string());
// let deposited = r.get::<f64>("deposited").unwrap_or(0.0);
// let d = Deposit {
// account: account_str.parse().unwrap_or(AccountAddress::ZERO),
// deposited,
// };
// top_deposits.push(d);
// // dbg!(&d);
// }
// Ok(top_deposits)
// }

pub async fn get_exchange_users(
pool: &Graph,
top_n: u64,
Expand Down Expand Up @@ -108,6 +199,40 @@ pub async fn get_exchange_users(
Ok(min_funding)
}

pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result<Vec<MinFunding>> {
let mut min_funding = vec![];

let q = format!(

Check failure on line 205 in src/analytics/offline_matching.rs

View workflow job for this annotation

GitHub Actions / clippy

useless use of `format!`
r#"
MATCH (e:SwapAccount)-[]-(u:UserLedger)
WHERE u.`total_inflows` = 0
WITH distinct(e.swap_id) AS user_id, max(u.`total_funded`) AS funded
RETURN user_id, funded
ORDER BY funded DESC
"#,
// start.to_rfc3339(),
// end.to_rfc3339(),
// top_n,
);
let cypher_query = neo4rs::query(&q);

// Execute the query
let mut result = pool.execute(cypher_query).await?;

// Fetch the first row only
while let Some(r) = result.next().await? {
let user_id = r.get::<u32>("user_id").unwrap_or(0);
let funded = r.get::<u64>("funded").unwrap_or(0);
let d = MinFunding {
user_id,
funded: funded as f64,
};
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}

pub async fn get_one_exchange_user(
pool: &Graph,
id: u32,
Expand Down Expand Up @@ -188,7 +313,7 @@ impl Matching {
.map(|el| el.user_id)
.collect();

dbg!(&ids);
// dbg!(&ids);
// let user_ledger = funded.iter().find(|el| {
// // check if we have already identified it
// self.definite.0.get(el.user_id).none()
Expand All @@ -207,12 +332,12 @@ impl Matching {
end: DateTime<Utc>,
save_dir: Option<PathBuf>,
) -> Result<()> {
let mut top_n = 3;
while top_n < 20 {
let mut top_n = 5;
while top_n < 25 {
let _ = self
.breadth_search_by_dates(pool, top_n, start, end, &save_dir)
.await; // don't error
top_n += 3;
top_n += 5;
}
Ok(())
}
Expand All @@ -236,8 +361,23 @@ impl Matching {
println!("day: {}", d);
let next_list = get_exchange_users(pool, top_n, start, d).await?;

// TODO: pick top of deposits
let deposits = get_date_range_deposits(pool, 100, start, d).await?;
// // TODO: pick top of deposits
// let deposits = get_date_range_deposits(pool, 1000, start, d).await.unwrap_or_default();

// if next_list.len() > 5 {
// // dbg!("next_list");
// dbg!(&next_list[..5]);
// }
// dbg!(&next_list);

let deposits = get_date_range_deposits_alt(pool, 1000, start, d)
.await
.unwrap_or_default();

// if deposits.len() > 5 {
// dbg!("alt");
// dbg!(&deposits[..5]);
// }

for u in next_list {
let _r = self.search(&u, &deposits).await;
Expand All @@ -252,6 +392,7 @@ impl Matching {

Ok(())
}

pub async fn search(
&mut self,
user: &MinFunding,
Expand All @@ -271,12 +412,48 @@ impl Matching {
bail!("could not find a candidate")
}

pub fn match_exact_sellers(
&mut self,
user_list: &[MinFunding],
deposits: &[Deposit],
tolerance: f64,
) {
user_list.iter().for_each(|user| {
let pending = self.pending.entry(user.user_id).or_default();

let candidates: Vec<AccountAddress> = deposits
.iter()
.filter_map(|el| {
if el.deposited > user.funded && // must always be slightly more
el.deposited < user.funded * tolerance &&
!pending.impossible.contains(&el.account) &&
// is also not already discovered
!self.definite.values().any(|found| found == &el.account)
{
Some(el.account)
} else {
None
}
})
.collect();

pending.maybe = candidates;

if pending.maybe.len() == 1 {
// we found a definite match, update it so the next loop doesn't include it
self.definite
.insert(user.user_id, *pending.maybe.first().unwrap());
}
})
}

pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) {
// let mut filtered_depositors = deposits.clone();
let pending = self.pending.entry(user.user_id).or_default();

let mut eval: Vec<AccountAddress> = vec![];
deposits.iter().for_each(|el| {
dbg!(&el);
if el.deposited >= user.funded &&
// must not already have been tagged impossible
!pending.impossible.contains(&el.account) &&
Expand Down
60 changes: 57 additions & 3 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async fn test_offline_analytics() -> Result<()> {
let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?;

let start_time = parse_date("2024-01-01");
let end_time = parse_date("2024-01-10");
let end_time = parse_date("2024-07-10");

let _r = offline_matching::get_date_range_deposits(&pool, 20, start_time, end_time).await?;
// dbg!(&r);
Expand All @@ -267,13 +267,67 @@ async fn test_offline_analytics_matching() -> Result<()> {
.depth_search_by_top_n_accounts(
&pool,
parse_date("2024-01-07"),
parse_date("2024-03-13"),
parse_date("2024-07-22"),
Some(dir),
)
.await;


dbg!(&m.definite);

Ok(())
}

#[tokio::test]
async fn test_easy_sellers() -> Result<()> {
libra_forensic_db::log_setup();

let (uri, user, pass) = neo4j_init::get_credentials_from_env()?;
let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?;

let mut user_list = offline_matching::get_exchange_users_only_outflows(&pool).await?;
user_list
.sort_by(|a, b: &offline_matching::MinFunding| b.funded.partial_cmp(&a.funded).unwrap());
// dbg!(&r[..10]);

let deposits = offline_matching::get_date_range_deposits_alt(
&pool,
1000,
parse_date("2024-01-07"),
parse_date("2024-07-22"),
)
.await
.unwrap_or_default();

// dbg!(&deposits[..10]);

let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default();

m.match_exact_sellers(&user_list, &deposits, 1.01);

let _ = m
.depth_search_by_top_n_accounts(
&pool,
parse_date("2024-01-07"),
parse_date("2024-07-22"),
Some(dir),
)
.await;
dbg!(&m.definite.len());

// let _ = m
// .depth_search_by_top_n_accounts(
// &pool,
// parse_date("2024-01-07"),
// parse_date("2024-07-22"),
// Some(dir),
// )
// .await;

// dbg!(&m.definite);

Ok(())
}

//

0 comments on commit b559be8

Please sign in to comment.