Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release 15.1.0 #438

Merged
merged 5 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,4 @@ jobs:
run: cargo test --lib

- name: Integration tests
# the cargo clean is to avoid running out of disk space, hopefully unnecessary in the future
run: cargo clean && cargo test --test '*'
run: cargo test --test '*'
46 changes: 20 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
edition = "2021"
name = "graph-gateway"
version = "15.0.0"
version = "15.1.0"

[dependencies]
alloy-primitives.workspace = true
Expand All @@ -16,7 +16,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock"] }
cost-model = { git = "https://github.com/graphprotocol/agora", rev = "3367d76" }
ethers = { version = "2.0.10", default-features = false, features = ["abigen"] }
eventuals = "0.6.7"
faster-hex = "0.8.0"
faster-hex = "0.9.0"
futures = "0.3"
graph-subscriptions = { git = "https://github.com/edgeandnode/subscription-payments", rev = "334d18b" }
graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0", default-features = false }
Expand All @@ -26,7 +26,7 @@ graphql-http = { git = "https://github.com/edgeandnode/toolshed.git", tag = "gra
hex = "0.4"
indexer-selection = { path = "../indexer-selection" }
indoc = "2.0.3"
itertools = "0.11"
itertools = "0.12.0"
lazy_static = "1.4"
maxminddb = "0.23"
prelude = { path = "../prelude" }
Expand Down Expand Up @@ -63,5 +63,3 @@ uuid = { version = "1.0", default-features = false, features = ["v4"] }
[dev-dependencies]
assert_matches = "1.5.0"
hyper = "0.14.24"
regex = "1.5"
test-with = { version = "0.12.0", default-features = false }
80 changes: 19 additions & 61 deletions graph-gateway/src/budgets.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use eventuals::{Eventual, EventualWriter};
use indexer_selection::decay::FastDecayBuffer;
use indexer_selection::decay::DecayBuffer;
use prelude::*;
use tokio::time::interval;
use tokio::{select, spawn, sync::mpsc};
Expand All @@ -10,7 +10,7 @@ use crate::metrics::METRICS;

pub struct Budgeter {
pub feedback: mpsc::UnboundedSender<Feedback>,
absolute_budget_limit: USD,
query_fees_target: USD,
budget_limit: Eventual<USD>,
}

Expand All @@ -25,21 +25,23 @@ impl Budgeter {
let (mut budget_limit_tx, budget_limit_rx) = Eventual::new();
budget_limit_tx.write(query_fees_target);
Actor::create(feedback_rx, budget_limit_tx, query_fees_target);
let absolute_budget_limit = USD(query_fees_target.0 * UDecimal18::from(10));
Self {
feedback: feedback_tx,
absolute_budget_limit,
query_fees_target,
budget_limit: budget_limit_rx,
}
}

pub fn budget(&self, query_count: u64, candidate_fees: &[USD]) -> USD {
let budget_min = USD(UDecimal18::try_from(10e-6).unwrap());
let budget_max = USD(self.query_fees_target.0 * UDecimal18::from(2));

let budget_limit = self
.budget_limit
.value_immediate()
.unwrap_or(self.absolute_budget_limit);
.unwrap_or(self.query_fees_target);
let max_fee = candidate_fees.iter().max().cloned().unwrap_or_default();
let budget = max_fee.max(budget_limit).min(self.absolute_budget_limit);
let budget = max_fee.max(budget_limit).clamp(budget_min, budget_max);
USD(budget.0 * UDecimal18::from(query_count as u128))
}
}
Expand Down Expand Up @@ -81,7 +83,7 @@ impl Actor {
if self.controller.recent_query_count == 0 {
return;
}
let budget_limit = USD(self.controller.control_variable());
let budget_limit = self.controller.control_variable();
tracing::debug!(?budget_limit);
self.budget_limit.write(budget_limit);
}
Expand All @@ -92,18 +94,16 @@ struct Controller {
target_query_fees: USD,
recent_fees: USD,
recent_query_count: u64,
error_history: FastDecayBuffer<f64>,
error_history: DecayBuffer<f64, 6, 4>,
}

