Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Driver fetches order's app-data #3242

Merged
merged 28 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 215 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ hyper = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
itertools = { workspace = true }
mimalloc = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
num = { workspace = true }
number = { path = "../number" }
prometheus = { workspace = true }
Expand Down
100 changes: 91 additions & 9 deletions crates/driver/src/domain/competition/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ use {
liquidity,
time,
},
infra::{self, blockchain, config::file::OrderPriorityStrategy, observe, Ethereum},
infra::{
self,
blockchain,
config::file::OrderPriorityStrategy,
observe::{self, metrics},
Ethereum,
},
util::{self, Bytes},
},
chrono::{Duration, Utc},
Expand All @@ -19,6 +25,7 @@ use {
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
},
tap::TapFallible,
thiserror::Error,
};

Expand Down Expand Up @@ -137,15 +144,22 @@ struct Inner {
/// `order_priority_strategies` from the driver's config.
order_sorting_strategies: Vec<Arc<dyn sorting::SortingStrategy>>,
signature_validator: Arc<dyn SignatureValidating>,
app_data_retriever: Option<order::app_data::AppDataRetriever>,
}

type BalanceGroup = (order::Trader, eth::TokenAddress, order::SellTokenBalance);
type Balances = HashMap<BalanceGroup, order::SellAmount>;

