From 143739ea1bca4ae15f9ba1f54fbe5bdfed4fba33 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Tue, 5 Dec 2023 22:02:37 -0500 Subject: [PATCH] feat: block oracle subgraph --- graph-gateway/src/client_query.rs | 3 +- graph-gateway/src/config.rs | 5 ++ graph-gateway/src/lib.rs | 35 ++++++++++++- graph-gateway/src/main.rs | 20 +++++--- graph-gateway/src/topology.rs | 85 ++++++++++++++++++++++++++----- 5 files changed, 125 insertions(+), 23 deletions(-) diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 36fa12f4..8320c438 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -818,8 +818,7 @@ async fn handle_indexer_query_inner( } } - // Return early if we aren't expecting an attestation. - if !ctx.deployment.manifest.features.is_empty() { + if !ctx.deployment.expect_attestation { return Ok(response.payload); } diff --git a/graph-gateway/src/config.rs b/graph-gateway/src/config.rs index 7dadb332..73cfb87e 100644 --- a/graph-gateway/src/config.rs +++ b/graph-gateway/src/config.rs @@ -19,6 +19,9 @@ pub struct Config { /// Respect the payment state of API keys (disable for testnets) pub api_key_payment_required: bool, pub attestations: AttestationConfig, + /// Block oracle subgraph URL + #[serde_as(as = "DisplayFromStr")] + pub block_oracle_subgraph: Url, pub chains: Vec, /// Ethereum RPC provider, or fixed exchange rate for testing pub exchange_rate_provider: ExchangeRateProvider, @@ -86,6 +89,8 @@ pub struct AttestationConfig { #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct Chain { + /// CAIP-2 ID used to identify chains in the block oracle. + pub caip2_id: String, /// The first name is used in logs, the others are aliases also supported in subgraph manifests. pub names: Vec, #[serde_as(as = "DisplayFromStr")] diff --git a/graph-gateway/src/lib.rs b/graph-gateway/src/lib.rs index 5a7162f8..06982a52 100644 --- a/graph-gateway/src/lib.rs +++ b/graph-gateway/src/lib.rs @@ -1,7 +1,11 @@ -use std::iter; +use std::{iter, time::Duration}; use axum::Json; +use eventuals::{Eventual, EventualExt as _, Ptr}; use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use serde::Deserialize; +use thegraph::client as subgraph_client; +use tokio::sync::Mutex; pub mod auth; pub mod block_constraints; @@ -42,3 +46,32 @@ where ); (headers, Json(payload)) } + +pub fn spawn_poller( + client: subgraph_client::Client, + query: String, + label: &'static str, + interval: Duration, +) -> Eventual>> +where + T: for<'de> Deserialize<'de> + Send + 'static, + Ptr>: Send, +{ + let (writer, reader) = Eventual::new(); + let state: &'static Mutex<_> = Box::leak(Box::new(Mutex::new((writer, client)))); + eventuals::timer(interval) + .pipe_async(move |_| { + let query = query.clone(); + async move { + let mut guard = state.lock().await; + match guard.1.paginated_query::(query).await { + Ok(response) => guard.0.write(Ptr::new(response)), + Err(subgraph_poll_err) => { + tracing::error!(label, %subgraph_poll_err); + } + }; + } + }) + .forever(); + reader +} diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 1ae3dbee..5f652162 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -18,6 +18,7 @@ use axum::{ routing, Router, Server, }; use eventuals::{Eventual, EventualExt as _, Ptr}; +use graph_gateway::topology::block_oracle_chains; use graph_subscriptions::subscription_tier::SubscriptionTiers; use prometheus::{self, Encoder as _}; use secp256k1::SecretKey; @@ -101,6 +102,17 @@ async fn main() { .filter(|_| !config.geoip_blocked_countries.is_empty()) .map(|db| GeoIP::new(db, config.geoip_blocked_countries).unwrap()); + let http_client = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .unwrap(); + + let block_oracle_chains = block_oracle_chains( + subgraph_client::Client::new(http_client.clone(), config.block_oracle_subgraph), + &config.chains, + ) + .await; + let block_caches: HashMap = config .chains .into_iter() @@ -113,11 +125,6 @@ async fn main() { let block_caches: &'static HashMap = Box::leak(Box::new(block_caches)); - let http_client = reqwest::Client::builder() - .timeout(Duration::from_secs(20)) - .build() - .unwrap(); - let grt_per_usd: Eventual = match config.exchange_rate_provider { ExchangeRateProvider::Fixed(grt_per_usd) => Eventual::from_value(GRT(grt_per_usd)), ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url).await.unwrap(), @@ -149,7 +156,8 @@ async fn main() { ))); let ipfs = ipfs::Client::new(http_client.clone(), config.ipfs, 50); - let network = GraphNetwork::new(network_subgraph_data.subgraphs, ipfs).await; + let network = + GraphNetwork::new(network_subgraph_data.subgraphs, block_oracle_chains, ipfs).await; // Indexer blocklist // Periodically check the defective POIs list against the network indexers and update the diff --git a/graph-gateway/src/topology.rs b/graph-gateway/src/topology.rs index d484a528..5c683011 100644 --- a/graph-gateway/src/topology.rs +++ b/graph-gateway/src/topology.rs @@ -1,18 +1,21 @@ -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use alloy_primitives::Address; use anyhow::anyhow; use eventuals::{Eventual, EventualExt, Ptr}; use futures::future::join_all; +use indoc::indoc; use itertools::Itertools; use prelude::GRT; use serde::Deserialize; +use thegraph::client as subgraph_client; use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::RwLock; use toolshed::url::Url; -use crate::{ipfs, network_subgraph}; +use crate::{config, ipfs, network_subgraph, spawn_poller}; /// Representation of the graph network being used to serve queries #[derive(Clone)] @@ -38,6 +41,7 @@ pub struct Subgraph { pub struct Deployment { pub id: DeploymentId, pub manifest: Arc, + pub expect_attestation: bool, /// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical /// allocation using the largest allocation ID and sum of the allocated tokens. pub indexers: Vec>, @@ -85,6 +89,7 @@ pub struct Manifest { impl GraphNetwork { pub async fn new( subgraphs: Eventual>>, + block_oracle_chains: Eventual>>, ipfs: Arc, ) -> Self { let cache: &'static RwLock = Box::leak(Box::new(RwLock::new(IpfsCache { @@ -94,8 +99,9 @@ impl GraphNetwork { // Create a lookup table for subgraphs, keyed by their ID. // Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4 - let subgraphs = subgraphs.map(move |subgraphs| async move { - Ptr::new(Self::subgraphs(&subgraphs, cache).await) + let subgraphs = subgraphs.map(move |subgraphs| { + let block_oracle_chains = block_oracle_chains.clone(); + async move { Ptr::new(Self::subgraphs(&subgraphs, cache, block_oracle_chains).await) } }); // Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash). @@ -134,19 +140,20 @@ impl GraphNetwork { async fn subgraphs( subgraphs: &[network_subgraph::Subgraph], cache: &'static RwLock, + block_oracle_chains: Eventual>>, ) -> HashMap { + let block_oracle_chains: &HashSet = + &block_oracle_chains.value_immediate().unwrap_or_default(); join_all(subgraphs.iter().map(|subgraph| async move { let id = subgraph.id; - let deployments = join_all( - subgraph - .versions - .iter() - .map(|version| Self::deployment(subgraphs, version, cache)), - ) - .await - .into_iter() - .flatten() - .collect(); + let deployments = + join_all(subgraph.versions.iter().map(|version| { + Self::deployment(subgraphs, version, cache, block_oracle_chains) + })) + .await + .into_iter() + .flatten() + .collect(); let subgraph = Subgraph { deployments, id, @@ -163,6 +170,7 @@ impl GraphNetwork { subgraphs: &[network_subgraph::Subgraph], version: &network_subgraph::SubgraphVersion, cache: &'static RwLock, + block_oracle_chains: &HashSet, ) -> Option> { let id = version.subgraph_deployment.id; let manifest = IpfsCache::manifest(cache, &version.subgraph_deployment.id).await?; @@ -215,9 +223,13 @@ impl GraphNetwork { let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2 && version.subgraph_deployment.allocations.is_empty(); + let expect_attestation = + manifest.features.is_empty() && block_oracle_chains.contains(&manifest.network); + Some(Arc::new(Deployment { id, manifest, + expect_attestation, subgraphs, indexers, transferred_to_l2, @@ -299,3 +311,48 @@ impl IpfsCache { }) } } + +/// Returns the set of CAIP-2 IDs supported by the Block Oracle. +pub async fn block_oracle_chains( + subgraph: subgraph_client::Client, + chains: &[config::Chain], +) -> Eventual>> { + let query = indoc! {" + networks( + block: $block + orderBy: id + orderDirection: asc + first: $first + where: { + id_gt: $last + } + ) { + id + } + "}; + #[derive(Clone, Deserialize)] + struct Network { + id: String, + } + let chains: &'static HashMap> = Box::leak(Box::new( + chains + .iter() + .map(|c| (c.caip2_id.to_string(), HashSet::from_iter(c.names.clone()))) + .collect::>(), + )); + let reader = spawn_poller::( + subgraph, + query.into(), + "block_oracle", + Duration::from_secs(120), + ) + .map(move |networks| async move { + networks + .iter() + .flat_map(|n| chains.get(&n.id).cloned().unwrap_or_default()) + .collect::>() + .into() + }); + reader.value().await.unwrap(); + reader +}