From 334e9111b5e714038e2ec9226822552f1aff070f Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 16 Nov 2023 11:42:03 +0100 Subject: [PATCH] chore(graph-gateway): use graphql-http crate client (#404) --- Cargo.lock | 30 +++-- graph-gateway/Cargo.toml | 1 + graph-gateway/src/indexers_status.rs | 2 +- .../src/indexers_status/cost_models.rs | 4 + .../src/indexers_status/cost_models/client.rs | 16 +++ .../src/indexers_status/cost_models/query.rs | 43 ++++++ graph-gateway/src/indexers_status/graphql.rs | 41 ------ .../indexing_statuses/client.rs | 14 +- .../indexing_statuses/query.rs | 37 +++--- .../src/indexers_status/public_poi/client.rs | 11 +- .../src/indexers_status/public_poi/query.rs | 122 +++++++++--------- graph-gateway/src/indexing.rs | 60 ++++----- 12 files changed, 207 insertions(+), 174 deletions(-) create mode 100644 graph-gateway/src/indexers_status/cost_models.rs create mode 100644 graph-gateway/src/indexers_status/cost_models/client.rs create mode 100644 graph-gateway/src/indexers_status/cost_models/query.rs delete mode 100644 graph-gateway/src/indexers_status/graphql.rs diff --git a/Cargo.lock b/Cargo.lock index e9588611..ae1d9c15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1708,6 +1708,7 @@ dependencies = [ "futures", "graph-subscriptions", "graphql", + "graphql-http", "hex", "hyper", "indexer-selection", @@ -1767,6 +1768,19 @@ dependencies = [ "serde", ] +[[package]] +name = "graphql-http" +version = "0.1.0" +source = "git+https://github.com/edgeandnode/toolshed.git?tag=graphql-http-v0.1.0#a20fa4a8d5aa640543a863fcd7844bf09076fb51" +dependencies = [ + "anyhow", + "async-trait", + "reqwest", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "graphql-parser" version = "0.4.0" @@ -1897,9 +1911,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f95b9abcae896730d42b78e09c155ed4ddf82c07b4de772c64aee5b2d8b7c150" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -3398,9 +3412,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.21" +version = "0.38.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" +checksum = "ffb93593068e9babdad10e4fce47dc9b3ac25315a72a59766ffd9e9a71996a04" dependencies = [ "bitflags 2.4.1", "errno", @@ -4386,9 +4400,9 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.4" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ "log", "once_cell", @@ -4407,9 +4421,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", "nu-ansi-term", diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index 27c3321c..9275bdb7 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -19,6 +19,7 @@ faster-hex = "0.8.0" futures = "0.3" graph-subscriptions = { git = "https://github.com/edgeandnode/subscription-payments", rev = "334d18b" } graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.1.0" } +graphql-http = { git = "https://github.com/edgeandnode/toolshed.git", tag = "graphql-http-v0.1.0", features = ["http-reqwest"] } hex = "0.4" indexer-selection = { path = "../indexer-selection" } indoc = "2.0.3" diff --git a/graph-gateway/src/indexers_status.rs b/graph-gateway/src/indexers_status.rs index 7b73588a..eaaecbcf 100644 --- a/graph-gateway/src/indexers_status.rs +++ b/graph-gateway/src/indexers_status.rs @@ -1,3 +1,3 @@ -mod graphql; +pub mod cost_models; pub mod indexing_statuses; pub mod public_poi; diff --git a/graph-gateway/src/indexers_status/cost_models.rs b/graph-gateway/src/indexers_status/cost_models.rs new file mode 100644 index 00000000..4d1f8554 --- /dev/null +++ b/graph-gateway/src/indexers_status/cost_models.rs @@ -0,0 +1,4 @@ +pub use query::*; + +pub mod client; +mod query; diff --git a/graph-gateway/src/indexers_status/cost_models/client.rs b/graph-gateway/src/indexers_status/cost_models/client.rs new file mode 100644 index 00000000..843516fb --- /dev/null +++ b/graph-gateway/src/indexers_status/cost_models/client.rs @@ -0,0 +1,16 @@ +use graphql_http::http_client::ReqwestExt; +use toolshed::url::Url; + +use crate::indexers_status::cost_models::query; + +pub async fn send_cost_model_query( + client: reqwest::Client, + status_url: Url, + query: query::CostModelQuery, +) -> anyhow::Result { + let res = client.post(status_url.0).send_graphql(query).await; + match res { + Ok(res) => Ok(res?), + Err(e) => Err(anyhow::anyhow!("Error sending cost model query: {}", e)), + } +} diff --git a/graph-gateway/src/indexers_status/cost_models/query.rs b/graph-gateway/src/indexers_status/cost_models/query.rs new file mode 100644 index 00000000..2b91b9cc --- /dev/null +++ b/graph-gateway/src/indexers_status/cost_models/query.rs @@ -0,0 +1,43 @@ +use graphql_http::graphql::{Document, IntoDocument, IntoDocumentWithVariables}; +use indoc::indoc; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use toolshed::thegraph::DeploymentId; + +pub(super) const COST_MODEL_QUERY_DOCUMENT: &str = indoc! { + r#"query ($deployments: [String!]!) { + costModels(deployments: $deployments) { + deployment + model + variables + } + }"# +}; + +#[serde_as] +#[derive(Clone, Debug, Serialize)] +pub struct CostModelQuery { + #[serde_as(as = "Vec")] + pub deployments: Vec, +} + +impl IntoDocumentWithVariables for CostModelQuery { + type Variables = Self; + + fn into_document_with_variables(self) -> (Document, Self::Variables) { + (COST_MODEL_QUERY_DOCUMENT.into_document(), self) + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CostModelResponse { + pub cost_models: Vec, +} + +#[derive(Deserialize)] +pub struct CostModelSourceResponse { + pub deployment: DeploymentId, + pub model: String, + pub variables: Option, +} diff --git a/graph-gateway/src/indexers_status/graphql.rs b/graph-gateway/src/indexers_status/graphql.rs deleted file mode 100644 index 90dcc375..00000000 --- a/graph-gateway/src/indexers_status/graphql.rs +++ /dev/null @@ -1,41 +0,0 @@ -use graphql::http::Response; -use serde::de::DeserializeOwned; -use serde_json::json; -use toolshed::url::Url; - -/// Trait for types that can be converted into a GraphQL query. -pub trait IntoGraphqlQuery { - fn to_query(&self) -> String; -} - -/// Send a GraphQL query to a given URL. -/// -/// This function is a wrapper around `reqwest::Client` that: -/// - Sets the `Content-Type` header to `application/json`. -/// - Sets the request body to the given query. -/// - Deserializes the response body into the given type. -// TODO: Improve error handling. Define custom error enum. -pub async fn send_graphql_query( - client: &reqwest::Client, - url: Url, - query: impl IntoGraphqlQuery, -) -> anyhow::Result -where - T: DeserializeOwned, -{ - let query = query.to_query(); - let body = &json!({ "query": query }); - - let response = client.post(url.0).json(body).send().await?; - - let status = response.status(); - let body = response - .json::>() - .await - .map_err(|err| anyhow::anyhow!("Response body deserialization failed: {}", err))?; - - // The GraphQL server returns a 400 if the query is invalid together with a JSON object - // containing the error message. - body.unpack() - .map_err(|err| anyhow::anyhow!("GraphQL query failed with status {}: {}", status, err)) -} diff --git a/graph-gateway/src/indexers_status/indexing_statuses/client.rs b/graph-gateway/src/indexers_status/indexing_statuses/client.rs index 5ec75e40..ea991ee9 100644 --- a/graph-gateway/src/indexers_status/indexing_statuses/client.rs +++ b/graph-gateway/src/indexers_status/indexing_statuses/client.rs @@ -1,11 +1,21 @@ +use graphql_http::http_client::ReqwestExt; use toolshed::url::Url; -use crate::indexers_status::graphql; use crate::indexers_status::indexing_statuses::query; pub async fn send_indexing_statuses_query( client: reqwest::Client, status_url: Url, ) -> anyhow::Result { - graphql::send_graphql_query(&client, status_url, query::IndexingStatusesQuery).await + let res = client + .post(status_url.0) + .send_graphql(query::INDEXING_STATUSES_QUERY_DOCUMENT) + .await; + match res { + Ok(res) => Ok(res?), + Err(e) => Err(anyhow::anyhow!( + "Error sending indexing statuses query: {}", + e + )), + } } diff --git a/graph-gateway/src/indexers_status/indexing_statuses/query.rs b/graph-gateway/src/indexers_status/indexing_statuses/query.rs index a8571867..39998c9d 100644 --- a/graph-gateway/src/indexers_status/indexing_statuses/query.rs +++ b/graph-gateway/src/indexers_status/indexing_statuses/query.rs @@ -5,27 +5,24 @@ use indoc::indoc; use serde::{Deserialize, Deserializer}; use toolshed::thegraph::DeploymentId; -use crate::indexers_status::graphql::IntoGraphqlQuery; - -#[derive(Clone, Debug)] -pub struct IndexingStatusesQuery; - -impl IntoGraphqlQuery for IndexingStatusesQuery { - fn to_query(&self) -> String { - String::from(indoc! { - r#"{ - indexingStatuses(subgraphs: []) { - subgraph - chains { - network - latestBlock { number hash } - earliestBlock { number hash } - } +pub(super) const INDEXING_STATUSES_QUERY_DOCUMENT: &str = indoc! { + r#"{ + indexingStatuses(subgraphs: []) { + subgraph + chains { + network + latestBlock { + number + hash } - }"# - }) - } -} + earliestBlock { + number + hash + } + } + } + }"# +}; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/graph-gateway/src/indexers_status/public_poi/client.rs b/graph-gateway/src/indexers_status/public_poi/client.rs index 9ee69872..ce83805e 100644 --- a/graph-gateway/src/indexers_status/public_poi/client.rs +++ b/graph-gateway/src/indexers_status/public_poi/client.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; use alloy_primitives::BlockNumber; +use graphql_http::http_client::ReqwestExt; use itertools::Itertools; use toolshed::thegraph::DeploymentId; use toolshed::url::Url; -use crate::indexers_status::graphql; use crate::indexers_status::public_poi::query; use crate::poi::ProofOfIndexing; @@ -14,7 +14,14 @@ pub async fn send_public_poi_query( status_url: Url, query: query::PublicProofOfIndexingQuery, ) -> anyhow::Result { - graphql::send_graphql_query(&client, status_url, query).await + let res = client.post(status_url.0).send_graphql(query).await; + match res { + Ok(res) => Ok(res?), + Err(e) => Err(anyhow::anyhow!( + "Error sending public proof of indexing query: {}", + e + )), + } } pub async fn send_public_poi_queries_and_merge_results( diff --git a/graph-gateway/src/indexers_status/public_poi/query.rs b/graph-gateway/src/indexers_status/public_poi/query.rs index ba32e7fa..d3578bfe 100644 --- a/graph-gateway/src/indexers_status/public_poi/query.rs +++ b/graph-gateway/src/indexers_status/public_poi/query.rs @@ -1,57 +1,48 @@ use alloy_primitives::{BlockNumber, B256}; +use graphql_http::graphql::{Document, IntoDocument, IntoDocumentWithVariables}; use indoc::indoc; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use toolshed::thegraph::DeploymentId; -use crate::indexers_status::graphql::IntoGraphqlQuery; - pub const MAX_REQUESTS_PER_QUERY: usize = 10; pub type ProofOfIndexing = B256; -#[derive(Clone, Debug)] +pub(super) const PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT: &str = indoc! { + r#"query ($requests: [PublicProofOfIndexingRequest!]!) { + publicProofsOfIndexing(requests: $requests) { + deployment + proofOfIndexing + block { + number + } + } + }"# +}; + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct PublicProofOfIndexingQuery { pub requests: Vec, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct PublicProofOfIndexingRequest { pub deployment: DeploymentId, pub block_number: BlockNumber, } -impl PublicProofOfIndexingRequest { - fn to_query_params(&self) -> String { - format!( - r#"{{ deployment: "{}", blockNumber: "{}" }}"#, - self.deployment, self.block_number - ) - } -} +impl IntoDocumentWithVariables for PublicProofOfIndexingQuery { + type Variables = Self; -impl IntoGraphqlQuery for PublicProofOfIndexingQuery { - fn to_query(&self) -> String { + fn into_document_with_variables(self) -> (Document, Self::Variables) { debug_assert!(!self.requests.is_empty(), "Must have at least one request"); - let requests = self - .requests - .iter() - .map(|request| request.to_query_params()) - .collect::>() - .join(", "); - - format!( - indoc! { - r#"{{ - publicProofsOfIndexing(requests: [{}]) {{ - deployment - proofOfIndexing - block {{ number }} - }} - }}"# - }, - requests + ( + PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT.into_document(), + self, ) } } @@ -82,30 +73,13 @@ mod tests { use super::*; mod query { - use super::*; - - #[test] - fn public_proof_of_indexing_request_to_query_params_format() { - //// Given - let request = PublicProofOfIndexingRequest { - deployment: "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw" - .parse() - .expect("Failed to parse deployment ID"), - block_number: 123, - }; - - //// When - let query = request.to_query_params(); + use graphql_http::http::request::IntoRequestParameters; + use serde_json::json; - //// Then - assert_eq!( - query.as_str(), - "{ deployment: \"QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw\", blockNumber: \"123\" }" - ); - } + use super::*; #[test] - fn create_status_public_pois_query() { + fn create_status_public_pois_request_params() { //// Given let query = PublicProofOfIndexingQuery { requests: vec![ @@ -125,18 +99,40 @@ mod tests { }; //// When - let query = query.to_query(); + let query = query.into_request_parameters(); //// Then + // Assert the query document. + assert_eq!( + query.query.as_str(), + PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT + ); + + // Assert the query variables. + assert!(matches!( + query.variables.get("requests"), + Some(serde_json::Value::Array(_)) + )); + + let var_requests = query + .variables + .get("requests") + .expect("Missing requests variables") + .as_array() + .expect("Invalid requests variables"); + assert_eq!( + var_requests[0], + json!({ + "deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", + "blockNumber": 123 + }) + ); assert_eq!( - query.as_str(), - indoc! { r#"{ - publicProofsOfIndexing(requests: [{ deployment: "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", blockNumber: "123" }, { deployment: "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", blockNumber: "456" }]) { - deployment - proofOfIndexing - block { number } - } - }"# } + var_requests[1], + json!({ + "deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", + "blockNumber": 456 + }) ); } } diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index e5c73cdf..95d46e7a 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -5,7 +5,6 @@ use eventuals::{Eventual, EventualExt as _, EventualWriter, Ptr}; use futures::future::join_all; use semver::Version; use serde::Deserialize; -use serde_json::json; use tokio::sync::Mutex; use toolshed::thegraph::{BlockPointer, DeploymentId}; use toolshed::url::{url::Host, Url}; @@ -16,8 +15,8 @@ use indexer_selection::Indexing; use prelude::epoch_cache::EpochCache; use crate::geoip::GeoIP; -use crate::indexers_status::indexing_statuses::{client, IndexingStatusResponse}; -use crate::subgraph_client::graphql_query; +use crate::indexers_status::cost_models::{self, CostModelQuery, CostModelSourceResponse}; +use crate::indexers_status::indexing_statuses::{self, IndexingStatusResponse}; use crate::topology::Deployment; pub struct IndexingStatus { @@ -183,12 +182,25 @@ async fn query_indexer_for_indexing_statuses( client: reqwest::Client, status_url: Url, ) -> Result, String> { - client::send_indexing_statuses_query(client, status_url) + indexing_statuses::client::send_indexing_statuses_query(client, status_url) .await .map_err(|err| err.to_string()) .map(|res| res.indexing_statuses) } +/// Convenience wrapper method around [`client::send_cost_model_query`] to map the result +/// types to the expected by [`query_status`] method. +async fn query_indexer_for_cost_models( + client: reqwest::Client, + cost_url: Url, + deployments: Vec, +) -> Result, String> { + cost_models::client::send_cost_model_query(client, cost_url, CostModelQuery { deployments }) + .await + .map_err(|err| err.to_string()) + .map(|res| res.cost_models) +} + async fn query_status( actor: &'static Mutex, client: &reqwest::Client, @@ -200,26 +212,13 @@ async fn query_status( let statuses = query_indexer_for_indexing_statuses(client.clone(), status_url.into()).await?; let cost_url = url.join("cost").map_err(|err| err.to_string())?; - let deployments = statuses - .iter() - .map(|stat| stat.subgraph.to_string()) - .collect::>(); - let cost_query = json!({ - "query": r#"query costModels($deployments: [String!]!) { - costModels(deployments: $deployments) { - deployment - model - variables - } - }"#, - "variables": { "deployments": deployments }, - }); - let cost_models = - graphql_query::(client, cost_url.into(), &cost_query, None) - .await - .and_then(graphql::http::Response::unpack) - .map(|cost_models| cost_models.cost_models) - .unwrap_or_default(); + let cost_models = query_indexer_for_cost_models( + client.clone(), + cost_url.into(), + statuses.iter().map(|stat| stat.subgraph).collect(), + ) + .await + .unwrap_or_default(); let mut actor = actor.lock().await; let mut cost_models = cost_models @@ -301,19 +300,6 @@ struct CostModelSource { variables: String, } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CostModelResponse { - cost_models: Vec, -} - -#[derive(Deserialize)] -struct CostModelSourceResponse { - deployment: DeploymentId, - model: String, - variables: Option, -} - #[derive(Deserialize)] struct IndexerVersion { version: String,