Skip to content

Commit

Permalink
feat(graph-gateway): estimate block rate automatically (#429)
Browse files Browse the repository at this point in the history
Configuring a fixed block rate is an unnecessary, and makes little sense
for chains with inconsistent block rates.
  • Loading branch information
Theodus authored Nov 27, 2023
1 parent 02c6c78 commit d620594
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 87 deletions.
62 changes: 62 additions & 0 deletions graph-gateway/src/chains/block_rate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::time::Duration;

use alloy_primitives::BlockNumber;
use itertools::Itertools;

/// Estimates blocks per minute, based on the past minute of chain head block heights.
pub struct Estimator {
capacity: usize,
block_heights: Vec<BlockNumber>,
}

impl Estimator {
pub fn new(poll_interval: Duration) -> Self {
let updates_per_minute = 60 / poll_interval.as_secs();
let capacity = updates_per_minute as usize + 1;
let block_heights = Vec::with_capacity(capacity);
Self {
capacity,
block_heights,
}
}

/// Expected to be called approximately once per `poll_interval`. Returns the updated block rate
/// in blocks per minute.
pub fn update(&mut self, block_height: BlockNumber) -> u64 {
if self.block_heights.len() < self.capacity {
self.block_heights.push(block_height);
} else {
self.block_heights.rotate_left(1);
*self.block_heights.last_mut().unwrap() = block_height;
}
// Sum the last minute of block height deltas.
self.block_heights
.iter()
.tuple_windows()
.map(|(a, b)| b.saturating_sub(*a))
.sum()
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn estimator() {
let mut estimator = Estimator::new(Duration::from_secs(20));
assert_eq!(estimator.update(1), 0);
assert_eq!(estimator.update(2), 1);
assert_eq!(estimator.update(3), 2);
assert_eq!(estimator.update(4), 3);
assert_eq!(estimator.update(5), 3);
assert_eq!(estimator.update(6), 3);
assert_eq!(estimator.update(8), 4);
assert_eq!(estimator.update(10), 5);
assert_eq!(estimator.update(12), 6);
assert_eq!(estimator.update(12), 4);
assert_eq!(estimator.update(12), 2);
assert_eq!(estimator.update(12), 0);
assert_eq!(estimator.update(13), 1);
}
}
49 changes: 20 additions & 29 deletions graph-gateway/src/chains/ethereum.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,48 @@
use std::time::Duration;

use alloy_primitives::BlockHash;
use reqwest;
use indexer_selection::UnresolvedBlock;
use serde::{de::Error, Deserialize, Deserializer};
use serde_json::{json, Value as JSON};
use tokio::sync::mpsc;
use tokio::time::interval;
use toolshed::{thegraph::BlockPointer, url::Url};
use tracing::Instrument;

use indexer_selection::UnresolvedBlock;

use crate::metrics::METRICS;

use super::{BlockHead, ClientMsg};

#[derive(Debug)]
pub struct Provider {
pub network: String,
pub block_time: Duration,
pub rpc: Url,
}

impl super::Provider for Provider {
fn network(&self) -> &str {
&self.network
}
}
use crate::{config, metrics::METRICS};

pub struct Client {
provider: Provider,
chain: config::Chain,
http_client: reqwest::Client,
notify: mpsc::UnboundedSender<ClientMsg>,
}

impl super::Client for Client {
type Provider = Provider;
type Config = config::Chain;

fn chain_name(config: &Self::Config) -> &str {
&config.name
}

fn poll_interval() -> Duration {
Duration::from_secs(2)
}

fn create(
provider: Provider,
chain: config::Chain,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock> {
let _trace =
tracing::info_span!("Ethereum Client Actor", network = %provider.network).entered();
let _trace = tracing::info_span!("Ethereum Client Actor", chain = %chain.name).entered();
let (unresolved_tx, mut unresolved_rx) = mpsc::unbounded_channel();
let mut client = Self {
provider,
chain,
http_client: reqwest::Client::new(),
notify,
};
tokio::spawn(
async move {
let mut poll_timer = interval(client.provider.block_time);
let mut poll_timer = interval(Self::poll_interval());
loop {
tokio::select! {
_ = poll_timer.tick() => {
Expand All @@ -74,14 +65,14 @@ impl super::Client for Client {
impl Client {
async fn spawn_block_fetch(&mut self, unresolved: Option<UnresolvedBlock>) {
let client = self.http_client.clone();
let network = self.provider.network.clone();
let rpc = self.provider.rpc.clone();
let chain = self.chain.name.clone();
let rpc = self.chain.rpc.clone();
let notify = self.notify.clone();
tokio::spawn(async move {
let timer = METRICS.block_resolution.start_timer(&[&network]);
let timer = METRICS.block_resolution.start_timer(&[&chain]);
let result = Self::fetch_block(client, rpc, unresolved.clone()).await;
drop(timer);
METRICS.block_resolution.check(&[&network], &result);
METRICS.block_resolution.check(&[&chain], &result);
let response = match result {
Ok(head) => match &unresolved {
Some(_) => ClientMsg::Block(head.block),
Expand Down
57 changes: 33 additions & 24 deletions graph-gateway/src/chains/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use std::time::Duration;

use alloy_primitives::{BlockHash, BlockNumber};
use eventuals::{Eventual, EventualWriter};
use indexer_selection::UnresolvedBlock;
use prelude::epoch_cache::EpochCache;
use tokio::sync::{mpsc, oneshot};
use tokio::time::interval;
use toolshed::thegraph::BlockPointer;
use tracing::Instrument;

use indexer_selection::UnresolvedBlock;
use prelude::epoch_cache::EpochCache;

use crate::{block_constraints::*, metrics::*};

pub mod block_rate;
pub mod ethereum;
pub mod test;

Expand All @@ -22,14 +22,15 @@ pub struct BlockHead {
pub uncles: Vec<BlockHash>,
}

pub trait Provider {
fn network(&self) -> &str;
}

pub trait Client {
type Provider: Provider;
type Config;

fn chain_name(config: &Self::Config) -> &str;

fn poll_interval() -> Duration;

fn create(
provider: Self::Provider,
config: Self::Config,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock>;
}
Expand All @@ -50,7 +51,7 @@ pub struct BlockRequirements {
#[derive(Clone)]
pub struct BlockCache {
pub chain_head: Eventual<BlockPointer>,
pub block_rate_hz: f64,
pub blocks_per_minute: Eventual<u64>,
request_tx: mpsc::UnboundedSender<Request>,
}

Expand All @@ -60,24 +61,27 @@ struct Request {
}

impl BlockCache {
pub fn new<C: Client>(block_rate_hz: f64, provider: C::Provider) -> Self {
let (chain_head_broadcast_tx, chain_head_broadcast_rx) = Eventual::new();
pub fn new<C: Client>(config: C::Config) -> Self {
let (chain_head_tx, chain_head_rx) = Eventual::new();
let (notify_tx, notify_rx) = mpsc::unbounded_channel();
let (request_tx, request_rx) = mpsc::unbounded_channel();
let (blocks_per_minute_tx, blocks_per_minute_rx) = Eventual::new();
let actor = Actor {
network: provider.network().to_string(),
client_tx: C::create(provider, notify_tx),
chain: C::chain_name(&config).to_string(),
client_tx: C::create(config, notify_tx),
notify_rx,
request_rx,
chain_head_broadcast_tx,
chain_head_tx,
blocks_per_minute_tx,
block_rate_estimator: block_rate::Estimator::new(C::poll_interval()),
number_to_hash: EpochCache::new(),
hash_to_number: EpochCache::new(),
pending: HashMap::new(),
};
actor.spawn();
Self {
chain_head: chain_head_broadcast_rx,
block_rate_hz,
chain_head: chain_head_rx,
blocks_per_minute: blocks_per_minute_rx,
request_tx,
}
}
Expand Down Expand Up @@ -106,19 +110,21 @@ impl BlockCache {
}

struct Actor {
network: String,
chain: String,
request_rx: mpsc::UnboundedReceiver<Request>,
client_tx: mpsc::UnboundedSender<UnresolvedBlock>,
notify_rx: mpsc::UnboundedReceiver<ClientMsg>,
chain_head_broadcast_tx: EventualWriter<BlockPointer>,
chain_head_tx: EventualWriter<BlockPointer>,
blocks_per_minute_tx: EventualWriter<u64>,
block_rate_estimator: block_rate::Estimator,
number_to_hash: EpochCache<BlockNumber, BlockHash, 2>,
hash_to_number: EpochCache<BlockHash, BlockNumber, 2>,
pending: HashMap<UnresolvedBlock, Vec<oneshot::Sender<Result<BlockPointer, UnresolvedBlock>>>>,
}

impl Actor {
fn spawn(mut self) {
let _trace = tracing::info_span!("Block Cache Actor", network = %self.network).entered();
let _trace = tracing::info_span!("Block Cache Actor", chain = %self.chain).entered();
tokio::spawn(
async move {
let mut cache_timer = interval(Duration::from_secs(32));
Expand Down Expand Up @@ -160,11 +166,11 @@ impl Actor {
};
match cached {
Ok(block) => {
with_metric(&METRICS.block_cache_hit, &[&self.network], |c| c.inc());
with_metric(&METRICS.block_cache_hit, &[&self.chain], |c| c.inc());
let _ = request.tx.send(Ok(block));
}
Err(unresolved) => {
with_metric(&METRICS.block_cache_miss, &[&self.network], |c| c.inc());
with_metric(&METRICS.block_cache_miss, &[&self.chain], |c| c.inc());
let _ = self.client_tx.send(unresolved.clone());
self.pending.entry(unresolved).or_default().push(request.tx);
}
Expand All @@ -183,10 +189,13 @@ impl Actor {

self.handle_block(head.block.clone()).await;

with_metric(&METRICS.chain_head, &[&self.network], |g| {
let blocks_per_minute = self.block_rate_estimator.update(head.block.number);
self.blocks_per_minute_tx.write(blocks_per_minute);

with_metric(&METRICS.chain_head, &[&self.chain], |g| {
g.set(head.block.number as i64)
});
self.chain_head_broadcast_tx.write(head.block);
self.chain_head_tx.write(head.block);
}

async fn handle_block(&mut self, block: BlockPointer) {
Expand Down
29 changes: 16 additions & 13 deletions graph-gateway/src/chains/test.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
use tokio::sync::mpsc;
use std::time::Duration;

use indexer_selection::UnresolvedBlock;
use tokio::sync::mpsc;
use toolshed::thegraph::BlockPointer;

use crate::chains::BlockHead;

use super::ClientMsg;

pub struct Provider {
pub network: String,
pub struct Config {
pub chain: String,
pub blocks: Vec<BlockPointer>,
}

impl super::Provider for Provider {
fn network(&self) -> &str {
&self.network
}
}

pub struct Client;

impl super::Client for Client {
type Provider = Provider;
type Config = Config;

fn chain_name(config: &Self::Config) -> &str {
&config.chain
}

fn poll_interval() -> std::time::Duration {
Duration::from_secs(1)
}

fn create(
provider: Provider,
config: Config,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock> {
let (tx, mut rx) = mpsc::unbounded_channel::<UnresolvedBlock>();
tokio::spawn(async move {
if let Some(head) = provider.blocks.last() {
if let Some(head) = config.blocks.last() {
notify
.send(ClientMsg::Head(BlockHead {
block: head.clone(),
Expand All @@ -40,7 +43,7 @@ impl super::Client for Client {
loop {
tokio::select! {
Some(unresolved) = rx.recv() => {
match provider.blocks.iter().find(|b| unresolved.matches(b)) {
match config.blocks.iter().find(|b| unresolved.matches(b)) {
Some(block) => notify.send(ClientMsg::Block(block.clone())).unwrap(),
None => notify.send(ClientMsg::Err(unresolved)).unwrap(),
};
Expand Down
3 changes: 2 additions & 1 deletion graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,13 @@ async fn handle_client_query_inner(
);
candidates.retain(|c| c.fee <= budget);

let block_rate_hz = block_cache.blocks_per_minute.value_immediate().unwrap_or(0) as f64 / 60.0;
let mut utility_params = UtilityParameters {
budget,
requirements: block_requirements,
// 170cbcf3-db7f-404a-be13-2022d9142677
latest_block: 0,
block_rate_hz: block_cache.block_rate_hz,
block_rate_hz,
};

let mut rng = SmallRng::from_entropy();
Expand Down
Loading

0 comments on commit d620594

Please sign in to comment.