Skip to content

Commit

Permalink
patch shill trade logic
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 2, 2024
1 parent 7426b3d commit 012db7a
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 60 deletions.
118 changes: 60 additions & 58 deletions src/analytics/enrich_rms.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,7 @@
use chrono::{DateTime, Duration, Utc};
use chrono::Duration;
use std::collections::VecDeque;

#[derive(Debug, Clone)]
pub struct Swap {
pub from_user: String,
pub to_accepter: String,
pub filled_at: DateTime<Utc>,
pub amount: f64,
pub created_at: DateTime<Utc>,
pub price: f64,
pub created: bool,
pub order_type: String,
pub rms_hour: f64,
pub rms_24hour: f64,
pub price_vs_rms_hour: f64,
pub price_vs_rms_24hour: f64,
pub took_best_price: Option<bool>, // New field to indicate if it took the best price
}
use crate::schema_exchange_orders::ExchangeOrder;

fn calculate_rms(data: &[f64]) -> f64 {
let (sum, count) = data
Expand All @@ -30,11 +15,11 @@ fn calculate_rms(data: &[f64]) -> f64 {
}

/// enrich swap struct with RMS data
pub fn process_swaps(swaps: &mut [Swap]) {
pub fn process_swaps(swaps: &mut [ExchangeOrder]) {
swaps.sort_by_key(|swap| swap.filled_at);

let mut window_1hour: VecDeque<Swap> = VecDeque::new();
let mut window_24hour: VecDeque<Swap> = VecDeque::new();
let mut window_1hour: VecDeque<ExchangeOrder> = VecDeque::new();
let mut window_24hour: VecDeque<ExchangeOrder> = VecDeque::new();

let one_hour = Duration::hours(1);
let twenty_four_hours = Duration::hours(24);
Expand Down Expand Up @@ -66,13 +51,13 @@ pub fn process_swaps(swaps: &mut [Swap]) {
// Collect filtered amounts before borrowing swap mutably
let filtered_1hour: Vec<f64> = window_1hour
.iter()
.filter(|s| s.from_user != swap.from_user && s.to_accepter != swap.to_accepter)
.filter(|s| s.user != swap.user && s.accepter != swap.accepter)
.map(|s| s.price)
.collect();

let filtered_24hour: Vec<f64> = window_24hour
.iter()
.filter(|s| s.from_user != swap.from_user && s.to_accepter != swap.to_accepter)
.filter(|s| s.user != swap.user && s.accepter != swap.accepter)
.map(|s| s.price)
.collect();

Expand All @@ -95,14 +80,14 @@ pub fn process_swaps(swaps: &mut [Swap]) {
}
}

pub fn process_swaps_with_best_price(swaps: &mut [Swap]) {
pub fn process_swaps_with_best_price(swaps: &mut [ExchangeOrder]) {
swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at

for i in 0..swaps.len() {
let current_swap = &swaps[i];

// Filter for open trades
let open_trades = swaps
let open_orders = swaps
.iter()
.filter(|&other_swap| {
other_swap.filled_at > current_swap.filled_at
Expand All @@ -111,28 +96,54 @@ pub fn process_swaps_with_best_price(swaps: &mut [Swap]) {
.collect::<Vec<_>>();

// Determine if the current swap took the best price
let best_price_condition = match current_swap.order_type.as_str() {
"Buy" => open_trades
.iter()
.all(|other_swap| other_swap.price >= current_swap.price),
"Sell" => open_trades
.iter()
.all(|other_swap| other_swap.price <= current_swap.price),
_ => true, // Default to true for unknown order types
let is_shill_bid = match current_swap.order_type.as_str() {
// Signs of shill trades.
// For those offering to BUY coins, as the tx.user (offerer)
// An honest and rational actor would not create a buy order
// higher than other SELL offers which have not been filled.
// The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist.
// For the accepter:
// I would only fill a lower price if the amount I have to sell is insufficient for higher prices.'
"Buy" => open_orders.iter().any(|other_swap| {
if other_swap.order_type == *"Sell" {
// this is not a rational trade if there are
// SELL offers of the same amount (or smaller)
// at a price equal or lower.
return other_swap.price <= current_swap.price
&& other_swap.amount <= current_swap.amount;
}
false
}),
// For those offering to SELL coins, as the tx.user (offerer)
// I should offer to sell near the current clearing price.
// If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately.
// If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price.
"Sell" => open_orders.iter().any(|other_swap|
// if there are cheaper SELL offers,
// for smaller sizes, then the rational honest actor
// will pick one of those.
// So we find the list of open orders which would be
// better than the one taken how.
// if there are ANY available, then this SELL order was
// filled dishonestly.
other_swap.price <= current_swap.price &&
other_swap.amount <= current_swap.amount),
_ => false, // Default to true for unknown order types
};

// Update the swap with the best price flag
swaps[i].took_best_price = Some(best_price_condition);
swaps[i].shill_bid = Some(is_shill_bid);
}
}

#[test]
fn test_rms_pipeline() {
use chrono::{DateTime, Utc};
let mut swaps = vec![
// first trade 5/5/2024 8pm
Swap {
from_user: "Alice".into(),
to_accepter: "Bob".into(),
ExchangeOrder {
user: 1, // alice
accepter: 2, // bob
filled_at: DateTime::parse_from_rfc3339("2024-05-05T20:02:00Z")
.unwrap()
.with_timezone(&Utc),
Expand All @@ -141,18 +152,17 @@ fn test_rms_pipeline() {
.unwrap()
.with_timezone(&Utc),
price: 100.0,
created: true,
order_type: "Buy".into(),
rms_hour: 0.0,
rms_24hour: 0.0,
price_vs_rms_hour: 0.0,
price_vs_rms_24hour: 0.0,
took_best_price: None,
shill_bid: None,
},
// less than 12 hours later next trade 5/6/2024 8AM
Swap {
from_user: "Alice".into(),
to_accepter: "Bob".into(),
ExchangeOrder {
user: 1,
accepter: 2,
filled_at: DateTime::parse_from_rfc3339("2024-05-06T08:01:00Z")
.unwrap()
.with_timezone(&Utc),
Expand All @@ -161,18 +171,17 @@ fn test_rms_pipeline() {
.unwrap()
.with_timezone(&Utc),
price: 4.0,
created: true,
order_type: "Buy".into(),
rms_hour: 0.0,
rms_24hour: 0.0,
price_vs_rms_hour: 0.0,
price_vs_rms_24hour: 0.0,
took_best_price: None,
shill_bid: None,
},
// less than one hour later
Swap {
from_user: "Alice".into(),
to_accepter: "Bob".into(),
ExchangeOrder {
user: 1,
accepter: 2,
filled_at: DateTime::parse_from_rfc3339("2024-05-06T09:00:00Z")
.unwrap()
.with_timezone(&Utc),
Expand All @@ -181,18 +190,17 @@ fn test_rms_pipeline() {
.unwrap()
.with_timezone(&Utc),
price: 4.0,
created: true,
order_type: "Buy".into(),
rms_hour: 0.0,
rms_24hour: 0.0,
price_vs_rms_hour: 0.0,
price_vs_rms_24hour: 0.0,
took_best_price: None,
shill_bid: None,
},
// same time as previous but different traders
Swap {
from_user: "Carol".into(),
to_accepter: "Dave".into(),
ExchangeOrder {
user: 300, // carol
accepter: 400, // dave
filled_at: DateTime::parse_from_rfc3339("2024-05-06T09:00:00Z")
.unwrap()
.with_timezone(&Utc),
Expand All @@ -201,22 +209,17 @@ fn test_rms_pipeline() {
.unwrap()
.with_timezone(&Utc),
price: 32.0,
created: true,
order_type: "Sell".into(),
rms_hour: 0.0,
rms_24hour: 0.0,
price_vs_rms_hour: 0.0,
price_vs_rms_24hour: 0.0,
took_best_price: None,
shill_bid: None,
},
];

process_swaps(&mut swaps);

// for swap in swaps.iter() {
// println!("{:?}", swap);
// }

let s0 = swaps.first().unwrap();
assert!(s0.rms_hour == 0.0);
assert!(s0.rms_24hour == 0.0);
Expand All @@ -230,7 +233,6 @@ fn test_rms_pipeline() {
assert!(s3.rms_hour == 4.0);
assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0));


process_swaps_with_best_price(&mut swaps);
dbg!(&swaps);
}
11 changes: 9 additions & 2 deletions src/load_exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use anyhow::{Context, Result};
use log::{error, info, warn};
use neo4rs::{query, Graph};

use crate::{extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder};
use crate::{
analytics::enrich_rms, extract_exchange_orders, queue, schema_exchange_orders::ExchangeOrder,
};

pub async fn swap_batch(
txs: &[ExchangeOrder],
Expand Down Expand Up @@ -81,6 +83,11 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOrder]) ->
}

pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Result<(u64, u64)> {
let orders = extract_exchange_orders::read_orders_from_file(path)?;
let mut orders = extract_exchange_orders::read_orders_from_file(path)?;
// add RMS stats to each order
enrich_rms::process_swaps(&mut orders);
// find likely shill bids
enrich_rms::process_swaps_with_best_price(&mut orders);

swap_batch(&orders, pool, batch_size).await
}
15 changes: 15 additions & 0 deletions src/schema_exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ pub struct ExchangeOrder {
pub created_at: DateTime<Utc>,
pub filled_at: DateTime<Utc>,
pub accepter: u32,
#[serde(skip_deserializing)]
pub rms_hour: f64,
#[serde(skip_deserializing)]
pub rms_24hour: f64,
#[serde(skip_deserializing)]
pub price_vs_rms_hour: f64,
#[serde(skip_deserializing)]
pub price_vs_rms_24hour: f64,
#[serde(skip_deserializing)]
pub shill_bid: Option<bool>, // New field to indicate if it took the best price
}

impl Default for ExchangeOrder {
Expand All @@ -28,6 +38,11 @@ impl Default for ExchangeOrder {
created_at: DateTime::<Utc>::from_timestamp_nanos(0),
filled_at: DateTime::<Utc>::from_timestamp_nanos(0),
accepter: 1,
rms_hour: 0.0,
rms_24hour: 0.0,
price_vs_rms_hour: 0.0,
price_vs_rms_24hour: 0.0,
shill_bid: None,
}
}
}
Expand Down
46 changes: 46 additions & 0 deletions tests/test_enrich_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::path::PathBuf;

use anyhow::Result;
use libra_forensic_db::{
analytics::enrich_rms,
extract_exchange_orders, load_exchange_orders,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
schema_exchange_orders::ExchangeOrder,
Expand All @@ -20,6 +21,51 @@ fn open_parse_file() {
assert!(orders.len() == 25450);
}

#[test]
fn test_enrich_rms() {
let path = env!("CARGO_MANIFEST_DIR");
let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json");
let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap();
assert!(orders.len() == 25450);

enrich_rms::process_swaps(&mut orders);

let count_above_100_pct = orders.iter().fold(0, |mut acc, el| {
if el.price_vs_rms_24hour > 2.0 {
acc += 1;
}
acc
});

assert!(count_above_100_pct == 96);

assert!(orders.len() == 25450);
}

#[test]
fn test_enrich_best_trade() {
let path = env!("CARGO_MANIFEST_DIR");
let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json");
let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap();
assert!(orders.len() == 25450);

enrich_rms::process_swaps_with_best_price(&mut orders);

let count_shill = orders.iter().fold(0, |mut acc, el| {
if let Some(is_shill) = el.shill_bid {
if is_shill {
acc += 1
}
}
acc
});

dbg!(&count_shill);

assert!(count_shill == 13723);
assert!(orders.len() == 25450);
}

#[tokio::test]
async fn test_swap_batch_cypher() -> Result<()> {
let c = start_neo4j_container();
Expand Down

0 comments on commit 012db7a

Please sign in to comment.