Skip to content

Commit

Permalink
Merge branch 'dex-solver-rate-limit' into colocation_rc
Browse files Browse the repository at this point in the history
  • Loading branch information
fleupold committed Nov 27, 2023
2 parents ccd425d + 33d1a1e commit 492b1b9
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 41 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 82 additions & 2 deletions crates/shared/src/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl RateLimiter {
}
}

#[derive(Error, Debug, Clone, Default)]
#[derive(Error, Debug, Clone, Default, PartialEq)]
pub enum RateLimiterError {
#[default]
#[error("rate limited")]
Expand Down Expand Up @@ -221,6 +221,30 @@ impl RateLimiter {

Ok(result)
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
if let Some(back_off_duration) = self.get_back_off_duration_if_limited() {
tokio::time::sleep(back_off_duration).await;
}

self.execute(task, requires_back_off).await
}

fn get_back_off_duration_if_limited(&self) -> Option<Duration> {
let strategy = self.strategy.lock().unwrap();
let now = Instant::now();

if strategy.drop_requests_until > now {
let back_off_duration = strategy.drop_requests_until - now;
Some(back_off_duration)
} else {
None
}
}
}

/// Shared module with common back-off checks.
Expand All @@ -236,7 +260,7 @@ pub mod back_off {

#[cfg(test)]
mod tests {
use {super::*, futures::FutureExt, tokio::time::sleep};
use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep};

#[test]
fn current_back_off_does_not_panic() {
Expand Down Expand Up @@ -317,4 +341,60 @@ mod tests {
rate_limiter.strategy().get_current_back_off()
);
}

#[tokio::test]
async fn test_execute_with_no_back_off() {
let timeout = Duration::from_secs(30);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string());

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.await
.unwrap();

assert_eq!(result, 1);
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout));
}

let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap();
assert_eq!(result, 1);
}

#[tokio::test]
async fn test_execute_with_back_off() {
let timeout = Duration::from_secs(3);
let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap();
let original_drop_until = strategy.drop_requests_until;
let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string());

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| true)
.await
.unwrap();

assert_eq!(result, 1);
let drop_until = {
let current_strategy = rate_limiter.strategy.lock().unwrap();
let drop_until = current_strategy.drop_requests_until;
assert!(drop_until >= original_drop_until.add(timeout));
drop_until
};

let result = rate_limiter.execute(async { 1 }, |_| false).await;
assert_eq!(result, Err(RateLimiterError::RateLimited));
{
let current_strategy = rate_limiter.strategy.lock().unwrap();
assert_eq!(current_strategy.drop_requests_until, drop_until);
}

let result = rate_limiter
.execute_with_back_off(async { 1 }, |_| false)
.await
.unwrap();
assert_eq!(result, 1);
}
}
2 changes: 2 additions & 0 deletions crates/solvers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ ethereum-types = "0.14"
ethrpc = { path = "../ethrpc" }
futures = "0.3"
hex = "0.4"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14"
itertools = "0.11"
num = "0.4"
Expand Down
1 change: 1 addition & 0 deletions crates/solvers/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ pub mod baseline;
pub mod legacy;
pub mod liquidity;
pub mod naive;
pub mod rate_limiter;

pub type Result<T> = anyhow::Result<T>;
58 changes: 58 additions & 0 deletions crates/solvers/src/boundary/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use {
anyhow::Result,
shared::rate_limiter::{
RateLimiter as SharedRateLimiter,
RateLimiterError as SharedRateLimiterError,
RateLimitingStrategy as SharedRateLimitingStrategy,
},
std::{future::Future, time::Duration},
thiserror::Error,
};

pub struct RateLimiter {
inner: SharedRateLimiter,
}

#[derive(Debug, Clone)]
pub struct RateLimitingStrategy {
inner: SharedRateLimitingStrategy,
}

impl RateLimitingStrategy {
pub fn try_new(
back_off_growth_factor: f64,
min_back_off: Duration,
max_back_off: Duration,
) -> Result<Self> {
SharedRateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off)
.map(|shared| Self { inner: shared })
}
}

#[derive(Error, Debug, Clone, Default, PartialEq)]
pub enum RateLimiterError {
#[default]
#[error("rate limited")]
RateLimited,
}

