Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: block oracle subgraph #449

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions graph-gateway/src/chains/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl super::Client for Client {
type Config = config::Chain;

fn chain_name(config: &Self::Config) -> &str {
&config.name
&config.names[0]
}

fn poll_interval() -> Duration {
Expand All @@ -34,7 +34,7 @@ impl super::Client for Client {
chain: config::Chain,
notify: mpsc::UnboundedSender<ClientMsg>,
) -> mpsc::UnboundedSender<UnresolvedBlock> {
let _trace = tracing::info_span!("Ethereum Client Actor", chain = %chain.name).entered();
let _trace = tracing::info_span!("ethereum_client", chain = %chain.names[0]).entered();
let (unresolved_tx, mut unresolved_rx) = mpsc::unbounded_channel();
let mut client = Self {
chain,
Expand Down Expand Up @@ -66,7 +66,7 @@ impl super::Client for Client {
impl Client {
async fn spawn_block_fetch(&mut self, unresolved: Option<UnresolvedBlock>) {
let client = self.http_client.clone();
let chain = self.chain.name.clone();
let chain = self.chain.names[0].clone();
let rpc = self.chain.rpc.clone();
let notify = self.notify.clone();
tokio::spawn(async move {
Expand Down
12 changes: 5 additions & 7 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct Context {
pub budgeter: &'static Budgeter,
pub indexer_selection_retry_limit: usize,
pub l2_gateway: Option<Url>,
pub block_caches: &'static HashMap<String, BlockCache>,
pub block_caches: &'static HashMap<String, &'static BlockCache>,
pub network: GraphNetwork,
pub indexing_statuses: Eventual<Ptr<HashMap<Indexing, IndexingStatus>>>,
pub attestation_domain: &'static Eip712Domain,
Expand Down Expand Up @@ -444,10 +444,9 @@ async fn handle_client_query_inner(
})
.collect();

let block_cache = ctx
let block_cache = *ctx
.block_caches
.get(&subgraph_chain)
.cloned()
.ok_or_else(|| Error::SubgraphChainNotSupported(subgraph_chain))?;

let block_constraints = block_constraints(&context)
Expand Down Expand Up @@ -612,7 +611,7 @@ async fn handle_client_query_inner(
};

let latest_query_block = pick_latest_query_block(
&block_cache,
block_cache,
latest_block.number.saturating_sub(selection.blocks_behind),
blocks_per_minute,
)
Expand Down Expand Up @@ -642,7 +641,7 @@ async fn handle_client_query_inner(
&latest_block,
&latest_query_block,
&utility_params.requirements,
&block_cache,
block_cache,
&selection,
)
.await;
Expand Down Expand Up @@ -819,8 +818,7 @@ async fn handle_indexer_query_inner(
}
}

// Return early if we aren't expecting an attestation.
if !ctx.deployment.manifest.features.is_empty() {
if !ctx.deployment.expect_attestation {
return Ok(response.payload);
}

Expand Down
8 changes: 7 additions & 1 deletion graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct Config {
/// Respect the payment state of API keys (disable for testnets)
pub api_key_payment_required: bool,
pub attestations: AttestationConfig,
/// Block oracle subgraph URL
#[serde_as(as = "DisplayFromStr")]
pub block_oracle_subgraph: Url,
pub chains: Vec<Chain>,
/// Ethereum RPC provider, or fixed exchange rate for testing
pub exchange_rate_provider: ExchangeRateProvider,
Expand Down Expand Up @@ -86,7 +89,10 @@ pub struct AttestationConfig {
#[serde_as]
#[derive(Clone, Debug, Deserialize)]
pub struct Chain {
pub name: String,
/// CAIP-2 ID used to identify chains in the block oracle.
pub caip2_id: String,
/// The first name is used in logs, the others are aliases also supported in subgraph manifests.
pub names: Vec<String>,
#[serde_as(as = "DisplayFromStr")]
pub rpc: Url,
}
Expand Down
35 changes: 34 additions & 1 deletion graph-gateway/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::iter;
use std::{iter, time::Duration};

use axum::Json;
use eventuals::{Eventual, EventualExt as _, Ptr};
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
use serde::Deserialize;
use thegraph::client as subgraph_client;
use tokio::sync::Mutex;

pub mod auth;
pub mod block_constraints;
Expand Down Expand Up @@ -42,3 +46,32 @@ where
);
(headers, Json(payload))
}

pub fn spawn_poller<T>(
client: subgraph_client::Client,
query: String,
label: &'static str,
interval: Duration,
) -> Eventual<Ptr<Vec<T>>>
where
T: for<'de> Deserialize<'de> + Send + 'static,
Ptr<Vec<T>>: Send,
{
let (writer, reader) = Eventual::new();
let state: &'static Mutex<_> = Box::leak(Box::new(Mutex::new((writer, client))));
eventuals::timer(interval)
.pipe_async(move |_| {
let query = query.clone();
async move {
let mut guard = state.lock().await;
match guard.1.paginated_query::<T>(query).await {
Ok(response) => guard.0.write(Ptr::new(response)),
Err(subgraph_poll_err) => {
tracing::error!(label, %subgraph_poll_err);
}
};
}
})
.forever();
reader
}
33 changes: 21 additions & 12 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use axum::{
routing, Router, Server,
};
use eventuals::{Eventual, EventualExt as _, Ptr};
use graph_gateway::topology::block_oracle_chains;
use graph_subscriptions::subscription_tier::SubscriptionTiers;
use prometheus::{self, Encoder as _};
use secp256k1::SecretKey;
Expand Down Expand Up @@ -101,22 +102,29 @@ async fn main() {
.filter(|_| !config.geoip_blocked_countries.is_empty())
.map(|db| GeoIP::new(db, config.geoip_blocked_countries).unwrap());

let block_caches = config
.chains
.into_iter()
.map(|chain| {
let network = chain.name.clone();
let cache = BlockCache::new::<ethereum::Client>(chain);
(network, cache)
})
.collect::<HashMap<String, BlockCache>>();
let block_caches: &'static HashMap<String, BlockCache> = Box::leak(Box::new(block_caches));

let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();

let block_oracle_chains = block_oracle_chains(
subgraph_client::Client::new(http_client.clone(), config.block_oracle_subgraph),
&config.chains,
)
.await;

let block_caches: HashMap<String, &'static BlockCache> = config
.chains
.into_iter()
.flat_map(|chain| {
let cache: &'static BlockCache =
Box::leak(Box::new(BlockCache::new::<ethereum::Client>(chain.clone())));
chain.names.into_iter().map(move |alias| (alias, cache))
})
.collect();
let block_caches: &'static HashMap<String, &'static BlockCache> =
Box::leak(Box::new(block_caches));

let grt_per_usd: Eventual<GRT> = match config.exchange_rate_provider {
ExchangeRateProvider::Fixed(grt_per_usd) => Eventual::from_value(GRT(grt_per_usd)),
ExchangeRateProvider::Rpc(url) => exchange_rate::grt_per_usd(url).await.unwrap(),
Expand Down Expand Up @@ -148,7 +156,8 @@ async fn main() {
)));

let ipfs = ipfs::Client::new(http_client.clone(), config.ipfs, 50);
let network = GraphNetwork::new(network_subgraph_data.subgraphs, ipfs).await;
let network =
GraphNetwork::new(network_subgraph_data.subgraphs, block_oracle_chains, ipfs).await;

// Indexer blocklist
// Periodically check the defective POIs list against the network indexers and update the
Expand Down
85 changes: 71 additions & 14 deletions graph-gateway/src/topology.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use alloy_primitives::Address;
use anyhow::anyhow;
use eventuals::{Eventual, EventualExt, Ptr};
use futures::future::join_all;
use indoc::indoc;
use itertools::Itertools;
use prelude::GRT;
use serde::Deserialize;
use thegraph::client as subgraph_client;
use thegraph::types::{DeploymentId, SubgraphId};
use tokio::sync::RwLock;
use toolshed::url::Url;

use crate::{ipfs, network_subgraph};
use crate::{config, ipfs, network_subgraph, spawn_poller};

/// Representation of the graph network being used to serve queries
#[derive(Clone)]
Expand All @@ -38,6 +41,7 @@ pub struct Subgraph {
pub struct Deployment {
pub id: DeploymentId,
pub manifest: Arc<Manifest>,
pub expect_attestation: bool,
/// An indexer may have multiple active allocations on a deployment. We collapse them into a single logical
/// allocation using the largest allocation ID and sum of the allocated tokens.
pub indexers: Vec<Arc<Indexer>>,
Expand Down Expand Up @@ -85,6 +89,7 @@ pub struct Manifest {
impl GraphNetwork {
pub async fn new(
subgraphs: Eventual<Ptr<Vec<network_subgraph::Subgraph>>>,
block_oracle_chains: Eventual<Ptr<HashSet<String>>>,
ipfs: Arc<ipfs::Client>,
) -> Self {
let cache: &'static RwLock<IpfsCache> = Box::leak(Box::new(RwLock::new(IpfsCache {
Expand All @@ -94,8 +99,9 @@ impl GraphNetwork {

// Create a lookup table for subgraphs, keyed by their ID.
// Invalid URL indexers are filtered out. See: 7f2f89aa-24c9-460b-ab1e-fc94697c4f4
let subgraphs = subgraphs.map(move |subgraphs| async move {
Ptr::new(Self::subgraphs(&subgraphs, cache).await)
let subgraphs = subgraphs.map(move |subgraphs| {
let block_oracle_chains = block_oracle_chains.clone();
async move { Ptr::new(Self::subgraphs(&subgraphs, cache, block_oracle_chains).await) }
});

// Create a lookup table for deployments, keyed by their ID (which is also their IPFS hash).
Expand Down Expand Up @@ -134,19 +140,20 @@ impl GraphNetwork {
async fn subgraphs(
subgraphs: &[network_subgraph::Subgraph],
cache: &'static RwLock<IpfsCache>,
block_oracle_chains: Eventual<Ptr<HashSet<String>>>,
) -> HashMap<SubgraphId, Subgraph> {
let block_oracle_chains: &HashSet<String> =
&block_oracle_chains.value_immediate().unwrap_or_default();
join_all(subgraphs.iter().map(|subgraph| async move {
let id = subgraph.id;
let deployments = join_all(
subgraph
.versions
.iter()
.map(|version| Self::deployment(subgraphs, version, cache)),
)
.await
.into_iter()
.flatten()
.collect();
let deployments =
join_all(subgraph.versions.iter().map(|version| {
Self::deployment(subgraphs, version, cache, block_oracle_chains)
}))
.await
.into_iter()
.flatten()
.collect();
let subgraph = Subgraph {
deployments,
id,
Expand All @@ -163,6 +170,7 @@ impl GraphNetwork {
subgraphs: &[network_subgraph::Subgraph],
version: &network_subgraph::SubgraphVersion,
cache: &'static RwLock<IpfsCache>,
block_oracle_chains: &HashSet<String>,
) -> Option<Arc<Deployment>> {
let id = version.subgraph_deployment.id;
let manifest = IpfsCache::manifest(cache, &version.subgraph_deployment.id).await?;
Expand Down Expand Up @@ -215,9 +223,13 @@ impl GraphNetwork {
let transferred_to_l2 = version.subgraph_deployment.transferred_to_l2
&& version.subgraph_deployment.allocations.is_empty();

let expect_attestation =
manifest.features.is_empty() && block_oracle_chains.contains(&manifest.network);

Some(Arc::new(Deployment {
id,
manifest,
expect_attestation,
subgraphs,
indexers,
transferred_to_l2,
Expand Down Expand Up @@ -299,3 +311,48 @@ impl IpfsCache {
})
}
}

/// Returns the set of CAIP-2 IDs supported by the Block Oracle.
pub async fn block_oracle_chains(
subgraph: subgraph_client::Client,
chains: &[config::Chain],
) -> Eventual<Ptr<HashSet<String>>> {
let query = indoc! {"
networks(
block: $block
orderBy: id
orderDirection: asc
first: $first
where: {
id_gt: $last
}
) {
id
}
"};
#[derive(Clone, Deserialize)]
struct Network {
id: String,
}
let chains: &'static HashMap<String, HashSet<String>> = Box::leak(Box::new(
chains
.iter()
.map(|c| (c.caip2_id.to_string(), HashSet::from_iter(c.names.clone())))
.collect::<HashMap<_, _>>(),
));
let reader = spawn_poller::<Network>(
subgraph,
query.into(),
"block_oracle",
Duration::from_secs(120),
)
.map(move |networks| async move {
networks
.iter()
.flat_map(|n| chains.get(&n.id).cloned().unwrap_or_default())
.collect::<HashSet<String>>()
.into()
});
reader.value().await.unwrap();
reader
}
Loading