From 3105fdf63c1da83ea424d58049a80ea12a3f4bc2 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 20 Nov 2023 13:17:36 -0500 Subject: [PATCH] refactor: move fee selection out of indexer selection (#419) This removes Agora (and therefore GraphQL-specific) cost models from indexer-selection. The fee negotiation step now happens before indexer selections (and no longer requires a retry). For now, the interaction between budgeting & candidate fees is simple, but that's acceptable for now since we are paying indexers more than they are requesting. Volume discounting has been removed entirely. --- Cargo.lock | 3 +- Cargo.toml | 1 - graph-gateway/Cargo.toml | 7 +- graph-gateway/src/block_constraints.rs | 10 +- graph-gateway/src/budgets.rs | 242 +++---------------------- graph-gateway/src/client_query.rs | 139 ++++++++------ graph-gateway/src/indexing.rs | 7 +- graph-gateway/src/main.rs | 1 - indexer-selection/Cargo.toml | 5 - indexer-selection/src/fee.rs | 184 ------------------- indexer-selection/src/indexing.rs | 19 +- indexer-selection/src/lib.rs | 52 +++--- indexer-selection/src/score.rs | 31 +++- indexer-selection/src/simulation.rs | 27 ++- indexer-selection/src/test.rs | 108 +++++------ indexer-selection/src/test_utils.rs | 10 +- prelude/src/lib.rs | 4 +- 17 files changed, 235 insertions(+), 615 deletions(-) delete mode 100644 indexer-selection/src/fee.rs diff --git a/Cargo.lock b/Cargo.lock index 8f923e56..d4d5700c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1702,6 +1702,7 @@ dependencies = [ "assert_matches", "axum", "chrono", + "cost-model", "ethers", "eventuals", "faster-hex", @@ -2077,8 +2078,6 @@ dependencies = [ "alloy-primitives", "anyhow", "arrayvec 0.7.4", - "cost-model", - "eventuals", "num-traits", "ordered-float", "permutation", diff --git a/Cargo.toml b/Cargo.toml index a814a12d..931b3eaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ debug = true [workspace.dependencies] anyhow = "1.0" alloy-primitives = { version = "0.4.2", features = ["serde"] } -eventuals = "0.6.7" rand = { version = "0.8", features = ["small_rng"] } reqwest = { version = "0.11", default-features = false, features = [ "json", diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index 84f97b1f..b9fb8ac0 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -13,13 +13,16 @@ axum = { version = "0.6.15", default-features = false, features = [ "original-uri", ] } chrono = { version = "0.4", default-features = false, features = ["clock"] } +cost-model = { git = "https://github.com/graphprotocol/agora", rev = "9984f9e" } ethers = { version = "2.0.10", default-features = false, features = ["abigen"] } -eventuals.workspace = true +eventuals = "0.6.7" faster-hex = "0.8.0" futures = "0.3" graph-subscriptions = { git = "https://github.com/edgeandnode/subscription-payments", rev = "334d18b" } graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.1.0" } -graphql-http = { git = "https://github.com/edgeandnode/toolshed.git", tag = "graphql-http-v0.1.0", features = ["http-reqwest"] } +graphql-http = { git = "https://github.com/edgeandnode/toolshed.git", tag = "graphql-http-v0.1.0", features = [ + "http-reqwest", +] } hex = "0.4" indexer-selection = { path = "../indexer-selection" } indoc = "2.0.3" diff --git a/graph-gateway/src/block_constraints.rs b/graph-gateway/src/block_constraints.rs index 6d443540..40b98617 100644 --- a/graph-gateway/src/block_constraints.rs +++ b/graph-gateway/src/block_constraints.rs @@ -1,16 +1,16 @@ use std::collections::{BTreeMap, BTreeSet}; use alloy_primitives::{BlockHash, BlockNumber}; +use cost_model::Context; use graphql::graphql_parser::query::{ Definition, Document, OperationDefinition, Selection, Text, Value, }; use graphql::{IntoStaticValue as _, QueryVariables, StaticValue}; +use indexer_selection::UnresolvedBlock; use itertools::Itertools as _; use serde_json::{self, json}; use toolshed::thegraph::BlockPointer; -use indexer_selection::{Context, UnresolvedBlock}; - #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub enum BlockConstraint { Unconstrained, @@ -29,7 +29,9 @@ impl BlockConstraint { } } -pub fn block_constraints<'c>(context: &'c Context<'c>) -> Option> { +pub fn block_constraints<'c>( + context: &'c Context<'c, String>, +) -> Option> { let mut constraints = BTreeSet::new(); let vars = &context.variables; // ba6c90f1-3baf-45be-ac1c-f60733404436 @@ -72,7 +74,7 @@ pub fn block_constraints<'c>(context: &'c Context<'c>) -> Option, + mut ctx: Context<'_, String>, resolved: &BTreeSet, latest: &BlockPointer, ) -> Option { diff --git a/graph-gateway/src/budgets.rs b/graph-gateway/src/budgets.rs index ac731346..da4c708f 100644 --- a/graph-gateway/src/budgets.rs +++ b/graph-gateway/src/budgets.rs @@ -1,85 +1,69 @@ -use std::collections::HashMap; +use std::time::Duration; -use eventuals::{Eventual, EventualWriter, Ptr}; -use indexer_selection::{ - decay::{Decay, FastDecayBuffer}, - impl_struct_decay, -}; -use prelude::{UDecimal18, USD}; -use tokio::{ - select, spawn, - sync::mpsc, - time::{interval, Duration, Instant}, -}; -use toolshed::thegraph::DeploymentId; +use eventuals::{Eventual, EventualWriter}; +use indexer_selection::decay::FastDecayBuffer; +use prelude::*; +use tokio::time::interval; +use tokio::{select, spawn, sync::mpsc}; use crate::metrics::METRICS; -/// This 10e-6 number comes some back-of-the-napkin calculations on what we expect is the minimum -/// fee an indexer should be paid per query, based on hosted service costs attributable to serving -/// queries in June 2023. Now we are using it as the maximum discount instead of the minimum budget. -const MAX_DISCOUNT_USD: f64 = 10e-6; - pub struct Budgeter { pub feedback: mpsc::UnboundedSender, - pub budgets: Eventual>>, - query_fees_target: USD, + absolute_budget_limit: USD, + budget_limit: Eventual, } pub struct Feedback { - pub deployment: DeploymentId, pub fees: USD, pub query_count: u64, } impl Budgeter { pub fn new(query_fees_target: USD) -> Self { - assert!(f64::from(query_fees_target.0) >= MAX_DISCOUNT_USD); let (feedback_tx, feedback_rx) = mpsc::unbounded_channel(); - let (budgets_tx, budgets_rx) = Eventual::new(); - Actor::create(feedback_rx, budgets_tx, query_fees_target); + let (budget_limit_tx, budget_limit_rx) = Eventual::new(); + Actor::create(feedback_rx, budget_limit_tx, query_fees_target); + let absolute_budget_limit = USD(query_fees_target.0 * UDecimal18::from(10)); Self { feedback: feedback_tx, - budgets: budgets_rx, - query_fees_target, + absolute_budget_limit, + budget_limit: budget_limit_rx, } } - pub fn budget(&self, deployment: &DeploymentId, query_count: u64) -> USD { - let budget = self - .budgets + pub fn budget(&self, query_count: u64, candidate_fees: &[USD]) -> USD { + let budget_limit = self + .budget_limit .value_immediate() - .and_then(|budgets| budgets.get(deployment).copied()) - .unwrap_or(self.query_fees_target); + .unwrap_or(self.absolute_budget_limit); + let max_fee = candidate_fees.iter().max().cloned().unwrap_or_default(); + let budget = max_fee.max(budget_limit).min(self.absolute_budget_limit); USD(budget.0 * UDecimal18::from(query_count as u128)) } } struct Actor { feedback: mpsc::UnboundedReceiver, - budgets: EventualWriter>>, - volume_estimators: HashMap, + budget_limit: EventualWriter, controller: Controller, } impl Actor { fn create( feedback: mpsc::UnboundedReceiver, - budgets: EventualWriter>>, + budget_limit: EventualWriter, query_fees_target: USD, ) { let mut actor = Actor { feedback, - budgets, - volume_estimators: HashMap::default(), + budget_limit, controller: Controller::new(query_fees_target), }; - let mut decay_timer = interval(Duration::from_secs(120)); let mut budget_timer = interval(Duration::from_secs(1)); spawn(async move { loop { select! { - _ = decay_timer.tick() => actor.decay(), Some(msg) = actor.feedback.recv() => actor.feedback(msg), _ = budget_timer.tick() => actor.revise_budget(), } @@ -87,59 +71,21 @@ impl Actor { }); } - fn decay(&mut self) { - let now = Instant::now(); - for estimator in self.volume_estimators.values_mut() { - estimator.decay(now); - } - } - fn feedback(&mut self, feedback: Feedback) { self.controller .add_queries(feedback.fees, feedback.query_count); - self.volume_estimators - .entry(feedback.deployment) - .or_insert_with(|| VolumeEstimator::new(Instant::now())) - .add_queries(feedback.query_count); } fn revise_budget(&mut self) { if self.controller.recent_query_count == 0 { return; } - let target = self.controller.target_query_fees; - let control_variable = self.controller.control_variable(); - tracing::debug!(budget_control_variable = ?control_variable); - let now = Instant::now(); - let budgets = self - .volume_estimators - .iter() - .map(|(deployment, volume_estimator)| { - let volume = volume_estimator.monthly_volume_estimate(now) as u64; - let mut budget = volume_discount(volume, target).0 * control_variable; - // limit budget to 100x target - budget = budget.min(target.0 * UDecimal18::from(100)); - (*deployment, USD(budget)) - }) - .collect(); - - self.budgets.write(Ptr::new(budgets)); + let budget_limit = USD(self.controller.control_variable()); + tracing::debug!(?budget_limit); + self.budget_limit.write(budget_limit); } } -fn volume_discount(monthly_volume: u64, target: USD) -> USD { - // Discount the budget, based on a generalized logistic function. We apply little to no discount - // between 0 and ~10e3 queries per month. And we limit the discount to 10E-6 USD. - // https://www.desmos.com/calculator/whtakt50sa - let b_max: f64 = target.0.into(); - let b_min = b_max - MAX_DISCOUNT_USD; - let m: f64 = 1e6; - let z: f64 = 0.45; - let v = monthly_volume as f64; - let budget = b_min + ((b_max - b_min) * m.powf(z)) / (v + m).powf(z); - USD(budget.try_into().unwrap_or_default()) -} - /// State for the control loop targeting `recent_query_fees`. struct Controller { target_query_fees: USD, @@ -182,150 +128,12 @@ impl Controller { } } -struct VolumeEstimator { - history: FastDecayBuffer, - last_time: Instant, -} - -#[derive(Default)] -struct QueryVolume { - time_elapsed: Duration, - num_queries: f64, -} - -impl_struct_decay!(QueryVolume { - time_elapsed, - num_queries -}); - -impl VolumeEstimator { - pub fn new(now: Instant) -> Self { - Self { - last_time: now, - history: FastDecayBuffer::new(), - } - } - - // This must be called on a regular interval. The unit tests are assuming - // 2 minutes. - pub fn decay(&mut self, now: Instant) { - let prev = self.last_time; - self.last_time = now; - self.history.current_mut().time_elapsed += now - prev; - self.history.decay(); - } - - pub fn add_queries(&mut self, count: u64) { - self.history.current_mut().num_queries += count as f64; - } - - pub fn monthly_volume_estimate(&self, now: Instant) -> f64 { - let mut elapsed_time = now - self.last_time; - let mut queries = 0.0; - for frame in self.history.frames() { - elapsed_time += frame.time_elapsed; - queries += frame.num_queries; - } - - // Scale to 30 days - let scale = 60.0 * 60.0 * 24.0 * 30.0; - let elapsed_time = elapsed_time.as_secs_f64(); - - (queries * scale) / elapsed_time - } -} - #[cfg(test)] mod tests { use indexer_selection::test_utils::assert_within; use super::*; - #[track_caller] - fn assert_approx(expected: f64, actual: f64, within: f64) { - assert!((actual - expected).abs() <= within); - } - - #[test] - fn stable_volume() { - let mut now = Instant::now(); - let mut estimate = VolumeEstimator::new(now); - - // Over a long period, do 2 queries per second and verify that the 30 day estimate is - // 5184000 across multiple delays - const COUNT: f64 = 2.0 * 21600.0 * 120.0; - for _ in 0..50 { - for _ in 0..120 { - now += Duration::from_secs(1); - estimate.add_queries(2); - // Very precise, correct within < 1 query. - assert_approx(estimate.monthly_volume_estimate(now), COUNT, 1.0); - } - estimate.decay(now); - } - } - - #[test] - fn sine_volume() { - let mut now = Instant::now(); - let mut estimate = VolumeEstimator::new(now); - - // Show that a stable oscillating traffic has low variance when looking at the estimate. - let mut elapsed = 0.0_f64; - for _ in 0..100 { - for _ in 0..1000 { - for _ in 0..120 { - now += Duration::from_secs(1); - elapsed += 1.0; - // sin is -1 .. 1, so the range here is 100.0 .. 200.0 - let queries = ((elapsed / 1000.0).sin() + 3.0) * 50.0; - estimate.add_queries(queries as u64); - } - estimate.decay(now); - } - let daily_estimate = estimate.monthly_volume_estimate(now) / 30.0; - // The center of the range is 12,960,000. - // The QPS oscillates at +- 33% - // But, the estimate is within 2% on each iteration, - // and is sometimes much closer. Of course, that means the - // total error is less than 2% as well. - assert_approx(12960000.0, daily_estimate, 250000.0); - } - } - - #[test] - fn volume_increase() { - let mut now = Instant::now(); - let mut estimate = VolumeEstimator::new(now); - - // Over a month, do 1 queries per minute. This is "testing" - for _ in 0..21600 { - now += Duration::from_secs(120); - estimate.add_queries(2); - estimate.decay(now); - } - // Now in "prod", do 20 queries per second. An increase of 1200x. - // 30 days, 24 hours per day, 30 2 minute intervals per hour. - let frames = 30_u64 * 24 * 30; - // 2400 queries in 2 minutes is 20 per second. - let per_frame = 2400_u64; - for _ in 0..frames { - for _ in 0..per_frame { - now += Duration::from_secs_f64(0.05); - estimate.add_queries(1); - } - estimate.decay(now); - } - - let queries = (frames * per_frame) as f64; - let estimation = estimate.monthly_volume_estimate(now); - - // Show that over 30 days this large increase of query volume was estimated more or less - // appropriately (within 3%). - assert!(estimation > queries); - assert!(estimation < (queries * 1.03)); - } - #[test] fn controller() { fn test_controller( diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index e8b951eb..e2615c9c 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -1,14 +1,10 @@ -use std::collections::HashSet; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::str::FromStr; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::Arc; use std::time::{Duration, Instant}; -use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, - sync::{ - atomic::{self, AtomicUsize}, - Arc, - }, -}; +use alloy_primitives::U256; use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, bail, Context as _}; use axum::extract::OriginalUri; @@ -20,10 +16,18 @@ use axum::{ http::{header, HeaderMap, Response, StatusCode}, RequestPartsExt, }; +use cost_model::{Context as AgoraContext, CostError, CostModel}; use eventuals::{Eventual, Ptr}; use futures::future::join_all; use graphql::graphql_parser::query::{OperationDefinition, SelectionSet}; +use indexer_selection::Candidate; +use indexer_selection::{ + actor::Update, BlockRequirements, IndexerError as IndexerSelectionError, + IndexerErrorObservation, Indexing, InputError, Selection, UnresolvedBlock, UtilityParameters, + SELECTION_LIMIT, +}; use lazy_static::lazy_static; +use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT}; use prelude::{UDecimal18, USD}; use prost::bytes::Buf; use rand::{rngs::SmallRng, SeedableRng as _}; @@ -36,13 +40,6 @@ use toolshed::url::Url; use tracing::Instrument; use uuid::Uuid; -use indexer_selection::{ - actor::Update, BlockRequirements, Context as AgoraContext, - IndexerError as IndexerSelectionError, IndexerErrorObservation, Indexing, InputError, - Selection, UnresolvedBlock, UtilityParameters, SELECTION_LIMIT, -}; -use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT}; - use crate::auth::{AuthHandler, AuthToken}; use crate::block_constraints::{block_constraints, make_query_deterministic, BlockConstraint}; use crate::budgets::{self, Budgeter}; @@ -395,7 +392,7 @@ async fn handle_client_query_inner( .value_immediate() .unwrap_or_default(); - let candidates: Vec = deployments + let candidates: BTreeSet = deployments .iter() .flat_map(move |deployment| { let id = deployment.id; @@ -412,8 +409,6 @@ async fn handle_client_query_inner( !blocklist.contains(indexing) }) }) - .collect::>() - .into_iter() .collect(); tracing::info!(candidates = candidates.len()); if candidates.is_empty() { @@ -425,15 +420,32 @@ async fn handle_client_query_inner( .as_ref() .map(ToString::to_string) .unwrap_or_default(); - let context = AgoraContext::new(&payload.query, &variables) + let mut context = AgoraContext::new(&payload.query, &variables) .map_err(|err| Error::InvalidQuery(anyhow!("{}", err)))?; - tracing::info!( target: reports::CLIENT_QUERY_TARGET, query = %payload.query, %variables, ); + let mut candidates: Vec = candidates + .into_iter() + .filter_map(|indexing| { + let cost_model = ctx + .indexing_statuses + .value_immediate() + .and_then(|m| m.get(&indexing).and_then(|s| s.cost_model.clone())); + let fee = match indexer_fee(&cost_model, &mut context) { + Ok(fee) => fee, + Err(cost_model_err) => { + tracing::warn!(%cost_model_err, ?indexing); + return None; + } + }; + Some(Candidate { indexing, fee }) + }) + .collect(); + let mut block_cache = ctx .block_caches .get(&subgraph_chain) @@ -472,33 +484,36 @@ async fn handle_client_query_inner( let user_settings = ctx.auth_handler.query_settings(&auth).await; + let grt_per_usd = ctx + .isa_state + .latest() + .network_params + .grt_per_usd + .ok_or_else(|| Error::Internal(anyhow!("Missing exchange rate")))?; let budget_query_count = count_top_level_selection_sets(&context) .map_err(Error::InvalidQuery)? .max(1) as u64; - // For budgeting purposes, pick the latest deployment. - let budget_deployment = deployments.last().unwrap().id; - let mut budget = ctx.budgeter.budget(&budget_deployment, budget_query_count); + let candidate_fees: Vec = candidates + .iter() + .map(|c| USD(c.fee.0 / grt_per_usd.0)) + .collect(); + let mut budget = ctx.budgeter.budget(budget_query_count, &candidate_fees); let ignore_budget_feedback = user_settings.budget.is_some(); if let Some(user_budget) = user_settings.budget { // Security: Consumers can and will set their budget to unreasonably high values. // This `.min` prevents the budget from being set far beyond what it would be // automatically. The reason this is important is because sometimes queries are // subsidized and we would be at-risk to allow arbitrarily high values. - budget = USD(user_budget.0.min(budget.0 * UDecimal18::from(10))); - // TOOD: budget = user_budget.max(budget * USD::try_from(0.1_f64).unwrap()); + let max_budget = USD(budget.0 * UDecimal18::from(10)); + budget = user_budget.min(max_budget); } - let grt_per_usd = ctx - .isa_state - .latest() - .network_params - .grt_per_usd - .ok_or_else(|| Error::Internal(anyhow!("Missing exchange rate")))?; let budget = GRT(budget.0 * grt_per_usd.0); tracing::info!( target: reports::CLIENT_QUERY_TARGET, query_count = budget_query_count, budget_grt = f64::from(budget.0) as f32, ); + candidates.retain(|c| c.fee <= budget); let mut utility_params = UtilityParameters { budget, @@ -531,34 +546,11 @@ async fn handle_client_query_inner( // 170cbcf3-db7f-404a-be13-2022d9142677 utility_params.latest_block = latest_block.number; - // Since we modify the context in-place, we need to reset the context to the state of - // the original client query. This to avoid the following scenario: - // 1. A client query has no block requirements set for some top-level operation - // 2. The first indexer is selected, with some indexing status at block number `n` - // 3. The query is made deterministic by setting the block requirement to the hash of - // block `n` - // 4. Some condition requires us to retry this query on another indexer with an indexing - // status at a block less than `n` - // 5. The same context is re-used, including the block requirement set to the hash of - // block `n` - // 6. The indexer is seen as being behind and is unnecessarily penalized - // - // TODO: Avoid the additional cloning of the entire AST here, especially in the case - // where retries are necessary. Only the top-level operation arguments need to be reset - // to the state of the client query. - let mut context = context.clone(); - let selection_timer = METRICS.indexer_selection_duration.start_timer(); let (selections, indexer_errors) = ctx .isa_state .latest() - .select_indexers( - &mut rng, - &candidates, - &utility_params, - &mut context, - SELECTION_LIMIT as u8, - ) + .select_indexers(&mut rng, &utility_params, &candidates) .map_err(|err| match err { InputError::MalformedQuery => Error::InvalidQuery(anyhow!("Failed to parse")), InputError::MissingNetworkParams => { @@ -649,6 +641,17 @@ async fn handle_client_query_inner( Err(_) if !last_retry => continue, Err(unresolved) => return Err(Error::BlockNotFound(unresolved)), }; + // The Agora context must be cloned to preserve the state of the original client query. + // This to avoid the following scenario: + // 1. A client query has no block requirements set for some top-level operation + // 2. The first indexer is selected, with some indexing status at block number `n` + // 3. The query is made deterministic by setting the block requirement to the hash of + // block `n` + // 4. Some condition requires us to retry this query on another indexer with an indexing + // status at a block less than `n` + // 5. The same context is re-used, including the block requirement set to the hash of + // block `n` + // 6. The indexer is seen as being behind and is unnecessarily penalized let deterministic_query = make_query_deterministic(context.clone(), &resolved_blocks, &latest_query_block) .ok_or_else(|| { @@ -690,7 +693,6 @@ async fn handle_client_query_inner( if !ignore_budget_feedback { let total_indexer_fees = USD(total_indexer_fees.0 / grt_per_usd.0); let _ = ctx.budgeter.feedback.send(budgets::Feedback { - deployment: budget_deployment, fees: total_indexer_fees, query_count: budget_query_count, }); @@ -888,7 +890,7 @@ async fn handle_indexer_query_inner( Ok(response.payload) } -fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { +fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { let selection_sets = ctx .operations .iter() @@ -902,6 +904,29 @@ fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { Ok(selection_sets.into_iter().map(|set| set.items.len()).sum()) } +pub fn indexer_fee( + cost_model: &Option>, + context: &mut AgoraContext<'_, String>, +) -> Result { + match cost_model + .as_ref() + .map(|model| model.cost_with_context(context)) + { + None => Ok(GRT(UDecimal18::from(0))), + Some(Ok(fee)) => { + let fee = U256::try_from_be_slice(&fee.to_bytes_be()).unwrap_or(U256::MAX); + Ok(GRT(UDecimal18::from_raw_u256(fee))) + } + Some(Err(CostError::CostModelFail | CostError::QueryNotCosted)) => Err("QueryNotCosted"), + Some(Err( + CostError::QueryNotSupported + | CostError::QueryInvalid + | CostError::FailedToParseQuery + | CostError::FailedToParseVariables, + )) => Err("MalformedQuery"), + } +} + /// This adapter middleware extracts the authorization token from the `api_key` path parameter, /// and adds it to the request in the `Authorization` header. /// diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index 95d46e7a..55c39b12 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -1,8 +1,11 @@ use std::{collections::HashMap, net::IpAddr, sync::Arc}; use alloy_primitives::Address; +use cost_model::CostModel; use eventuals::{Eventual, EventualExt as _, EventualWriter, Ptr}; use futures::future::join_all; +use indexer_selection::Indexing; +use prelude::epoch_cache::EpochCache; use semver::Version; use serde::Deserialize; use tokio::sync::Mutex; @@ -10,10 +13,6 @@ use toolshed::thegraph::{BlockPointer, DeploymentId}; use toolshed::url::{url::Host, Url}; use trust_dns_resolver::TokioAsyncResolver as DNSResolver; -use indexer_selection::cost_model::CostModel; -use indexer_selection::Indexing; -use prelude::epoch_cache::EpochCache; - use crate::geoip::GeoIP; use crate::indexers_status::cost_models::{self, CostModelQuery, CostModelSourceResponse}; use crate::indexers_status::indexing_statuses::{self, IndexingStatusResponse}; diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 344a0d05..e47ddf9f 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -476,7 +476,6 @@ async fn write_indexer_inputs( url: indexer.url.clone(), stake: indexer.staked_tokens, allocation: indexer.allocated_tokens, - cost_model: status.cost_model.clone(), block: Some(BlockStatus { reported_number: status.block.number, blocks_behind: latest_block.saturating_sub(status.block.number), diff --git a/indexer-selection/Cargo.toml b/indexer-selection/Cargo.toml index 5141dc97..98d86a3d 100644 --- a/indexer-selection/Cargo.toml +++ b/indexer-selection/Cargo.toml @@ -11,8 +11,6 @@ path = "bin/sim.rs" alloy-primitives.workspace = true anyhow.workspace = true arrayvec = "0.7" -cost-model = { git = "https://github.com/graphprotocol/agora", rev = "9984f9e" } -eventuals.workspace = true num-traits = "0.2" ordered-float = "4.1" permutation = "0.4" @@ -23,6 +21,3 @@ siphasher.workspace = true tokio.workspace = true toolshed.workspace = true tracing.workspace = true - -[dev-dependencies] -eventuals = { workspace = true, features = ["trace"] } diff --git a/indexer-selection/src/fee.rs b/indexer-selection/src/fee.rs deleted file mode 100644 index cf771f5c..00000000 --- a/indexer-selection/src/fee.rs +++ /dev/null @@ -1,184 +0,0 @@ -use std::convert::TryFrom; - -use alloy_primitives::U256; -use cost_model::{CostError, CostModel}; -use eventuals::Ptr; - -use prelude::{UDecimal18, GRT}; - -use crate::{utility::UtilityFactor, Context, IndexerError, InputError, SelectionError}; - -// TODO: We need to rethink both of the following functions. -// - We have been treating fees as having relatively low importance to prioritize network health -// over cost to dApp developers. The utility curve incentivises indexers to charge fees above 50% -// of the query budget. With multi-selection, this prevents more than one indexer being selected, -// driving down QoS for the client query. -// - The simplifying assumption that queries are proportional to utility was never true. But it's -// especially untrue with multi-selection, since indexers with low fees & high utilities in a -// single factor may be selected far more often then they were previously. - -/// (5_f64.sqrt() - 1.0) / 2.0 -const S: f64 = 0.6180339887498949; - -// 7a3da342-c049-4ab0-8058-91880491b442 -const WEIGHT: f64 = 0.5; - -/// Constraints for the utility function `u(fee)`: -/// - u(0) = 1, u(budget + 1 GRTWei) = 0, and u is continuous within this input range. We want to -/// avoid a discontinuity at the budget where a 1 GRTWei difference in the fee suddenly sends -/// the utility to 0. -/// - monotonically decreasing with a range of [0,1] (the expected range of the utility combining -/// function) -/// -/// Another consideration is what shape we want the curve to be. At first, is seems a straight line -/// would make sense - this would express that the utility of money is linear (eg: 2x money is 2x -/// the utility) and we are measuring the amount saved. (eg: if you pay 25% of budget you would -/// retain 75% money and if you pay no fee you would retain 100% money and the value of the money -/// is linear. This works out. But.. there's another way to think about it. Gateways buy queries in -/// bulk. If we flip the question to "how many queries can I buy with a given amount of money" we -/// get a different answer - 1/x. Eg: We can buy 1 query at 100%, or 4 queries at 25%, so it is 4x -/// the utility to buy 4 queries. 1/x though doesn't intersect (0,1) and (1,0) though. So, we just -/// take 1/x and shift the curve down and to the left so that it intersects those points. This -/// answer is a compromise between linear and 1/x. -/// -/// One annoying thing about treating this as a separate utility, rather than have some notion of -/// being "willing to pay more for more service" is that intuitively it seems that fee being used -/// for service isn't "wasted". Eg: I should be willing to pay more for higher service, and it's the -/// difference between the expected fee and actual fee for the product that is savings or losses -/// and this doesn't take that into account at all. But my hunch is that the utility combining -/// function captures this and it's ok to have it be separate like this. -/// -/// I (Zac) did some math too see what the optimal fee (as a proportion of budget) should be for -/// an Indexer to maximize their own revenue given the current algorithm. -/// Toy is here: https://www.desmos.com/calculator/hsex58vkc0 -/// For the rendered curve - x is fee, y is revenue. The maxima is the point of interest, where -/// the given fee maximizes revenue. -/// - The top value n=... is the scale value above. -/// - The u(utility) equation is the utility function for this indexer -/// - The m(my) slider is the Indexer's own utility for all other utilities that are not fee -/// - The o(other) slider is the average utility of other Indexers in the pool (after accounting for -/// their fee) -/// - The c(count) slider is the number of other Indexers in the pool. -/// -/// This verifies some properties that we want to hold. -/// - An Indexer with better service (as compared to the total pool) is incentivized to charge a -/// higher fee than an Indexer with a lesser service in the same pool, and will increase overall -/// fees. -/// This can be verified by dragging m to the right and seeing the curve shift up and to the right. -/// - Even as the number of Indexers of a subgraph increases to 6, the optimal fee for an -/// average Indexer is 63% of the maximum budget (with higher optimal fees as the number -/// of Indexers decreases). This can be verified by setting o and m to equal values and -/// increasing c(count) to 5 (on second thought that's wrong... because m doesn't include fee - so -/// o would have to be lower than m - meaning the optimal fee is a bit higher). -/// - This ensures there is no race to the bottom. -/// - It seems that assuming mostly rational Indexers and a medium sized pool, -/// the Consumer may expect to pay ~55-75% of the maximum budget. - -// https://www.desmos.com/calculator/wnffyb9edh -// 7a3da342-c049-4ab0-8058-91880491b442 -pub fn fee_utility(fee: &GRT, budget: &GRT) -> UtilityFactor { - // Any fee over budget has zero utility. - if fee > budget { - return UtilityFactor::one(0.0); - } - let one_wei = UDecimal18::from_raw_u256(U256::from(1)); - let scaled_fee: f64 = (fee.0 / budget.0.saturating_add(one_wei)).into(); - println!("{} {}, {scaled_fee}", fee.0, budget.0); - let mut utility = (scaled_fee + S).recip() - S; - // Set minimum utility, since small negative utility can result from loss of precision when the - // fee approaches the budget. - utility = utility.max(1e-18); - UtilityFactor { - utility, - weight: WEIGHT, - } -} - -/// Indexers set their fees using cost models. However, indexers currently take a "lazy" approach to -/// setting cost models (e.g. `default => 0.0000001 (GRT)`). So we do some work for them by -/// calculating a lower bound for the revenue maximizing fee, while still giving them an opportunity -/// to benefit from optimizing their cost models. -/// -/// We assume queries are proportional to utility. This is wrong, but it would simplify our revenue -/// function to `fee * utility`. Solving for the correct revenue maximizing value is complex and -/// recursive (since the revenue maximizing fee depends on the utility of all other indexers which -/// itself depends on their revenue maximizing fee... ad infinitum). -fn min_optimal_fee(budget: &GRT) -> GRT { - let w = WEIGHT; - let mut min_rate = (4.0 * S.powi(2) * w + w.powi(2) - 2.0 * w + 1.0).sqrt(); - min_rate = (min_rate - 2.0 * S.powi(2) - w + 1.0) / (2.0 * S); - GRT(budget.0 * UDecimal18::try_from(min_rate).unwrap()) -} - -pub fn indexer_fee( - cost_model: &Option>, - context: &mut Context<'_>, - budget: &GRT, - max_indexers: u8, -) -> Result { - let mut fee = match cost_model - .as_ref() - .map(|model| model.cost_with_context(context)) - { - None => GRT(UDecimal18::from(0)), - Some(Ok(fee)) => { - let fee = U256::try_from_be_slice(&fee.to_bytes_be()).unwrap_or(U256::MAX); - GRT(UDecimal18::from_raw_u256(fee)) - } - Some(Err(CostError::CostModelFail | CostError::QueryNotCosted)) => { - return Err(IndexerError::QueryNotCosted.into()); - } - Some(Err( - CostError::QueryNotSupported - | CostError::QueryInvalid - | CostError::FailedToParseQuery - | CostError::FailedToParseVariables, - )) => { - return Err(InputError::MalformedQuery.into()); - } - }; - - // Any fee over budget is refused. - if &fee > budget { - return Err(IndexerError::FeeTooHigh.into()); - } - - let budget = GRT(budget.0 / UDecimal18::from(max_indexers as u128)); - let min_optimal_fee = min_optimal_fee(&budget); - // If their fee is less than the min optimal, lerp between them so that - // indexers are rewarded for being closer. - if fee < min_optimal_fee { - fee = GRT((min_optimal_fee.0 + fee.0) * UDecimal18::try_from(0.75).unwrap()); - } - - Ok(fee) -} - -#[cfg(test)] -mod test { - use prelude::test_utils::BASIC_QUERY; - - use crate::test_utils::{assert_within, default_cost_model}; - - use super::*; - - #[test] - fn test() { - let cost_model = Some(Ptr::new(default_cost_model(GRT(UDecimal18::try_from( - 0.01, - ) - .unwrap())))); - let mut context = Context::new(BASIC_QUERY, "").unwrap(); - // Expected values based on https://www.desmos.com/calculator/kxd4kpjxi5 - let tests = [(0.01, 0.0), (0.02, 0.27304), (0.1, 0.50615), (1.0, 0.55769)]; - for (budget, expected_utility) in tests { - let budget = GRT(UDecimal18::try_from(budget).unwrap()); - let fee = indexer_fee(&cost_model, &mut context, &budget, 1).unwrap(); - let utility = fee_utility(&fee, &budget); - println!("{fee:?} / {budget:?}, {expected_utility}, {utility:?}"); - let utility = utility.utility.powf(utility.weight); - assert!(fee.0 >= UDecimal18::try_from(0.01).unwrap()); - assert_within(utility, expected_utility, 0.0001); - } - } -} diff --git a/indexer-selection/src/indexing.rs b/indexer-selection/src/indexing.rs index 73e00c51..314c897a 100644 --- a/indexer-selection/src/indexing.rs +++ b/indexer-selection/src/indexing.rs @@ -1,13 +1,12 @@ use std::time::{Duration, Instant}; -use eventuals::Ptr; use prelude::GRT; use toolshed::url::Url; -use crate::{ - decay::ISADecayBuffer, fee::indexer_fee, performance::*, reliability::*, BlockRequirements, - Context, CostModel, IndexerErrorObservation, SelectionError, -}; +use crate::decay::ISADecayBuffer; +use crate::performance::*; +use crate::reliability::*; +use crate::{BlockRequirements, IndexerErrorObservation}; pub struct IndexingState { pub status: IndexingStatus, @@ -34,7 +33,6 @@ pub struct IndexingStatus { pub url: Url, pub stake: GRT, pub allocation: GRT, - pub cost_model: Option>, pub block: Option, pub versions_behind: u8, } @@ -164,13 +162,4 @@ impl IndexingState { perf_success.decay(); perf_failure.decay(); } - - pub fn fee( - &self, - context: &mut Context<'_>, - budget: &GRT, - max_indexers: u8, - ) -> Result { - indexer_fee(&self.status.cost_model, context, budget, max_indexers) - } } diff --git a/indexer-selection/src/lib.rs b/indexer-selection/src/lib.rs index b9821d6d..7d853655 100644 --- a/indexer-selection/src/lib.rs +++ b/indexer-selection/src/lib.rs @@ -7,7 +7,6 @@ use std::{ }; use alloy_primitives::{Address, BlockHash, BlockNumber}; -pub use cost_model::{self, CostModel}; use num_traits::Zero as _; pub use ordered_float::NotNan; use prelude::*; @@ -16,21 +15,17 @@ use score::{expected_individual_score, ExpectedValue}; use toolshed::thegraph::{BlockPointer, DeploymentId}; use toolshed::url::Url; +use crate::score::{select_indexers, SelectionFactors}; pub use crate::{ economic_security::NetworkParameters, indexing::{BlockStatus, IndexingState, IndexingStatus}, score::SELECTION_LIMIT, utility::ConcaveUtilityParameters, }; -use crate::{ - fee::indexer_fee, - score::{select_indexers, SelectionFactors}, -}; pub mod actor; pub mod decay; mod economic_security; -mod fee; mod indexing; mod performance; mod reliability; @@ -41,14 +36,16 @@ mod test; pub mod test_utils; mod utility; -// We have to use `String` instead of `&'c str` here because of compiler bug triggered when holding -// a context across an await. See https://github.com/rust-lang/rust/issues/71723 -pub type Context<'c> = cost_model::Context<'c, String>; - /// If an indexer's score is penalized such that it falls below this proportion of the max indexer /// score, then the indexer will be discarded from the set of indexers to select from. const MIN_SCORE_CUTOFF: f64 = 0.25; +#[derive(Clone, Debug)] +pub struct Candidate { + pub indexing: Indexing, + pub fee: GRT, +} + #[derive(Clone, Debug)] pub struct Selection { pub indexing: Indexing, @@ -210,18 +207,18 @@ impl State { pub fn select_indexers<'a>( &self, rng: &mut SmallRng, - candidates: &'a [Indexing], params: &UtilityParameters, - context: &mut Context<'_>, - selection_limit: u8, + candidates: &'a [Candidate], ) -> Result<(Vec, IndexerErrors<'a>), InputError> { let mut errors = IndexerErrors(BTreeMap::new()); let mut available = Vec::::new(); for candidate in candidates { - match self.selection_factors(candidate, params, context, selection_limit) { + match self.selection_factors(candidate, params) { Ok(factors) => available.push(factors), - Err(SelectionError::BadIndexer(err)) => errors.add(err, &candidate.indexer), + Err(SelectionError::BadIndexer(err)) => { + errors.add(err, &candidate.indexing.indexer) + } Err(SelectionError::BadInput(err)) => return Err(err), }; } @@ -254,21 +251,18 @@ impl State { tracing::debug!(score_cutoff = *score_cutoff); available.retain(|factors| factors.expected_score >= score_cutoff); - let mut selections = select_indexers(rng, params, &available, selection_limit); - selections.truncate(selection_limit as usize); + let selections = select_indexers(rng, params, &available); Ok((selections, errors)) } fn selection_factors( &self, - candidate: &Indexing, + candidate: &Candidate, params: &UtilityParameters, - context: &mut Context<'_>, - selection_limit: u8, ) -> Result { let state = self .indexings - .get(candidate) + .get(&candidate.indexing) .ok_or(IndexerError::NoStatus)?; let block_status = state.status.block.as_ref().ok_or(IndexerError::NoStatus)?; @@ -285,12 +279,9 @@ impl State { .slashable_usd(state.status.stake) .ok_or(InputError::MissingNetworkParams)?; - let fee = indexer_fee( - &state.status.cost_model, - context, - ¶ms.budget, - selection_limit, - )?; + if candidate.fee > params.budget { + return Err(IndexerError::FeeTooHigh.into()); + } let reliability = state.reliability.expected_value(); let perf_success = state.perf_success.expected_value(); @@ -305,12 +296,13 @@ impl State { block_status.blocks_behind, slashable_usd, zero_allocation, - &fee, + &candidate.fee, )) .unwrap_or(NotNan::zero()); + debug_assert!(*expected_score > 0.0); Ok(SelectionFactors { - indexing: *candidate, + indexing: candidate.indexing, url: state.status.url.clone(), versions_behind: state.status.versions_behind, reliability, @@ -319,7 +311,7 @@ impl State { blocks_behind: block_status.blocks_behind, slashable_usd, expected_score, - fee, + fee: candidate.fee, last_use: state.last_use, sybil: sybil(&state.status.allocation)?, }) diff --git a/indexer-selection/src/score.rs b/indexer-selection/src/score.rs index 0bfe566b..c22578fb 100644 --- a/indexer-selection/src/score.rs +++ b/indexer-selection/src/score.rs @@ -1,15 +1,15 @@ use std::time::{Duration, Instant}; +use alloy_primitives::U256; use arrayvec::ArrayVec; use ordered_float::NotNan; -use prelude::GRT; +use prelude::{UDecimal18, GRT}; use rand::{prelude::SliceRandom as _, Rng}; use toolshed::url::Url; +use crate::performance::performance_utility; +use crate::utility::{weighted_product_model, UtilityFactor}; use crate::{ - fee::fee_utility, - performance::performance_utility, - utility::{weighted_product_model, UtilityFactor}, BlockRequirements, ConcaveUtilityParameters, Indexing, Selection, UtilityParameters, MIN_SCORE_CUTOFF, }; @@ -45,7 +45,6 @@ pub fn select_indexers( rng: &mut R, params: &UtilityParameters, factors: &[SelectionFactors], - selection_limit: u8, ) -> Vec { if factors.is_empty() { return vec![]; @@ -56,7 +55,6 @@ pub fn select_indexers( // We must use a suitable indexer, if one exists. Indexers are filtered out when calculating // selection factors if they are over budget or don't meet freshness requirements. So they won't // pollute the set we're selecting from. - let selection_limit = SELECTION_LIMIT.min(selection_limit as usize); let sample_limit = factors.len().min(16); let mut selections: ArrayVec<&SelectionFactors, SELECTION_LIMIT> = ArrayVec::new(); @@ -72,7 +70,7 @@ pub fn select_indexers( let mut combined_score = selections[0].expected_score; // Sample some indexers and add them to the selected set if they increase the combined score. for _ in 0..sample_limit { - if selections.len() == selection_limit { + if selections.len() == SELECTION_LIMIT { break; } let candidate = factors.choose_weighted(rng, |f| *f.sybil).unwrap(); @@ -280,3 +278,22 @@ fn versions_behind_utility(versions_behind: u8) -> UtilityFactor { fn exploration_weight(t: Duration) -> f64 { 0.1_f64.powf(0.005 * t.as_secs_f64()) } + +/// Target an optimal fee of ~(1/3) of budget, since up to 3 indexers can be selected. +/// https://www.desmos.com/calculator/elzlqpb7tc +pub fn fee_utility(fee: &GRT, budget: &GRT) -> UtilityFactor { + // Any fee over budget has zero utility. + if fee > budget { + return UtilityFactor::one(0.0); + } + let one_wei = UDecimal18::from_raw_u256(U256::from(1)); + let scaled_fee = fee.0 / budget.0.saturating_add(one_wei); + // (5_f64.sqrt() - 1.0) / 2.0 + const S: f64 = 0.6180339887498949; + let mut utility = (f64::from(scaled_fee) + S).recip() - S; + // Set minimum utility, since small negative utility can result from loss of precision when the + // fee approaches the budget. + utility = utility.max(1e-18); + let weight: f64 = 1.4; + UtilityFactor { utility, weight } +} diff --git a/indexer-selection/src/simulation.rs b/indexer-selection/src/simulation.rs index e8704076..a9363772 100644 --- a/indexer-selection/src/simulation.rs +++ b/indexer-selection/src/simulation.rs @@ -3,17 +3,14 @@ use std::time::{Duration, Instant}; use alloy_primitives::Address; use anyhow::Result; -use eventuals::Ptr; -use rand::{prelude::SmallRng, Rng as _, SeedableRng as _}; -use rand_distr::Normal; - use prelude::test_utils::{bytes_from_id, init_test_tracing}; use prelude::{UDecimal18, GRT}; +use rand::{prelude::SmallRng, Rng as _, SeedableRng as _}; +use rand_distr::Normal; use toolshed::thegraph::DeploymentId; -use crate::test_utils::default_cost_model; use crate::{ - BlockStatus, Context, IndexerErrorObservation, Indexing, IndexingStatus, Selection, State, + BlockStatus, Candidate, IndexerErrorObservation, Indexing, IndexingStatus, Selection, State, UtilityParameters, }; @@ -67,7 +64,6 @@ pub async fn simulate( url: "http://localhost".parse().unwrap(), stake: characteristics.stake, allocation: characteristics.allocation, - cost_model: Some(Ptr::new(default_cost_model(characteristics.fee))), block: Some(BlockStatus { reported_number: params .latest_block @@ -81,11 +77,14 @@ pub async fn simulate( ); } - let candidates: Vec = characteristics + let candidates: Vec = characteristics .iter() - .map(|c| Indexing { - indexer: c.address, - deployment, + .map(|c| Candidate { + indexing: Indexing { + indexer: c.address, + deployment, + }, + fee: c.fee, }) .collect(); let characteristics: HashMap<&Address, &IndexerCharacteristics> = @@ -96,11 +95,9 @@ pub async fn simulate( isa.decay(); } - let mut context = Context::new("{ a }", "").unwrap(); let t0 = Instant::now(); - let (mut selections, _) = isa - .select_indexers(&mut rng, &candidates, params, &mut context, selection_limit) - .unwrap(); + let (mut selections, _) = isa.select_indexers(&mut rng, params, &candidates).unwrap(); + selections.truncate(selection_limit as usize); results.avg_selection_seconds += Instant::now().duration_since(t0).as_secs_f64(); selections diff --git a/indexer-selection/src/test.rs b/indexer-selection/src/test.rs index 325a8d60..1e60c160 100644 --- a/indexer-selection/src/test.rs +++ b/indexer-selection/src/test.rs @@ -4,26 +4,20 @@ use std::ops::RangeInclusive; use alloy_primitives::Address; use anyhow::{bail, ensure}; -use eventuals::Ptr; -use num_traits::ToPrimitive as _; -use rand::{ - rngs::SmallRng, - seq::{IteratorRandom, SliceRandom}, - thread_rng, Rng, RngCore as _, SeedableRng as _, -}; -use tokio::spawn; - use prelude::buffer_queue::QueueWriter; use prelude::test_utils::bytes_from_id; use prelude::{buffer_queue, double_buffer, UDecimal18, GRT}; +use rand::rngs::SmallRng; +use rand::seq::{IteratorRandom, SliceRandom}; +use rand::{thread_rng, Rng, RngCore as _, SeedableRng as _}; +use tokio::spawn; use toolshed::thegraph::{BlockPointer, DeploymentId}; use crate::actor::{process_updates, Update}; -use crate::test_utils::default_cost_model; use crate::{ - BlockRequirements, BlockStatus, Context, IndexerError, IndexerErrors, Indexing, IndexingState, - IndexingStatus, InputError, NetworkParameters, Selection, State, UtilityParameters, - SELECTION_LIMIT, + BlockRequirements, BlockStatus, Candidate, IndexerError, IndexerErrors, Indexing, + IndexingState, IndexingStatus, InputError, NetworkParameters, Selection, State, + UtilityParameters, }; #[derive(Clone)] @@ -41,6 +35,7 @@ struct Topology { blocks: Vec, deployments: HashSet, indexings: HashMap, + fees: HashMap, } #[derive(Debug)] @@ -48,8 +43,6 @@ struct Request { deployment: DeploymentId, indexers: Vec
, params: UtilityParameters, - query: String, - selection_limit: u8, } fn base_indexing_status() -> IndexingStatus { @@ -57,7 +50,6 @@ fn base_indexing_status() -> IndexingStatus { url: "http://localhost:8000".parse().unwrap(), stake: GRT(UDecimal18::from(1)), allocation: GRT(UDecimal18::from(1)), - cost_model: None, block: Some(BlockStatus { reported_number: 0, blocks_behind: 0, @@ -96,15 +88,21 @@ impl Topology { hash: bytes_from_id(id).into(), }) .collect::>(); - let indexings = (0..rng.gen_range(config.indexings.clone())) + let indexings: HashMap = (0..rng + .gen_range(config.indexings.clone())) .filter_map(|_| Self::gen_indexing(rng, config, &blocks, &deployments)) .collect(); + let fees: HashMap = indexings + .keys() + .map(|i| (*i, Self::gen_grt(rng, &[0.0, 0.1, 1.0, 2.0]))) + .collect(); let state = Self { grt_per_usd: GRT(UDecimal18::from(1)), slashing_percentage: UDecimal18::try_from(0.1).unwrap(), blocks, deployments, indexings, + fees, }; update_writer @@ -133,10 +131,6 @@ impl Topology { }; let status = IndexingStatus { stake: Self::gen_grt(rng, &[0.0, 50e3, 100e3, 150e3]), - cost_model: Some(Ptr::new(default_cost_model(Self::gen_grt( - rng, - &[0.0, 0.1, 1.0, 2.0], - )))), block: blocks.choose(rng).map(|b| BlockStatus { reported_number: b.number, blocks_behind: blocks.len() as u64 - 1 - b.number, @@ -174,8 +168,6 @@ impl Topology { }, self.blocks.last()?.number, ), - query: "{ entities { id } }".to_string(), - selection_limit: rng.gen_range(1..=SELECTION_LIMIT) as u8, }) } @@ -189,8 +181,6 @@ impl Topology { Err(_) => bail!("unexpected InputError"), }; - let mut context = Context::new(&request.query, "").unwrap(); - let fees = GRT(selections.iter().map(|s| s.fee.0).sum()); ensure!(fees <= request.params.budget); @@ -199,21 +189,14 @@ impl Topology { let mut expected_errors = IndexerErrors(BTreeMap::new()); for indexer in &request.indexers { - let status = self - .indexings - .get(&Indexing { - indexer: *indexer, - deployment: request.deployment, - }) - .unwrap(); + let indexing = Indexing { + indexer: *indexer, + deployment: request.deployment, + }; + let status = self.indexings.get(&indexing).unwrap(); let required_block = request.params.requirements.range.map(|(_, n)| n); - let fee = status - .cost_model - .as_ref() - .map(|c| c.cost_with_context(&mut context).unwrap().to_f64().unwrap()) - .unwrap_or(0.0) - / 1e18; - println!("indexer={}, fee={}", indexer, fee); + let fee = *self.fees.get(&indexing).unwrap(); + println!("indexer={}, fee={:?}", indexer, fee); let mut set_err = |err: IndexerError| { expected_errors.0.entry(err).or_default().insert(indexer); }; @@ -225,7 +208,7 @@ impl Topology { set_err(IndexerError::NoStatus); } else if status.stake == GRT(UDecimal18::from(0)) { set_err(IndexerError::NoStake); - } else if fee > request.params.budget.0.into() { + } else if fee > request.params.budget { set_err(IndexerError::FeeTooHigh); } } @@ -273,22 +256,23 @@ async fn fuzz() { None => continue, }; println!("{:#?}", request); - let mut context = Context::new(&request.query, "").unwrap(); - let candidates: Vec = request + let candidates: Vec = request .indexers .iter() - .map(|indexer| Indexing { - indexer: *indexer, - deployment: request.deployment, + .map(|indexer| { + let indexing = Indexing { + indexer: *indexer, + deployment: request.deployment, + }; + Candidate { + fee: *topology.fees.get(&indexing).unwrap(), + indexing, + } }) .collect(); - let result = isa_state.latest().select_indexers( - &mut rng, - &candidates, - &request.params, - &mut context, - request.selection_limit, - ); + let result = isa_state + .latest() + .select_indexers(&mut rng, &request.params, &candidates); if let Err(err) = topology.check(&request, &result) { println!("{}", err); println!("TEST_SEED={}", seed); @@ -302,10 +286,13 @@ async fn fuzz() { fn favor_higher_version() { let mut rng = SmallRng::from_entropy(); - let candidates: Vec = (0..2) - .map(|i| Indexing { - indexer: bytes_from_id(0).into(), - deployment: DeploymentId(bytes_from_id(i).into()), + let candidates: Vec = (0..2) + .map(|i| Candidate { + indexing: Indexing { + indexer: bytes_from_id(0).into(), + deployment: DeploymentId(bytes_from_id(i).into()), + }, + fee: GRT(UDecimal18::from(1)), }) .collect(); let mut versions_behind = [rng.gen_range(0..3), rng.gen_range(0..3)]; @@ -321,14 +308,14 @@ fn favor_higher_version() { indexings: HashMap::from_iter([]), }; state.indexings.insert( - candidates[0], + candidates[0].indexing, IndexingState::new(IndexingStatus { versions_behind: versions_behind[0], ..base_indexing_status() }), ); state.indexings.insert( - candidates[1], + candidates[1].indexing, IndexingState::new(IndexingStatus { versions_behind: versions_behind[1], ..base_indexing_status() @@ -343,8 +330,7 @@ fn favor_higher_version() { }, 1, ); - let mut context = Context::new("{ a { b } }", "").unwrap(); - let result = state.select_indexers(&mut rng, &candidates, ¶ms, &mut context, 1); + let result = state.select_indexers(&mut rng, ¶ms, &candidates); println!("{:#?}", candidates); println!("{:#?}", versions_behind); @@ -355,7 +341,7 @@ fn favor_higher_version() { .iter() .position(|v| v == max_version) .unwrap(); - let expected = candidates[index]; + let expected = candidates[index].indexing; let selection = result.unwrap().0[0].indexing; assert_eq!(selection, expected); diff --git a/indexer-selection/src/test_utils.rs b/indexer-selection/src/test_utils.rs index 14fcabc7..801b76c0 100644 --- a/indexer-selection/src/test_utils.rs +++ b/indexer-selection/src/test_utils.rs @@ -1,20 +1,14 @@ use std::hash::{Hash as _, Hasher as _}; use alloy_primitives::Address; -use siphasher::sip::SipHasher24; - use prelude::test_utils::bytes_from_id; -use prelude::GRT; +use siphasher::sip::SipHasher24; use toolshed::thegraph::DeploymentId; -use crate::{BlockPointer, CostModel}; +use crate::BlockPointer; pub const TEST_KEY: &str = "244226452948404D635166546A576E5A7234753778217A25432A462D4A614E64"; -pub fn default_cost_model(fee: GRT) -> CostModel { - CostModel::compile(format!("default => {};", fee.0), "").unwrap() -} - #[track_caller] pub fn assert_within(value: f64, expected: f64, tolerance: f64) { let diff = (value - expected).abs(); diff --git a/prelude/src/lib.rs b/prelude/src/lib.rs index da2f8d65..13fed7f3 100644 --- a/prelude/src/lib.rs +++ b/prelude/src/lib.rs @@ -50,8 +50,8 @@ pub fn sip24_hash(value: &impl Hash) -> u64 { // between these types. /// GRT with 18 fractional digits -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct GRT(pub UDecimal18); /// USD with 18 fractional digits -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct USD(pub UDecimal18);