Skip to content

Commit

Permalink
Respect driver deadline in solution processing (#2059)
Browse files Browse the repository at this point in the history
# Description
Dependent on #2061

Implements deadline for settlement encoding and solution merging in
`driver` as explained here
#1498 (comment)

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [ ] Solutions encoding now returns a stream that can be combined with
other post-processing steps before all settlements are encoded
- [ ] Merging happens as a `map` operation on the stream. As new encoded
solutions come in, we attempt to merge them into previously found
solutions. This changes the existing merge algorithm behavior, but
should be fine as it's an imperfect optimisation for Gnosis solvers
anyways.
- [ ] Adjusted deadline for re-simulation to use `driver` and not
`solver` timeout.

## How to test
CI

---------

Co-authored-by: Felix Leupold <[email protected]>
  • Loading branch information
sunce86 and fleupold authored Nov 22, 2023
1 parent 04feb02 commit f7b1e2e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 67 deletions.
117 changes: 58 additions & 59 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use {
},
util::Bytes,
},
futures::{future::join_all, StreamExt},
futures::{stream::FuturesUnordered, Stream, StreamExt},
itertools::Itertools,
rand::seq::SliceRandom,
std::{collections::HashSet, sync::Mutex},
tap::TapFallible,
};
Expand Down Expand Up @@ -76,6 +75,8 @@ impl Competition {
}
})?;

observe::postprocessing(&solutions, auction.deadline().driver().unwrap_or_default());

// Discard solutions that don't have unique ID.
let mut ids = HashSet::new();
let solutions = solutions.into_iter().filter(|solution| {
Expand All @@ -88,7 +89,7 @@ impl Competition {
}
});