impl AuctionProcessor {
/// Prioritize well priced and filter out unfillable orders from the given
/// auction.
/// Process the auction by prioritizing the orders and filtering out
/// unfillable orders. Fetches full app data for each order and returns an
/// auction with updated orders.
pub async fn prioritize(&self, auction: Auction, solver: &eth::H160) -> Auction {
let _timer = metrics::get()
.auction_preprocessing
.with_label_values(&["total"])
.start_timer();

Auction {
orders: self.prioritize_orders(&auction, solver).await,
..auction
Expand Down Expand Up @@ -180,16 +194,24 @@ impl AuctionProcessor {
let mut orders = auction.orders.clone();
let solver = *solver;
let order_comparators = lock.order_sorting_strategies.clone();
let app_data_retriever = lock.app_data_retriever.clone();

// Use spawn_blocking() because a lot of CPU bound computations are happening
// and we don't want to block the runtime for too long.
let fut = tokio::task::spawn_blocking(move || {
let start = std::time::Instant::now();
orders.extend(rt.block_on(Self::cow_amm_orders(&eth, &tokens, &cow_amms, signature_validator.as_ref())));
sorting::sort_orders(&mut orders, &tokens, &solver, &order_comparators);
let mut balances =
rt.block_on(async { Self::fetch_balances(&eth, &orders).await });
Self::filter_orders(&mut balances, &mut orders);
let (mut balances, mut app_data_by_hash) =
rt.block_on(async {
tokio::join!(
Self::fetch_balances(&eth, &orders),
Self::collect_orders_app_data(app_data_retriever, &orders),
)
});

Self::update_orders(&mut balances, &mut app_data_by_hash, &mut orders);

tracing::debug!(auction_id = new_id.0, time =? start.elapsed(), "auction preprocessing done");
orders
})
Expand All @@ -209,8 +231,55 @@ impl AuctionProcessor {
fut
}

/// Removes orders that cannot be filled due to missing funds of the owner.
fn filter_orders(balances: &mut Balances, orders: &mut Vec<order::Order>) {
/// Fetches the app data for all orders in the auction.
/// Returns a map from app data hash to the fetched app data.
async fn collect_orders_app_data(
mstrug marked this conversation as resolved.
Show resolved Hide resolved
app_data_retriever: Option<order::app_data::AppDataRetriever>,
orders: &[order::Order],
) -> HashMap<order::app_data::AppDataHash, app_data::ValidatedAppData> {
let Some(app_data_retriever) = app_data_retriever else {
return Default::default();
};

let _timer = metrics::get()
.auction_preprocessing
.with_label_values(&["fetch_app_data"])
.start_timer();

join_all(
orders
.iter()
.map(|order| order.app_data.hash())
.unique()
.map(|app_data_hash| {
let app_data_retriever = app_data_retriever.clone();
async move {
let fetched_app_data = app_data_retriever
.get(&app_data_hash)
.await
.tap_err(|err| {
tracing::warn!(?app_data_hash, ?err, "failed to fetch app data");
})
.ok()
.flatten();

(app_data_hash, fetched_app_data)
}
}),
)
.await
.into_iter()
.filter_map(|(app_data_hash, app_data)| app_data.map(|app_data| (app_data_hash, app_data)))
.collect::<HashMap<_, _>>()
}

/// Removes orders that cannot be filled due to missing funds of the owner
/// and updates the fetched app data.
fn update_orders(
balances: &mut Balances,
app_data_by_hash: &mut HashMap<order::app_data::AppDataHash, app_data::ValidatedAppData>,
orders: &mut Vec<order::Order>,
) {
// The auction that we receive from the `autopilot` assumes that there
// is sufficient balance to completely cover all the orders. **This is
// not the case** (as the protocol should not chose which limit orders
Expand Down Expand Up @@ -275,6 +344,12 @@ impl AuctionProcessor {
}

remaining_balance.0 -= allocated_balance.0;

// Update order app data if it was fetched.
if let Some(fetched_app_data) = app_data_by_hash.remove(&order.app_data.hash()) {
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
order.app_data = fetched_app_data.into();
}

sunce86 marked this conversation as resolved.
Show resolved Hide resolved
true
});
}
Expand Down Expand Up @@ -306,6 +381,11 @@ impl AuctionProcessor {
})
.collect::<Vec<_>>();

let _timer = metrics::get()
.auction_preprocessing
.with_label_values(&["fetch_balances"])
.start_timer();

join_all(
traders
.into_iter()
Expand Down Expand Up @@ -392,7 +472,7 @@ impl AuctionProcessor {
},
kind: order::Kind::Limit,
side: template.order.kind.into(),
app_data: order::AppData(Bytes(template.order.app_data.0)),
app_data: order::app_data::AppDataHash(Bytes(template.order.app_data.0)).into(),
buy_token_balance: template.order.buy_token_balance.into(),
sell_token_balance: template.order.sell_token_balance.into(),
partial: match template.order.partially_fillable {
Expand Down Expand Up @@ -449,6 +529,7 @@ impl AuctionProcessor {
pub fn new(
eth: &infra::Ethereum,
order_priority_strategies: Vec<OrderPriorityStrategy>,
app_data_retriever: Option<order::app_data::AppDataRetriever>,
) -> Self {
let eth = eth.with_metric_label("auctionPreProcessing".into());
let mut order_sorting_strategies = vec![];
Expand Down Expand Up @@ -484,6 +565,7 @@ impl AuctionProcessor {
eth,
order_sorting_strategies,
signature_validator,
app_data_retriever,
})))
}
}
Expand Down
180 changes: 180 additions & 0 deletions crates/driver/src/domain/competition/order/app_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use {
crate::util::Bytes,
derive_more::From,
futures::FutureExt,
moka::future::Cache,
reqwest::StatusCode,
shared::request_sharing::BoxRequestSharing,
std::sync::Arc,
thiserror::Error,
url::Url,
};

/// A struct for retrieving order's full app-data by its hash from a remote
/// service, with support for caching and deduplicating concurrent requests.
///
/// Ensures efficient access to application data by:
/// - Caching results to avoid redundant network requests.
/// - Sharing ongoing requests to prevent duplicate fetches for the same
/// `app_data`.
/// - Validating fetched app data.
///
/// LRU cache is used since only ~2% of app-data is unique across all orders
/// meaning that the cache hit rate is expected to be high, so there is no need
/// for TTL cache.
#[derive(Clone)]
pub struct AppDataRetriever(Arc<Inner>);

struct Inner {
client: reqwest::Client,
base_url: Url,
request_sharing:
BoxRequestSharing<AppDataHash, Result<Option<app_data::ValidatedAppData>, FetchingError>>,
app_data_validator: app_data::Validator,
cache: Cache<AppDataHash, Option<app_data::ValidatedAppData>>,
}

impl AppDataRetriever {
pub fn new(orderbook_url: Url, cache_size: u64) -> Self {
Self(Arc::new(Inner {
client: reqwest::Client::new(),
base_url: orderbook_url,
request_sharing: BoxRequestSharing::labelled("app_data".to_string()),
app_data_validator: app_data::Validator::new(usize::MAX),
cache: Cache::new(cache_size),
}))
}

/// Retrieves the full app-data for the given `app_data` hash, if exists.
pub async fn get(
&self,
app_data: &AppDataHash,
) -> Result<Option<app_data::ValidatedAppData>, FetchingError> {
if let Some(app_data) = self.0.cache.get(app_data).await {
return Ok(app_data.clone());
}

let app_data_fut = move |app_data: &AppDataHash| {
let app_data = *app_data;
let self_ = self.clone();

async move {
let url = self_
.0
.base_url
.join(&format!("v1/app_data/{:?}", app_data.0))?;
mstrug marked this conversation as resolved.
Show resolved Hide resolved
let response = self_.0.client.get(url).send().await?;
let validated_app_data = match response.status() {
StatusCode::NOT_FOUND => None,
_ => {
let bytes = response.error_for_status()?.bytes().await?;
Some(self_.0.app_data_validator.validate(&bytes)?)
}
};

self_
.0
.cache
.insert(app_data, validated_app_data.clone())
.await;

Ok(validated_app_data)
}
.boxed()
};

self.0
.request_sharing
.shared_or_else(*app_data, app_data_fut)
.await
}
}

/// The app data associated with an order.
#[derive(Debug, Clone, From)]
pub enum AppData {
/// App data hash.
Hash(AppDataHash),
/// Validated full app data.
Full(Box<::app_data::ValidatedAppData>),
sunce86 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for AppData {
fn default() -> Self {
Self::Hash(Default::default())
}
}

impl AppData {
pub fn hash(&self) -> AppDataHash {
match self {
Self::Hash(hash) => *hash,
Self::Full(data) => AppDataHash(data.hash.0.into()),
}
}
}

impl From<[u8; APP_DATA_LEN]> for AppData {
fn from(app_data_hash: [u8; APP_DATA_LEN]) -> Self {
Self::Hash(app_data_hash.into())
}
}

impl From<::app_data::ValidatedAppData> for AppData {
fn from(value: ::app_data::ValidatedAppData) -> Self {
Self::Full(Box::new(value))
}
}

/// The length of the app data hash in bytes.
pub const APP_DATA_LEN: usize = 32;

/// This is a hash allowing arbitrary user data to be associated with an order.
/// While this type holds the hash, the data itself is uploaded to IPFS. This
/// hash is signed along with the order.
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
pub struct AppDataHash(pub Bytes<[u8; APP_DATA_LEN]>);

impl From<[u8; APP_DATA_LEN]> for AppDataHash {
fn from(inner: [u8; APP_DATA_LEN]) -> Self {
Self(inner.into())
}
}

impl From<AppDataHash> for [u8; APP_DATA_LEN] {
fn from(app_data: AppDataHash) -> Self {
app_data.0.into()
}
}

#[derive(Error, Debug)]
pub enum FetchingError {
#[error("error while sending a request: {0}")]
Http(String),
#[error("received invalid app data: {0}")]
InvalidAppData(#[from] anyhow::Error),
#[error("internal error: {0}")]
Internal(String),
}

impl From<reqwest::Error> for FetchingError {
fn from(err: reqwest::Error) -> Self {
FetchingError::Http(err.to_string())
}
}

impl From<url::ParseError> for FetchingError {
fn from(err: url::ParseError) -> Self {
FetchingError::Internal(err.to_string())
}
}

impl Clone for FetchingError {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required to satisfy BoxRequestSharing constraints.

fn clone(&self) -> Self {
match self {
Self::Http(message) => Self::Http(message.clone()),
Self::InvalidAppData(err) => Self::InvalidAppData(shared::clone_anyhow_error(err)),
Self::Internal(message) => Self::Internal(message.clone()),
}
}
}
Loading
Loading