Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(graph-gateway): estimate block rate automatically #429

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions graph-gateway/src/chains/block_rate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::time::Duration;

use alloy_primitives::BlockNumber;
use itertools::Itertools;

/// Estimates blocks per minute, based on the past minute of chain head block heights.
pub struct Estimator {
capacity: usize,
block_heights: Vec<BlockNumber>,
}

impl Estimator {
pub fn new(poll_interval: Duration) -> Self {
let updates_per_minute = 60 / poll_interval.as_secs();
let capacity = updates_per_minute as usize + 1;
let block_heights = Vec::with_capacity(capacity);
Self {
capacity,
block_heights,
}
}

/// Expected to be called approximately once per `poll_interval`. Returns the updated block rate
/// in blocks per minute.
pub fn update(&mut self, block_height: BlockNumber) -> u64 {
if self.block_heights.len() < self.capacity {
self.block_heights.push(block_height);
} else {
self.block_heights.rotate_left(1);
*self.block_heights.last_mut().unwrap() = block_height;
}
// Sum the last minute of block height deltas.
self.block_heights
.iter()
.tuple_windows()
.map(|(a, b)| b.saturating_sub(*a))
.sum()
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn estimator() {
let mut estimator = Estimator::new(Duration::from_secs(20));
assert_eq!(estimator.update(1), 0);
assert_eq!(estimator.update(2), 1);
assert_eq!(estimator.update(3), 2);
assert_eq!(estimator.update(4), 3);
assert_eq!(estimator.update(5), 3);
assert_eq!(estimator.update(6), 3);
assert_eq!(estimator.update(8), 4);
assert_eq!(estimator.update(10), 5);
assert_eq!(estimator.update(12), 6);
assert_eq!(estimator.update(12), 4);
assert_eq!(estimator.update(12), 2);
assert_eq!(estimator.update(12), 0);
assert_eq!(estimator.update(13), 1);
}
}
49 changes: 20 additions & 29 deletions graph-gateway/src/chains/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,48 @@
use std::time::Duration;

use alloy_primitives::BlockHash;
use reqwest;
use indexer_selection::UnresolvedBlock;
use serde::{de::Error, Deserialize, Deserializer};
use serde_json::{json, Value as JSON};
use tokio::sync::mpsc;
use tokio::time::interval;
use toolshed::{thegraph::BlockPointer, url::Url};
use tracing::Instrument;

use indexer_selection::UnresolvedBlock;

use crate::metrics::METRICS;

use super::{BlockHead, ClientMsg};

#[derive(Debug)]
pub struct Provider {
pub network: String,
pub block_time: Duration,
pub rpc: Url,
}

impl super::Provider for Provider {
fn network(&self) -> &str {
&self.network
}
}
use crate::{config, metrics::METRICS};

pub struct Client {
provider: Provider,
chain: config::Chain,
http_client: reqwest::Client,
notify: mpsc::UnboundedSender<ClientMsg>,
}

impl super::Client for Client {
type Provider = Provider;
type Config = config::Chain;

fn chain_name(config: &Self::Config) -> &str {
&config.name
}

fn poll_interval() -> Duration {
Duration::from_secs(2)
}

fn create(
provider: Provider,
chain: config::Chain,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock> {
let _trace =
tracing::info_span!("Ethereum Client Actor", network = %provider.network).entered();
let _trace = tracing::info_span!("Ethereum Client Actor", chain = %chain.name).entered();
let (unresolved_tx, mut unresolved_rx) = mpsc::unbounded_channel();
let mut client = Self {
provider,
chain,
http_client: reqwest::Client::new(),
notify,
};
tokio::spawn(
async move {
let mut poll_timer = interval(client.provider.block_time);
let mut poll_timer = interval(Self::poll_interval());
loop {
tokio::select! {
_ = poll_timer.tick() => {
@@ -74,14 +65,14 @@ impl super::Client for Client {
impl Client {
async fn spawn_block_fetch(&mut self, unresolved: Option<UnresolvedBlock>) {
let client = self.http_client.clone();
let network = self.provider.network.clone();
let rpc = self.provider.rpc.clone();
let chain = self.chain.name.clone();
let rpc = self.chain.rpc.clone();
let notify = self.notify.clone();
tokio::spawn(async move {
let timer = METRICS.block_resolution.start_timer(&[&network]);
let timer = METRICS.block_resolution.start_timer(&[&chain]);
let result = Self::fetch_block(client, rpc, unresolved.clone()).await;
drop(timer);
METRICS.block_resolution.check(&[&network], &result);
METRICS.block_resolution.check(&[&chain], &result);
let response = match result {
Ok(head) => match &unresolved {
Some(_) => ClientMsg::Block(head.block),
57 changes: 33 additions & 24 deletions graph-gateway/src/chains/mod.rs
Original file line number Diff line number Diff line change
@@ -3,16 +3,16 @@ use std::time::Duration;

use alloy_primitives::{BlockHash, BlockNumber};
use eventuals::{Eventual, EventualWriter};
use indexer_selection::UnresolvedBlock;
use prelude::epoch_cache::EpochCache;
use tokio::sync::{mpsc, oneshot};
use tokio::time::interval;
use toolshed::thegraph::BlockPointer;
use tracing::Instrument;

use indexer_selection::UnresolvedBlock;
use prelude::epoch_cache::EpochCache;

use crate::{block_constraints::*, metrics::*};

pub mod block_rate;
pub mod ethereum;
pub mod test;

@@ -22,14 +22,15 @@ pub struct BlockHead {
pub uncles: Vec<BlockHash>,
}

pub trait Provider {
fn network(&self) -> &str;
}

pub trait Client {
type Provider: Provider;
type Config;

fn chain_name(config: &Self::Config) -> &str;

fn poll_interval() -> Duration;

fn create(
provider: Self::Provider,
config: Self::Config,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock>;
}
@@ -50,7 +51,7 @@ pub struct BlockRequirements {
#[derive(Clone)]
pub struct BlockCache {
pub chain_head: Eventual<BlockPointer>,
pub block_rate_hz: f64,
pub blocks_per_minute: Eventual<u64>,
request_tx: mpsc::UnboundedSender<Request>,
}

@@ -60,24 +61,27 @@ struct Request {
}

impl BlockCache {
pub fn new<C: Client>(block_rate_hz: f64, provider: C::Provider) -> Self {
let (chain_head_broadcast_tx, chain_head_broadcast_rx) = Eventual::new();
pub fn new<C: Client>(config: C::Config) -> Self {
let (chain_head_tx, chain_head_rx) = Eventual::new();
let (notify_tx, notify_rx) = mpsc::unbounded_channel();
let (request_tx, request_rx) = mpsc::unbounded_channel();
let (blocks_per_minute_tx, blocks_per_minute_rx) = Eventual::new();
let actor = Actor {
network: provider.network().to_string(),
client_tx: C::create(provider, notify_tx),
chain: C::chain_name(&config).to_string(),
client_tx: C::create(config, notify_tx),
notify_rx,
request_rx,
chain_head_broadcast_tx,
chain_head_tx,
blocks_per_minute_tx,
block_rate_estimator: block_rate::Estimator::new(C::poll_interval()),
number_to_hash: EpochCache::new(),
hash_to_number: EpochCache::new(),
pending: HashMap::new(),
};
actor.spawn();
Self {
chain_head: chain_head_broadcast_rx,
block_rate_hz,
chain_head: chain_head_rx,
blocks_per_minute: blocks_per_minute_rx,
request_tx,
}
}
@@ -106,19 +110,21 @@ impl BlockCache {
}

struct Actor {
network: String,
chain: String,
request_rx: mpsc::UnboundedReceiver<Request>,
client_tx: mpsc::UnboundedSender<UnresolvedBlock>,
notify_rx: mpsc::UnboundedReceiver<ClientMsg>,
chain_head_broadcast_tx: EventualWriter<BlockPointer>,
chain_head_tx: EventualWriter<BlockPointer>,
blocks_per_minute_tx: EventualWriter<u64>,
block_rate_estimator: block_rate::Estimator,
number_to_hash: EpochCache<BlockNumber, BlockHash, 2>,
hash_to_number: EpochCache<BlockHash, BlockNumber, 2>,
pending: HashMap<UnresolvedBlock, Vec<oneshot::Sender<Result<BlockPointer, UnresolvedBlock>>>>,
}

impl Actor {
fn spawn(mut self) {
let _trace = tracing::info_span!("Block Cache Actor", network = %self.network).entered();
let _trace = tracing::info_span!("Block Cache Actor", chain = %self.chain).entered();
tokio::spawn(
async move {
let mut cache_timer = interval(Duration::from_secs(32));
@@ -160,11 +166,11 @@ impl Actor {
};
match cached {
Ok(block) => {
with_metric(&METRICS.block_cache_hit, &[&self.network], |c| c.inc());
with_metric(&METRICS.block_cache_hit, &[&self.chain], |c| c.inc());
let _ = request.tx.send(Ok(block));
}
Err(unresolved) => {
with_metric(&METRICS.block_cache_miss, &[&self.network], |c| c.inc());
with_metric(&METRICS.block_cache_miss, &[&self.chain], |c| c.inc());
let _ = self.client_tx.send(unresolved.clone());
self.pending.entry(unresolved).or_default().push(request.tx);
}
@@ -183,10 +189,13 @@ impl Actor {

self.handle_block(head.block.clone()).await;

with_metric(&METRICS.chain_head, &[&self.network], |g| {
let blocks_per_minute = self.block_rate_estimator.update(head.block.number);
self.blocks_per_minute_tx.write(blocks_per_minute);

with_metric(&METRICS.chain_head, &[&self.chain], |g| {
g.set(head.block.number as i64)
});
self.chain_head_broadcast_tx.write(head.block);
self.chain_head_tx.write(head.block);
}

async fn handle_block(&mut self, block: BlockPointer) {
29 changes: 16 additions & 13 deletions graph-gateway/src/chains/test.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
use tokio::sync::mpsc;
use std::time::Duration;

use indexer_selection::UnresolvedBlock;
use tokio::sync::mpsc;
use toolshed::thegraph::BlockPointer;

use crate::chains::BlockHead;

use super::ClientMsg;

pub struct Provider {
pub network: String,
pub struct Config {
pub chain: String,
pub blocks: Vec<BlockPointer>,
}

impl super::Provider for Provider {
fn network(&self) -> &str {
&self.network
}
}

pub struct Client;

impl super::Client for Client {
type Provider = Provider;
type Config = Config;

fn chain_name(config: &Self::Config) -> &str {
&config.chain
}

fn poll_interval() -> std::time::Duration {
Duration::from_secs(1)
}

fn create(
provider: Provider,
config: Config,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock> {
let (tx, mut rx) = mpsc::unbounded_channel::<UnresolvedBlock>();
tokio::spawn(async move {
if let Some(head) = provider.blocks.last() {
if let Some(head) = config.blocks.last() {
notify
.send(ClientMsg::Head(BlockHead {
block: head.clone(),
@@ -40,7 +43,7 @@ impl super::Client for Client {
loop {
tokio::select! {
Some(unresolved) = rx.recv() => {
match provider.blocks.iter().find(|b| unresolved.matches(b)) {
match config.blocks.iter().find(|b| unresolved.matches(b)) {
Some(block) => notify.send(ClientMsg::Block(block.clone())).unwrap(),
None => notify.send(ClientMsg::Err(unresolved)).unwrap(),
};
3 changes: 2 additions & 1 deletion graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
@@ -515,12 +515,13 @@ async fn handle_client_query_inner(
);
candidates.retain(|c| c.fee <= budget);

let block_rate_hz = block_cache.blocks_per_minute.value_immediate().unwrap_or(0) as f64 / 60.0;
let mut utility_params = UtilityParameters {
budget,
requirements: block_requirements,
// 170cbcf3-db7f-404a-be13-2022d9142677
latest_block: 0,
block_rate_hz: block_cache.block_rate_hz,
block_rate_hz,
};

let mut rng = SmallRng::from_entropy();
16 changes: 1 addition & 15 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::ops::Deref;
use std::str::FromStr;
use std::time::Duration;
use std::{collections::BTreeMap, fmt, path::PathBuf};

use alloy_primitives::{Address, B256, U256};
@@ -12,7 +11,6 @@ use serde::Deserialize;
use serde_with::{serde_as, DeserializeAs, DisplayFromStr, FromInto};
use toolshed::url::Url;

use crate::chains::ethereum;
use crate::poi::ProofOfIndexingInfo;

#[serde_as]
@@ -86,23 +84,11 @@ pub struct AttestationConfig {
}

#[serde_as]
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Chain {
pub name: String,
#[serde_as(as = "DisplayFromStr")]
pub rpc: Url,
pub poll_hz: u16,
pub block_rate_hz: f64,
}

impl From<Chain> for ethereum::Provider {
fn from(chain: Chain) -> Self {
Self {
network: chain.name,
rpc: chain.rpc,
block_time: Duration::from_secs(chain.poll_hz as u64),
}
}
}

#[serde_as]
2 changes: 1 addition & 1 deletion graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -106,7 +106,7 @@ async fn main() {
.into_iter()
.map(|chain| {
let network = chain.name.clone();
let cache = BlockCache::new::<ethereum::Client>(chain.block_rate_hz, chain.into());
let cache = BlockCache::new::<ethereum::Client>(chain);
(network, cache)
})
.collect::<HashMap<String, BlockCache>>();
8 changes: 4 additions & 4 deletions graph-gateway/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -51,24 +51,24 @@ impl Metrics {
block_resolution: ResponseMetricVecs::new(
"gw_block_resolution",
"block requests",
&["network"],
&["chain"],
),
block_cache_hit: register_int_counter_vec!(
"gw_block_cache_hit",
"block cache hit count",
&["network"]
&["chain"]
)
.unwrap(),
block_cache_miss: register_int_counter_vec!(
"gw_block_cache_miss",
"block cache miss count",
&["network"]
&["chain"]
)
.unwrap(),
chain_head: register_int_gauge_vec!(
"gw_chain_head",
"chain head block number",
&["network"]
&["chain"]
)
.unwrap(),
indexer_selection_duration: register_histogram!(