diff --git a/graph-gateway/src/chains/ethereum.rs b/graph-gateway/src/chains/ethereum.rs index 5ff16ed1..f237d7d8 100644 --- a/graph-gateway/src/chains/ethereum.rs +++ b/graph-gateway/src/chains/ethereum.rs @@ -23,7 +23,7 @@ impl super::Client for Client { type Config = config::Chain; fn chain_name(config: &Self::Config) -> &str { - &config.name + &config.names[0] } fn poll_interval() -> Duration { @@ -34,7 +34,7 @@ impl super::Client for Client { chain: config::Chain, notify: mpsc::UnboundedSender, ) -> mpsc::UnboundedSender { - let _trace = tracing::info_span!("Ethereum Client Actor", chain = %chain.name).entered(); + let _trace = tracing::info_span!("ethereum_client", chain = %chain.names[0]).entered(); let (unresolved_tx, mut unresolved_rx) = mpsc::unbounded_channel(); let mut client = Self { chain, @@ -66,7 +66,7 @@ impl super::Client for Client { impl Client { async fn spawn_block_fetch(&mut self, unresolved: Option) { let client = self.http_client.clone(); - let chain = self.chain.name.clone(); + let chain = self.chain.names[0].clone(); let rpc = self.chain.rpc.clone(); let notify = self.notify.clone(); tokio::spawn(async move { diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 4a37e908..8320c438 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -97,7 +97,7 @@ pub struct Context { pub budgeter: &'static Budgeter, pub indexer_selection_retry_limit: usize, pub l2_gateway: Option, - pub block_caches: &'static HashMap, + pub block_caches: &'static HashMap, pub network: GraphNetwork, pub indexing_statuses: Eventual>>, pub attestation_domain: &'static Eip712Domain, @@ -444,10 +444,9 @@ async fn handle_client_query_inner( }) .collect(); - let block_cache = ctx + let block_cache = *ctx .block_caches .get(&subgraph_chain) - .cloned() .ok_or_else(|| Error::SubgraphChainNotSupported(subgraph_chain))?; let block_constraints = block_constraints(&context) @@ -612,7 +611,7 @@ async fn handle_client_query_inner( }; let latest_query_block = pick_latest_query_block( - &block_cache, + block_cache, latest_block.number.saturating_sub(selection.blocks_behind), blocks_per_minute, ) @@ -642,7 +641,7 @@ async fn handle_client_query_inner( &latest_block, &latest_query_block, &utility_params.requirements, - &block_cache, + block_cache, &selection, ) .await; @@ -819,8 +818,7 @@ async fn handle_indexer_query_inner( } } - // Return early if we aren't expecting an attestation. - if !ctx.deployment.manifest.features.is_empty() { + if !ctx.deployment.expect_attestation { return Ok(response.payload); } diff --git a/graph-gateway/src/config.rs b/graph-gateway/src/config.rs index 2b54767f..73cfb87e 100644 --- a/graph-gateway/src/config.rs +++ b/graph-gateway/src/config.rs @@ -19,6 +19,9 @@ pub struct Config { /// Respect the payment state of API keys (disable for testnets) pub api_key_payment_required: bool, pub attestations: AttestationConfig, + /// Block oracle subgraph URL + #[serde_as(as = "DisplayFromStr")] + pub block_oracle_subgraph: Url, pub chains: Vec, /// Ethereum RPC provider, or fixed exchange rate for testing pub exchange_rate_provider: ExchangeRateProvider, @@ -86,7 +89,10 @@ pub struct AttestationConfig { #[serde_as] #[derive(Clone, Debug, Deserialize)] pub struct Chain { - pub name: String, + /// CAIP-2 ID used to identify chains in the block oracle. + pub caip2_id: String, + /// The first name is used in logs, the others are aliases also supported in subgraph manifests. + pub names: Vec, #[serde_as(as = "DisplayFromStr")] pub rpc: Url, } diff --git a/graph-gateway/src/lib.rs b/graph-gateway/src/lib.rs index 5a7162f8..06982a52 100644 --- a/graph-gateway/src/lib.rs +++ b/graph-gateway/src/lib.rs @@ -1,7 +1,11 @@ -use std::iter; +use std::{iter, time::Duration}; use axum::Json; +use eventuals::{Eventual, EventualExt as _, Ptr}; use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use serde::Deserialize; +use thegraph::client as subgraph_client; +use tokio::sync::Mutex; pub mod auth; pub mod block_constraints; @@ -42,3 +46,32 @@ where ); (headers, Json(payload)) } + +pub fn spawn_poller( + client: subgraph_client::Client, + query: String, + label: &'static str, + interval: Duration, +) -> Eventual>> +where + T: for<'de> Deserialize<'de> + Send + 'static, + Ptr>: Send, +{ + let (writer, reader) = Eventual::new(); + let state: &'static Mutex<_> = Box::leak(Box::new(Mutex::new((writer, client)))); + eventuals::timer(interval) + .pipe_async(move |_| { + let query = query.clone(); + async move { + let mut guard = state.lock().await; + match guard.1.paginated_query::(query).await { + Ok(response) => guard.0.write(Ptr::new(response)), + Err(subgraph_poll_err) => { + tracing::error!(label, %subgraph_poll_err); + } + }; + } + }) + .forever(); + reader +} diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index ebd0aa15..5f652162 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -18,6 +18,7 @@ use axum::{ routing, Router, Server, }; use eventuals::{Eventual, EventualExt as _, Ptr}; +use graph_gateway::topology::block_oracle_chains; use graph_subscriptions::subscription_tier::SubscriptionTiers; use prometheus::{self, Encoder as _}; use secp256k1::SecretKey; @@ -101,22 +102,29 @@ async fn main() { .filter(|_| !config.geoip_blocked_countries.is_empty()) .map(|db| GeoIP::new(db, config.geoip_blocked_countries).unwrap()); - let block_caches = config - .chains - .into_iter() - .map(|chain| { - let network = chain.name.clone(); - let cache = BlockCache::new::(chain); - (network, cache) - }) - .collect::>(); - let block_caches: &'static HashMap = Box::leak(Box::new(block_caches)); - let http_client = reqwest::Client::builder() .timeout(Duration::from_secs(20)) .build() .unwrap(); + let block_oracle_chains = block_oracle_chains( + subgraph_client::Client::new(http_client.clone(), config.block_oracle_subgraph), + &config.chains, + ) + .await; + + let block_caches: HashMap = config + .chains + .into_iter() + .flat_map(|chain| { + let cache: &'static BlockCache = + Box::leak(Box::new(BlockCache::new::(chain.clone()))); + chain.names.into_iter().map(move |alias| (alias, cache)) + }) + .collect(); + let block_caches: &'static HashMap = + Box::leak(Box::new(block_caches)); + let grt_per_usd: Eventual = match config.exchange_rate_provider { ExchangeRateProvider::Fixed(grt_per_usd) => Eventual::from_value(GRT(grt_per_usd)), ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url).await.unwrap(), @@ -148,7 +156,8 @@ async fn main() { ))); let ipfs = ipfs::Client::new(http_client.clone(), config.ipfs, 50); - let network = GraphNetwork::new(network_subgraph_data.subgraphs, ipfs).await; + let network = + GraphNetwork::new(network_subgraph_data.subgraphs, block_oracle_chains, ipfs).await; // Indexer blocklist // Periodically check the defective POIs list against the network indexers and update the diff --git a/graph-gateway/src/topology.rs b/graph-gateway/src/topology.rs index d484a528..5c683011 100644 --- a/graph-gateway/src/topology.rs +++ b/graph-gateway/src/topology.rs @@ -1,18 +1,21 @@ -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use alloy_primitives::Address; use anyhow::anyhow; use eventuals::{Eventual, EventualExt, Ptr}; use futures::future::join_all; +use indoc::indoc; use itertools::Itertools; use prelude::GRT; use serde::Deserialize; +use thegraph::client as subgraph_client; use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::RwLock; use toolshed::url::Url; -use crate::{ipfs, network_subgraph}; +use crate::{config, ipfs, network_subgraph, spawn_poller}; /// Representation of the graph network being used to serve queries #[derive(Clone)] @@ -38,6 +41,7 @@ pub struct Subgraph { pub struct Deployment { pub id: DeploymentId, pub manifest: Arc, + pub expect_attestation: bool, /// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical /// allocation using the largest allocation ID and sum of the allocated tokens. pub indexers: Vec>, @@ -85,6 +89,7 @@ pub struct Manifest { impl GraphNetwork { pub async fn new( subgraphs: Eventual>>, + block_oracle_chains: Eventual>>, ipfs: Arc, ) -> Self { let cache: &'static RwLock = Box::leak(Box::new(RwLock::new(IpfsCache { @@ -94,8 +99,9 @@ impl GraphNetwork { // Create a lookup table for subgraphs, keyed by their ID. // Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4 - let subgraphs = subgraphs.map(move |subgraphs| async move { - Ptr::new(Self::subgraphs(&subgraphs, cache).await) + let subgraphs = subgraphs.map(move |subgraphs| { + let block_oracle_chains = block_oracle_chains.clone(); + async move { Ptr::new(Self::subgraphs(&subgraphs, cache, block_oracle_chains).await) } }); // Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash). @@ -134,19 +140,20 @@ impl GraphNetwork { async fn subgraphs( subgraphs: &[network_subgraph::Subgraph], cache: &'static RwLock, + block_oracle_chains: Eventual>>, ) -> HashMap { + let block_oracle_chains: &HashSet = + &block_oracle_chains.value_immediate().unwrap_or_default(); join_all(subgraphs.iter().map(|subgraph| async move { let id = subgraph.id; - let deployments = join_all( - subgraph - .versions - .iter() - .map(|version| Self::deployment(subgraphs, version, cache)), - ) - .await - .into_iter() - .flatten() - .collect(); + let deployments = + join_all(subgraph.versions.iter().map(|version| { + Self::deployment(subgraphs, version, cache, block_oracle_chains) + })) + .await + .into_iter() + .flatten() + .collect(); let subgraph = Subgraph { deployments, id, @@ -163,6 +170,7 @@ impl GraphNetwork { subgraphs: &[network_subgraph::Subgraph], version: &network_subgraph::SubgraphVersion, cache: &'static RwLock, + block_oracle_chains: &HashSet, ) -> Option> { let id = version.subgraph_deployment.id; let manifest = IpfsCache::manifest(cache, &version.subgraph_deployment.id).await?; @@ -215,9 +223,13 @@ impl GraphNetwork { let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2 && version.subgraph_deployment.allocations.is_empty(); + let expect_attestation = + manifest.features.is_empty() && block_oracle_chains.contains(&manifest.network); + Some(Arc::new(Deployment { id, manifest, + expect_attestation, subgraphs, indexers, transferred_to_l2, @@ -299,3 +311,48 @@ impl IpfsCache { }) } } + +/// Returns the set of CAIP-2 IDs supported by the Block Oracle. +pub async fn block_oracle_chains( + subgraph: subgraph_client::Client, + chains: &[config::Chain], +) -> Eventual>> { + let query = indoc! {" + networks( + block: $block + orderBy: id + orderDirection: asc + first: $first + where: { + id_gt: $last + } + ) { + id + } + "}; + #[derive(Clone, Deserialize)] + struct Network { + id: String, + } + let chains: &'static HashMap> = Box::leak(Box::new( + chains + .iter() + .map(|c| (c.caip2_id.to_string(), HashSet::from_iter(c.names.clone()))) + .collect::>(), + )); + let reader = spawn_poller::( + subgraph, + query.into(), + "block_oracle", + Duration::from_secs(120), + ) + .map(move |networks| async move { + networks + .iter() + .flat_map(|n| chains.get(&n.id).cloned().unwrap_or_default()) + .collect::>() + .into() + }); + reader.value().await.unwrap(); + reader +}