Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solver participation guard #3257

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/autopilot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ chrono = { workspace = true }
clap = { workspace = true }
contracts = { path = "../contracts" }
cow-amm = { path = "../cow-amm" }
dashmap = { workspace = true }
database = { path = "../database" }
derive_more = { workspace = true }
ethcontract = { workspace = true }
Expand Down
91 changes: 87 additions & 4 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,30 @@ pub struct Arguments {
/// Archive node URL used to index CoW AMM
#[clap(long, env)]
pub archive_node_url: Option<Url>,

/// Configuration for the solver participation guard.
#[clap(flatten)]
pub db_based_solver_participation_guard: DbBasedSolverParticipationGuardConfig,
}

#[derive(Debug, clap::Parser)]
pub struct DbBasedSolverParticipationGuardConfig {
/// Enables or disables the solver participation guard
#[clap(
id = "db_enabled",
long = "db-based-solver-participation-guard-enabled",
env = "DB_BASED_SOLVER_PARTICIPATION_GUARD_ENABLED",
default_value = "true"
)]
pub enabled: bool,

/// The time-to-live for the solver participation blacklist cache.
#[clap(long, env, default_value = "5m", value_parser = humantime::parse_duration)]
pub solver_blacklist_cache_ttl: Duration,

/// The number of last auctions to check solver participation eligibility.
#[clap(long, env, default_value = "3")]
pub solver_last_auctions_participation_count: u32,
}
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -290,6 +314,7 @@ impl std::fmt::Display for Arguments {
max_winners_per_auction,
archive_node_url,
max_solutions_per_solver,
db_based_solver_participation_guard,
} = self;

