Skip to content

Commit

Permalink
feat: block oracle subgraph
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 7, 2023
1 parent 0115852 commit 143739e
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 23 deletions.
3 changes: 1 addition & 2 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,7 @@ async fn handle_indexer_query_inner(
}
}

// Return early if we aren't expecting an attestation.
if !ctx.deployment.manifest.features.is_empty() {
if !ctx.deployment.expect_attestation {
return Ok(response.payload);
}

Expand Down
5 changes: 5 additions & 0 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct Config {
/// Respect the payment state of API keys (disable for testnets)
pub api_key_payment_required: bool,
pub attestations: AttestationConfig,
/// Block oracle subgraph URL
#[serde_as(as = "DisplayFromStr")]
pub block_oracle_subgraph: Url,
pub chains: Vec<Chain>,
/// Ethereum RPC provider, or fixed exchange rate for testing
pub exchange_rate_provider: ExchangeRateProvider,
Expand Down Expand Up @@ -86,6 +89,8 @@ pub struct AttestationConfig {
#[serde_as]
#[derive(Clone, Debug, Deserialize)]
pub struct Chain {
/// CAIP-2 ID used to identify chains in the block oracle.
pub caip2_id: String,
/// The first name is used in logs, the others are aliases also supported in subgraph manifests.
pub names: Vec<String>,
#[serde_as(as = "DisplayFromStr")]
Expand Down
35 changes: 34 additions & 1 deletion graph-gateway/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::iter;
use std::{iter, time::Duration};

use axum::Json;
use eventuals::{Eventual, EventualExt as _, Ptr};
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
use serde::Deserialize;
use thegraph::client as subgraph_client;
use tokio::sync::Mutex;

pub mod auth;
pub mod block_constraints;
Expand Down Expand Up @@ -42,3 +46,32 @@ where
);
(headers, Json(payload))
}

pub fn spawn_poller<T>(
client: subgraph_client::Client,
query: String,
label: &'static str,
interval: Duration,
) -> Eventual<Ptr<Vec<T>>>
where
T: for<'de> Deserialize<'de> + Send + 'static,
Ptr<Vec<T>>: Send,
{
let (writer, reader) = Eventual::new();
let state: &'static Mutex<_> = Box::leak(Box::new(Mutex::new((writer, client))));
eventuals::timer(interval)
.pipe_async(move |_| {
let query = query.clone();
async move {
let mut guard = state.lock().await;
match guard.1.paginated_query::<T>(query).await {
Ok(response) => guard.0.write(Ptr::new(response)),
Err(subgraph_poll_err) => {
tracing::error!(label, %subgraph_poll_err);
}
};
}
})
.forever();
reader
}
20 changes: 14 additions & 6 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use axum::{
routing, Router, Server,
};
use eventuals::{Eventual, EventualExt as _, Ptr};
use graph_gateway::topology::block_oracle_chains;
use graph_subscriptions::subscription_tier::SubscriptionTiers;
use prometheus::{self, Encoder as _};
use secp256k1::SecretKey;
Expand Down Expand Up @@ -101,6 +102,17 @@ async fn main() {
.filter(|_| !config.geoip_blocked_countries.is_empty())
.map(|db| GeoIP::new(db, config.geoip_blocked_countries).unwrap());

let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();

let block_oracle_chains = block_oracle_chains(
subgraph_client::Client::new(http_client.clone(), config.block_oracle_subgraph),
&config.chains,
)
.await;

let block_caches: HashMap<String, &'static BlockCache> = config
.chains
.into_iter()
Expand All @@ -113,11 +125,6 @@ async fn main() {
let block_caches: &'static HashMap<String, &'static BlockCache> =
Box::leak(Box::new(block_caches));

let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();

let grt_per_usd: Eventual<GRT> = match config.exchange_rate_provider {
ExchangeRateProvider::Fixed(grt_per_usd) => Eventual::from_value(GRT(grt_per_usd)),
ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url).await.unwrap(),
Expand Down Expand Up @@ -149,7 +156,8 @@ async fn main() {
)));

let ipfs = ipfs::Client::new(http_client.clone(), config.ipfs, 50);
let network = GraphNetwork::new(network_subgraph_data.subgraphs, ipfs).await;
let network =
GraphNetwork::new(network_subgraph_data.subgraphs, block_oracle_chains, ipfs).await;

// Indexer blocklist
// Periodically check the defective POIs list against the network indexers and update the
Expand Down
85 changes: 71 additions & 14 deletions graph-gateway/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use alloy_primitives::Address;
use anyhow::anyhow;
use eventuals::{Eventual, EventualExt, Ptr};
use futures::future::join_all;
use indoc::indoc;
use itertools::Itertools;
use prelude::GRT;
use serde::Deserialize;
use thegraph::client as subgraph_client;
use thegraph::types::{DeploymentId, SubgraphId};
use tokio::sync::RwLock;
use toolshed::url::Url;

use crate::{ipfs, network_subgraph};
use crate::{config, ipfs, network_subgraph, spawn_poller};

/// Representation of the graph network being used to serve queries
#[derive(Clone)]
Expand All @@ -38,6 +41,7 @@ pub struct Subgraph {
pub struct Deployment {
pub id: DeploymentId,
pub manifest: Arc<Manifest>,
pub expect_attestation: bool,
/// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical
/// allocation using the largest allocation ID and sum of the allocated tokens.
pub indexers: Vec<Arc<Indexer>>,
Expand Down Expand Up @@ -85,6 +89,7 @@ pub struct Manifest {
impl GraphNetwork {
pub async fn new(
subgraphs: Eventual<Ptr<Vec<network_subgraph::Subgraph>>>,
block_oracle_chains: Eventual<Ptr<HashSet<String>>>,
ipfs: Arc<ipfs::Client>,
) -> Self {
let cache: &'static RwLock<IpfsCache> = Box::leak(Box::new(RwLock::new(IpfsCache {
Expand All @@ -94,8 +99,9 @@ impl GraphNetwork {

// Create a lookup table for subgraphs, keyed by their ID.
// Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let subgraphs = subgraphs.map(move |subgraphs| async move {
Ptr::new(Self::subgraphs(&subgraphs, cache).await)
let subgraphs = subgraphs.map(move |subgraphs| {
let block_oracle_chains = block_oracle_chains.clone();
async move { Ptr::new(Self::subgraphs(&subgraphs, cache, block_oracle_chains).await) }
});

// Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash).
Expand Down Expand Up @@ -134,19 +140,20 @@ impl GraphNetwork {
async fn subgraphs(
subgraphs: &[network_subgraph::Subgraph],
cache: &'static RwLock<IpfsCache>,
block_oracle_chains: Eventual<Ptr<HashSet<String>>>,
) -> HashMap<SubgraphId, Subgraph> {
let block_oracle_chains: &HashSet<String> =
&block_oracle_chains.value_immediate().unwrap_or_default();
join_all(subgraphs.iter().map(|subgraph| async move {
let id = subgraph.id;
let deployments = join_all(
subgraph
.versions
.iter()
.map(|version| Self::deployment(subgraphs, version, cache)),
)
.await
.into_iter()
.flatten()
.collect();
let deployments =
join_all(subgraph.versions.iter().map(|version| {
Self::deployment(subgraphs, version, cache, block_oracle_chains)
}))
.await
.into_iter()
.flatten()
.collect();
let subgraph = Subgraph {
deployments,
id,
Expand All @@ -163,6 +170,7 @@ impl GraphNetwork {
subgraphs: &[network_subgraph::Subgraph],
version: &network_subgraph::SubgraphVersion,
cache: &'static RwLock<IpfsCache>,
block_oracle_chains: &HashSet<String>,
) -> Option<Arc<Deployment>> {
let id = version.subgraph_deployment.id;
let manifest = IpfsCache::manifest(cache, &version.subgraph_deployment.id).await?;
Expand Down Expand Up @@ -215,9 +223,13 @@ impl GraphNetwork {
let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2
&& version.subgraph_deployment.allocations.is_empty();

let expect_attestation =
manifest.features.is_empty() && block_oracle_chains.contains(&manifest.network);

Some(Arc::new(Deployment {
id,
manifest,
expect_attestation,
subgraphs,
indexers,
transferred_to_l2,
Expand Down Expand Up @@ -299,3 +311,48 @@ impl IpfsCache {
})
}
}

/// Returns the set of CAIP-2 IDs supported by the Block Oracle.
pub async fn block_oracle_chains(
subgraph: subgraph_client::Client,
chains: &[config::Chain],
) -> Eventual<Ptr<HashSet<String>>> {
let query = indoc! {"
networks(
block: $block
orderBy: id
orderDirection: asc
first: $first
where: {
id_gt: $last
}
) {
id
}
"};
#[derive(Clone, Deserialize)]
struct Network {
id: String,
}
let chains: &'static HashMap<String, HashSet<String>> = Box::leak(Box::new(
chains
.iter()
.map(|c| (c.caip2_id.to_string(), HashSet::from_iter(c.names.clone())))
.collect::<HashMap<_, _>>(),
));
let reader = spawn_poller::<Network>(
subgraph,
query.into(),
"block_oracle",
Duration::from_secs(120),
)
.map(move |networks| async move {
networks
.iter()
.flat_map(|n| chains.get(&n.id).cloned().unwrap_or_default())
.collect::<HashSet<String>>()
.into()
});
reader.value().await.unwrap();
reader
}

0 comments on commit 143739e

Please sign in to comment.