From 7cfed3c60f42cc9c255fb4d44d40e6c086db888c Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Tue, 28 Nov 2023 17:08:19 +0100 Subject: [PATCH] chore(deps): use thegraph dependency types and subgraph client --- Cargo.lock | 32 ++- Cargo.toml | 3 +- graph-gateway/Cargo.toml | 1 + graph-gateway/src/auth.rs | 2 +- graph-gateway/src/block_constraints.rs | 2 +- graph-gateway/src/chains/ethereum.rs | 3 +- graph-gateway/src/chains/mod.rs | 2 +- graph-gateway/src/chains/test.rs | 2 +- graph-gateway/src/client_query.rs | 2 +- graph-gateway/src/indexer_client.rs | 2 +- .../src/indexers_status/cost_models/query.rs | 2 +- .../indexing_statuses/query.rs | 2 +- .../src/indexers_status/public_poi/client.rs | 2 +- .../src/indexers_status/public_poi/query.rs | 2 +- graph-gateway/src/indexing.rs | 2 +- graph-gateway/src/indexings_blocklist.rs | 2 +- graph-gateway/src/lib.rs | 1 - graph-gateway/src/main.rs | 9 +- graph-gateway/src/network_subgraph.rs | 5 +- graph-gateway/src/poi.rs | 2 +- graph-gateway/src/reports.rs | 3 +- graph-gateway/src/subgraph_client.rs | 4 - graph-gateway/src/subgraph_client/client.rs | 254 ------------------ graph-gateway/src/subgraph_client/queries.rs | 205 -------------- graph-gateway/src/subgraph_studio.rs | 2 +- graph-gateway/src/subscriptions_subgraph.rs | 5 +- graph-gateway/src/topology.rs | 2 +- .../it_indexers_status_indexing_statuses.rs | 2 +- .../tests/it_indexers_status_public_pois.rs | 2 +- graph-gateway/tests/it_indexings_blocklist.rs | 2 +- graph-gateway/tests/it_subgraph_client.rs | 181 ------------- indexer-selection/Cargo.toml | 1 + indexer-selection/src/lib.rs | 2 +- indexer-selection/src/simulation.rs | 2 +- indexer-selection/src/test.rs | 2 +- indexer-selection/src/test_utils.rs | 2 +- 36 files changed, 68 insertions(+), 683 deletions(-) delete mode 100644 graph-gateway/src/subgraph_client.rs delete mode 100644 graph-gateway/src/subgraph_client/client.rs delete mode 100644 graph-gateway/src/subgraph_client/queries.rs delete mode 100644 graph-gateway/tests/it_subgraph_client.rs diff --git a/Cargo.lock b/Cargo.lock index cd19bae8..1f4d8985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1719,6 +1719,7 @@ dependencies = [ "simple-rate-limiter", "tap_core", "test-with", + "thegraph", "thiserror", "tokio", "toolshed", @@ -2090,6 +2091,7 @@ dependencies = [ "rand", "rand_distr", "siphasher", + "thegraph", "tokio", "toolshed", "tracing", @@ -4124,6 +4126,28 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "thegraph" +version = "0.1.1" +source = "git+https://github.com/edgeandnode/toolshed?tag=thegraph-v0.1.1#5694de09fad473489012866c3fbca85a5eae098b" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "async-graphql", + "bs58", + "ethers-core", + "graphql-http", + "indoc", + "reqwest", + "serde", + "serde_json", + "serde_with 3.4.0", + "sha3", + "thiserror", + "toolshed", + "tracing", +] + [[package]] name = "thiserror" version = "1.0.50" @@ -4318,8 +4342,8 @@ dependencies = [ [[package]] name = "toolshed" -version = "0.3.0" -source = "git+https://github.com/edgeandnode/toolshed?tag=v0.3.0#af7b1c8be57a488122f015eb95f2051bb704cdff" +version = "0.4.0" +source = "git+https://github.com/edgeandnode/toolshed?tag=v0.4.0#aa885d38818101d4c30d4869ab060f8c2c29f13b" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -4751,9 +4775,9 @@ checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "web-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" +checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index d667baf2..db56bc89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,5 +28,6 @@ tokio = { version = "1.24", features = [ "sync", "time", ] } -toolshed = { git = "https://github.com/edgeandnode/toolshed", tag = "v0.3.0" } +toolshed = { git = "https://github.com/edgeandnode/toolshed", tag = "v0.4.0" } +thegraph = { git = "https://github.com/edgeandnode/toolshed", tag = "thegraph-v0.1.1" } tracing = { version = "0.1", default-features = false } diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index 22c65650..974ddf7c 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -45,6 +45,7 @@ serde_with = "3.1" serde_yaml = "0.9" simple-rate-limiter = "1.0" tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "e55ea03" } +thegraph = { workspace = true, features = ["subgraph-client"] } thiserror = "1.0.40" tokio.workspace = true toolshed.workspace = true diff --git a/graph-gateway/src/auth.rs b/graph-gateway/src/auth.rs index 4d01bec1..dd2ad7ca 100644 --- a/graph-gateway/src/auth.rs +++ b/graph-gateway/src/auth.rs @@ -8,8 +8,8 @@ use anyhow::{anyhow, bail, ensure, Result}; use eventuals::{Eventual, EventualExt, Ptr}; use graph_subscriptions::TicketPayload; use prelude::USD; +use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::RwLock; -use toolshed::thegraph::{DeploymentId, SubgraphId}; use crate::subgraph_studio::{APIKey, QueryStatus}; use crate::subscriptions::Subscription; diff --git a/graph-gateway/src/block_constraints.rs b/graph-gateway/src/block_constraints.rs index 8855b166..513126c5 100644 --- a/graph-gateway/src/block_constraints.rs +++ b/graph-gateway/src/block_constraints.rs @@ -9,7 +9,7 @@ use graphql::{IntoStaticValue as _, StaticValue}; use indexer_selection::UnresolvedBlock; use itertools::Itertools as _; use serde_json::{self, json}; -use toolshed::thegraph::BlockPointer; +use thegraph::types::BlockPointer; #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub enum BlockConstraint { diff --git a/graph-gateway/src/chains/ethereum.rs b/graph-gateway/src/chains/ethereum.rs index 5dbb886c..5ff16ed1 100644 --- a/graph-gateway/src/chains/ethereum.rs +++ b/graph-gateway/src/chains/ethereum.rs @@ -4,9 +4,10 @@ use alloy_primitives::BlockHash; use indexer_selection::UnresolvedBlock; use serde::{de::Error, Deserialize, Deserializer}; use serde_json::{json, Value as JSON}; +use thegraph::types::BlockPointer; use tokio::sync::mpsc; use tokio::time::interval; -use toolshed::{thegraph::BlockPointer, url::Url}; +use toolshed::url::Url; use tracing::Instrument; use super::{BlockHead, ClientMsg}; diff --git a/graph-gateway/src/chains/mod.rs b/graph-gateway/src/chains/mod.rs index 97592bb8..50e70b73 100644 --- a/graph-gateway/src/chains/mod.rs +++ b/graph-gateway/src/chains/mod.rs @@ -5,9 +5,9 @@ use alloy_primitives::{BlockHash, BlockNumber}; use eventuals::{Eventual, EventualWriter}; use indexer_selection::UnresolvedBlock; use prelude::epoch_cache::EpochCache; +use thegraph::types::BlockPointer; use tokio::sync::{mpsc, oneshot}; use tokio::time::interval; -use toolshed::thegraph::BlockPointer; use tracing::Instrument; use crate::{block_constraints::*, metrics::*}; diff --git a/graph-gateway/src/chains/test.rs b/graph-gateway/src/chains/test.rs index c7ee6e61..ff6aed85 100644 --- a/graph-gateway/src/chains/test.rs +++ b/graph-gateway/src/chains/test.rs @@ -1,8 +1,8 @@ use std::time::Duration; use indexer_selection::UnresolvedBlock; +use thegraph::types::BlockPointer; use tokio::sync::mpsc; -use toolshed::thegraph::BlockPointer; use crate::chains::BlockHead; diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index e64f5858..f29500a2 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -34,8 +34,8 @@ use rand::{rngs::SmallRng, SeedableRng as _}; use serde::Deserialize; use serde_json::json; use serde_json::value::RawValue; +use thegraph::types::{attestation, BlockPointer, DeploymentId, SubgraphId}; use tokio::sync::mpsc; -use toolshed::thegraph::{attestation, BlockPointer, DeploymentId, SubgraphId}; use toolshed::url::Url; use tracing::Instrument; use uuid::Uuid; diff --git a/graph-gateway/src/indexer_client.rs b/graph-gateway/src/indexer_client.rs index aee1474a..bb844cdc 100644 --- a/graph-gateway/src/indexer_client.rs +++ b/graph-gateway/src/indexer_client.rs @@ -4,7 +4,7 @@ use alloy_primitives::BlockNumber; use axum::http::StatusCode; use indexer_selection::Selection; use serde::Deserialize; -use toolshed::thegraph::attestation::Attestation; +use thegraph::types::attestation::Attestation; use crate::receipts::{ReceiptSigner, ReceiptStatus, ScalarReceipt}; diff --git a/graph-gateway/src/indexers_status/cost_models/query.rs b/graph-gateway/src/indexers_status/cost_models/query.rs index 2b91b9cc..062eddb5 100644 --- a/graph-gateway/src/indexers_status/cost_models/query.rs +++ b/graph-gateway/src/indexers_status/cost_models/query.rs @@ -2,7 +2,7 @@ use graphql_http::graphql::{Document, IntoDocument, IntoDocumentWithVariables}; use indoc::indoc; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; pub(super) const COST_MODEL_QUERY_DOCUMENT: &str = indoc! { r#"query ($deployments: [String!]!) { diff --git a/graph-gateway/src/indexers_status/indexing_statuses/query.rs b/graph-gateway/src/indexers_status/indexing_statuses/query.rs index 39998c9d..f2f9f1b6 100644 --- a/graph-gateway/src/indexers_status/indexing_statuses/query.rs +++ b/graph-gateway/src/indexers_status/indexing_statuses/query.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use alloy_primitives::BlockHash; use indoc::indoc; use serde::{Deserialize, Deserializer}; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; pub(super) const INDEXING_STATUSES_QUERY_DOCUMENT: &str = indoc! { r#"{ diff --git a/graph-gateway/src/indexers_status/public_poi/client.rs b/graph-gateway/src/indexers_status/public_poi/client.rs index ce83805e..d017564b 100644 --- a/graph-gateway/src/indexers_status/public_poi/client.rs +++ b/graph-gateway/src/indexers_status/public_poi/client.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use alloy_primitives::BlockNumber; use graphql_http::http_client::ReqwestExt; use itertools::Itertools; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; use toolshed::url::Url; use crate::indexers_status::public_poi::query; diff --git a/graph-gateway/src/indexers_status/public_poi/query.rs b/graph-gateway/src/indexers_status/public_poi/query.rs index d3578bfe..347b2219 100644 --- a/graph-gateway/src/indexers_status/public_poi/query.rs +++ b/graph-gateway/src/indexers_status/public_poi/query.rs @@ -3,7 +3,7 @@ use graphql_http::graphql::{Document, IntoDocument, IntoDocumentWithVariables}; use indoc::indoc; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; pub const MAX_REQUESTS_PER_QUERY: usize = 10; diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index 65cadb17..2cefc3c6 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -7,8 +7,8 @@ use futures::future::join_all; use indexer_selection::Indexing; use prelude::epoch_cache::EpochCache; use semver::Version; +use thegraph::types::{BlockPointer, DeploymentId}; use tokio::sync::Mutex; -use toolshed::thegraph::{BlockPointer, DeploymentId}; use toolshed::url::{url::Host, Url}; use trust_dns_resolver::TokioAsyncResolver as DNSResolver; diff --git a/graph-gateway/src/indexings_blocklist.rs b/graph-gateway/src/indexings_blocklist.rs index 316a28fd..dc964bba 100644 --- a/graph-gateway/src/indexings_blocklist.rs +++ b/graph-gateway/src/indexings_blocklist.rs @@ -5,8 +5,8 @@ use std::time::Duration; use alloy_primitives::Address; use itertools::Itertools; +use thegraph::types::DeploymentId; use tokio::sync::Mutex; -use toolshed::thegraph::DeploymentId; use toolshed::url::Url; use indexer_selection::Indexing; diff --git a/graph-gateway/src/lib.rs b/graph-gateway/src/lib.rs index b77deefa..5a7162f8 100644 --- a/graph-gateway/src/lib.rs +++ b/graph-gateway/src/lib.rs @@ -20,7 +20,6 @@ pub mod network_subgraph; pub mod poi; pub mod receipts; pub mod reports; -pub mod subgraph_client; pub mod subgraph_studio; pub mod subscriptions; pub mod subscriptions_subgraph; diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index a4bb1b1b..ebd0aa15 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -18,18 +18,20 @@ use axum::{ routing, Router, Server, }; use eventuals::{Eventual, EventualExt as _, Ptr}; -use graph_gateway::budgets::Budgeter; use graph_subscriptions::subscription_tier::SubscriptionTiers; use prometheus::{self, Encoder as _}; +use secp256k1::SecretKey; use serde_json::json; use simple_rate_limiter::RateLimiter; +use thegraph::client as subgraph_client; +use thegraph::types::{attestation, DeploymentId}; use tokio::spawn; -use toolshed::thegraph::{attestation, DeploymentId}; use tower_http::cors::{self, CorsLayer}; use graph_gateway::indexings_blocklist::indexings_blocklist; use graph_gateway::{ auth::AuthHandler, + budgets::Budgeter, chains::{ethereum, BlockCache}, client_query, config::{Config, ExchangeRateProvider}, @@ -40,13 +42,12 @@ use graph_gateway::{ receipts::ReceiptSigner, reports, reports::KafkaClient, - subgraph_client, subgraph_studio, subscriptions_subgraph, + subgraph_studio, subscriptions_subgraph, topology::{Deployment, GraphNetwork}, vouchers, JsonResponse, }; use indexer_selection::{actor::Update, BlockStatus, Indexing}; use prelude::{buffer_queue::QueueWriter, *}; -use secp256k1::SecretKey; // Moving the `exchange_rate` module to `lib.rs` makes the doctests to fail during the compilation // step. This module is only used here, so let's keep it here for now. diff --git a/graph-gateway/src/network_subgraph.rs b/graph-gateway/src/network_subgraph.rs index 14e52d28..90a93b20 100644 --- a/graph-gateway/src/network_subgraph.rs +++ b/graph-gateway/src/network_subgraph.rs @@ -6,10 +6,9 @@ use anyhow::anyhow; use eventuals::{self, Eventual, EventualExt as _, EventualWriter, Ptr}; use prelude::*; use serde::Deserialize; +use thegraph::client as subgraph_client; +use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::Mutex; -use toolshed::thegraph::{DeploymentId, SubgraphId}; - -use crate::subgraph_client; pub struct Data { pub network_params: NetworkParams, diff --git a/graph-gateway/src/poi.rs b/graph-gateway/src/poi.rs index e2a8d224..5ea57c86 100644 --- a/graph-gateway/src/poi.rs +++ b/graph-gateway/src/poi.rs @@ -1,5 +1,5 @@ use alloy_primitives::{BlockNumber, B256}; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; pub type ProofOfIndexing = B256; diff --git a/graph-gateway/src/reports.rs b/graph-gateway/src/reports.rs index dd8fc3f0..a6080e13 100644 --- a/graph-gateway/src/reports.rs +++ b/graph-gateway/src/reports.rs @@ -6,7 +6,8 @@ use prost::Message as _; use rdkafka::error::KafkaResult; use serde::Deserialize; use serde_json::{json, Map}; -use toolshed::{concat_bytes, thegraph::attestation::Attestation}; +use thegraph::types::attestation::Attestation; +use toolshed::concat_bytes; use tracing::span; use tracing_subscriber::{filter::FilterFn, layer, prelude::*, registry, EnvFilter, Layer}; diff --git a/graph-gateway/src/subgraph_client.rs b/graph-gateway/src/subgraph_client.rs deleted file mode 100644 index 7f7c9a3c..00000000 --- a/graph-gateway/src/subgraph_client.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod client; -pub mod queries; - -pub use client::*; diff --git a/graph-gateway/src/subgraph_client/client.rs b/graph-gateway/src/subgraph_client/client.rs deleted file mode 100644 index 695467f1..00000000 --- a/graph-gateway/src/subgraph_client/client.rs +++ /dev/null @@ -1,254 +0,0 @@ -use alloy_primitives::aliases::BlockNumber; -use graphql_http::graphql::IntoDocument; -use graphql_http::http::request::IntoRequestParameters; -use graphql_http::http_client::ResponseError; -use serde::de::Deserialize; -use toolshed::thegraph::block_pointer::BlockPointer; -use toolshed::url::Url; - -use super::queries::{ - meta::send_subgraph_meta_query, - page::{send_subgraph_page_query, BlockHeight, SubgraphPageQueryResponseOpaqueEntry}, - send_subgraph_query, -}; - -const DEFAULT_SUBGRAPH_PAGE_QUERY_SIZE: usize = 200; - -async fn send_paginated_query Deserialize<'de>>( - client: &reqwest::Client, - subgraph_url: Url, - query: impl IntoDocument + Clone, - ticket: Option<&str>, - batch_size: usize, - latest_block: BlockNumber, -) -> Result<(Vec, BlockNumber), String> { - // The latest block number that the subgraph has progressed to. - let mut latest_block = latest_block; - // The last id of the previous batch. - let mut last_id: Option = None; - // The block at which the query should be executed. - let mut query_block: Option = None; - - // Vector to store the results of the paginated query. - let mut results = Vec::new(); - - loop { - let block_height = match query_block.as_ref() { - Some(block) => BlockHeight::new_with_block_hash(block.hash), - None => BlockHeight::new_with_block_number_gte(latest_block), - }; - - let response = send_subgraph_page_query( - client, - subgraph_url.clone(), - ticket, - query.clone(), - block_height, - batch_size, - last_id, - ) - .await?; - - let data = match response { - Ok(data) if !data.results.is_empty() => data, - Ok(_) => break, - Err(err) => match err { - ResponseError::Empty => break, - ResponseError::Failure { errors } => { - let errors = errors - .into_iter() - .map(|err| err.message) - .collect::>(); - - if errors - .iter() - .any(|err| err.contains("no block with that hash found")) - { - tracing::info!("Reorg detected. Restarting query to try a new block."); - - last_id = None; - query_block = None; - continue; - } - - return Err(errors.join(", ")); - } - }, - }; - - last_id = Some( - serde_json::from_str::( - data.results.last().unwrap().get(), - ) - .map_err(|_| "failed to extract id for last entry".to_string())? - .id, - ); - query_block = Some(data.meta.block); - - for entry in data.results { - results.push(serde_json::from_str::(entry.get()).map_err(|err| err.to_string())?); - } - } - - if let Some(block) = query_block { - latest_block = block.number; - } - - Ok((results, latest_block)) -} - -/// A client for interacting with a subgraph. -pub struct Client { - http_client: reqwest::Client, - subgraph_url: Url, - - /// The request authentication bearer token. - /// - /// This is token is inserted in the `Authentication` header. - auth_token: Option, - - /// The latest block number that the subgraph has progressed to. - /// This is set to 0 initially and updated after each paginated query. - latest_block: BlockNumber, - - /// The number of entities to fetch per paginated query. - page_size: usize, -} - -impl Client { - /// Create a new client with default settings. - /// - /// The default settings are: - /// - No authentication token - /// - Page size of 200 entities per query - /// - Latest block number of 0 - pub fn new(http_client: reqwest::Client, subgraph_url: Url) -> Self { - Self { - http_client, - subgraph_url, - auth_token: None, - latest_block: 0, - page_size: DEFAULT_SUBGRAPH_PAGE_QUERY_SIZE, - } - } - - /// Create a new client builder. - /// - /// The builder allows for configuring the client before building it. - /// - /// Example: - /// ```text - /// let client = SubgraphClient::builder(http_client, subgraph_url) - /// .with_auth_token(Some(ticket)) - /// .with_page_size(100) - /// .with_subgraph_latest_block(18627000) - /// .build(); - /// ``` - pub fn builder(http_client: reqwest::Client, subgraph_url: Url) -> ClientBuilder { - ClientBuilder::new(http_client, subgraph_url) - } - - pub async fn query Deserialize<'de>>( - &self, - query: impl IntoRequestParameters + Send, - ) -> Result { - send_subgraph_query::( - &self.http_client, - self.subgraph_url.clone(), - self.auth_token.as_deref(), - query, - ) - .await - } - - pub async fn paginated_query Deserialize<'de>>( - &mut self, - query: impl IntoDocument + Clone, - ) -> Result, String> { - // Graph-node is rejecting values of `number_gte:0` on subgraphs with a larger `startBlock`. - // This forces us to request the latest block number from the subgraph before sending the - // paginated query. - // TODO: delete when resolved - if self.latest_block == 0 { - let init = send_subgraph_meta_query( - &self.http_client, - self.subgraph_url.clone(), - self.auth_token.as_deref(), - ) - .await?; - - self.latest_block = init.meta.block.number; - } - - // Send the paginated query. - let (results, latest_block) = send_paginated_query( - &self.http_client, - self.subgraph_url.clone(), - query, - self.auth_token.as_deref(), - self.page_size, - self.latest_block, - ) - .await?; - - self.latest_block = latest_block; - - Ok(results) - } -} - -/// A builder for constructing a subgraph client. -pub struct ClientBuilder { - http_client: reqwest::Client, - subgraph_url: Url, - auth_token: Option, - latest_block: BlockNumber, - page_size: usize, -} - -impl ClientBuilder { - fn new(http_client: reqwest::Client, subgraph_url: Url) -> Self { - Self { - http_client, - subgraph_url, - auth_token: None, - latest_block: 0, - page_size: DEFAULT_SUBGRAPH_PAGE_QUERY_SIZE, - } - } - - /// Set request authentication token. - /// - /// By default all requests are issued non-authenticated. - pub fn with_auth_token(mut self, token: Option) -> Self { - self.auth_token = token; - self - } - - /// Set the number of entities to fetch per page query. - /// - /// The default value is 200 entities per query. - pub fn with_page_size(mut self, size: usize) -> Self { - debug_assert_ne!(size, 0, "page size must be greater than 0"); - self.page_size = size; - self - } - - /// Set the latest block number that the subgraph has progressed to. - /// - /// The default value is 0. - pub fn with_subgraph_latest_block(mut self, latest_block: BlockNumber) -> Self { - self.latest_block = latest_block; - self - } - - pub fn build(self) -> Client { - Client { - http_client: self.http_client, - subgraph_url: self.subgraph_url, - auth_token: self.auth_token, - latest_block: self.latest_block, - page_size: self.page_size, - } - } -} diff --git a/graph-gateway/src/subgraph_client/queries.rs b/graph-gateway/src/subgraph_client/queries.rs deleted file mode 100644 index 32db4a3b..00000000 --- a/graph-gateway/src/subgraph_client/queries.rs +++ /dev/null @@ -1,205 +0,0 @@ -use graphql_http::http::request::IntoRequestParameters; -use graphql_http::http_client::{ReqwestExt, ResponseResult}; -use serde::de::DeserializeOwned; -use toolshed::url::Url; - -/// Send an authenticated GraphQL query to a subgraph. -pub async fn send_query( - client: &reqwest::Client, - url: Url, - ticket: Option<&str>, - query: impl IntoRequestParameters + Send, -) -> Result, String> -where - T: DeserializeOwned, -{ - let mut builder = client.post(url.0); - - if let Some(ticket) = ticket { - builder = builder.bearer_auth(ticket) - } - - // TODO: delete before merge - builder = builder.header("Origin", "https://thegraph.com"); - - let res = builder - .send_graphql(query) - .await - .map_err(|err| err.to_string())?; - - Ok(res) -} - -pub async fn send_subgraph_query( - client: &reqwest::Client, - subgraph_url: Url, - ticket: Option<&str>, - query: impl IntoRequestParameters + Send, -) -> Result -where - T: DeserializeOwned, -{ - send_query(client, subgraph_url, ticket, query) - .await - .map_err(|err| format!("Error sending subgraph graphql query: {}", err))? - .map_err(|err| err.to_string()) -} - -/// Subgraphs sometimes fall behind, be it due to failing or the Graph Node may be having issues. The -/// `_meta` field can now be added to any query so that it is possible to determine against which block -/// the query was effectively executed. -pub mod meta { - use serde::Deserialize; - use toolshed::thegraph::BlockPointer; - use toolshed::url::Url; - - use super::send_query; - - const SUBGRAPH_META_QUERY_DOCUMENT: &str = r#"{ meta: _meta { block { number hash } } }"#; - - #[derive(Debug, Deserialize)] - pub struct SubgraphMetaQueryResponse { - pub meta: Meta, - } - - #[derive(Debug, Deserialize)] - pub struct Meta { - pub block: BlockPointer, - } - - pub async fn send_subgraph_meta_query( - client: &reqwest::Client, - subgraph_url: Url, - ticket: Option<&str>, - ) -> Result { - send_query(client, subgraph_url, ticket, SUBGRAPH_META_QUERY_DOCUMENT) - .await - .map_err(|err| format!("Error sending subgraph meta query: {}", err))? - .map_err(|err| err.to_string()) - } -} - -pub mod page { - use alloy_primitives::{BlockHash, BlockNumber}; - use graphql_http::graphql::{Document, IntoDocument, IntoDocumentWithVariables}; - use graphql_http::http_client::ResponseResult; - use indoc::indoc; - use serde::{Deserialize, Serialize}; - use serde_json::value::RawValue; - use toolshed::url::Url; - - use super::{meta::Meta, send_query}; - - /// The block at which the query should be executed. - /// - /// This is part of the input arguments of the [`SubgraphPageQuery`]. - #[derive(Clone, Debug, Default, Serialize)] - pub struct BlockHeight { - /// Value containing a block hash - #[serde(skip_serializing_if = "Option::is_none")] - hash: Option, - - /// Value containing a block number - #[serde(skip_serializing_if = "Option::is_none")] - number: Option, - - /// Value containing the minimum block number. - /// - /// In the case of `number_gte`, the query will be executed on the latest block only if - /// the subgraph has progressed to or past the minimum block number. - /// Defaults to the latest block when omitted. - #[serde(skip_serializing_if = "Option::is_none")] - number_gte: Option, - } - - impl BlockHeight { - pub fn new_with_block_number_gte(number_gte: BlockNumber) -> Self { - Self { - number_gte: Some(number_gte), - ..Default::default() - } - } - - pub fn new_with_block_hash(hash: BlockHash) -> Self { - Self { - hash: Some(hash), - ..Default::default() - } - } - } - - #[derive(Clone, Debug, Serialize)] - pub struct SubgraphPageQueryVars { - /// The block at which the query should be executed. - block: BlockHeight, - first: usize, - last: String, - } - - pub struct SubgraphPageQuery { - query: Document, - vars: SubgraphPageQueryVars, - } - - impl SubgraphPageQuery { - pub fn new( - query: impl IntoDocument, - block: BlockHeight, - first: usize, - last: String, - ) -> Self { - Self { - query: query.into_document(), - vars: SubgraphPageQueryVars { block, first, last }, - } - } - } - - impl IntoDocumentWithVariables for SubgraphPageQuery { - type Variables = SubgraphPageQueryVars; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - let query = format!( - indoc! { - r#"query ($block: Block_height!, $first: Int!, $last: String!) {{ - meta: _meta(block: $block) {{ block {{ number hash }} }} - results: {query} - }}"# - }, - query = self.query - ); - - (query.into_document(), self.vars) - } - } - - #[derive(Debug, Deserialize)] - pub struct SubgraphPageQueryResponse { - pub meta: Meta, - pub results: Vec>, - } - - #[derive(Debug, Deserialize)] - pub struct SubgraphPageQueryResponseOpaqueEntry { - pub id: String, - } - - pub async fn send_subgraph_page_query( - client: &reqwest::Client, - subgraph_url: Url, - ticket: Option<&str>, - query: impl IntoDocument, - block_height: BlockHeight, - batch_size: usize, - last: Option, - ) -> Result, String> { - send_query( - client, - subgraph_url, - ticket, - SubgraphPageQuery::new(query, block_height, batch_size, last.unwrap_or_default()), - ) - .await - .map_err(|err| format!("Error sending subgraph graphql query: {}", err)) - } -} diff --git a/graph-gateway/src/subgraph_studio.rs b/graph-gateway/src/subgraph_studio.rs index 1bcdaf85..85b8d939 100644 --- a/graph-gateway/src/subgraph_studio.rs +++ b/graph-gateway/src/subgraph_studio.rs @@ -4,8 +4,8 @@ use alloy_primitives::Address; use eventuals::{self, Eventual, EventualExt as _, EventualWriter, Ptr}; use prelude::{UDecimal18, USD}; use serde::Deserialize; +use thegraph::types::{DeploymentId, SubgraphId}; use tokio::{sync::Mutex, time::Duration}; -use toolshed::thegraph::{DeploymentId, SubgraphId}; use toolshed::url::Url; #[derive(Clone, Debug, Default)] diff --git a/graph-gateway/src/subscriptions_subgraph.rs b/graph-gateway/src/subscriptions_subgraph.rs index b7ce8655..8806b3de 100644 --- a/graph-gateway/src/subscriptions_subgraph.rs +++ b/graph-gateway/src/subscriptions_subgraph.rs @@ -4,10 +4,11 @@ use std::{collections::HashMap, sync::Arc}; use alloy_primitives::Address; use eventuals::{self, Eventual, EventualExt as _, EventualWriter, Ptr}; use graph_subscriptions::subscription_tier::SubscriptionTiers; -use prelude::unix_timestamp; +use thegraph::client as subgraph_client; use tokio::sync::Mutex; -use crate::subgraph_client; +use prelude::unix_timestamp; + use crate::subscriptions::{ActiveSubscription, Subscription}; pub struct Client { diff --git a/graph-gateway/src/topology.rs b/graph-gateway/src/topology.rs index 5ea15064..d484a528 100644 --- a/graph-gateway/src/topology.rs +++ b/graph-gateway/src/topology.rs @@ -8,8 +8,8 @@ use futures::future::join_all; use itertools::Itertools; use prelude::GRT; use serde::Deserialize; +use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::RwLock; -use toolshed::thegraph::{DeploymentId, SubgraphId}; use toolshed::url::Url; use crate::{ipfs, network_subgraph}; diff --git a/graph-gateway/tests/it_indexers_status_indexing_statuses.rs b/graph-gateway/tests/it_indexers_status_indexing_statuses.rs index d83642f8..2ed979bb 100644 --- a/graph-gateway/tests/it_indexers_status_indexing_statuses.rs +++ b/graph-gateway/tests/it_indexers_status_indexing_statuses.rs @@ -1,8 +1,8 @@ use std::time::Duration; use assert_matches::assert_matches; +use thegraph::types::DeploymentId; use tokio::time::timeout; -use toolshed::thegraph::DeploymentId; use graph_gateway::indexers_status::indexing_statuses::client; diff --git a/graph-gateway/tests/it_indexers_status_public_pois.rs b/graph-gateway/tests/it_indexers_status_public_pois.rs index bfe2b08f..33b9875d 100644 --- a/graph-gateway/tests/it_indexers_status_public_pois.rs +++ b/graph-gateway/tests/it_indexers_status_public_pois.rs @@ -2,8 +2,8 @@ use std::time::Duration; use alloy_primitives::BlockNumber; use assert_matches::assert_matches; +use thegraph::types::DeploymentId; use tokio::time::timeout; -use toolshed::thegraph::DeploymentId; use graph_gateway::indexers_status::public_poi::client; use graph_gateway::indexers_status::public_poi::{ diff --git a/graph-gateway/tests/it_indexings_blocklist.rs b/graph-gateway/tests/it_indexings_blocklist.rs index 5b777443..8a1f3782 100644 --- a/graph-gateway/tests/it_indexings_blocklist.rs +++ b/graph-gateway/tests/it_indexings_blocklist.rs @@ -1,8 +1,8 @@ use std::time::Duration; use alloy_primitives::Address; +use thegraph::types::DeploymentId; use tokio::time::timeout; -use toolshed::thegraph::DeploymentId; use graph_gateway::indexings_blocklist::check_indexer_pois; use graph_gateway::poi::{ProofOfIndexing, ProofOfIndexingInfo}; diff --git a/graph-gateway/tests/it_subgraph_client.rs b/graph-gateway/tests/it_subgraph_client.rs deleted file mode 100644 index 4e086d03..00000000 --- a/graph-gateway/tests/it_subgraph_client.rs +++ /dev/null @@ -1,181 +0,0 @@ -use assert_matches::assert_matches; -use serde::Deserialize; -use toolshed::thegraph::{BlockPointer, SubgraphId}; -use toolshed::url::Url; - -use graph_gateway::subgraph_client::queries::meta::{ - send_subgraph_meta_query, SubgraphMetaQueryResponse, -}; -use graph_gateway::subgraph_client::queries::page::{send_subgraph_page_query, BlockHeight}; -use graph_gateway::subgraph_client::Client as SubgraphClient; - -/// Test helper to parse a URL. -fn test_url(url: &str) -> Url { - url.parse().expect("Invalid URL") -} - -/// Test helper to get the test query key from the environment. -fn test_query_key() -> String { - std::env::var("THEGRAPH_TEST_QUERY_KEY").expect("Missing THEGRAPH_TEST_QUERY_KEY") -} - -#[test_with::env(THEGRAPH_TEST_QUERY_KEY)] -#[tokio::test] -async fn send_subgraph_meta_query_request() { - //// Given - let ticket = test_query_key(); - - let http_client = reqwest::Client::new(); - let subgraph_url = test_url("https://gateway.thegraph.com/api/deployments/id/QmRbgjyzEgfxGbodu6itfkXCQ5KA9oGxKscrcQ9QuF88oT"); - - //// When - let req_fut = send_subgraph_meta_query(&http_client, subgraph_url, Some(&ticket)); - let res = tokio::time::timeout(std::time::Duration::from_secs(10), req_fut) - .await - .expect("Timeout on subgraph meta query"); - - //// Then - // Assert the query succeeded and we get a non-empty block number and hash. - assert_matches!(res, Ok(SubgraphMetaQueryResponse { meta }) => { - assert!(meta.block.number > 0); - assert!(!meta.block.hash.is_empty()); - }); -} - -#[test_with::env(THEGRAPH_TEST_QUERY_KEY)] -#[tokio::test] -async fn send_subgraph_page_query_request() { - //// Given - const PAGE_REQUEST_BATCH_SIZE: usize = 6; - - let ticket = test_query_key(); - - let http_client = reqwest::Client::new(); - let subgraph_url = test_url("https://gateway.thegraph.com/api/deployments/id/QmRbgjyzEgfxGbodu6itfkXCQ5KA9oGxKscrcQ9QuF88oT"); - - // Query all subgraph ids. - const SUBGRAPHS_QUERY_DOCUMENT: &str = r#" - subgraphs( - block: $block - orderBy: id, orderDirection: asc - first: $first - where: { - id_gt: $last - entityVersion: 2 - } - ) { - id - } - "#; - - //// When - let req_fut = send_subgraph_page_query( - &http_client, - subgraph_url, - Some(&ticket), - SUBGRAPHS_QUERY_DOCUMENT, - BlockHeight::new_with_block_number_gte(18627000), - PAGE_REQUEST_BATCH_SIZE, - None, - ); - let res = tokio::time::timeout(std::time::Duration::from_secs(10), req_fut) - .await - .expect("Timeout on subgraph meta query"); - - //// Then - assert_matches!(res, Ok(Ok(resp)) => { - // Assert meta data is present and valid. - assert!(resp.meta.block.number > 0); - assert!(!resp.meta.block.hash.is_empty()); - - // Assert the results are present and the correct size. - assert_eq!(resp.results.len(), PAGE_REQUEST_BATCH_SIZE); - }); -} - -#[test_with::env(THEGRAPH_TEST_QUERY_KEY)] -#[tokio::test] -async fn client_send_query() { - //// Given - let ticket = test_query_key(); - - let http_client = reqwest::Client::new(); - let subgraph_url = test_url("https://gateway.thegraph.com/api/deployments/id/QmRbgjyzEgfxGbodu6itfkXCQ5KA9oGxKscrcQ9QuF88oT"); - - let client = SubgraphClient::builder(http_client, subgraph_url) - .with_auth_token(Some(ticket)) - .build(); - - // Subgraph meta query - const SUBGRAPH_META_QUERY_DOCUMENT: &str = r#"{ meta: _meta { block { number hash } } }"#; - - #[derive(Debug, Deserialize)] - struct Meta { - block: BlockPointer, - } - - #[derive(Debug, Deserialize)] - struct SubgraphMetaQueryResponse { - meta: Meta, - } - - //// When - let req_fut = client.query::(SUBGRAPH_META_QUERY_DOCUMENT); - let res = tokio::time::timeout(std::time::Duration::from_secs(10), req_fut) - .await - .expect("Timeout on subgraph meta query"); - - //// Then - // Assert the query succeeded and we get a non-empty block number and hash. - assert_matches!(res, Ok(SubgraphMetaQueryResponse { meta }) => { - assert!(meta.block.number > 0); - assert!(!meta.block.hash.is_empty()); - }); -} - -#[test_with::env(THEGRAPH_TEST_QUERY_KEY)] -#[tokio::test] -async fn send_subgraph_paginated() { - //// Given - let ticket = test_query_key(); - let subgraph_url = test_url("https://gateway.thegraph.com/api/deployments/id/QmRbgjyzEgfxGbodu6itfkXCQ5KA9oGxKscrcQ9QuF88oT"); - - let http_client = reqwest::Client::new(); - - let mut client = SubgraphClient::builder(http_client, subgraph_url) - .with_auth_token(Some(ticket)) - .build(); - - // Query all subgraph ids. - const SUBGRAPHS_QUERY_DOCUMENT: &str = r#" - subgraphs( - block: $block - orderBy: id, orderDirection: asc - first: $first - where: { - id_gt: $last - entityVersion: 2 - } - ) { - id - } - "#; - - #[derive(Debug, Deserialize)] - #[serde(rename_all = "camelCase")] - pub struct Subgraph { - pub id: SubgraphId, - } - - //// When - let req_fut = client.paginated_query::(SUBGRAPHS_QUERY_DOCUMENT); - let res = tokio::time::timeout(std::time::Duration::from_secs(10), req_fut) - .await - .expect("Timeout on subgraph paginated query"); - - //// Then - // Assert the query succeeded and we got a non-empty list of active subscriptions. - assert_matches!(res, Ok(vec) => { - assert!(!vec.is_empty()); - }); -} diff --git a/indexer-selection/Cargo.toml b/indexer-selection/Cargo.toml index 98d86a3d..0fffabd5 100644 --- a/indexer-selection/Cargo.toml +++ b/indexer-selection/Cargo.toml @@ -18,6 +18,7 @@ prelude = { path = "../prelude" } rand.workspace = true rand_distr = "0.4" siphasher.workspace = true +thegraph.workspace = true tokio.workspace = true toolshed.workspace = true tracing.workspace = true diff --git a/indexer-selection/src/lib.rs b/indexer-selection/src/lib.rs index e3105c13..beb5c64d 100644 --- a/indexer-selection/src/lib.rs +++ b/indexer-selection/src/lib.rs @@ -12,7 +12,7 @@ pub use ordered_float::NotNan; use prelude::*; use rand::{prelude::SmallRng, Rng as _}; use score::{expected_individual_score, ExpectedValue}; -use toolshed::thegraph::{BlockPointer, DeploymentId}; +use thegraph::types::{BlockPointer, DeploymentId}; use toolshed::url::Url; use crate::score::{select_indexers, SelectionFactors}; diff --git a/indexer-selection/src/simulation.rs b/indexer-selection/src/simulation.rs index 52b536a8..c16158d0 100644 --- a/indexer-selection/src/simulation.rs +++ b/indexer-selection/src/simulation.rs @@ -7,7 +7,7 @@ use prelude::test_utils::{bytes_from_id, init_test_tracing}; use prelude::{UDecimal18, GRT}; use rand::{prelude::SmallRng, Rng as _, SeedableRng as _}; use rand_distr::Normal; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; use crate::{ BlockStatus, Candidate, IndexerErrorObservation, Indexing, IndexingStatus, Selection, State, diff --git a/indexer-selection/src/test.rs b/indexer-selection/src/test.rs index ff389fd7..9ae75813 100644 --- a/indexer-selection/src/test.rs +++ b/indexer-selection/src/test.rs @@ -10,8 +10,8 @@ use prelude::{buffer_queue, double_buffer, UDecimal18, GRT}; use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::{thread_rng, Rng, RngCore as _, SeedableRng as _}; +use thegraph::types::{BlockPointer, DeploymentId}; use tokio::spawn; -use toolshed::thegraph::{BlockPointer, DeploymentId}; use crate::actor::{process_updates, Update}; use crate::{ diff --git a/indexer-selection/src/test_utils.rs b/indexer-selection/src/test_utils.rs index 801b76c0..443beb3b 100644 --- a/indexer-selection/src/test_utils.rs +++ b/indexer-selection/src/test_utils.rs @@ -3,7 +3,7 @@ use std::hash::{Hash as _, Hasher as _}; use alloy_primitives::Address; use prelude::test_utils::bytes_from_id; use siphasher::sip::SipHasher24; -use toolshed::thegraph::DeploymentId; +use thegraph::types::DeploymentId; use crate::BlockPointer;