Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solver participation guard #3257

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5fe0dd6
Solver participation validator
squadgazzz Jan 29, 2025
5319945
Test
squadgazzz Jan 29, 2025
e65c328
Avoid rpc calls every time
squadgazzz Jan 29, 2025
fc3321b
Typo
squadgazzz Jan 29, 2025
0fbd61c
Docs
squadgazzz Jan 29, 2025
b1abfa0
Metrics
squadgazzz Jan 29, 2025
292dcff
Configurable validators
squadgazzz Jan 29, 2025
fe9ef5b
Fixed clap config
squadgazzz Jan 29, 2025
c5e3502
Refactoring
squadgazzz Jan 30, 2025
a9e6a3f
Config per solver
squadgazzz Jan 30, 2025
9a55fe2
Start using the new config
squadgazzz Jan 30, 2025
f9bdafd
Simplify to hashset
squadgazzz Jan 30, 2025
5fc831e
Nit
squadgazzz Jan 30, 2025
3154cd0
Use driver's name in metrics
squadgazzz Jan 31, 2025
47007c1
Nit
squadgazzz Jan 31, 2025
bb9059e
Send metrics about each found solver
squadgazzz Jan 31, 2025
6787d34
Cache only accepted solvers
squadgazzz Jan 31, 2025
a2710c6
Refactoring
squadgazzz Jan 31, 2025
1f43009
Fix the tests
squadgazzz Feb 3, 2025
0d50991
Merge branch 'main' into blacklist-failing-solvers
squadgazzz Feb 3, 2025
366611d
Nits
squadgazzz Feb 7, 2025
e9a70f5
Trigger updates on the proposed_solution table insert
squadgazzz Feb 11, 2025
e220eaf
Nit
squadgazzz Feb 11, 2025
51832d4
Formatting
squadgazzz Feb 11, 2025
17ee52c
infra::Persistence
squadgazzz Feb 12, 2025
cba693a
Naming
squadgazzz Feb 12, 2025
c3c9433
Comment
squadgazzz Feb 14, 2025
58d7de1
Merge branch 'main' into blacklist-failing-solvers
squadgazzz Feb 14, 2025
fdc3afe
Merge branch 'main' into blacklist-failing-solvers
squadgazzz Feb 14, 2025
4f6cd1d
Comments
squadgazzz Feb 17, 2025
bdd33d0
Simplify the code
squadgazzz Feb 17, 2025
fd0fc27
Nits
squadgazzz Feb 17, 2025
e5250a5
Solver names in the log
squadgazzz Feb 17, 2025
051bd50
Naming
squadgazzz Feb 17, 2025
4bb8640
Fixes unit tests
squadgazzz Feb 17, 2025
de31f1e
Nit
squadgazzz Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/autopilot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ chrono = { workspace = true }
clap = { workspace = true }
contracts = { path = "../contracts" }
cow-amm = { path = "../cow-amm" }
dashmap = { workspace = true }
database = { path = "../database" }
derive_more = { workspace = true }
ethcontract = { workspace = true }
Expand Down
90 changes: 86 additions & 4 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,32 @@ pub struct Arguments {
/// Archive node URL used to index CoW AMM
#[clap(long, env)]
pub archive_node_url: Option<Url>,

/// Configuration for the solver participation guard.
#[clap(flatten)]
pub db_based_solver_participation_guard: DbBasedSolverParticipationGuardConfig,
}

#[derive(Debug, clap::Parser)]
pub struct DbBasedSolverParticipationGuardConfig {
/// Enables or disables the solver participation guard
#[clap(
id = "db_enabled",
long = "db-based-solver-participation-guard-enabled",
env = "DB_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED",
default_value = "true"
)]
pub enabled: bool,

/// Sets the duration for which the solver remains blacklisted.
/// Technically, the time-to-live for the solver participation blacklist
/// cache.
#[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)]
pub solver_blacklist_cache_ttl: Duration,