impl Controller {
fn new(target_query_fees: USD) -> Self {
let mut error_history = FastDecayBuffer::default();
*error_history.current_mut() = target_query_fees.0.into();
Self {
target_query_fees,
recent_fees: USD(UDecimal18::from(0)),
recent_query_count: 0,
error_history,
error_history: DecayBuffer::default(),
}
}

Expand All @@ -112,66 +112,24 @@ impl Controller {
self.recent_query_count += query_count;
}

fn control_variable(&mut self) -> UDecimal18 {
fn control_variable(&mut self) -> USD {
// See the following link if you're unfamiliar with PID controllers:
// https://en.wikipedia.org/wiki/Proportional%E2%80%93integral%E2%80%93derivative_controller
let target = f64::from(self.target_query_fees.0);
let process_variable =
f64::from(self.recent_fees.0) / self.recent_query_count.max(1) as f64;
tracing::debug!(avg_query_fees = process_variable);
METRICS.avg_query_fees.set(process_variable);

self.recent_fees = USD(UDecimal18::from(0));
self.recent_query_count = 0;
self.error_history.decay();
let error = f64::from(self.target_query_fees.0) - process_variable;
*self.error_history.current_mut() = error;
let error = (target - process_variable) / target;
*self.error_history.current_mut() += error;

let i: f64 = self.error_history.frames().iter().sum();
let k_i = 1.2;
let correction = UDecimal18::try_from(i * k_i).unwrap_or_default();
self.target_query_fees.0 + correction
}
}

#[cfg(test)]
mod tests {
use indexer_selection::test_utils::assert_within;

use super::*;

#[test]
fn controller() {
fn test_controller(
controller: &mut Controller,
process_variable_multiplier: f64,
tolerance: f64,
) {
let setpoint: f64 = controller.target_query_fees.0.into();
let mut process_variable = 0.0;
for i in 0..20 {
let control_variable: f64 = controller.control_variable().into();
process_variable = control_variable * process_variable_multiplier;
println!(
"{i:02} SP={setpoint:.6}, PV={:.8}, CV={:.8}",
process_variable, control_variable,
);
controller.add_queries(USD(UDecimal18::try_from(process_variable).unwrap()), 1);
}
assert_within(process_variable, setpoint, tolerance);
}

for setpoint in [10e-6, 20e-6, 50e-6] {
let setpoint = USD(UDecimal18::try_from(setpoint).unwrap());
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.2, 1e-6);
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.6, 1e-6);
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.8, 1e-6);

let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.2, 1e-6);
test_controller(&mut controller, 0.6, 1e-6);
test_controller(&mut controller, 0.7, 1e-6);
}
let k_i = 0.1;
let control_variable = (i * k_i) * target;
USD(UDecimal18::try_from(target + control_variable).unwrap_or_default())
}
}
12 changes: 8 additions & 4 deletions graph-gateway/src/chains/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ struct Actor {

impl Actor {
fn spawn(mut self) {
let _trace = tracing::info_span!("Block Cache Actor", chain = %self.chain).entered();
let _trace = tracing::info_span!("block_cache", chain = %self.chain).entered();
tokio::spawn(
async move {
let mut cache_timer = interval(Duration::from_secs(32));
Expand All @@ -133,7 +133,7 @@ impl Actor {
else => break,
};
}
tracing::error!("block cache exit");
tracing::error!("exit");
}
.in_current_span(),
);
Expand Down Expand Up @@ -166,8 +166,6 @@ impl Actor {
}

async fn handle_chain_head(&mut self, head: BlockHead) {
tracing::info!(chain_head = ?head);

for uncle in &head.uncles {
let removed = self.hash_to_number.remove(uncle);
if let Some(removed) = removed {
Expand All @@ -178,6 +176,8 @@ impl Actor {
let blocks_per_minute = self.block_rate_estimator.update(head.block.number);
self.blocks_per_minute_tx.write(blocks_per_minute);

tracing::info!(chain_head = ?head, blocks_per_minute);

// Remove prior blocks from the past minute. This avoids retaining uncled blocks that are
// frequently used.
let cutoff = head.block.number - blocks_per_minute;
Expand All @@ -189,6 +189,10 @@ impl Actor {
with_metric(&METRICS.chain_head, &[&self.chain], |g| {
g.set(head.block.number as i64)
});
with_metric(&METRICS.blocks_per_minute, &[&self.chain], |g| {
g.set(blocks_per_minute as i64)
});

self.chain_head_tx.write(head.block);
}

Expand Down
13 changes: 5 additions & 8 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ async fn handle_client_query_inner(
.map(|c| USD(c.fee.0 / grt_per_usd.0))
.collect();
let mut budget = ctx.budgeter.budget(budget_query_count, &candidate_fees);
let ignore_budget_feedback = user_settings.budget.is_some();
if let Some(user_budget) = user_settings.budget {
// Security: Consumers can and will set their budget to unreasonably high values.
// This `.min` prevents the budget from being set far beyond what it would be
Expand Down Expand Up @@ -679,13 +678,11 @@ async fn handle_client_query_inner(
match outcome_rx.recv().await {
Some(Err(_)) | None => (),
Some(Ok(outcome)) => {
if !ignore_budget_feedback {
let total_indexer_fees = USD(total_indexer_fees.0 / grt_per_usd.0);
let _ = ctx.budgeter.feedback.send(budgets::Feedback {
fees: total_indexer_fees,
query_count: budget_query_count,
});
}
let total_indexer_fees = USD(total_indexer_fees.0 / grt_per_usd.0);
let _ = ctx.budgeter.feedback.send(budgets::Feedback {
fees: total_indexer_fees,
query_count: budget_query_count,
});

return Ok(outcome);
}
Expand Down
Loading
Loading