From 012db7a46820c583cf901ca23082f9fbab5590bd Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Sun, 1 Dec 2024 20:34:59 -0500 Subject: [PATCH] patch shill trade logic --- src/analytics/enrich_rms.rs | 118 +++++++++++++++++----------------- src/load_exchange_orders.rs | 11 +++- src/schema_exchange_orders.rs | 15 +++++ tests/test_enrich_exchange.rs | 46 +++++++++++++ 4 files changed, 130 insertions(+), 60 deletions(-) diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index b8d3e0d..a1b7e06 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -1,22 +1,7 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::Duration; use std::collections::VecDeque; -#[derive(Debug, Clone)] -pub struct Swap { - pub from_user: String, - pub to_accepter: String, - pub filled_at: DateTime, - pub amount: f64, - pub created_at: DateTime, - pub price: f64, - pub created: bool, - pub order_type: String, - pub rms_hour: f64, - pub rms_24hour: f64, - pub price_vs_rms_hour: f64, - pub price_vs_rms_24hour: f64, - pub took_best_price: Option, // New field to indicate if it took the best price -} +use crate::schema_exchange_orders::ExchangeOrder; fn calculate_rms(data: &[f64]) -> f64 { let (sum, count) = data @@ -30,11 +15,11 @@ fn calculate_rms(data: &[f64]) -> f64 { } /// enrich swap struct with RMS data -pub fn process_swaps(swaps: &mut [Swap]) { +pub fn process_swaps(swaps: &mut [ExchangeOrder]) { swaps.sort_by_key(|swap| swap.filled_at); - let mut window_1hour: VecDeque = VecDeque::new(); - let mut window_24hour: VecDeque = VecDeque::new(); + let mut window_1hour: VecDeque = VecDeque::new(); + let mut window_24hour: VecDeque = VecDeque::new(); let one_hour = Duration::hours(1); let twenty_four_hours = Duration::hours(24); @@ -66,13 +51,13 @@ pub fn process_swaps(swaps: &mut [Swap]) { // Collect filtered amounts before borrowing swap mutably let filtered_1hour: Vec = window_1hour .iter() - .filter(|s| s.from_user != swap.from_user && s.to_accepter != swap.to_accepter) + .filter(|s| s.user != swap.user && s.accepter != swap.accepter) .map(|s| s.price) .collect(); let filtered_24hour: Vec = window_24hour .iter() - .filter(|s| s.from_user != swap.from_user && s.to_accepter != swap.to_accepter) + .filter(|s| s.user != swap.user && s.accepter != swap.accepter) .map(|s| s.price) .collect(); @@ -95,14 +80,14 @@ pub fn process_swaps(swaps: &mut [Swap]) { } } -pub fn process_swaps_with_best_price(swaps: &mut [Swap]) { +pub fn process_swaps_with_best_price(swaps: &mut [ExchangeOrder]) { swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at for i in 0..swaps.len() { let current_swap = &swaps[i]; // Filter for open trades - let open_trades = swaps + let open_orders = swaps .iter() .filter(|&other_swap| { other_swap.filled_at > current_swap.filled_at @@ -111,28 +96,54 @@ pub fn process_swaps_with_best_price(swaps: &mut [Swap]) { .collect::>(); // Determine if the current swap took the best price - let best_price_condition = match current_swap.order_type.as_str() { - "Buy" => open_trades - .iter() - .all(|other_swap| other_swap.price >= current_swap.price), - "Sell" => open_trades - .iter() - .all(|other_swap| other_swap.price <= current_swap.price), - _ => true, // Default to true for unknown order types + let is_shill_bid = match current_swap.order_type.as_str() { + // Signs of shill trades. + // For those offering to BUY coins, as the tx.user (offerer) + // An honest and rational actor would not create a buy order + // higher than other SELL offers which have not been filled. + // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. + // For the accepter: + // I would only fill a lower price if the amount I have to sell is insufficient for higher prices.' + "Buy" => open_orders.iter().any(|other_swap| { + if other_swap.order_type == *"Sell" { + // this is not a rational trade if there are + // SELL offers of the same amount (or smaller) + // at a price equal or lower. + return other_swap.price <= current_swap.price + && other_swap.amount <= current_swap.amount; + } + false + }), + // For those offering to SELL coins, as the tx.user (offerer) + // I should offer to sell near the current clearing price. + // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. + // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. + "Sell" => open_orders.iter().any(|other_swap| + // if there are cheaper SELL offers, + // for smaller sizes, then the rational honest actor + // will pick one of those. + // So we find the list of open orders which would be + // better than the one taken how. + // if there are ANY available, then this SELL order was + // filled dishonestly. + other_swap.price <= current_swap.price && + other_swap.amount <= current_swap.amount), + _ => false, // Default to true for unknown order types }; // Update the swap with the best price flag - swaps[i].took_best_price = Some(best_price_condition); + swaps[i].shill_bid = Some(is_shill_bid); } } #[test] fn test_rms_pipeline() { + use chrono::{DateTime, Utc}; let mut swaps = vec![ // first trade 5/5/2024 8pm - Swap { - from_user: "Alice".into(), - to_accepter: "Bob".into(), + ExchangeOrder { + user: 1, // alice + accepter: 2, // bob filled_at: DateTime::parse_from_rfc3339("2024-05-05T20:02:00Z") .unwrap() .with_timezone(&Utc), @@ -141,18 +152,17 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 100.0, - created: true, order_type: "Buy".into(), rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - took_best_price: None, + shill_bid: None, }, // less than 12 hours later next trade 5/6/2024 8AM - Swap { - from_user: "Alice".into(), - to_accepter: "Bob".into(), + ExchangeOrder { + user: 1, + accepter: 2, filled_at: DateTime::parse_from_rfc3339("2024-05-06T08:01:00Z") .unwrap() .with_timezone(&Utc), @@ -161,18 +171,17 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 4.0, - created: true, order_type: "Buy".into(), rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - took_best_price: None, + shill_bid: None, }, // less than one hour later - Swap { - from_user: "Alice".into(), - to_accepter: "Bob".into(), + ExchangeOrder { + user: 1, + accepter: 2, filled_at: DateTime::parse_from_rfc3339("2024-05-06T09:00:00Z") .unwrap() .with_timezone(&Utc), @@ -181,18 +190,17 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 4.0, - created: true, order_type: "Buy".into(), rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - took_best_price: None, + shill_bid: None, }, // same time as previous but different traders - Swap { - from_user: "Carol".into(), - to_accepter: "Dave".into(), + ExchangeOrder { + user: 300, // carol + accepter: 400, // dave filled_at: DateTime::parse_from_rfc3339("2024-05-06T09:00:00Z") .unwrap() .with_timezone(&Utc), @@ -201,22 +209,17 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 32.0, - created: true, order_type: "Sell".into(), rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - took_best_price: None, + shill_bid: None, }, ]; process_swaps(&mut swaps); - // for swap in swaps.iter() { - // println!("{:?}", swap); - // } - let s0 = swaps.first().unwrap(); assert!(s0.rms_hour == 0.0); assert!(s0.rms_24hour == 0.0); @@ -230,7 +233,6 @@ fn test_rms_pipeline() { assert!(s3.rms_hour == 4.0); assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0)); - process_swaps_with_best_price(&mut swaps); dbg!(&swaps); } diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 9ebe9d7..616b154 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -4,7 +4,9 @@ use anyhow::{Context, Result}; use log::{error, info, warn}; use neo4rs::{query, Graph}; -use crate::{extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder}; +use crate::{ + analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder, +}; pub async fn swap_batch( txs: &[ExchangeOrder], @@ -81,6 +83,11 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) -> } pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> { - let orders = extract_exchange_orders::read_orders_from_file(path)?; + let mut orders = extract_exchange_orders::read_orders_from_file(path)?; + // add RMS stats to each order + enrich_rms::process_swaps(&mut orders); + // find likely shill bids + enrich_rms::process_swaps_with_best_price(&mut orders); + swap_batch(&orders, pool, batch_size).await } diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index 7539b8e..b8f230b 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -16,6 +16,16 @@ pub struct ExchangeOrder { pub created_at: DateTime, pub filled_at: DateTime, pub accepter: u32, + #[serde(skip_deserializing)] + pub rms_hour: f64, + #[serde(skip_deserializing)] + pub rms_24hour: f64, + #[serde(skip_deserializing)] + pub price_vs_rms_hour: f64, + #[serde(skip_deserializing)] + pub price_vs_rms_24hour: f64, + #[serde(skip_deserializing)] + pub shill_bid: Option, // New field to indicate if it took the best price } impl Default for ExchangeOrder { @@ -28,6 +38,11 @@ impl Default for ExchangeOrder { created_at: DateTime::::from_timestamp_nanos(0), filled_at: DateTime::::from_timestamp_nanos(0), accepter: 1, + rms_hour: 0.0, + rms_24hour: 0.0, + price_vs_rms_hour: 0.0, + price_vs_rms_24hour: 0.0, + shill_bid: None, } } } diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index 3aeef1c..2ef8bf2 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -6,6 +6,7 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ + analytics::enrich_rms, extract_exchange_orders, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, schema_exchange_orders::ExchangeOrder, @@ -20,6 +21,51 @@ fn open_parse_file() { assert!(orders.len() == 25450); } +#[test] +fn test_enrich_rms() { + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + enrich_rms::process_swaps(&mut orders); + + let count_above_100_pct = orders.iter().fold(0, |mut acc, el| { + if el.price_vs_rms_24hour > 2.0 { + acc += 1; + } + acc + }); + + assert!(count_above_100_pct == 96); + + assert!(orders.len() == 25450); +} + +#[test] +fn test_enrich_best_trade() { + let path = env!("CARGO_MANIFEST_DIR"); + let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); + let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); + assert!(orders.len() == 25450); + + enrich_rms::process_swaps_with_best_price(&mut orders); + + let count_shill = orders.iter().fold(0, |mut acc, el| { + if let Some(is_shill) = el.shill_bid { + if is_shill { + acc += 1 + } + } + acc + }); + + dbg!(&count_shill); + + assert!(count_shill == 13723); + assert!(orders.len() == 25450); +} + #[tokio::test] async fn test_swap_batch_cypher() -> Result<()> { let c = start_neo4j_container();