Skip to content

Commit

Permalink
refactor: UDecimal18
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 17, 2023
1 parent 334e911 commit f08e2f6
Show file tree
Hide file tree
Showing 26 changed files with 318 additions and 553 deletions.
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ reqwest = { version = "0.11", default-features = false, features = [
"default-tls",
"gzip",
] }
secp256k1 = { version = "0.24", default-features = false }
serde = { version = "1.0", features = ["derive"] }
siphasher = "1.0.0"
tokio = { version = "1.24", features = [
Expand Down
3 changes: 2 additions & 1 deletion graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ prometheus = { version = "0.13", default-features = false }
prost = "0.12.1"
rand.workspace = true
rdkafka = { version = "0.36.0", features = ["gssapi", "tracing"] }
receipts = { git = "ssh://[email protected]/edgeandnode/receipts.git", rev = "89a821c" }
reqwest.workspace = true
secp256k1.workspace = true
secp256k1 = { version = "0.24", default-features = false }
semver = "1.0"
serde.workspace = true
serde_json = { version = "1.0", features = ["raw_value"] }
Expand Down
59 changes: 30 additions & 29 deletions graph-gateway/src/budgets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use indexer_selection::{
decay::{Decay, FastDecayBuffer},
impl_struct_decay,
};
use prelude::USD;
use prelude::{UDecimal18, USD};
use tokio::{
select, spawn,
sync::mpsc,
Expand Down Expand Up @@ -34,7 +34,7 @@ pub struct Feedback {

impl Budgeter {
pub fn new(query_fees_target: USD) -> Self {
assert!(query_fees_target.as_f64() >= MAX_DISCOUNT_USD);
assert!(f64::from(query_fees_target.0) >= MAX_DISCOUNT_USD);
let (feedback_tx, feedback_rx) = mpsc::unbounded_channel();
let (budgets_tx, budgets_rx) = Eventual::new();
Actor::create(feedback_rx, budgets_tx, query_fees_target);
Expand All @@ -51,7 +51,7 @@ impl Budgeter {
.value_immediate()
.and_then(|budgets| budgets.get(deployment).copied())
.unwrap_or(self.query_fees_target);
budget * USD::try_from(query_count).unwrap()
USD(budget.0 * UDecimal18::from(query_count as u128))
}
}

Expand Down Expand Up @@ -109,17 +109,17 @@ impl Actor {
}
let target = self.controller.target_query_fees;
let control_variable = self.controller.control_variable();
tracing::debug!(budget_control_variable = %control_variable);
tracing::debug!(budget_control_variable = ?control_variable);
let now = Instant::now();
let budgets = self
.volume_estimators
.iter()
.map(|(deployment, volume_estimator)| {
let volume = volume_estimator.monthly_volume_estimate(now) as u64;
let mut budget = volume_discount(volume, target) * control_variable;
let mut budget = volume_discount(volume, target).0 * control_variable;
// limit budget to 100x target
budget = budget.min(target * USD::try_from(100_u64).unwrap());
(*deployment, budget)
budget = budget.min(target.0 * UDecimal18::from(100));
(*deployment, USD(budget))
})
.collect();

Expand All @@ -131,13 +131,13 @@ fn volume_discount(monthly_volume: u64, target: USD) -> USD {
// Discount the budget, based on a generalized logistic function. We apply little to no discount
// between 0 and ~10e3 queries per month. And we limit the discount to 10E-6 USD.
// https://www.desmos.com/calculator/whtakt50sa
let b_max = target.as_f64();
let b_max: f64 = target.0.into();
let b_min = b_max - MAX_DISCOUNT_USD;
let m: f64 = 1e6;
let z: f64 = 0.45;
let v = monthly_volume as f64;
let budget = b_min + ((b_max - b_min) * m.powf(z)) / (v + m).powf(z);
budget.try_into().unwrap_or_default()
USD(budget.try_into().unwrap_or_default())
}

/// State for the control loop targeting `recent_query_fees`.
Expand All @@ -152,32 +152,33 @@ impl Controller {
fn new(target_query_fees: USD) -> Self {
Self {
target_query_fees,
recent_fees: USD::zero(),
recent_fees: USD(UDecimal18::from(0)),
recent_query_count: 0,
error_history: FastDecayBuffer::default(),
}
}

fn add_queries(&mut self, fees: USD, query_count: u64) {
self.recent_fees += fees;
self.recent_fees = USD(self.recent_fees.0 + fees.0);
self.recent_query_count += query_count;
}

fn control_variable(&mut self) -> USD {
fn control_variable(&mut self) -> UDecimal18 {
// See the following link if you're unfamiliar with PID controllers:
// https://en.wikipedia.org/wiki/Proportional%E2%80%93integral%E2%80%93derivative_controller
let process_variable = self.recent_fees.as_f64() / self.recent_query_count.max(1) as f64;
let process_variable =
f64::from(self.recent_fees.0) / self.recent_query_count.max(1) as f64;
METRICS.avg_query_fees.set(process_variable);

self.recent_fees = USD::zero();
self.recent_fees = USD(UDecimal18::from(0));
self.recent_query_count = 0;
self.error_history.decay();
let error = self.target_query_fees.as_f64() - process_variable;
let error = f64::from(self.target_query_fees.0) - process_variable;
*self.error_history.current_mut() = error;

let i: f64 = self.error_history.frames().iter().sum();
let k_i = 3e4;
USD::try_from(1.0).unwrap() + USD::try_from(i * k_i).unwrap_or(USD::zero())
UDecimal18::from(1) + UDecimal18::try_from(i * k_i).unwrap_or_default()
}
}

Expand Down Expand Up @@ -332,32 +333,32 @@ mod tests {
process_variable_multiplier: f64,
tolerance: f64,
) {
let setpoint = controller.target_query_fees.as_f64();
let mut process_variable = USD::zero();
let setpoint: f64 = controller.target_query_fees.0.into();
let mut process_variable = 0.0;
for i in 0..30 {
let control_variable = controller.control_variable();
process_variable = controller.target_query_fees
* USD::try_from(process_variable_multiplier).unwrap()
let control_variable: f64 = controller.control_variable().into();
process_variable = f64::from(controller.target_query_fees.0)
* process_variable_multiplier
* control_variable;
println!(
"{i:02} SP={setpoint:.6}, PV={:.8}, CV={:.8}",
process_variable.as_f64(),
control_variable.as_f64(),
process_variable, control_variable,
);
controller.add_queries(process_variable, 1);
controller.add_queries(USD(UDecimal18::try_from(process_variable).unwrap()), 1);
}
assert_within(process_variable.as_f64(), setpoint, tolerance);
assert_within(process_variable, setpoint, tolerance);
}

for setpoint in [20e-6, 40e-6] {
let mut controller = Controller::new(USD::try_from(setpoint).unwrap());
let setpoint = USD(UDecimal18::try_from(setpoint).unwrap());
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.2, 1e-6);
let mut controller = Controller::new(USD::try_from(setpoint).unwrap());
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.6, 1e-6);
let mut controller = Controller::new(USD::try_from(setpoint).unwrap());
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.8, 1e-6);

let mut controller = Controller::new(USD::try_from(setpoint).unwrap());
let mut controller = Controller::new(setpoint);
test_controller(&mut controller, 0.2, 1e-6);
test_controller(&mut controller, 0.6, 1e-6);
test_controller(&mut controller, 0.7, 1e-6);
Expand Down
32 changes: 14 additions & 18 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use eventuals::{Eventual, Ptr};
use futures::future::join_all;
use graphql::graphql_parser::query::{OperationDefinition, SelectionSet};
use lazy_static::lazy_static;
use prelude::USD;
use prelude::{UDecimal18, USD};
use prost::bytes::Buf;
use rand::{rngs::SmallRng, SeedableRng as _};
use serde::Deserialize;
Expand Down Expand Up @@ -484,19 +484,20 @@ async fn handle_client_query_inner(
// This `.min` prevents the budget from being set far beyond what it would be
// automatically. The reason this is important is because sometimes queries are
// subsidized and we would be at-risk to allow arbitrarily high values.
budget = user_budget.min(budget * USD::try_from(10_u64).unwrap());
budget = USD(user_budget.0.min(budget.0 * UDecimal18::from(10)));
// TOOD: budget = user_budget.max(budget * USD::try_from(0.1_f64).unwrap());
}
let budget: GRT = ctx
let grt_per_usd = ctx
.isa_state
.latest()
.network_params
.usd_to_grt(budget)
.grt_per_usd
.ok_or_else(|| Error::Internal(anyhow!("Missing exchange rate")))?;
let budget = GRT(budget.0 * grt_per_usd.0);
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
query_count = budget_query_count,
budget_grt = budget.as_f64() as f32,
budget_grt = f64::from(budget.0) as f32,
);

let mut utility_params = UtilityParameters {
Expand All @@ -510,7 +511,7 @@ async fn handle_client_query_inner(
let mut rng = SmallRng::from_entropy();

let mut total_indexer_queries = 0;
let mut total_indexer_fees = GRT::zero();
let mut total_indexer_fees = GRT(UDecimal18::from(0));
// Used to track how many times an indexer failed to resolve a block. This may indicate that
// our latest block has been uncled.
let mut latest_unresolved: u32 = 0;
Expand Down Expand Up @@ -580,10 +581,10 @@ async fn handle_client_query_inner(

// Double the budget & retry if there is any indexer requesting a higher fee.
if !last_retry && isa_errors.contains_key(&IndexerSelectionError::FeeTooHigh) {
utility_params.budget = utility_params.budget * GRT::try_from(2_u64).unwrap();
utility_params.budget = GRT(utility_params.budget.0 * UDecimal18::from(2));
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
budget_grt = budget.as_f64() as f32,
budget_grt = f64::from(budget.0) as f32,
"increase_budget"
);
continue;
Expand All @@ -595,10 +596,10 @@ async fn handle_client_query_inner(
)));
}

total_indexer_fees += selections.iter().map(|s| s.fee).sum();
total_indexer_fees = GRT(total_indexer_fees.0 + selections.iter().map(|s| s.fee.0).sum());
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
indexer_fees_grt = total_indexer_fees.as_f64() as f32,
indexer_fees_grt = f64::from(total_indexer_fees.0) as f32,
);

// The gateway's current strategy for predicting is optimized for keeping responses close to chain head. We've
Expand Down Expand Up @@ -687,12 +688,7 @@ async fn handle_client_query_inner(
Some(Err(_)) | None => (),
Some(Ok(outcome)) => {
if !ignore_budget_feedback {
let total_indexer_fees: USD = ctx
.isa_state
.latest()
.network_params
.grt_to_usd(total_indexer_fees)
.unwrap();
let total_indexer_fees: USD = USD(total_indexer_fees.0 / grt_per_usd.0);
let _ = ctx.budgeter.feedback.send(budgets::Feedback {
deployment: budget_deployment,
fees: total_indexer_fees,
Expand Down Expand Up @@ -738,13 +734,13 @@ async fn handle_indexer_query(
%deployment,
url = %selection.url,
blocks_behind = selection.blocks_behind,
fee_grt = selection.fee.as_f64() as f32,
fee_grt = f64::from(selection.fee.0) as f32,
subgraph_chain = %ctx.deployment.manifest.network,
);

let receipt = ctx
.receipt_signer
.create_receipt(&selection.indexing, selection.fee)
.create_receipt(&selection.indexing, &selection.fee)
.await
.ok_or(IndexerError::NoAllocation);

Expand Down
9 changes: 5 additions & 4 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use std::{collections::BTreeMap, fmt, path::PathBuf};

use alloy_primitives::{Address, U256};
use graph_subscriptions::subscription_tier::{SubscriptionTier, SubscriptionTiers};
use prelude::UDecimal18;
use secp256k1::SecretKey;
use semver::Version;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr, FromInto};
use toolshed::url::Url;

use indexer_selection::SecretKey;
use prelude::USD;

use crate::chains::ethereum;
use crate::poi::ProofOfIndexingInfo;

Expand Down Expand Up @@ -110,8 +109,10 @@ impl From<Chain> for ethereum::Provider {
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum ExchangeRateProvider {
/// Ethereum RPC provider
Rpc(#[serde_as(as = "DisplayFromStr")] Url),
Fixed(USD),
/// Fixed conversion rate of GRT/USD
Fixed(#[serde_as(as = "DisplayFromStr")] UDecimal18),
}

#[derive(Debug, Deserialize)]
Expand Down
Loading

0 comments on commit f08e2f6

Please sign in to comment.