From 26ecb09e2cf0fedeed3d01d650e10252f0505340 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 28 Nov 2023 11:24:45 -0500 Subject: [PATCH 1/5] fix: block cache logs --- graph-gateway/src/chains/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/graph-gateway/src/chains/mod.rs b/graph-gateway/src/chains/mod.rs index 50e70b73..7b0b9ab8 100644 --- a/graph-gateway/src/chains/mod.rs +++ b/graph-gateway/src/chains/mod.rs @@ -112,7 +112,7 @@ struct Actor { impl Actor { fn spawn(mut self) { - let _trace = tracing::info_span!("Block Cache Actor", chain = %self.chain).entered(); + let _trace = tracing::info_span!("block_cache", chain = %self.chain).entered(); tokio::spawn( async move { let mut cache_timer = interval(Duration::from_secs(32)); @@ -133,7 +133,7 @@ impl Actor { else => break, }; } - tracing::error!("block cache exit"); + tracing::error!("exit"); } .in_current_span(), ); @@ -166,8 +166,6 @@ impl Actor { } async fn handle_chain_head(&mut self, head: BlockHead) { - tracing::info!(chain_head = ?head); - for uncle in &head.uncles { let removed = self.hash_to_number.remove(uncle); if let Some(removed) = removed { @@ -178,6 +176,8 @@ impl Actor { let blocks_per_minute = self.block_rate_estimator.update(head.block.number); self.blocks_per_minute_tx.write(blocks_per_minute); + tracing::info!(chain_head = ?head, blocks_per_minute); + // Remove prior blocks from the past minute. This avoids retaining uncled blocks that are // frequently used. let cutoff = head.block.number - blocks_per_minute; From bf9cb4f7584efca21dfad7ba251c2dff1fb48b06 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 28 Nov 2023 12:22:41 -0500 Subject: [PATCH 2/5] feat: add metric for blocks per minute --- graph-gateway/src/chains/mod.rs | 4 ++++ graph-gateway/src/metrics.rs | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/graph-gateway/src/chains/mod.rs b/graph-gateway/src/chains/mod.rs index 7b0b9ab8..3c4dd042 100644 --- a/graph-gateway/src/chains/mod.rs +++ b/graph-gateway/src/chains/mod.rs @@ -189,6 +189,10 @@ impl Actor { with_metric(&METRICS.chain_head, &[&self.chain], |g| { g.set(head.block.number as i64) }); + with_metric(&METRICS.blocks_per_minute, &[&self.chain], |g| { + g.set(blocks_per_minute as i64) + }); + self.chain_head_tx.write(head.block); } diff --git a/graph-gateway/src/metrics.rs b/graph-gateway/src/metrics.rs index 514d4f84..e1966e24 100644 --- a/graph-gateway/src/metrics.rs +++ b/graph-gateway/src/metrics.rs @@ -21,6 +21,7 @@ pub struct Metrics { pub block_cache_hit: IntCounterVec, pub block_cache_miss: IntCounterVec, pub chain_head: IntGaugeVec, + pub blocks_per_minute: IntGaugeVec, pub indexer_selection_duration: Histogram, } @@ -71,6 +72,12 @@ impl Metrics { &["chain"] ) .unwrap(), + blocks_per_minute: register_int_gauge_vec!( + "gw_blocks_per_minute", + "chain blocks per minute", + &["chain"] + ) + .unwrap(), indexer_selection_duration: register_histogram!( "gw_indexer_selection_duration", "indexer selection duration" From 92d2272f1a818f4aa665da9e536dbdaaa2d81c66 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Wed, 29 Nov 2023 09:21:09 -0500 Subject: [PATCH 3/5] fix(budget): reduce windup --- graph-gateway/src/budgets.rs | 80 ++++++++-------------------------- indexer-selection/src/decay.rs | 1 - 2 files changed, 19 insertions(+), 62 deletions(-) diff --git a/graph-gateway/src/budgets.rs b/graph-gateway/src/budgets.rs index e0a52a9b..4680602d 100644 --- a/graph-gateway/src/budgets.rs +++ b/graph-gateway/src/budgets.rs @@ -1,7 +1,7 @@ use std::time::Duration; use eventuals::{Eventual, EventualWriter}; -use indexer_selection::decay::FastDecayBuffer; +use indexer_selection::decay::DecayBuffer; use prelude::*; use tokio::time::interval; use tokio::{select, spawn, sync::mpsc}; @@ -10,7 +10,7 @@ use crate::metrics::METRICS; pub struct Budgeter { pub feedback: mpsc::UnboundedSender, - absolute_budget_limit: USD, + query_fees_target: USD, budget_limit: Eventual, } @@ -25,21 +25,23 @@ impl Budgeter { let (mut budget_limit_tx, budget_limit_rx) = Eventual::new(); budget_limit_tx.write(query_fees_target); Actor::create(feedback_rx, budget_limit_tx, query_fees_target); - let absolute_budget_limit = USD(query_fees_target.0 * UDecimal18::from(10)); Self { feedback: feedback_tx, - absolute_budget_limit, + query_fees_target, budget_limit: budget_limit_rx, } } pub fn budget(&self, query_count: u64, candidate_fees: &[USD]) -> USD { + let budget_min = USD(UDecimal18::try_from(10e-6).unwrap()); + let budget_max = USD(self.query_fees_target.0 * UDecimal18::from(2)); + let budget_limit = self .budget_limit .value_immediate() - .unwrap_or(self.absolute_budget_limit); + .unwrap_or(self.query_fees_target); let max_fee = candidate_fees.iter().max().cloned().unwrap_or_default(); - let budget = max_fee.max(budget_limit).min(self.absolute_budget_limit); + let budget = max_fee.max(budget_limit).clamp(budget_min, budget_max); USD(budget.0 * UDecimal18::from(query_count as u128)) } } @@ -81,7 +83,7 @@ impl Actor { if self.controller.recent_query_count == 0 { return; } - let budget_limit = USD(self.controller.control_variable()); + let budget_limit = self.controller.control_variable(); tracing::debug!(?budget_limit); self.budget_limit.write(budget_limit); } @@ -92,18 +94,16 @@ struct Controller { target_query_fees: USD, recent_fees: USD, recent_query_count: u64, - error_history: FastDecayBuffer, + error_history: DecayBuffer, } impl Controller { fn new(target_query_fees: USD) -> Self { - let mut error_history = FastDecayBuffer::default(); - *error_history.current_mut() = target_query_fees.0.into(); Self { target_query_fees, recent_fees: USD(UDecimal18::from(0)), recent_query_count: 0, - error_history, + error_history: DecayBuffer::default(), } } @@ -112,66 +112,24 @@ impl Controller { self.recent_query_count += query_count; } - fn control_variable(&mut self) -> UDecimal18 { + fn control_variable(&mut self) -> USD { // See the following link if you're unfamiliar with PID controllers: // https://en.wikipedia.org/wiki/Proportional%E2%80%93integral%E2%80%93derivative_controller + let target = f64::from(self.target_query_fees.0); let process_variable = f64::from(self.recent_fees.0) / self.recent_query_count.max(1) as f64; + tracing::debug!(avg_query_fees = process_variable); METRICS.avg_query_fees.set(process_variable); self.recent_fees = USD(UDecimal18::from(0)); self.recent_query_count = 0; self.error_history.decay(); - let error = f64::from(self.target_query_fees.0) - process_variable; - *self.error_history.current_mut() = error; + let error = (target - process_variable) / target; + *self.error_history.current_mut() += error; let i: f64 = self.error_history.frames().iter().sum(); - let k_i = 1.2; - let correction = UDecimal18::try_from(i * k_i).unwrap_or_default(); - self.target_query_fees.0 + correction - } -} - -#[cfg(test)] -mod tests { - use indexer_selection::test_utils::assert_within; - - use super::*; - - #[test] - fn controller() { - fn test_controller( - controller: &mut Controller, - process_variable_multiplier: f64, - tolerance: f64, - ) { - let setpoint: f64 = controller.target_query_fees.0.into(); - let mut process_variable = 0.0; - for i in 0..20 { - let control_variable: f64 = controller.control_variable().into(); - process_variable = control_variable * process_variable_multiplier; - println!( - "{i:02} SP={setpoint:.6}, PV={:.8}, CV={:.8}", - process_variable, control_variable, - ); - controller.add_queries(USD(UDecimal18::try_from(process_variable).unwrap()), 1); - } - assert_within(process_variable, setpoint, tolerance); - } - - for setpoint in [10e-6, 20e-6, 50e-6] { - let setpoint = USD(UDecimal18::try_from(setpoint).unwrap()); - let mut controller = Controller::new(setpoint); - test_controller(&mut controller, 0.2, 1e-6); - let mut controller = Controller::new(setpoint); - test_controller(&mut controller, 0.6, 1e-6); - let mut controller = Controller::new(setpoint); - test_controller(&mut controller, 0.8, 1e-6); - - let mut controller = Controller::new(setpoint); - test_controller(&mut controller, 0.2, 1e-6); - test_controller(&mut controller, 0.6, 1e-6); - test_controller(&mut controller, 0.7, 1e-6); - } + let k_i = 0.1; + let control_variable = (i * k_i) * target; + USD(UDecimal18::try_from(target + control_variable).unwrap_or_default()) } } diff --git a/indexer-selection/src/decay.rs b/indexer-selection/src/decay.rs index b4f73ae0..c4e8db2f 100644 --- a/indexer-selection/src/decay.rs +++ b/indexer-selection/src/decay.rs @@ -44,7 +44,6 @@ pub trait Decay { } pub type ISADecayBuffer = DecayBuffer; -pub type FastDecayBuffer = DecayBuffer; impl Default for DecayBuffer where From 96632ed093863290e072bddbc44665d002f6fe5c Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 30 Nov 2023 14:34:26 -0500 Subject: [PATCH 4/5] feat(budget): allow feedback from manual budgets --- graph-gateway/src/client_query.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index f29500a2..deb8adc6 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -507,7 +507,6 @@ async fn handle_client_query_inner( .map(|c| USD(c.fee.0 / grt_per_usd.0)) .collect(); let mut budget = ctx.budgeter.budget(budget_query_count, &candidate_fees); - let ignore_budget_feedback = user_settings.budget.is_some(); if let Some(user_budget) = user_settings.budget { // Security: Consumers can and will set their budget to unreasonably high values. // This `.min` prevents the budget from being set far beyond what it would be @@ -679,13 +678,11 @@ async fn handle_client_query_inner( match outcome_rx.recv().await { Some(Err(_)) | None => (), Some(Ok(outcome)) => { - if !ignore_budget_feedback { - let total_indexer_fees = USD(total_indexer_fees.0 / grt_per_usd.0); - let _ = ctx.budgeter.feedback.send(budgets::Feedback { - fees: total_indexer_fees, - query_count: budget_query_count, - }); - } + let total_indexer_fees = USD(total_indexer_fees.0 / grt_per_usd.0); + let _ = ctx.budgeter.feedback.send(budgets::Feedback { + fees: total_indexer_fees, + query_count: budget_query_count, + }); return Ok(outcome); } From 61ebf6237013edb00bc3600d96d6c9e976ee1723 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 28 Nov 2023 11:04:36 -0500 Subject: [PATCH 5/5] release 15.1.0 --- .github/workflows/ci.yml | 3 +-- Cargo.lock | 46 +++++++++++++++++----------------------- graph-gateway/Cargo.toml | 8 +++---- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4656031d..e212ba76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,5 +51,4 @@ jobs: run: cargo test --lib - name: Integration tests - # the cargo clean is to avoid running out of disk space, hopefully unnecessary in the future - run: cargo clean && cargo test --test '*' + run: cargo test --test '*' diff --git a/Cargo.lock b/Cargo.lock index 5846ea4a..8e4d4f59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1410,9 +1410,9 @@ dependencies = [ [[package]] name = "faster-hex" -version = "0.8.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239f7bfb930f820ab16a9cd95afc26f88264cf6905c960b340a615384aa3338a" +checksum = "a2a2b11eda1d40935b26cf18f6833c526845ae8c41e58d09af6adeb6f0269183" dependencies = [ "serde", ] @@ -1678,7 +1678,7 @@ dependencies = [ [[package]] name = "graph-gateway" -version = "15.0.0" +version = "15.1.0" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -1698,7 +1698,7 @@ dependencies = [ "hyper", "indexer-selection", "indoc", - "itertools 0.11.0", + "itertools 0.12.0", "lazy_static", "maxminddb", "prelude", @@ -1708,7 +1708,6 @@ dependencies = [ "rand", "rdkafka", "receipts", - "regex", "reqwest", "secp256k1", "semver 1.0.20", @@ -1718,7 +1717,6 @@ dependencies = [ "serde_yaml", "simple-rate-limiter", "tap_core", - "test-with", "thegraph", "thiserror", "tokio", @@ -2188,6 +2186,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2700,9 +2707,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "parity-scale-codec" -version = "3.6.5" +version = "3.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dec8a8073036902368c2cdc0387e85ff9a37054d7e7c98e592145e0c92cd4fb" +checksum = "881331e34fa842a2fb61cc2db9643a8fedc615e47cfcc52597d1af0db9a7e8fe" dependencies = [ "arrayvec 0.7.4", "bitvec", @@ -2714,11 +2721,11 @@ dependencies = [ [[package]] name = "parity-scale-codec-derive" -version = "3.6.5" +version = "3.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "312270ee71e1cd70289dacf597cab7b207aa107d2f28191c2ae45b2ece18a260" +checksum = "be30eaf4b0a9fba5336683b38de57bb86d179a35862ba6bfcf57625d006bde5b" dependencies = [ - "proc-macro-crate 1.3.1", + "proc-macro-crate 2.0.0", "proc-macro2", "quote", "syn 1.0.109", @@ -3930,9 +3937,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "spki" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", "der", @@ -4113,19 +4120,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "test-with" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfdbef2253c8b276a6877dd72b0dc03fc1ca4bfa2e344d884e908c6f3669f3ea" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "regex", - "syn 2.0.39", -] - [[package]] name = "thegraph" version = "0.1.1" diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index 76afa017..443c5ca0 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -1,7 +1,7 @@ [package] edition = "2021" name = "graph-gateway" -version = "15.0.0" +version = "15.1.0" [dependencies] alloy-primitives.workspace = true @@ -16,7 +16,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] } cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3367d76" } ethers = { version = "2.0.10", default-features = false, features = ["abigen"] } eventuals = "0.6.7" -faster-hex = "0.8.0" +faster-hex = "0.9.0" futures = "0.3" graph-subscriptions = { git = "https://github.com/edgeandnode/subscription-payments", rev = "334d18b" } graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0", default-features = false } @@ -26,7 +26,7 @@ graphql-http = { git = "https://github.com/edgeandnode/toolshed.git", tag = "gra hex = "0.4" indexer-selection = { path = "../indexer-selection" } indoc = "2.0.3" -itertools = "0.11" +itertools = "0.12.0" lazy_static = "1.4" maxminddb = "0.23" prelude = { path = "../prelude" } @@ -63,5 +63,3 @@ uuid = { version = "1.0", default-features = false, features = ["v4"] } [dev-dependencies] assert_matches = "1.5.0" hyper = "0.14.24" -regex = "1.5" -test-with = { version = "0.12.0", default-features = false }