impl RateLimiter {
pub fn new(strategy: RateLimitingStrategy, name: String) -> Self {
Self {
inner: SharedRateLimiter::from_strategy(strategy.inner, name),
}
}

pub async fn execute_with_back_off<T>(
&self,
task: impl Future<Output = T>,
requires_back_off: impl Fn(&T) -> bool,
) -> Result<T, RateLimiterError> {
self.inner
.execute_with_back_off(task, requires_back_off)
.await
.map_err(|err| match err {
SharedRateLimiterError::RateLimited => RateLimiterError::RateLimited,
})
}
}
96 changes: 66 additions & 30 deletions crates/solvers/src/domain/solver/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
use {
crate::{
domain,
domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills},
boundary::rate_limiter::{RateLimiter, RateLimiterError},
domain::{
self,
auction,
dex::{self, slippage},
order::{self, Order},
solution,
solver::dex::fills::Fills,
},
infra,
},
futures::{future, stream, FutureExt, StreamExt},
Expand Down Expand Up @@ -33,10 +40,17 @@ pub struct Dex {

/// Parameters used to calculate the revert risk of a solution.
risk: domain::Risk,

/// Handles 429 Too Many Requests error with a retry mechanism
rate_limiter: RateLimiter,
}

impl Dex {
pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self {
pub fn new(
dex: infra::dex::Dex,
config: infra::config::dex::Config,
rate_limiter: RateLimiter,
) -> Self {
Self {
dex,
simulator: infra::dex::Simulator::new(
Expand All @@ -48,6 +62,7 @@ impl Dex {
concurrent_requests: config.concurrent_requests,
fills: Fills::new(config.smallest_partial_fill),
risk: config.risk,
rate_limiter,
}
}

Expand Down Expand Up @@ -86,40 +101,61 @@ impl Dex {
.filter_map(future::ready)
}

async fn try_solve(
&self,
order: &Order,
dex_order: &dex::Order,
tokens: &auction::Tokens,
gas_price: auction::GasPrice,
) -> Result<dex::Swap, infra::dex::Error> {
let slippage = self.slippage.relative(&dex_order.amount(), tokens);
self.dex
.swap(dex_order, &slippage, tokens, gas_price)
.await
.map_err(|err| {
match &err {
err @ infra::dex::Error::NotFound => {
if order.partially_fillable {
// Only adjust the amount to try next if we are sure the API worked
// correctly yet still wasn't able to provide a
// swap.
self.fills.reduce_next_try(order.uid);
} else {
tracing::debug!(?err, "skipping order");
}
}
err @ infra::dex::Error::OrderNotSupported => {
tracing::debug!(?err, "skipping order")
}
err @ infra::dex::Error::RateLimited => {
tracing::debug!(?err, "encountered rate limit")
}
infra::dex::Error::Other(err) => tracing::warn!(?err, "failed to get swap"),
}
err
})
}

async fn solve_order(
&self,
order: order::UserOrder<'_>,
tokens: &auction::Tokens,
gas_price: auction::GasPrice,
) -> Option<solution::Solution> {
let order = order.get();
let swap = {
let order = self.fills.dex_order(order, tokens)?;
let slippage = self.slippage.relative(&order.amount(), tokens);
self.dex.swap(&order, &slippage, tokens, gas_price).await
};

let swap = match swap {
Ok(swap) => swap,
Err(err @ infra::dex::Error::NotFound) => {
if order.partially_fillable {
// Only adjust the amount to try next if we are sure the API worked correctly
// yet still wasn't able to provide a swap.
self.fills.reduce_next_try(order.uid);
} else {
tracing::debug!(?err, "skipping order");
}
return None;
}
Err(err @ infra::dex::Error::OrderNotSupported) => {
tracing::debug!(?err, "skipping order");
return None;
}
Err(infra::dex::Error::Other(err)) => {
tracing::warn!(?err, "failed to get swap");
return None;
}
};
let dex_order = self.fills.dex_order(order, tokens)?;
let swap = self
.rate_limiter
.execute_with_back_off(
self.try_solve(order, &dex_order, tokens, gas_price),
|result| matches!(result, Err(infra::dex::Error::RateLimited)),
)
.await
.map_err(|err| match err {
RateLimiterError::RateLimited => infra::dex::Error::RateLimited,
})
.and_then(|result| result)
.ok()?;

let uid = order.uid;
let sell = tokens.reference_price(&order.sell.token);
Expand Down
Loading

0 comments on commit 492b1b9

Please sign in to comment.