write!(f, "{}", shared)?;
Expand Down Expand Up @@ -373,6 +398,11 @@ impl std::fmt::Display for Arguments {
"max_solutions_per_solver: {:?}",
max_solutions_per_solver
)?;
writeln!(
f,
"db_based_solver_participation_guard: {:?}",
db_based_solver_participation_guard
)?;
Ok(())
}
}
Expand All @@ -384,6 +414,7 @@ pub struct Solver {
pub url: Url,
pub submission_account: Account,
pub fairness_threshold: Option<U256>,
pub accepts_unsettled_blocking: bool,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -432,18 +463,34 @@ impl FromStr for Solver {
Account::Address(H160::from_str(parts[2]).context("failed to parse submission")?)
};

let fairness_threshold = match parts.get(3) {
Some(value) => {
Some(U256::from_dec_str(value).context("failed to parse fairness threshold")?)
let mut fairness_threshold: Option<U256> = Default::default();
let mut accepts_unsettled_blocking = false;

if let Some(value) = parts.get(3) {
match U256::from_dec_str(value) {
Ok(parsed_fairness_threshold) => {
fairness_threshold = Some(parsed_fairness_threshold);
}
Err(_) => {
accepts_unsettled_blocking = value
.parse()
.context("failed to parse solver's third arg param")?
}
}
None => None,
};

if let Some(value) = parts.get(4) {
accepts_unsettled_blocking = value
.parse()
.context("failed to parse `accepts_unsettled_blocking` flag")?;
}

Ok(Self {
name: name.to_owned(),
url,
fairness_threshold,
submission_account,
accepts_unsettled_blocking,
})
}
}
Expand Down Expand Up @@ -640,6 +687,7 @@ mod test {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
accepts_unsettled_blocking: false,
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
Expand All @@ -655,6 +703,7 @@ mod test {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
accepts_unsettled_blocking: false,
submission_account: Account::Kms(
Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(),
),
Expand All @@ -673,6 +722,40 @@ mod test {
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
accepts_unsettled_blocking: false,
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_accepts_unsettled_blocking_flag() {
let argument =
"name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|true";
let driver = Solver::from_str(argument).unwrap();
let expected = Solver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: None,
accepts_unsettled_blocking: true,
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_threshold_and_accepts_unsettled_blocking_flag() {
let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000|true";
let driver = Solver::from_str(argument).unwrap();
let expected = Solver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
accepts_unsettled_blocking: true,
};
assert_eq!(driver, expected);
}
Expand Down
24 changes: 24 additions & 0 deletions crates/autopilot/src/database/competition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,28 @@ impl super::Postgres {

Ok(())
}

/// Finds solvers that won `last_auctions_count` consecutive auctions but
/// never settled any of them. The current block is used to prevent
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
/// selecting auctions with deadline after the current block since they
/// still can be settled.
pub async fn find_non_settling_solvers(
&self,
last_auctions_count: u32,
current_block: u64,
) -> anyhow::Result<Vec<Address>> {
let mut ex = self.pool.acquire().await.context("acquire")?;
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["find_non_settling_solvers"])
.start_timer();

database::solver_competition::find_non_settling_solvers(
&mut ex,
last_auctions_count,
current_block,
)
.await
.context("solver_competition::find_non_settling_solvers")
}
}
6 changes: 5 additions & 1 deletion crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ use {
};

mod participant;
mod participation_guard;

pub use participant::{Participant, Ranked, Unranked};
pub use {
participant::{Participant, Ranked, Unranked},
participation_guard::SolverParticipationGuard,
};

type SolutionId = u64;

Expand Down
123 changes: 123 additions & 0 deletions crates/autopilot/src/domain/competition/participation_guard/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use {
crate::{
database::Postgres,
domain::{eth, Metrics},
infra,
},
ethrpc::block_stream::CurrentBlockWatcher,
std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
},
};

/// Checks the DB by searching for solvers that won N last consecutive auctions
/// but never settled any of them.
#[derive(Clone)]
pub(super) struct Validator(Arc<Inner>);

struct Inner {
db: Postgres,
banned_solvers: dashmap::DashMap<eth::Address, Instant>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
}

impl Validator {
pub fn new(
db: Postgres,
current_block: CurrentBlockWatcher,
settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
) -> Self {
let self_ = Self(Arc::new(Inner {
db,
banned_solvers: Default::default(),
ttl,
last_auctions_count,
drivers_by_address,
}));

self_.start_maintenance(settlement_updates_receiver, current_block);

self_
}

/// Update the internal cache only once the settlement table is updated to
/// avoid redundant DB queries.
fn start_maintenance(
&self,
mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
current_block: CurrentBlockWatcher,
) {
let self_ = self.clone();
tokio::spawn(async move {
while settlement_updates_receiver.recv().await.is_some() {
let current_block = current_block.borrow().number;
match self_
.0
.db
.find_non_settling_solvers(self_.0.last_auctions_count, current_block)
.await
{
Ok(non_settling_solvers) => {
let non_settling_drivers = non_settling_solvers
.into_iter()
.filter_map(|solver| {
let address = eth::Address(solver.0.into());
self_.0.drivers_by_address.get(&address).map(|driver| {
Metrics::get()
.non_settling_solver
.with_label_values(&[&driver.name]);

driver.clone()
})
})
.collect::<Vec<_>>();

let non_settling_solver_names = non_settling_drivers
.iter()
.map(|driver| driver.name.clone())
.collect::<Vec<_>>();

tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers");

let now = Instant::now();
non_settling_drivers
.into_iter()
// Check if solver accepted this feature. This should be removed once a CIP is
// approved.
.filter_map(|driver| {
driver.accepts_unsettled_blocking.then_some(driver.submission_address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this filtering can be avoided completely if you filter out drivers that disabled this feature right in the Validator constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The on-call week erased some contexts from my head. Initially, I filtered out drivers in the constructor, but then I had to implement this approach. Trying to remember what was the reason 🙈

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log and the metric were the reason:

tracing::debug!(solvers = ?non_settling_solver_names, "found non-settling solvers");

Metrics::get()
.non_settling_solver
.with_label_values(&[&driver.name]);

Without the full drivers set, there will be just addresses. Since this feature is expected to be enabled selectively, not for all the solvers, using this metric and the log we can identify the problematic solvers that didn't accept the feature but still show low performance.

})
.for_each(|solver| {
self_.0.banned_solvers.insert(solver, now);
});
}
Err(err) => {
tracing::warn!(?err, "error while searching for non-settling solvers")
}
}
}
});
}
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, solver: &eth::Address) -> anyhow::Result<bool> {
if let Some(entry) = self.0.banned_solvers.get(solver) {
if Instant::now().duration_since(*entry.value()) < self.0.ttl {
return Ok(false);
} else {
self.0.banned_solvers.remove(solver);
}
}

Ok(true)
}
}
Loading
Loading