// Empty solutions aren't useful, so discard them.
// Discard empty solutions.
let solutions = solutions.filter(|solution| {
if solution.is_empty() {
observe::empty_solution(self.solver.name(), solution.id());
Expand All @@ -99,67 +100,37 @@ impl Competition {
}
});

// Encode the solutions into settlements.
let settlements = join_all(solutions.map(|solution| async move {
observe::encoding(solution.id());
(
solution.id(),
solution.encode(auction, &self.eth, &self.simulator).await,
)
}))
.await;

// Filter out solutions that failed to encode.
let mut settlements = settlements
.into_iter()
.filter_map(|(id, result)| {
// Encode solutions into settlements (streamed).
let encoded = solutions
.map(|solution| async move {
let id = solution.id();
observe::encoding(id);
let settlement = solution.encode(auction, &self.eth, &self.simulator).await;
(id, settlement)
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(id, result)| async move {
result
.tap_err(|err| {
observe::encoding_failed(self.solver.name(), id, err);
notify::encoding_failed(&self.solver, auction.id(), id, err);
})
.ok()
})
.collect_vec();

// TODO(#1483): parallelize this
// TODO(#1480): more optimal approach for settlement merging

// Merge the settlements in random order.
settlements.shuffle(&mut rand::thread_rng());

// The merging algorithm works as follows: the [`settlements`] vector keeps the
// "most merged" settlements until they can't be merged anymore, at
// which point they are moved into the [`results`] vector.

// The merged settlements in their final form.
let mut results = Vec::new();
while let Some(settlement) = settlements.pop() {
// Has [`settlement`] been merged into another settlement?
let mut merged = false;
// Try to merge [`settlement`] into some other settlement.
for other in settlements.iter_mut() {
match other.merge(&settlement, &self.eth, &self.simulator).await {
Ok(m) => {
*other = m;
merged = true;
observe::merged(&settlement, other);
break;
}
Err(err) => {
observe::not_merged(&settlement, other, err);
}
}
}
// If [`settlement`] can't be merged into any other settlement, this is its
// final, most optimal form. Push it to the results.
if !merged {
results.push(settlement);
}
});

// Merge settlements as they arrive until there are no more new settlements or
// timeout is reached.
let mut settlements = Vec::new();
if tokio::time::timeout(
auction.deadline().driver().unwrap_or_default(),
merge_settlements(&mut settlements, encoded, &self.eth, &self.simulator),
)
.await
.is_err()
{
observe::postprocessing_timed_out(&settlements)
}

let settlements = results;

// Score the settlements.
let scores = settlements
.into_iter()
Expand Down Expand Up @@ -221,7 +192,7 @@ impl Competition {
ethrpc::current_block::into_stream(self.eth.current_block().clone());
while let Some(block) = stream.next().await {
if let Err(err) = self.simulate_settlement(&settlement).await {
tracing::warn!(block = block.number, ?err, "solution reverts on new block");
observe::winner_voided(block, &err);
*score_ref = None;
*self.settlement.lock().unwrap() = None;
if let Some(id) = settlement.notify_id() {
Expand All @@ -231,8 +202,7 @@ impl Competition {
}
}
};
let timeout = remaining.to_std().unwrap_or_default();
let _ = tokio::time::timeout(timeout, simulate_on_new_blocks).await;
let _ = tokio::time::timeout(remaining, simulate_on_new_blocks).await;
}

Ok(score)
Expand Down Expand Up @@ -331,6 +301,35 @@ impl Competition {
}
}

/// Tries to merge the incoming stream of new settlements into existing ones.
/// Always adds the new settlement by itself.
async fn merge_settlements(
merged: &mut Vec<Settlement>,
new: impl Stream<Item = Settlement>,
eth: &Ethereum,
simulator: &Simulator,
) {
let mut new = std::pin::pin!(new);
while let Some(settlement) = new.next().await {
// Try to merge [`settlement`] into some settlements.
for other in merged.iter_mut() {
match other.merge(&settlement, eth, simulator).await {
Ok(m) => {
*other = m;
observe::merged(&settlement, other);
// could possibly break here if we want to avoid merging
// into multiple settlements
}
Err(err) => {
observe::not_merged(&settlement, other, err);
}
}
}
// add [`settlement`] by itself
merged.push(settlement);
}
}

/// Solution information sent to the protocol by the driver before the solution
/// ranking happens.
#[derive(Debug)]
Expand Down
6 changes: 3 additions & 3 deletions crates/driver/src/domain/competition/solution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ impl SolverTimeout {
}
}

impl From<chrono::Duration> for SolverTimeout {
fn from(duration: chrono::Duration) -> Self {
Self(duration)
impl From<std::time::Duration> for SolverTimeout {
fn from(duration: std::time::Duration) -> Self {
Self(chrono::Duration::from_std(duration).unwrap_or(chrono::Duration::max_value()))
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/driver/src/domain/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,24 @@ impl Deadline {

/// Remaining time until the deadline for driver to return solution to
/// autopilot is reached.
pub fn driver(self) -> Result<chrono::Duration, DeadlineExceeded> {
pub fn driver(self) -> Result<std::time::Duration, DeadlineExceeded> {
Self::remaining(self.driver)
}

/// Remaining time until the deadline for solvers to return solution to
/// driver is reached.
pub fn solvers(self) -> Result<chrono::Duration, DeadlineExceeded> {
pub fn solvers(self) -> Result<std::time::Duration, DeadlineExceeded> {
Self::remaining(self.solvers)
}

fn remaining(
deadline: chrono::DateTime<chrono::Utc>,
) -> Result<chrono::Duration, DeadlineExceeded> {
) -> Result<std::time::Duration, DeadlineExceeded> {
let deadline = deadline - infra::time::now();
if deadline <= chrono::Duration::zero() {
Err(DeadlineExceeded)
} else {
Ok(deadline)
Ok(deadline.to_std().expect("not negative"))
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use {
infra::solver,
util::http,
},
ethrpc::current_block::BlockInfo,
std::collections::HashMap,
url::Url,
};
Expand Down Expand Up @@ -87,6 +88,24 @@ pub fn empty_solution(solver: &solver::Name, id: solution::Id) {
.inc();
}

// Observe that postprocessing (encoding & merging) of solutions is about to
// start.
pub fn postprocessing(solutions: &[Solution], deadline: std::time::Duration) {
tracing::debug!(
solutions = ?solutions.len(),
remaining = ?deadline,
"postprocessing solutions"
);
}

// Observe that postprocessing didn't complete before the timeout.
pub fn postprocessing_timed_out(completed: &[Settlement]) {
tracing::debug!(
completed = ?completed.len(),
"postprocessing solutions timed out"
);
}

/// Observe that a solution is about to be encoded into a settlement.
pub fn encoding(id: solution::Id) {
tracing::trace!(?id, "encoding settlement");
Expand Down Expand Up @@ -147,6 +166,12 @@ pub fn score(settlement: &Settlement, score: &competition::Score) {
);
}

// Observe that the winning settlement started failing upon arrival of a new
// block
pub fn winner_voided(block: BlockInfo, err: &simulator::Error) {
tracing::warn!(block = block.number, ?err, "solution reverts on new block");
}

pub fn revealing() {
tracing::trace!("revealing");
}
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/tests/setup/solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Solver {
"orders": orders_json,
"liquidity": [],
"effectiveGasPrice": effective_gas_price,
"deadline": infra::time::now() + deadline.solvers().unwrap() - http_delay,
"deadline": infra::time::now() + chrono::Duration::from_std(deadline.solvers().unwrap()).unwrap() - http_delay,
});
assert_eq!(req, expected, "unexpected /solve request");
let mut state = state.0.lock().unwrap();
Expand Down

0 comments on commit f7b1e2e

Please sign in to comment.