/// The number of last auctions to check solver participation eligibility.
#[clap(long, env, default_value = "3")]
pub solver_last_auctions_participation_count: u32,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -290,6 +316,7 @@ impl std::fmt::Display for Arguments {
max_winners_per_auction,
archive_node_url,
max_solutions_per_solver,
db_based_solver_participation_guard,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -373,6 +400,11 @@ impl std::fmt::Display for Arguments {
"max_solutions_per_solver: {:?}",
max_solutions_per_solver
)?;
writeln!(
f,
"db_based_solver_participation_guard: {:?}",
db_based_solver_participation_guard
)?;
Ok(())
}
}
Expand All @@ -384,6 +416,7 @@ pub struct Solver {
pub url: Url,
pub submission_account: Account,
pub fairness_threshold: Option<U256>,
pub requested_timeout_on_problems: bool,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -432,18 +465,31 @@ impl FromStr for Solver {
Account::Address(H160::from_str(parts[2]).context("failed to parse submission")?)
};

let fairness_threshold = match parts.get(3) {
Some(value) => {
Some(U256::from_dec_str(value).context("failed to parse fairness threshold")?)
let mut fairness_threshold: Option<U256> = Default::default();
let mut requested_timeout_on_problems = false;

if let Some(value) = parts.get(3) {
match U256::from_dec_str(value) {
Ok(parsed_fairness_threshold) => {
fairness_threshold = Some(parsed_fairness_threshold);
}
Err(_) => {
requested_timeout_on_problems =
value.to_lowercase() == "requested_timeout_on_problems";
}
}
None => None,
};

if let Some(value) = parts.get(4) {
requested_timeout_on_problems = value.to_lowercase() == "requested_timeout_on_problems";
}

Ok(Self {
name: name.to_owned(),
url,
fairness_threshold,
submission_account,
requested_timeout_on_problems,
})
}
}
Expand Down Expand Up @@ -640,6 +686,7 @@ mod test {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
requested_timeout_on_problems: false,
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
Expand All @@ -655,6 +702,7 @@ mod test {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
requested_timeout_on_problems: false,
submission_account: Account::Kms(
Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(),
),
Expand All @@ -673,6 +721,40 @@ mod test {
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
requested_timeout_on_problems: false,
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_accepts_unsettled_blocking_flag() {
let argument =
"name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|requested_timeout_on_problems";
let driver = Solver::from_str(argument).unwrap();
let expected = Solver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: None,
requested_timeout_on_problems: true,
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_threshold_and_accepts_unsettled_blocking_flag() {
let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|requested_timeout_on_problems";
let driver = Solver::from_str(argument).unwrap();
let expected = Solver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
requested_timeout_on_problems: true,
};
assert_eq!(driver, expected);
}
Expand Down
6 changes: 5 additions & 1 deletion crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use {
};

mod participant;
mod participation_guard;

pub use participant::{Participant, Ranked, Unranked};
pub use {
participant::{Participant, Ranked, Unranked},
participation_guard::SolverParticipationGuard,
};

type SolutionId = u64;

Expand Down
111 changes: 111 additions & 0 deletions crates/autopilot/src/domain/competition/participation_guard/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use {
crate::{
domain::{eth, Metrics},
infra,
},
ethrpc::block_stream::CurrentBlockWatcher,
std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
},
};

/// Checks the DB by searching for solvers that won N last consecutive auctions
/// but never settled any of them.
#[derive(Clone)]
pub(super) struct Validator(Arc<Inner>);

struct Inner {
persistence: infra::Persistence,
banned_solvers: dashmap::DashMap<eth::Address, Instant>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
}

impl Validator {
pub fn new(
persistence: infra::Persistence,
current_block: CurrentBlockWatcher,
competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
) -> Self {
let self_ = Self(Arc::new(Inner {
persistence,
banned_solvers: Default::default(),
ttl,
last_auctions_count,
drivers_by_address,
}));

self_.start_maintenance(competition_updates_receiver, current_block);

self_
}

/// Update the internal cache only once the competition auctions table is
/// updated to avoid redundant DB queries on each block or any other
/// timeout.
fn start_maintenance(
&self,
mut competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
current_block: CurrentBlockWatcher,
) {
let self_ = self.clone();
tokio::spawn(async move {
while competition_updates_receiver.recv().await.is_some() {
let current_block = current_block.borrow().number;
let non_settling_solvers = match self_
.0
.persistence
.find_non_settling_solvers(self_.0.last_auctions_count, current_block)
.await
{
Ok(non_settling_solvers) => non_settling_solvers,
Err(err) => {
tracing::warn!(?err, "error while searching for non-settling solvers");
continue;
}
};

let now = Instant::now();
let non_settling_solver_names: Vec<&str> = non_settling_solvers
.iter()
.filter_map(|solver| self_.0.drivers_by_address.get(solver))
.map(|driver| {
Metrics::get()
.non_settling_solver
.with_label_values(&[&driver.name]);
// Check if solver accepted this feature. This should be removed once the
// CIP making this mandatory has been approved.
if driver.requested_timeout_on_problems {
tracing::debug!(solver = ?driver.name, "disabling solver temporarily");
self_
.0
.banned_solvers
.insert(driver.submission_address, now);
}
driver.name.as_ref()
})
.collect();

tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers");
}
tracing::error!("stream of settlement updates terminated unexpectedly");
});
}
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
if let Some(entry) = self.0.banned_solvers.get(solver) {
return Ok(entry.elapsed() >= self.0.ttl);
}

Ok(true)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
mod db;
mod onchain;

use {
crate::{
arguments::DbBasedSolverParticipationGuardConfig,
domain::eth,
infra::{self, Ethereum},
},
std::sync::Arc,
};

/// This struct checks whether a solver can participate in the competition by
/// using different validators.
#[derive(Clone)]
pub struct SolverParticipationGuard(Arc<Inner>);

struct Inner {
/// Stores the validators in order they will be called.
validators: Vec<Box<dyn Validator + Send + Sync>>,
}

impl SolverParticipationGuard {
pub fn new(
eth: Ethereum,
persistence: infra::Persistence,
competition_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
db_based_validator_config: DbBasedSolverParticipationGuardConfig,
drivers: impl IntoIterator<Item = Arc<infra::Driver>>,
) -> Self {
let mut validators: Vec<Box<dyn Validator + Send + Sync>> = Vec::new();

if db_based_validator_config.enabled {
let current_block = eth.current_block().clone();
let database_solver_participation_validator = db::Validator::new(
persistence,
current_block,
competition_updates_receiver,
db_based_validator_config.solver_blacklist_cache_ttl,
db_based_validator_config.solver_last_auctions_participation_count,
drivers
.into_iter()
.map(|driver| (driver.submission_address, driver.clone()))
.collect(),
);
validators.push(Box::new(database_solver_participation_validator));
}

let onchain_solver_participation_validator = onchain::Validator { eth };
validators.push(Box::new(onchain_solver_participation_validator));

Self(Arc::new(Inner { validators }))
}

/// Checks if a solver can participate in the competition.
/// Sequentially asks internal validators to avoid redundant RPC calls in
/// the following order:
/// 1. DB-based validator: operates fast since it uses in-memory cache.
/// 2. Onchain-based validator: only then calls the Authenticator contract.
pub async fn can_participate(&self, solver: &eth::Address) -> anyhow::Result<bool> {
for validator in &self.0.validators {
if !validator.is_allowed(solver).await? {
return Ok(false);
}
}

Ok(true)
}
}

#[async_trait::async_trait]
trait Validator: Send + Sync {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::{domain::eth, infra::Ethereum};

/// Calls Authenticator contract to check if a solver has a sufficient
/// permission.
pub(super) struct Validator {
pub eth: Ethereum,
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
Ok(self
.eth
.contracts()
.authenticator()
.is_solver(solver.0)
.call()
.await?)
}
}
Loading
Loading