From 50c0d2db80c1ffe632b3bd3f9e41ef6ef2e07345 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 6 Dec 2024 11:18:01 -0500 Subject: [PATCH] refactor: consolidate indexing status requests --- src/indexers.rs | 1 - src/indexers/indexing_progress.rs | 201 ------------- src/network.rs | 14 + src/network/cost_model.rs | 27 +- .../indexer_indexing_progress_resolver.rs | 269 ++++++++++-------- src/network/service.rs | 5 +- 6 files changed, 170 insertions(+), 347 deletions(-) delete mode 100644 src/indexers/indexing_progress.rs diff --git a/src/indexers.rs b/src/indexers.rs index 311e4523..d9176571 100644 --- a/src/indexers.rs +++ b/src/indexers.rs @@ -1,2 +1 @@ -pub mod indexing_progress; pub mod public_poi; diff --git a/src/indexers/indexing_progress.rs b/src/indexers/indexing_progress.rs deleted file mode 100644 index c676159b..00000000 --- a/src/indexers/indexing_progress.rs +++ /dev/null @@ -1,201 +0,0 @@ -use serde::Deserialize; -use serde_with::serde_as; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId}; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::{RequestError, ReqwestExt as _, ResponseError}, -}; -use url::Url; - -const INDEXING_PROGRESS_QUERY_DOCUMENT: &str = r#" - query indexingProgress($deployments: [String!]!) { - indexingStatuses(subgraphs: $deployments) { - subgraph - chains { - network - latestBlock { number } - earliestBlock { number } - } - } - }"#; - -/// Errors that can occur while fetching the indexing progress. -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - /// The request failed. - #[error("request error: {0}")] - Request(String), - - /// Invalid response. - /// - /// The response could not be deserialized or is missing required fields. - #[error("invalid response: {0}")] - InvalidResponse(String), - - /// The response did not contain any progress information. - #[error("empty response")] - EmptyResponse, -} - -/// Send a request to the indexer to get the indexing status of the given deployments. -pub async fn send_request( - client: &reqwest::Client, - status_url: Url, - deployments: impl IntoIterator, -) -> Result, Error> { - let resp = client - .post(status_url) - .send_graphql::(Request::new(deployments)) - .await - .map_err(|err| match err { - RequestError::RequestSerializationError(..) => { - unreachable!("request serialization should not fail") - } - RequestError::RequestSendError(..) | RequestError::ResponseRecvError(..) => { - Error::Request(err.to_string()) - } - RequestError::ResponseDeserializationError { .. } => { - Error::InvalidResponse(err.to_string()) - } - })? - .map_err(|err| match err { - ResponseError::Failure { .. } => Error::Request(err.to_string()), - ResponseError::Empty => Error::EmptyResponse, - })?; - - if resp.indexing_statuses.is_empty() { - return Err(Error::EmptyResponse); - } - - Ok(resp.indexing_statuses) -} - -/// The request type for the indexing progress query. -/// -/// This type is used to construct the GraphQL request document and variables for the indexing -/// progress query. -/// -/// See [`INDEXING_PROGRESS_QUERY_DOCUMENT`] for the query document. -#[derive(Debug, Clone)] -struct Request { - document: Document, - vars_deployments: Vec, -} - -impl Request { - /// Create a new indexing progress query request. - fn new<'a>(deployments: impl IntoIterator) -> Self { - let deployments = deployments - .into_iter() - .map(|item| item.to_string()) - .collect(); - Self { - document: INDEXING_PROGRESS_QUERY_DOCUMENT.into_document(), - vars_deployments: deployments, - } - } -} - -impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - self.document, - serde_json::json!({ "deployments": self.vars_deployments }), - ) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct Response { - indexing_statuses: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct IndexingStatusResponse { - #[serde(rename = "subgraph")] - pub deployment_id: DeploymentId, - pub chains: Vec, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ChainStatus { - pub latest_block: Option, - pub earliest_block: Option, -} - -#[serde_as] -#[derive(Debug, Deserialize)] -pub struct BlockStatus { - #[serde_as(as = "serde_with::DisplayFromStr")] - pub number: BlockNumber, -} - -#[cfg(test)] -mod tests { - use super::Response; - - #[test] - fn deserialize_response() { - //* Given - let response = serde_json::json!({ - "indexingStatuses": [ - { - "subgraph": "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z", - "chains": [ - { - "network": "rinkeby", - "latestBlock": { - "number": "10164818", - "hash": "0xaa94881130ba16c28cc90a5a880b117bdc90b6b11e9cde0c78804cdb93cc9e85" - }, - "earliestBlock": { - "number": "7559999", - "hash": "0x0" - } - } - ] - }, - { - "subgraph": "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a", - "chains": [ - { - "network": "rinkeby" - } - ] - } - ] - }); - - //* When - let response = serde_json::from_value(response); - - //* Then - let response: Response = response.expect("deserialization failed"); - - assert_eq!(response.indexing_statuses.len(), 2); - let status1 = &response.indexing_statuses[0]; - let status2 = &response.indexing_statuses[1]; - - // Status 1 - assert_eq!( - status1.deployment_id.to_string(), - "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z" - ); - assert_eq!(status1.chains.len(), 1); - assert!(status1.chains[0].latest_block.is_some()); - assert!(status1.chains[0].earliest_block.is_some()); - - // Status 2 - assert_eq!( - status2.deployment_id.to_string(), - "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a" - ); - assert_eq!(status2.chains.len(), 1); - assert!(status2.chains[0].latest_block.is_none()); - assert!(status2.chains[0].earliest_block.is_none()); - } -} diff --git a/src/network.rs b/src/network.rs index d9b9f807..7f26425a 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,6 +1,7 @@ pub use errors::{DeploymentError, SubgraphError}; pub use internal::{Indexing, IndexingId}; pub use service::{NetworkService, ResolvedSubgraphInfo}; +use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariables}; pub mod cost_model; mod errors; @@ -12,3 +13,16 @@ pub mod internal; pub mod service; pub mod subgraph_client; pub mod version_filter; + +pub struct GraphQlRequest { + document: String, + variables: serde_json::Value, +} +impl IntoDocumentWithVariables for GraphQlRequest { + type Variables = serde_json::Value; + fn into_document_with_variables( + self, + ) -> (thegraph_graphql_http::graphql::Document, Self::Variables) { + (self.document.into_document(), self.variables) + } +} diff --git a/src/network/cost_model.rs b/src/network/cost_model.rs index 24296d37..2e29f4e2 100644 --- a/src/network/cost_model.rs +++ b/src/network/cost_model.rs @@ -2,12 +2,11 @@ use std::collections::HashMap; use anyhow::anyhow; use thegraph_core::DeploymentId; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::ReqwestExt, -}; +use thegraph_graphql_http::http_client::ReqwestExt; use url::Url; +use crate::network::GraphQlRequest; + pub struct CostModelResolver { http: reqwest::Client, cache: parking_lot::Mutex>, @@ -51,7 +50,7 @@ impl CostModelResolver { ) -> anyhow::Result> { let url = url.join("cost").map_err(|_| anyhow!("invalid URL"))?; - const QUERY: &str = r#" + let query = r#" query costModels($deployments: [String!]!) { costModels(deployments: $deployments) { deployment @@ -59,7 +58,6 @@ impl CostModelResolver { } } "#; - let deployments = deployments.iter().map(|item| item.to_string()).collect(); #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] struct Response { @@ -70,22 +68,13 @@ impl CostModelResolver { pub deployment: DeploymentId, pub model: String, } - struct Request { - deployments: Vec, - } - impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - QUERY.into_document(), - serde_json::json!({ "deployments": self.deployments }), - ) - } - } let resp = self .http .post(url) - .send_graphql::(Request { deployments }) + .send_graphql::(GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "deployments": deployments }), + }) .await??; Ok(resp .cost_models diff --git a/src/network/indexer_indexing_progress_resolver.rs b/src/network/indexer_indexing_progress_resolver.rs index f94b63f8..f75d700d 100644 --- a/src/network/indexer_indexing_progress_resolver.rs +++ b/src/network/indexer_indexing_progress_resolver.rs @@ -1,103 +1,41 @@ -//! A resolver that fetches the indexing progress of deployments from an indexer's status URL. - -use std::{collections::HashMap, time::Duration}; +use std::collections::HashMap; use parking_lot::{Mutex, RwLock}; +use serde_with::serde_as; use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId}; +use thegraph_graphql_http::http_client::ReqwestExt; use url::Url; -use crate::{ - indexers, - indexers::indexing_progress::{ChainStatus, Error as IndexingProgressFetchError}, -}; - -/// The number of deployments indexing progress to query in a single request. -const INDEXINGS_PER_REQUEST_BATCH_SIZE: usize = 100; - -/// An error that occurred while resolving the indexer's progress. -#[derive(Clone, Debug, thiserror::Error)] -pub enum ResolutionError { - /// An error occurred while fetching the indexer progress. - /// - /// This includes network errors, timeouts, and deserialization errors. - #[error("fetch error: {0}")] - FetchError(#[from] IndexingProgressFetchError), - - /// The resolution timed out. - #[error("timeout")] - Timeout, -} +use super::GraphQlRequest; -/// The indexing progress information of a deployment on a chain. #[derive(Debug, Clone)] pub struct IndexingProgressInfo { - /// The latest block number indexed by the indexer. pub latest_block: BlockNumber, - /// The earliest block number indexed by the indexer. pub min_block: Option, } -/// A resolver that fetches the indexing progress of deployments from an indexer's status URL. pub struct IndexingProgressResolver { - client: reqwest::Client, - timeout: Duration, + http: reqwest::Client, cache: RwLock>>>, } impl IndexingProgressResolver { - pub fn new(client: reqwest::Client, timeout: Duration) -> Self { + pub fn new(http: reqwest::Client) -> Self { Self { - client, - timeout, + http, cache: Default::default(), } } - /// Fetches the indexing progress of the given deployments from the indexer's status URL. - async fn fetch_indexing_progress( - &self, - url: &Url, - indexings: &[DeploymentId], - ) -> HashMap, ResolutionError>> { - let status_url = url.join("status").unwrap(); - let res = tokio::time::timeout( - self.timeout, - send_requests( - &self.client, - &status_url, - indexings, - INDEXINGS_PER_REQUEST_BATCH_SIZE, - ), - ) - .await; - - match res { - Ok(res) => res - .into_iter() - .map(|(deployment_id, result)| (deployment_id, result.map_err(Into::into))) - .collect(), - // If the request timed out, return a timeout error for all deployments - Err(_) => indexings - .iter() - .map(|deployment_id| (*deployment_id, Err(ResolutionError::Timeout))) - .collect(), - } - } - - /// Resolves the indexing progress of the given deployments. - /// - /// The function fetches the indexing progress of the given deployments from the indexer's - /// status URL. If the fetch fails, the function returns the cached data for the failed - /// deployments. If the fetch succeeds, the function updates the cache with the fetched data. - /// - /// Returns a map of deployment IDs to their indexing progress information. pub async fn resolve( &self, url: &Url, - indexer_deployments: &[DeploymentId], + deployments: &[DeploymentId], ) -> HashMap { let url_string = url.to_string(); - let results = self.fetch_indexing_progress(url, indexer_deployments).await; + + let status_url = url.join("status").unwrap(); + let results = send_requests(&self.http, status_url, deployments).await; let mut outer_cache = self.cache.read(); if !outer_cache.contains_key(&url_string) { @@ -109,69 +47,156 @@ impl IndexingProgressResolver { } let mut cache = outer_cache.get(&url_string).unwrap().lock(); - for (deployment, result) in results { - let status = result.ok().and_then(|chains| { - let chain = chains.first()?; - Some(IndexingProgressInfo { - latest_block: chain.latest_block.as_ref().map(|block| block.number)?, - min_block: chain.earliest_block.as_ref().map(|block| block.number), - }) - }); - if let Some(status) = status { - cache.insert(deployment, status); - } + for (deployment, status) in results { + cache.insert(deployment, status); } cache.clone() } } -/// Sends requests to the indexer's status URL to fetch the indexing progress of deployments. -/// -/// Given a list of deployment IDs, the function groups them into batches of a given size and sends -/// all requests concurrently. If one request fails, the function marks all deployments in the batch -/// as failed. The function returns a map of deployment IDs to the indexing progress information. async fn send_requests( - client: &reqwest::Client, - status_url: &Url, + http: &reqwest::Client, + status_url: Url, indexings: &[DeploymentId], - batch_size: usize, -) -> HashMap, IndexingProgressFetchError>> { - // Group the deployments into batches of `batch_size` - let request_batches = indexings.chunks(batch_size); - - // Create a request for each batch +) -> HashMap { + let request_batches = indexings.chunks(100); let requests = request_batches.map(|batch| { let status_url = status_url.clone(); async move { - // Request the indexing progress - let response = - indexers::indexing_progress::send_request(client, status_url, batch).await; - - let result = match response { - Err(err) => { - // If the request failed, mark all deployment IDs in the batch as failed - return batch - .iter() - .map(|deployment_id| (*deployment_id, Err(err.clone()))) - .collect::>(); + let result = send_request(http, status_url.clone(), batch).await; + match result { + Ok(response) => response, + Err(indexing_progress_err) => { + tracing::warn!(%status_url, %indexing_progress_err); + Default::default() } - Ok(res) => res, - }; - - // Construct a map of deployment IDs to responses - result - .into_iter() - .filter(|response| { - batch.contains(&response.deployment_id) && !response.chains.is_empty() - }) - .map(|response| (response.deployment_id, Ok(response.chains))) - .collect::>() + } } }); - // Send all requests concurrently let responses = futures::future::join_all(requests).await; - - // Merge the responses into a single map responses.into_iter().flatten().collect() } + +async fn send_request( + client: &reqwest::Client, + status_url: Url, + deployments: &[DeploymentId], +) -> anyhow::Result> { + let query = r#" + query indexingProgress($deployments: [String!]!) { + indexingStatuses(subgraphs: $deployments) { + subgraph + chains { + network + latestBlock { number } + earliestBlock { number } + } + } + }"#; + let response = client + .post(status_url) + .send_graphql::(GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "deployments": deployments }), + }) + .await??; + let statuses = response + .indexing_statuses + .into_iter() + .filter(|info| info.chains.len() == 1) + .filter_map(|info| { + let chain = &info.chains[0]; + let status = IndexingProgressInfo { + latest_block: chain.latest_block.as_ref().map(|block| block.number)?, + min_block: chain.earliest_block.as_ref().map(|block| block.number), + }; + Some((info.deployment_id, status)) + }) + .collect(); + Ok(statuses) +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct Response { + indexing_statuses: Vec, +} +#[derive(Debug, serde::Deserialize)] +pub struct IndexingStatusResponse { + #[serde(rename = "subgraph")] + pub deployment_id: DeploymentId, + pub chains: Vec, +} +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChainStatus { + pub latest_block: Option, + pub earliest_block: Option, +} +#[serde_as] +#[derive(Debug, serde::Deserialize)] +pub struct BlockStatus { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub number: BlockNumber, +} + +#[cfg(test)] +mod tests { + use super::Response; + + #[test] + fn deserialize_response() { + let response = serde_json::json!({ + "indexingStatuses": [ + { + "subgraph": "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z", + "chains": [ + { + "network": "rinkeby", + "latestBlock": { + "number": "10164818", + "hash": "0xaa94881130ba16c28cc90a5a880b117bdc90b6b11e9cde0c78804cdb93cc9e85" + }, + "earliestBlock": { + "number": "7559999", + "hash": "0x0" + } + } + ] + }, + { + "subgraph": "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a", + "chains": [ + { + "network": "rinkeby" + } + ] + } + ] + }); + + let response = serde_json::from_value(response); + let response: Response = response.expect("deserialization failed"); + + assert_eq!(response.indexing_statuses.len(), 2); + let status1 = &response.indexing_statuses[0]; + let status2 = &response.indexing_statuses[1]; + + assert_eq!( + status1.deployment_id.to_string(), + "QmZTy9EJHu8rfY9QbEk3z1epmmvh5XHhT2Wqhkfbyt8k9Z" + ); + assert_eq!(status1.chains.len(), 1); + assert!(status1.chains[0].latest_block.is_some()); + assert!(status1.chains[0].earliest_block.is_some()); + + assert_eq!( + status2.deployment_id.to_string(), + "QmSLQfPFcz2pKRJZUH16Sk26EFpRgdxTYGnMiKvWgKRM2a" + ); + assert_eq!(status2.chains.len(), 1); + assert!(status2.chains[0].latest_block.is_none()); + assert!(status2.chains[0].earliest_block.is_none()); + } +} diff --git a/src/network/service.rs b/src/network/service.rs index 41bb765c..9911226e 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -188,10 +188,7 @@ pub fn spawn( Duration::from_secs(5), Duration::from_secs(20 * 60), ), - indexing_progress_resolver: IndexingProgressResolver::new( - http.clone(), - Duration::from_secs(25), - ), + indexing_progress_resolver: IndexingProgressResolver::new(http.clone()), cost_model_resolver: CostModelResolver::new(http.clone()), }; let update_interval = Duration::from_secs(60);