From 3246ed5cb21752ed60fea4f799e8954d3095421e Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 27 Nov 2023 15:18:40 -0500 Subject: [PATCH] fix: budget controller --- graph-gateway/src/budgets.rs | 20 +++++++++++--------- graph-gateway/src/client_query.rs | 20 ++++++++------------ 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/graph-gateway/src/budgets.rs b/graph-gateway/src/budgets.rs index da4c708f..e0a52a9b 100644 --- a/graph-gateway/src/budgets.rs +++ b/graph-gateway/src/budgets.rs @@ -22,7 +22,8 @@ pub struct Feedback { impl Budgeter { pub fn new(query_fees_target: USD) -> Self { let (feedback_tx, feedback_rx) = mpsc::unbounded_channel(); - let (budget_limit_tx, budget_limit_rx) = Eventual::new(); + let (mut budget_limit_tx, budget_limit_rx) = Eventual::new(); + budget_limit_tx.write(query_fees_target); Actor::create(feedback_rx, budget_limit_tx, query_fees_target); let absolute_budget_limit = USD(query_fees_target.0 * UDecimal18::from(10)); Self { @@ -96,11 +97,13 @@ struct Controller { impl Controller { fn new(target_query_fees: USD) -> Self { + let mut error_history = FastDecayBuffer::default(); + *error_history.current_mut() = target_query_fees.0.into(); Self { target_query_fees, recent_fees: USD(UDecimal18::from(0)), recent_query_count: 0, - error_history: FastDecayBuffer::default(), + error_history, } } @@ -123,8 +126,9 @@ impl Controller { *self.error_history.current_mut() = error; let i: f64 = self.error_history.frames().iter().sum(); - let k_i = 3e4; - UDecimal18::from(1) + UDecimal18::try_from(i * k_i).unwrap_or_default() + let k_i = 1.2; + let correction = UDecimal18::try_from(i * k_i).unwrap_or_default(); + self.target_query_fees.0 + correction } } @@ -143,11 +147,9 @@ mod tests { ) { let setpoint: f64 = controller.target_query_fees.0.into(); let mut process_variable = 0.0; - for i in 0..30 { + for i in 0..20 { let control_variable: f64 = controller.control_variable().into(); - process_variable = f64::from(controller.target_query_fees.0) - * process_variable_multiplier - * control_variable; + process_variable = control_variable * process_variable_multiplier; println!( "{i:02} SP={setpoint:.6}, PV={:.8}, CV={:.8}", process_variable, control_variable, @@ -157,7 +159,7 @@ mod tests { assert_within(process_variable, setpoint, tolerance); } - for setpoint in [20e-6, 40e-6] { + for setpoint in [10e-6, 20e-6, 50e-6] { let setpoint = USD(UDecimal18::try_from(setpoint).unwrap()); let mut controller = Controller::new(setpoint); test_controller(&mut controller, 0.2, 1e-6); diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 41e7e56d..d0364aac 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -559,7 +559,7 @@ async fn handle_client_query_inner( utility_params.latest_block = latest_block.number; let selection_timer = METRICS.indexer_selection_duration.start_timer(); - let (selections, indexer_errors) = ctx + let (mut selections, indexer_errors) = ctx .isa_state .latest() .select_indexers(&mut rng, &utility_params, &candidates) @@ -583,23 +583,19 @@ async fn handle_client_query_inner( .filter(|(_, l)| *l > 0) .collect::>(); - // Double the budget & retry if there is any indexer requesting a higher fee. - if !last_retry && isa_errors.contains_key(&IndexerSelectionError::FeeTooHigh) { - utility_params.budget = GRT(utility_params.budget.0 * UDecimal18::from(2)); - tracing::info!( - target: reports::CLIENT_QUERY_TARGET, - budget_grt = f64::from(budget.0) as f32, - "increase_budget" - ); - continue; - } - return Err(Error::NoSuitableIndexer(anyhow!( "Indexer selection errors: {:?}", isa_errors ))); } + for selection in &mut selections { + // TODO: In a future where indexers are expected put more effort into setting cost + // models, we should pay selected indexers the maximum fee of the alternatives + // (where `fee <= budget`). + let min_fee = budget.0 * UDecimal18::try_from(0.75).unwrap(); + selection.fee = GRT(selection.fee.0.max(min_fee)); + } total_indexer_fees = GRT(total_indexer_fees.0 + selections.iter().map(|s| s.fee.0).sum()); tracing::info!( target: reports::CLIENT_QUERY_TARGET,