diff --git a/graph-gateway/src/block_constraints.rs b/graph-gateway/src/block_constraints.rs index 6d443540..40b98617 100644 --- a/graph-gateway/src/block_constraints.rs +++ b/graph-gateway/src/block_constraints.rs @@ -1,16 +1,16 @@ use std::collections::{BTreeMap, BTreeSet}; use alloy_primitives::{BlockHash, BlockNumber}; +use cost_model::Context; use graphql::graphql_parser::query::{ Definition, Document, OperationDefinition, Selection, Text, Value, }; use graphql::{IntoStaticValue as _, QueryVariables, StaticValue}; +use indexer_selection::UnresolvedBlock; use itertools::Itertools as _; use serde_json::{self, json}; use toolshed::thegraph::BlockPointer; -use indexer_selection::{Context, UnresolvedBlock}; - #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub enum BlockConstraint { Unconstrained, @@ -29,7 +29,9 @@ impl BlockConstraint { } } -pub fn block_constraints<'c>(context: &'c Context<'c>) -> Option> { +pub fn block_constraints<'c>( + context: &'c Context<'c, String>, +) -> Option> { let mut constraints = BTreeSet::new(); let vars = &context.variables; // ba6c90f1-3baf-45be-ac1c-f60733404436 @@ -72,7 +74,7 @@ pub fn block_constraints<'c>(context: &'c Context<'c>) -> Option, + mut ctx: Context<'_, String>, resolved: &BTreeSet, latest: &BlockPointer, ) -> Option { diff --git a/graph-gateway/src/budgets.rs b/graph-gateway/src/budgets.rs index ac731346..da4c708f 100644 --- a/graph-gateway/src/budgets.rs +++ b/graph-gateway/src/budgets.rs @@ -1,85 +1,69 @@ -use std::collections::HashMap; +use std::time::Duration; -use eventuals::{Eventual, EventualWriter, Ptr}; -use indexer_selection::{ - decay::{Decay, FastDecayBuffer}, - impl_struct_decay, -}; -use prelude::{UDecimal18, USD}; -use tokio::{ - select, spawn, - sync::mpsc, - time::{interval, Duration, Instant}, -}; -use toolshed::thegraph::DeploymentId; +use eventuals::{Eventual, EventualWriter}; +use indexer_selection::decay::FastDecayBuffer; +use prelude::*; +use tokio::time::interval; +use tokio::{select, spawn, sync::mpsc}; use crate::metrics::METRICS; -/// This 10e-6 number comes some back-of-the-napkin calculations on what we expect is the minimum -/// fee an indexer should be paid per query, based on hosted service costs attributable to serving -/// queries in June 2023. Now we are using it as the maximum discount instead of the minimum budget. -const MAX_DISCOUNT_USD: f64 = 10e-6; - pub struct Budgeter { pub feedback: mpsc::UnboundedSender, - pub budgets: Eventual>>, - query_fees_target: USD, + absolute_budget_limit: USD, + budget_limit: Eventual, } pub struct Feedback { - pub deployment: DeploymentId, pub fees: USD, pub query_count: u64, } impl Budgeter { pub fn new(query_fees_target: USD) -> Self { - assert!(f64::from(query_fees_target.0) >= MAX_DISCOUNT_USD); let (feedback_tx, feedback_rx) = mpsc::unbounded_channel(); - let (budgets_tx, budgets_rx) = Eventual::new(); - Actor::create(feedback_rx, budgets_tx, query_fees_target); + let (budget_limit_tx, budget_limit_rx) = Eventual::new(); + Actor::create(feedback_rx, budget_limit_tx, query_fees_target); + let absolute_budget_limit = USD(query_fees_target.0 * UDecimal18::from(10)); Self { feedback: feedback_tx, - budgets: budgets_rx, - query_fees_target, + absolute_budget_limit, + budget_limit: budget_limit_rx, } } - pub fn budget(&self, deployment: &DeploymentId, query_count: u64) -> USD { - let budget = self - .budgets + pub fn budget(&self, query_count: u64, candidate_fees: &[USD]) -> USD { + let budget_limit = self + .budget_limit .value_immediate() - .and_then(|budgets| budgets.get(deployment).copied()) - .unwrap_or(self.query_fees_target); + .unwrap_or(self.absolute_budget_limit); + let max_fee = candidate_fees.iter().max().cloned().unwrap_or_default(); + let budget = max_fee.max(budget_limit).min(self.absolute_budget_limit); USD(budget.0 * UDecimal18::from(query_count as u128)) } } struct Actor { feedback: mpsc::UnboundedReceiver, - budgets: EventualWriter>>, - volume_estimators: HashMap, + budget_limit: EventualWriter, controller: Controller, } impl Actor { fn create( feedback: mpsc::UnboundedReceiver, - budgets: EventualWriter>>, + budget_limit: EventualWriter, query_fees_target: USD, ) { let mut actor = Actor { feedback, - budgets, - volume_estimators: HashMap::default(), + budget_limit, controller: Controller::new(query_fees_target), }; - let mut decay_timer = interval(Duration::from_secs(120)); let mut budget_timer = interval(Duration::from_secs(1)); spawn(async move { loop { select! { - _ = decay_timer.tick() => actor.decay(), Some(msg) = actor.feedback.recv() => actor.feedback(msg), _ = budget_timer.tick() => actor.revise_budget(), } @@ -87,59 +71,21 @@ impl Actor { }); } - fn decay(&mut self) { - let now = Instant::now(); - for estimator in self.volume_estimators.values_mut() { - estimator.decay(now); - } - } - fn feedback(&mut self, feedback: Feedback) { self.controller .add_queries(feedback.fees, feedback.query_count); - self.volume_estimators - .entry(feedback.deployment) - .or_insert_with(|| VolumeEstimator::new(Instant::now())) - .add_queries(feedback.query_count); } fn revise_budget(&mut self) { if self.controller.recent_query_count == 0 { return; } - let target = self.controller.target_query_fees; - let control_variable = self.controller.control_variable(); - tracing::debug!(budget_control_variable = ?control_variable); - let now = Instant::now(); - let budgets = self - .volume_estimators - .iter() - .map(|(deployment, volume_estimator)| { - let volume = volume_estimator.monthly_volume_estimate(now) as u64; - let mut budget = volume_discount(volume, target).0 * control_variable; - // limit budget to 100x target - budget = budget.min(target.0 * UDecimal18::from(100)); - (*deployment, USD(budget)) - }) - .collect(); - - self.budgets.write(Ptr::new(budgets)); + let budget_limit = USD(self.controller.control_variable()); + tracing::debug!(?budget_limit); + self.budget_limit.write(budget_limit); } } -fn volume_discount(monthly_volume: u64, target: USD) -> USD { - // Discount the budget, based on a generalized logistic function. We apply little to no discount - // between 0 and ~10e3 queries per month. And we limit the discount to 10E-6 USD. - // https://www.desmos.com/calculator/whtakt50sa - let b_max: f64 = target.0.into(); - let b_min = b_max - MAX_DISCOUNT_USD; - let m: f64 = 1e6; - let z: f64 = 0.45; - let v = monthly_volume as f64; - let budget = b_min + ((b_max - b_min) * m.powf(z)) / (v + m).powf(z); - USD(budget.try_into().unwrap_or_default()) -} - /// State for the control loop targeting `recent_query_fees`. struct Controller { target_query_fees: USD, @@ -182,150 +128,12 @@ impl Controller { } } -struct VolumeEstimator { - history: FastDecayBuffer, - last_time: Instant, -} - -#[derive(Default)] -struct QueryVolume { - time_elapsed: Duration, - num_queries: f64, -} - -impl_struct_decay!(QueryVolume { - time_elapsed, - num_queries -}); - -impl VolumeEstimator { - pub fn new(now: Instant) -> Self { - Self { - last_time: now, - history: FastDecayBuffer::new(), - } - } - - // This must be called on a regular interval. The unit tests are assuming - // 2 minutes. - pub fn decay(&mut self, now: Instant) { - let prev = self.last_time; - self.last_time = now; - self.history.current_mut().time_elapsed += now - prev; - self.history.decay(); - } - - pub fn add_queries(&mut self, count: u64) { - self.history.current_mut().num_queries += count as f64; - } - - pub fn monthly_volume_estimate(&self, now: Instant) -> f64 { - let mut elapsed_time = now - self.last_time; - let mut queries = 0.0; - for frame in self.history.frames() { - elapsed_time += frame.time_elapsed; - queries += frame.num_queries; - } - - // Scale to 30 days - let scale = 60.0 * 60.0 * 24.0 * 30.0; - let elapsed_time = elapsed_time.as_secs_f64(); - - (queries * scale) / elapsed_time - } -} - #[cfg(test)] mod tests { use indexer_selection::test_utils::assert_within; use super::*; - #[track_caller] - fn assert_approx(expected: f64, actual: f64, within: f64) { - assert!((actual - expected).abs() <= within); - } - - #[test] - fn stable_volume() { - let mut now = Instant::now(); - let mut estimate = VolumeEstimator::new(now); - - // Over a long period, do 2 queries per second and verify that the 30 day estimate is - // 5184000 across multiple delays - const COUNT: f64 = 2.0 * 21600.0 * 120.0; - for _ in 0..50 { - for _ in 0..120 { - now += Duration::from_secs(1); - estimate.add_queries(2); - // Very precise, correct within < 1 query. - assert_approx(estimate.monthly_volume_estimate(now), COUNT, 1.0); - } - estimate.decay(now); - } - } - - #[test] - fn sine_volume() { - let mut now = Instant::now(); - let mut estimate = VolumeEstimator::new(now); - - // Show that a stable oscillating traffic has low variance when looking at the estimate. - let mut elapsed = 0.0_f64; - for _ in 0..100 { - for _ in 0..1000 { - for _ in 0..120 { - now += Duration::from_secs(1); - elapsed += 1.0; - // sin is -1 .. 1, so the range here is 100.0 .. 200.0 - let queries = ((elapsed / 1000.0).sin() + 3.0) * 50.0; - estimate.add_queries(queries as u64); - } - estimate.decay(now); - } - let daily_estimate = estimate.monthly_volume_estimate(now) / 30.0; - // The center of the range is 12,960,000. - // The QPS oscillates at +- 33% - // But, the estimate is within 2% on each iteration, - // and is sometimes much closer. Of course, that means the - // total error is less than 2% as well. - assert_approx(12960000.0, daily_estimate, 250000.0); - } - } - - #[test] - fn volume_increase() { - let mut now = Instant::now(); - let mut estimate = VolumeEstimator::new(now); - - // Over a month, do 1 queries per minute. This is "testing" - for _ in 0..21600 { - now += Duration::from_secs(120); - estimate.add_queries(2); - estimate.decay(now); - } - // Now in "prod", do 20 queries per second. An increase of 1200x. - // 30 days, 24 hours per day, 30 2 minute intervals per hour. - let frames = 30_u64 * 24 * 30; - // 2400 queries in 2 minutes is 20 per second. - let per_frame = 2400_u64; - for _ in 0..frames { - for _ in 0..per_frame { - now += Duration::from_secs_f64(0.05); - estimate.add_queries(1); - } - estimate.decay(now); - } - - let queries = (frames * per_frame) as f64; - let estimation = estimate.monthly_volume_estimate(now); - - // Show that over 30 days this large increase of query volume was estimated more or less - // appropriately (within 3%). - assert!(estimation > queries); - assert!(estimation < (queries * 1.03)); - } - #[test] fn controller() { fn test_controller( diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index e8b951eb..e2615c9c 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -1,14 +1,10 @@ -use std::collections::HashSet; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::str::FromStr; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::Arc; use std::time::{Duration, Instant}; -use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, - sync::{ - atomic::{self, AtomicUsize}, - Arc, - }, -}; +use alloy_primitives::U256; use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, bail, Context as _}; use axum::extract::OriginalUri; @@ -20,10 +16,18 @@ use axum::{ http::{header, HeaderMap, Response, StatusCode}, RequestPartsExt, }; +use cost_model::{Context as AgoraContext, CostError, CostModel}; use eventuals::{Eventual, Ptr}; use futures::future::join_all; use graphql::graphql_parser::query::{OperationDefinition, SelectionSet}; +use indexer_selection::Candidate; +use indexer_selection::{ + actor::Update, BlockRequirements, IndexerError as IndexerSelectionError, + IndexerErrorObservation, Indexing, InputError, Selection, UnresolvedBlock, UtilityParameters, + SELECTION_LIMIT, +}; use lazy_static::lazy_static; +use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT}; use prelude::{UDecimal18, USD}; use prost::bytes::Buf; use rand::{rngs::SmallRng, SeedableRng as _}; @@ -36,13 +40,6 @@ use toolshed::url::Url; use tracing::Instrument; use uuid::Uuid; -use indexer_selection::{ - actor::Update, BlockRequirements, Context as AgoraContext, - IndexerError as IndexerSelectionError, IndexerErrorObservation, Indexing, InputError, - Selection, UnresolvedBlock, UtilityParameters, SELECTION_LIMIT, -}; -use prelude::{buffer_queue::QueueWriter, double_buffer::DoubleBufferReader, unix_timestamp, GRT}; - use crate::auth::{AuthHandler, AuthToken}; use crate::block_constraints::{block_constraints, make_query_deterministic, BlockConstraint}; use crate::budgets::{self, Budgeter}; @@ -395,7 +392,7 @@ async fn handle_client_query_inner( .value_immediate() .unwrap_or_default(); - let candidates: Vec = deployments + let candidates: BTreeSet = deployments .iter() .flat_map(move |deployment| { let id = deployment.id; @@ -412,8 +409,6 @@ async fn handle_client_query_inner( !blocklist.contains(indexing) }) }) - .collect::>() - .into_iter() .collect(); tracing::info!(candidates = candidates.len()); if candidates.is_empty() { @@ -425,15 +420,32 @@ async fn handle_client_query_inner( .as_ref() .map(ToString::to_string) .unwrap_or_default(); - let context = AgoraContext::new(&payload.query, &variables) + let mut context = AgoraContext::new(&payload.query, &variables) .map_err(|err| Error::InvalidQuery(anyhow!("{}", err)))?; - tracing::info!( target: reports::CLIENT_QUERY_TARGET, query = %payload.query, %variables, ); + let mut candidates: Vec = candidates + .into_iter() + .filter_map(|indexing| { + let cost_model = ctx + .indexing_statuses + .value_immediate() + .and_then(|m| m.get(&indexing).and_then(|s| s.cost_model.clone())); + let fee = match indexer_fee(&cost_model, &mut context) { + Ok(fee) => fee, + Err(cost_model_err) => { + tracing::warn!(%cost_model_err, ?indexing); + return None; + } + }; + Some(Candidate { indexing, fee }) + }) + .collect(); + let mut block_cache = ctx .block_caches .get(&subgraph_chain) @@ -472,33 +484,36 @@ async fn handle_client_query_inner( let user_settings = ctx.auth_handler.query_settings(&auth).await; + let grt_per_usd = ctx + .isa_state + .latest() + .network_params + .grt_per_usd + .ok_or_else(|| Error::Internal(anyhow!("Missing exchange rate")))?; let budget_query_count = count_top_level_selection_sets(&context) .map_err(Error::InvalidQuery)? .max(1) as u64; - // For budgeting purposes, pick the latest deployment. - let budget_deployment = deployments.last().unwrap().id; - let mut budget = ctx.budgeter.budget(&budget_deployment, budget_query_count); + let candidate_fees: Vec = candidates + .iter() + .map(|c| USD(c.fee.0 / grt_per_usd.0)) + .collect(); + let mut budget = ctx.budgeter.budget(budget_query_count, &candidate_fees); let ignore_budget_feedback = user_settings.budget.is_some(); if let Some(user_budget) = user_settings.budget { // Security: Consumers can and will set their budget to unreasonably high values. // This `.min` prevents the budget from being set far beyond what it would be // automatically. The reason this is important is because sometimes queries are // subsidized and we would be at-risk to allow arbitrarily high values. - budget = USD(user_budget.0.min(budget.0 * UDecimal18::from(10))); - // TOOD: budget = user_budget.max(budget * USD::try_from(0.1_f64).unwrap()); + let max_budget = USD(budget.0 * UDecimal18::from(10)); + budget = user_budget.min(max_budget); } - let grt_per_usd = ctx - .isa_state - .latest() - .network_params - .grt_per_usd - .ok_or_else(|| Error::Internal(anyhow!("Missing exchange rate")))?; let budget = GRT(budget.0 * grt_per_usd.0); tracing::info!( target: reports::CLIENT_QUERY_TARGET, query_count = budget_query_count, budget_grt = f64::from(budget.0) as f32, ); + candidates.retain(|c| c.fee <= budget); let mut utility_params = UtilityParameters { budget, @@ -531,34 +546,11 @@ async fn handle_client_query_inner( // 170cbcf3-db7f-404a-be13-2022d9142677 utility_params.latest_block = latest_block.number; - // Since we modify the context in-place, we need to reset the context to the state of - // the original client query. This to avoid the following scenario: - // 1. A client query has no block requirements set for some top-level operation - // 2. The first indexer is selected, with some indexing status at block number `n` - // 3. The query is made deterministic by setting the block requirement to the hash of - // block `n` - // 4. Some condition requires us to retry this query on another indexer with an indexing - // status at a block less than `n` - // 5. The same context is re-used, including the block requirement set to the hash of - // block `n` - // 6. The indexer is seen as being behind and is unnecessarily penalized - // - // TODO: Avoid the additional cloning of the entire AST here, especially in the case - // where retries are necessary. Only the top-level operation arguments need to be reset - // to the state of the client query. - let mut context = context.clone(); - let selection_timer = METRICS.indexer_selection_duration.start_timer(); let (selections, indexer_errors) = ctx .isa_state .latest() - .select_indexers( - &mut rng, - &candidates, - &utility_params, - &mut context, - SELECTION_LIMIT as u8, - ) + .select_indexers(&mut rng, &utility_params, &candidates) .map_err(|err| match err { InputError::MalformedQuery => Error::InvalidQuery(anyhow!("Failed to parse")), InputError::MissingNetworkParams => { @@ -649,6 +641,17 @@ async fn handle_client_query_inner( Err(_) if !last_retry => continue, Err(unresolved) => return Err(Error::BlockNotFound(unresolved)), }; + // The Agora context must be cloned to preserve the state of the original client query. + // This to avoid the following scenario: + // 1. A client query has no block requirements set for some top-level operation + // 2. The first indexer is selected, with some indexing status at block number `n` + // 3. The query is made deterministic by setting the block requirement to the hash of + // block `n` + // 4. Some condition requires us to retry this query on another indexer with an indexing + // status at a block less than `n` + // 5. The same context is re-used, including the block requirement set to the hash of + // block `n` + // 6. The indexer is seen as being behind and is unnecessarily penalized let deterministic_query = make_query_deterministic(context.clone(), &resolved_blocks, &latest_query_block) .ok_or_else(|| { @@ -690,7 +693,6 @@ async fn handle_client_query_inner( if !ignore_budget_feedback { let total_indexer_fees = USD(total_indexer_fees.0 / grt_per_usd.0); let _ = ctx.budgeter.feedback.send(budgets::Feedback { - deployment: budget_deployment, fees: total_indexer_fees, query_count: budget_query_count, }); @@ -888,7 +890,7 @@ async fn handle_indexer_query_inner( Ok(response.payload) } -fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { +fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { let selection_sets = ctx .operations .iter() @@ -902,6 +904,29 @@ fn count_top_level_selection_sets(ctx: &AgoraContext) -> anyhow::Result { Ok(selection_sets.into_iter().map(|set| set.items.len()).sum()) } +pub fn indexer_fee( + cost_model: &Option>, + context: &mut AgoraContext<'_, String>, +) -> Result { + match cost_model + .as_ref() + .map(|model| model.cost_with_context(context)) + { + None => Ok(GRT(UDecimal18::from(0))), + Some(Ok(fee)) => { + let fee = U256::try_from_be_slice(&fee.to_bytes_be()).unwrap_or(U256::MAX); + Ok(GRT(UDecimal18::from_raw_u256(fee))) + } + Some(Err(CostError::CostModelFail | CostError::QueryNotCosted)) => Err("QueryNotCosted"), + Some(Err( + CostError::QueryNotSupported + | CostError::QueryInvalid + | CostError::FailedToParseQuery + | CostError::FailedToParseVariables, + )) => Err("MalformedQuery"), + } +} + /// This adapter middleware extracts the authorization token from the `api_key` path parameter, /// and adds it to the request in the `Authorization` header. /// diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index 95d46e7a..55c39b12 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -1,8 +1,11 @@ use std::{collections::HashMap, net::IpAddr, sync::Arc}; use alloy_primitives::Address; +use cost_model::CostModel; use eventuals::{Eventual, EventualExt as _, EventualWriter, Ptr}; use futures::future::join_all; +use indexer_selection::Indexing; +use prelude::epoch_cache::EpochCache; use semver::Version; use serde::Deserialize; use tokio::sync::Mutex; @@ -10,10 +13,6 @@ use toolshed::thegraph::{BlockPointer, DeploymentId}; use toolshed::url::{url::Host, Url}; use trust_dns_resolver::TokioAsyncResolver as DNSResolver; -use indexer_selection::cost_model::CostModel; -use indexer_selection::Indexing; -use prelude::epoch_cache::EpochCache; - use crate::geoip::GeoIP; use crate::indexers_status::cost_models::{self, CostModelQuery, CostModelSourceResponse}; use crate::indexers_status::indexing_statuses::{self, IndexingStatusResponse}; diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 344a0d05..e47ddf9f 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -476,7 +476,6 @@ async fn write_indexer_inputs( url: indexer.url.clone(), stake: indexer.staked_tokens, allocation: indexer.allocated_tokens, - cost_model: status.cost_model.clone(), block: Some(BlockStatus { reported_number: status.block.number, blocks_behind: latest_block.saturating_sub(status.block.number), diff --git a/prelude/src/lib.rs b/prelude/src/lib.rs index da2f8d65..13fed7f3 100644 --- a/prelude/src/lib.rs +++ b/prelude/src/lib.rs @@ -50,8 +50,8 @@ pub fn sip24_hash(value: &impl Hash) -> u64 { // between these types. /// GRT with 18 fractional digits -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct GRT(pub UDecimal18); /// USD with 18 fractional digits -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct USD(pub UDecimal18);