Skip to content

Commit

Permalink
Share liquidity fetch requests (#2065)
Browse files Browse the repository at this point in the history
# Description
Because all the traits and APIs around liquidity fetching are very weird
it's pretty hard to keep track of which liquidity pools are not worth
fetching.

# Changes
1. This PR introduces a request sharing logic in the `RecentBlockCache`.
With that it shouldn't matter how many solvers have to fetch liquidity
in parallel as they should all share the underlying RPC requests.
2. Additionally I modified the logic that keeps track of recently used
keys to only contain ids of pools we were able to fetch. This is
important because those keys are used to update the cache in a
background task and if we have too many useless keys in there we'd waste
a lot of work during those background updates.
3. Moved `RequestSharing` garbage collection to background task instead
of at every insert. This is mostly relevant because we now use this
component for liquidity fetching which issues **MANY** requests. This
improvement is actually pretty huge.

Those changes (together with the previous 2 liquidity changes) brought
down the liquidity fetching time from ~8s to 0.7-1.5s which should be
fast enough for the gnosis solvers.

Fixes: #2041
  • Loading branch information
MartinquaXD authored Nov 27, 2023
1 parent fa8796c commit d583ec3
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 113 deletions.
2 changes: 1 addition & 1 deletion crates/driver/src/boundary/liquidity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const BLOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
fn cache_config() -> CacheConfig {
CacheConfig {
number_of_blocks_to_cache: NonZeroU64::new(10).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(200).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(1000).unwrap(),
maximum_recent_block_age: 4,
max_retries: 5,
delay_between_retries: Duration::from_secs(1),
Expand Down
7 changes: 1 addition & 6 deletions crates/driver/src/boundary/liquidity/swapr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,7 @@ pub async fn collector(
pool_code: config.pool_code,
missing_pool_cache_time: config.missing_pool_cache_time,
},
|web3, pair_provider| {
SwaprPoolReader(DefaultPoolReader {
web3,
pair_provider,
})
},
|web3, pair_provider| SwaprPoolReader(DefaultPoolReader::new(web3, pair_provider)),
)
.await
}
8 changes: 1 addition & 7 deletions crates/driver/src/boundary/liquidity/uniswap/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,7 @@ pub async fn collector(
blocks: &CurrentBlockStream,
config: &infra::liquidity::config::UniswapV2,
) -> Result<Box<dyn LiquidityCollecting>> {
collector_with_reader(eth, blocks, config, |web3, pair_provider| {
DefaultPoolReader {
web3,
pair_provider,
}
})
.await
collector_with_reader(eth, blocks, config, DefaultPoolReader::new).await
}

pub(in crate::boundary::liquidity) async fn collector_with_reader<R, F>(
Expand Down
24 changes: 15 additions & 9 deletions crates/driver/src/domain/competition/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl AuctionProcessor {

/// Fetches the tradable balance for every order owner.
async fn fetch_balances(ethereum: &infra::Ethereum, orders: &[order::Order]) -> Balances {
let mut tokens: HashMap<_, _> = Default::default();
// Collect trader/token/source/interaction tuples for fetching available
// balances. Note that we are pessimistic here, if a trader is selling
// the same token with the same source in two different orders using a
Expand All @@ -297,6 +298,7 @@ impl AuctionProcessor {
.map(|((trader, token, source), mut orders)| {
let first = orders.next().expect("group contains at least 1 order");
let mut others = orders;
tokens.entry(token).or_insert_with(|| ethereum.erc20(token));
if others.all(|order| order.pre_interactions == first.pre_interactions) {
(trader, token, source, &first.pre_interactions[..])
} else {
Expand All @@ -308,15 +310,19 @@ impl AuctionProcessor {
join_all(
traders
.into_iter()
.map(|(trader, token, source, interactions)| async move {
let balance = ethereum
.erc20(token)
.tradable_balance(trader.into(), source, interactions)
.await;
(
(trader, token, source),
balance.map(order::SellAmount::from).ok(),
)
.map(|(trader, token, source, interactions)| {
let token_contract = tokens.get(&token);
let token_contract = token_contract.expect("all tokens were created earlier");
let fetch_balance =
token_contract.tradable_balance(trader.into(), source, interactions);

async move {
let balance = fetch_balance.await;
(
(trader, token, source),
balance.map(order::SellAmount::from).ok(),
)
}
}),
)
.await
Expand Down
Loading

0 comments on commit d583ec3

Please sign in to comment.