From 72a72c78bfab10381d9fcb134c37016b5dffd9b7 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 6 Dec 2024 14:02:36 -0500 Subject: [PATCH] refactor: merge PoI resolver and blocklist --- src/indexers.rs | 1 - src/indexers/public_poi.rs | 188 --------- src/main.rs | 3 - src/network.rs | 7 +- src/network/indexer_indexing_poi_blocklist.rs | 83 ---- src/network/indexer_indexing_poi_resolver.rs | 207 --------- src/network/indexer_processing.rs | 42 +- src/network/poi_filter.rs | 253 +++++++++++ src/network/service.rs | 29 +- src/ttl_hash_map.rs | 396 ------------------ 10 files changed, 281 insertions(+), 928 deletions(-) delete mode 100644 src/indexers.rs delete mode 100644 src/indexers/public_poi.rs delete mode 100644 src/network/indexer_indexing_poi_blocklist.rs delete mode 100644 src/network/indexer_indexing_poi_resolver.rs create mode 100644 src/network/poi_filter.rs delete mode 100644 src/ttl_hash_map.rs diff --git a/src/indexers.rs b/src/indexers.rs deleted file mode 100644 index d91765715..000000000 --- a/src/indexers.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod public_poi; diff --git a/src/indexers/public_poi.rs b/src/indexers/public_poi.rs deleted file mode 100644 index 4e1b654a0..000000000 --- a/src/indexers/public_poi.rs +++ /dev/null @@ -1,188 +0,0 @@ -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::{RequestError, ReqwestExt, ResponseError}, -}; -use url::Url; - -const PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT: &str = r#" - query publicPois($requests: [PublicProofOfIndexingRequest!]!) { - publicProofsOfIndexing(requests: $requests) { - deployment - proofOfIndexing - block { number } - } - }"#; - -/// Errors that can occur while fetching the indexer's public POIs. -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - /// The request failed. - #[error("request error: {0}")] - Request(String), - - /// Invalid response. - /// - /// The response could not be deserialized or is missing required fields. - #[error("invalid response: {0}")] - InvalidResponse(String), - - /// The response did not contain any public POIs. - #[error("empty response")] - EmptyResponse, -} - -#[derive(Clone, Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct PublicProofOfIndexingRequest { - deployment: DeploymentId, - block_number: BlockNumber, -} - -impl From<(DeploymentId, BlockNumber)> for PublicProofOfIndexingRequest { - fn from((deployment, block_number): (DeploymentId, BlockNumber)) -> Self { - Self { - deployment, - block_number, - } - } -} - -/// Send a request to the indexer to get the Public POIs of the given deployment-block number pairs. -pub async fn send_request( - client: &reqwest::Client, - status_url: Url, - pois: impl IntoIterator, -) -> Result, Error> { - let resp = client - .post(status_url) - .send_graphql::(Request::new(pois)) - .await - .map_err(|err| match err { - RequestError::RequestSerializationError(..) => { - unreachable!("request serialization should not fail") - } - RequestError::RequestSendError(..) | RequestError::ResponseRecvError(..) => { - Error::Request(err.to_string()) - } - RequestError::ResponseDeserializationError { .. } => { - Error::InvalidResponse(err.to_string()) - } - })? - .map_err(|err| match err { - ResponseError::Failure { .. } => Error::Request(err.to_string()), - ResponseError::Empty => Error::EmptyResponse, - })?; - - if resp.public_proofs_of_indexing.is_empty() { - return Err(Error::EmptyResponse); - } - - Ok(resp.public_proofs_of_indexing) -} - -#[derive(Clone, Debug)] -pub struct Request { - document: Document, - var_requests: Vec, -} - -impl Request { - pub fn new<'a>(requests: impl IntoIterator) -> Self { - Self { - document: PUBLIC_PROOF_OF_INDEXING_QUERY_DOCUMENT.into_document(), - var_requests: requests.into_iter().copied().map(Into::into).collect(), - } - } -} - -impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - self.document, - serde_json::json!({ "requests": self.var_requests }), - ) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct Response { - public_proofs_of_indexing: Vec, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PublicProofOfIndexingResult { - pub deployment: DeploymentId, - pub block: PartialBlockPtr, - pub proof_of_indexing: Option, -} - -#[serde_as] -#[derive(Debug, Deserialize)] -pub struct PartialBlockPtr { - #[serde_as(as = "DisplayFromStr")] - pub number: BlockNumber, -} - -#[cfg(test)] -mod tests { - use thegraph_core::{deployment_id, proof_of_indexing as poi}; - - use super::Response; - - #[test] - fn deserialize_public_pois_response() { - //* Given - let response = r#"{ - "publicProofsOfIndexing": [ - { - "deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", - "proofOfIndexing": "0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287", - "block": { - "number": "123" - } - }, - { - "deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", - "block": { - "number": "456" - } - } - ] - }"#; - - //* When - let response = serde_json::from_str::(response); - - //* Then - let response = response.expect("deserialization failed"); - - assert_eq!(response.public_proofs_of_indexing.len(), 2); - assert_eq!( - response.public_proofs_of_indexing[0].deployment, - deployment_id!("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH") - ); - assert_eq!( - response.public_proofs_of_indexing[0].proof_of_indexing, - Some(poi!( - "ba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287" - )) - ); - assert_eq!(response.public_proofs_of_indexing[0].block.number, 123); - assert_eq!( - response.public_proofs_of_indexing[1].deployment, - deployment_id!("QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw") - ); - assert_eq!( - response.public_proofs_of_indexing[1].proof_of_indexing, - None - ); - assert_eq!(response.public_proofs_of_indexing[1].block.number, 456); - } -} diff --git a/src/main.rs b/src/main.rs index 1a0764e98..6d6675b22 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,6 @@ mod exchange_rate; mod graphql; mod http_ext; mod indexer_client; -mod indexers; mod indexing_performance; mod metrics; mod middleware; @@ -21,8 +20,6 @@ mod receipts; mod reports; mod subgraph_studio; mod time; -#[allow(dead_code)] -mod ttl_hash_map; mod unattestable_errors; use std::{ diff --git a/src/network.rs b/src/network.rs index f4d54b712..547886869 100644 --- a/src/network.rs +++ b/src/network.rs @@ -6,10 +6,9 @@ use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariable pub mod cost_model; mod errors; pub mod host_filter; -pub mod indexer_indexing_poi_blocklist; -pub mod indexer_indexing_poi_resolver; mod indexer_processing; pub mod indexing_progress; +pub mod poi_filter; mod pre_processing; pub mod service; mod snapshot; @@ -18,8 +17,8 @@ mod subgraph_processing; pub mod version_filter; pub struct GraphQlRequest { - document: String, - variables: serde_json::Value, + pub document: String, + pub variables: serde_json::Value, } impl IntoDocumentWithVariables for GraphQlRequest { type Variables = serde_json::Value; diff --git a/src/network/indexer_indexing_poi_blocklist.rs b/src/network/indexer_indexing_poi_blocklist.rs deleted file mode 100644 index 1543f872f..000000000 --- a/src/network/indexer_indexing_poi_blocklist.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! This module contains the [`PoiBlocklist`] struct, which is used to block indexers based -//! on their Proof of Indexing (POI) information. -//! -//! Given a list of blocked POIs, the blocklist checks if an indexer reports any of them as public -//! POIs. If a match is found, the indexer is blocked for the associated deployment ID. -//! -//! The blocklist caches the blocklist state for each indexer, so that subsequent checks against the -//! same indexer are fast. The cached entries are considered expired after a given TTL. - -use std::collections::{HashMap, HashSet}; - -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; - -use crate::config::BlockedPoi; - -/// A blocklist based on the Proof of Indexing (POI) of indexers. -#[derive(Default)] -pub struct PoiBlocklist { - blocklist: HashMap>, -} - -impl PoiBlocklist { - pub fn new(conf: Vec) -> Self { - let mut blocklist: HashMap> = - Default::default(); - for info in conf.into_iter() { - blocklist - .entry(info.deployment) - .or_default() - .push((info.block_number, info.public_poi.into())); - } - Self { blocklist } - } - - pub fn is_empty(&self) -> bool { - self.blocklist.is_empty() - } - - /// Get a list of POIs metadata that are affected. - /// - /// If none of the deployments are affected, an empty list is returned. This allows to avoid - /// querying the indexer for POIs if none of its deployments is affected. - pub fn affected_pois_metadata<'a>( - &self, - deployments: impl IntoIterator, - ) -> Vec<(DeploymentId, BlockNumber)> { - deployments - .into_iter() - .flat_map(|deployment| { - self.blocklist.get(deployment).into_iter().flat_map(|pois| { - pois.iter() - .map(|(block_number, _)| (*deployment, *block_number)) - }) - }) - .collect() - } - - /// Return deployments with blocked POIs. - pub fn check( - &self, - pois: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing>, - ) -> HashSet { - pois.iter() - .filter(|((deployment_id, block_number), poi)| { - self.check_poi(*deployment_id, *block_number, **poi) - }) - .map(|((deployment_id, _), _)| *deployment_id) - .collect() - } - - /// Check if the POI is in the blocklist. - fn check_poi( - &self, - deployment: DeploymentId, - block_number: BlockNumber, - poi: ProofOfIndexing, - ) -> bool { - match self.blocklist.get(&deployment) { - None => false, - Some(blocked_pois) => blocked_pois.contains(&(block_number, poi)), - } - } -} diff --git a/src/network/indexer_indexing_poi_resolver.rs b/src/network/indexer_indexing_poi_resolver.rs deleted file mode 100644 index 8fe29db44..000000000 --- a/src/network/indexer_indexing_poi_resolver.rs +++ /dev/null @@ -1,207 +0,0 @@ -//! A resolver for the Proof of Indexing (POI) of indexers. -//! -//! The resolver fetches the public POIs of indexers based on the given POIs metadata. It caches the -//! results of these requests to avoid making the same request multiple times. -//! -//! The cache has a TTL of 20 minutes. Entries are considered expired after this time causing the -//! resolver to fetch the public POIs of the indexer again. - -use std::{collections::HashMap, time::Duration}; - -use parking_lot::RwLock; -use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; -use url::Url; - -use crate::{ - indexers, indexers::public_poi::Error as PublicPoiFetchError, ttl_hash_map::TtlHashMap, -}; - -/// The number of Public POI queries in a single request. -const POIS_PER_REQUEST_BATCH_SIZE: usize = 10; - -/// Error that can occur during POI resolution. -#[derive(Clone, Debug, thiserror::Error)] -pub enum ResolutionError { - /// An error occurred while fetching the Public POIs of the indexer. - /// - /// This includes network errors, timeouts, and deserialization errors. - #[error("fetch error: {0}")] - FetchError(#[from] PublicPoiFetchError), - - /// Resolution timed out. - #[error("timeout")] - Timeout, -} - -/// A resolver for the Proof of Indexing (POI) of indexers. Results are cached for some TTL to avoid -/// making the same request multiple times. -#[allow(clippy::type_complexity)] -pub struct PoiResolver { - client: reqwest::Client, - cache: RwLock>, - timeout: Duration, -} - -impl PoiResolver { - /// Create a new [`PoiResolver`] with the given timeout and cache TTL. - pub fn new(client: reqwest::Client, timeout: Duration, cache_ttl: Duration) -> Self { - Self { - client, - timeout, - cache: RwLock::new(TtlHashMap::with_ttl(cache_ttl)), - } - } - - /// Fetch the public POIs of the indexer based on the given POIs metadata. - async fn fetch_indexer_public_pois( - &self, - url: &Url, - pois: &[(DeploymentId, BlockNumber)], - ) -> HashMap<(DeploymentId, BlockNumber), Result> { - let status_url = url.join("status").unwrap(); - let res = tokio::time::timeout( - self.timeout, - send_requests(&self.client, &status_url, pois, POIS_PER_REQUEST_BATCH_SIZE), - ) - .await; - - match res { - Ok(res) => res - .into_iter() - .map(|(meta, result)| (meta, result.map_err(Into::into))) - .collect(), - // If the request timed out, return a timeout error for all deployment-block number pairs - Err(_) => pois - .iter() - .map(|meta| (*meta, Err(ResolutionError::Timeout))) - .collect(), - } - } - - /// Gets the cached Public POIs information for the given deployment-block number pairs. - /// - /// This method locks the cache in read mode and returns the cached information. - fn get_from_cache<'a>( - &self, - url: &str, - keys: impl IntoIterator, - ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { - let cache_read = self.cache.read(); - let mut result = HashMap::new(); - - for key in keys { - match cache_read.get(&(url.to_owned(), *key)) { - Some(value) => { - result.insert(*key, *value); - } - None => continue, - } - } - - result - } - - /// Updates the cache with the given Public POIs information. - /// - /// This method locks the cache in write mode and updates the cache with the given progress - /// information. - fn update_cache<'a>( - &self, - url: &str, - data: impl IntoIterator, - ) { - let mut cache_write = self.cache.write(); - for (key, value) in data { - cache_write.insert((url.to_owned(), *key), *value); - } - } - - /// Resolve the public POIs of the indexer based on the given POIs metadata. - /// - /// If the public POIs of the indexer are already in the cache, the resolver returns them. - pub async fn resolve( - &self, - url: &Url, - poi_requests: &[(DeploymentId, BlockNumber)], - ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { - let url_string = url.to_string(); - let mut results = self.get_from_cache(&url_string, poi_requests); - let missing_requests: Vec<(DeploymentId, BlockNumber)> = poi_requests - .iter() - .filter(|r| !results.contains_key(r)) - .cloned() - .collect(); - if missing_requests.is_empty() { - return results; - } - - let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = self - .fetch_indexer_public_pois(url, &missing_requests) - .await - .into_iter() - .filter_map(|(key, result)| match result { - Ok(poi) => Some((key, poi)), - Err(poi_fetch_err) => { - tracing::warn!(%poi_fetch_err, ?key); - None - } - }) - .collect(); - self.update_cache(&url_string, &fetched); - results.extend(fetched); - results - } -} - -/// Send requests to the indexer to get the Public POIs of the given deployment-block number pairs. -/// -/// Given a list of deployment-block number pairs, the function sends requests to the indexer to get -/// the Public POIs of the indexers. The function batches the queries into groups of `batch_size` -/// and sends them in a single request. All requests are sent concurrently to the indexer. The -/// function returns a map of deployment-block number pairs to the Public POIs of the indexers, or -/// an error if the request failed. -async fn send_requests( - client: &reqwest::Client, - status_url: &Url, - poi_requests: &[(DeploymentId, BlockNumber)], - batch_size: usize, -) -> HashMap<(DeploymentId, BlockNumber), Result> { - // Batch the POI queries into groups of `batch_size` - let request_batches = poi_requests.chunks(batch_size); - - // Create a request for each batch - let requests = request_batches.map(|batch| { - let status_url = status_url.clone(); - async move { - let response = indexers::public_poi::send_request(client, status_url, batch).await; - let result = match response { - Err(err) => { - // If the request failed, mark all deployment-block number pairs in the batch as - // failed. - return batch - .iter() - .map(|meta| (*meta, Err(err.clone()))) - .collect::>(); - } - Ok(res) => res, - }; - - // Construct a map of deployment IDs to responses - result - .into_iter() - .filter_map(|res| { - Some(( - (res.deployment, res.block.number), - Ok(res.proof_of_indexing?), - )) - }) - .collect::>() - } - }); - - // Send all requests concurrently - let responses = futures::future::join_all(requests).await; - - // Merge the responses into a single map - responses.into_iter().flatten().collect() -} diff --git a/src/network/indexer_processing.rs b/src/network/indexer_processing.rs index 4071d4b56..c06286368 100644 --- a/src/network/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use custom_debug::CustomDebug; use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId}; @@ -8,10 +8,7 @@ use url::Url; use crate::{ config::BlockedIndexer, errors::UnavailableReason, - network::{ - indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, - indexing_progress::IndexingProgressResolver, service::InternalState, - }, + network::{indexing_progress::IndexingProgressResolver, service::InternalState}, }; /// Internal representation of the indexer pre-processed information. @@ -223,13 +220,10 @@ async fn process_indexer_indexings( let mut healthy_indexer_indexings = indexer_indexings.keys().copied().collect::>(); // Check if the indexer's indexings should be blocked by POI - let blocked_indexings_by_poi = resolve_and_check_indexer_indexings_blocked_by_poi( - &state.poi_blocklist, - &state.poi_resolver, - url, - &healthy_indexer_indexings, - ) - .await; + let blocked_indexings_by_poi = state + .indexer_poi_filer + .blocked_deployments(url, &healthy_indexer_indexings) + .await; // Remove the blocked indexings from the healthy indexers list healthy_indexer_indexings.retain(|id| !blocked_indexings_by_poi.contains(id)); @@ -307,30 +301,6 @@ async fn process_indexer_indexings( .collect() } -/// Resolve and check if any of the indexer's indexings should be blocked by POI. -async fn resolve_and_check_indexer_indexings_blocked_by_poi( - blocklist: &PoiBlocklist, - resolver: &PoiResolver, - url: &Url, - indexings: &[DeploymentId], -) -> HashSet { - if blocklist.is_empty() { - return Default::default(); - } - - // Get the list of affected POIs to resolve for the indexer's deployments - // If none of the deployments are affected, the indexer must be ALLOWED - let indexer_affected_pois = blocklist.affected_pois_metadata(indexings); - if indexer_affected_pois.is_empty() { - return Default::default(); - } - - // Resolve the indexer public POIs for the affected deployments - let poi_result = resolver.resolve(url, &indexer_affected_pois).await; - - blocklist.check(poi_result) -} - /// Resolve the indexer's progress information. async fn resolve_indexer_progress( resolver: &IndexingProgressResolver, diff --git a/src/network/poi_filter.rs b/src/network/poi_filter.rs new file mode 100644 index 000000000..57470a728 --- /dev/null +++ b/src/network/poi_filter.rs @@ -0,0 +1,253 @@ +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use serde_with::serde_as; +use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; +use thegraph_graphql_http::http_client::ReqwestExt; +use tokio::time::Instant; +use url::Url; + +use super::GraphQlRequest; + +pub struct PoiFilter { + http: reqwest::Client, + blocklist: HashMap>, + cache: parking_lot::RwLock>, +} + +struct IndexerEntry { + pois: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing>, + last_refresh: Instant, +} + +impl PoiFilter { + pub fn new( + http: reqwest::Client, + blocklist: HashMap>, + ) -> Self { + Self { + http, + blocklist, + cache: Default::default(), + } + } + + pub async fn blocked_deployments( + &self, + url: &Url, + deployments: &[DeploymentId], + ) -> HashSet { + let requests: Vec<(DeploymentId, BlockNumber)> = self + .blocklist + .iter() + .filter(|(deployment, _)| deployments.contains(deployment)) + .flat_map(|(deployment, entries)| { + entries.iter().map(|(block, _)| (*deployment, *block)) + }) + .collect(); + let pois = self.resolve(url, requests).await; + + deployments + .iter() + .filter(|deployment| match self.blocklist.get(deployment) { + None => false, + Some(blocklist) => blocklist.iter().any(|(block, poi)| { + pois.get(&(**deployment, *block)) + .map(|poi_| poi == poi_) + .unwrap_or(true) + }), + }) + .cloned() + .collect() + } + + /// Fetch public PoIs, such that results are cached indefinitely and refreshed every 20 minutes. + async fn resolve( + &self, + url: &Url, + requests: Vec<(DeploymentId, BlockNumber)>, + ) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { + let url_string = url.to_string(); + + let mut results: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = Default::default(); + let mut refresh = false; + { + let cache = self.cache.read(); + if let Some(indexer) = cache.get(&url_string.to_string()) { + refresh = indexer.last_refresh.elapsed() > Duration::from_secs(20 * 60); + for key in &requests { + if let Some(poi) = indexer.pois.get(key) { + results.insert(*key, *poi); + } + } + } + } + + let updates: Vec<(DeploymentId, u64)> = if refresh { + requests.clone() + } else { + requests + .iter() + .filter(|r| !results.contains_key(r)) + .cloned() + .collect() + }; + if updates.is_empty() { + return results; + } + let fetched: HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> = + send_requests(&self.http, url, updates).await; + { + let now = Instant::now(); + let mut cache = self.cache.write(); + let indexer = cache.entry(url_string).or_insert_with(|| IndexerEntry { + pois: Default::default(), + last_refresh: now, + }); + indexer.last_refresh = now; + for (key, value) in &fetched { + indexer.pois.insert(*key, *value); + } + } + results.extend(fetched); + + results + } +} + +async fn send_requests( + http: &reqwest::Client, + status_url: &Url, + requests: Vec<(DeploymentId, BlockNumber)>, +) -> HashMap<(DeploymentId, BlockNumber), ProofOfIndexing> { + let request_batches = requests.chunks(10); + let requests = request_batches.map(|batch| { + let status_url = status_url.clone(); + async move { + let result = send_request(http, status_url.clone(), batch).await; + match result { + Ok(response) => response, + Err(poi_fetch_err) => { + tracing::warn!(%status_url, %poi_fetch_err); + Default::default() + } + } + } + }); + + let responses = futures::future::join_all(requests).await; + responses + .into_iter() + .flatten() + .filter_map(|response| { + Some(( + (response.deployment, response.block.number), + response.proof_of_indexing?, + )) + }) + .collect() +} + +async fn send_request( + http: &reqwest::Client, + status_url: Url, + pois: &[(DeploymentId, BlockNumber)], +) -> anyhow::Result> { + let query = r#" + query publicPois($requests: [PublicProofOfIndexingRequest!]!) { + publicProofsOfIndexing(requests: $requests) { + deployment + proofOfIndexing + block { number } + } + }"#; + let response = http + .post(status_url) + .send_graphql::(GraphQlRequest { + document: query.to_string(), + variables: serde_json::json!({ "requests": pois.iter().map(|(deployment, block)| serde_json::json!({ + "deployment": deployment, + "blockNumber": block, + })).collect::>() }), + }) + .await??; + Ok(response.public_proofs_of_indexing) +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct Response { + public_proofs_of_indexing: Vec, +} +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PublicProofOfIndexingResponse { + pub deployment: DeploymentId, + pub block: PartialBlockPtr, + pub proof_of_indexing: Option, +} +#[serde_as] +#[derive(Debug, serde::Deserialize)] +pub struct PartialBlockPtr { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub number: BlockNumber, +} + +#[cfg(test)] +mod tests { + use thegraph_core::{deployment_id, proof_of_indexing as poi}; + + use super::Response; + + #[test] + fn deserialize_public_pois_response() { + //* Given + let response = r#"{ + "publicProofsOfIndexing": [ + { + "deployment": "QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH", + "proofOfIndexing": "0xba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287", + "block": { + "number": "123" + } + }, + { + "deployment": "QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw", + "block": { + "number": "456" + } + } + ] + }"#; + + //* When + let response = serde_json::from_str::(response); + + //* Then + let response = response.expect("deserialization failed"); + + assert_eq!(response.public_proofs_of_indexing.len(), 2); + assert_eq!( + response.public_proofs_of_indexing[0].deployment, + deployment_id!("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH") + ); + assert_eq!( + response.public_proofs_of_indexing[0].proof_of_indexing, + Some(poi!( + "ba8a057796a81e013789789996551bb5b2920fb9947334db956992f7098bd287" + )) + ); + assert_eq!(response.public_proofs_of_indexing[0].block.number, 123); + assert_eq!( + response.public_proofs_of_indexing[1].deployment, + deployment_id!("QmawxQJ5U1JvgosoFVDyAwutLWxrckqVmBTQxaMaKoj3Lw") + ); + assert_eq!( + response.public_proofs_of_indexing[1].proof_of_indexing, + None + ); + assert_eq!(response.public_proofs_of_indexing[1].block.number, 456); + } +} diff --git a/src/network/service.rs b/src/network/service.rs index 7c72236a9..11222006c 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -19,10 +19,9 @@ use super::{ cost_model::CostModelResolver, errors::{DeploymentError, SubgraphError}, host_filter::HostFilter, - indexer_indexing_poi_blocklist::PoiBlocklist, - indexer_indexing_poi_resolver::PoiResolver, indexer_processing::{self, IndexerRawInfo}, indexing_progress::IndexingProgressResolver, + poi_filter::PoiFilter, snapshot::{self, Indexing, IndexingId, NetworkTopologySnapshot}, subgraph_client::Client as SubgraphClient, subgraph_processing::{DeploymentInfo, SubgraphInfo}, @@ -171,6 +170,22 @@ pub fn spawn( indexer_host_blocklist: HashSet, poi_blocklist: Vec, ) -> NetworkService { + let poi_blocklist = poi_blocklist + .iter() + .map(|entry| &entry.deployment) + .collect::>() + .into_iter() + .map(|deployment| { + ( + *deployment, + poi_blocklist + .iter() + .filter(|entry| &entry.deployment == deployment) + .map(|entry| (entry.block_number, entry.public_poi.into())) + .collect::>(), + ) + }) + .collect(); let internal_state = InternalState { indexer_blocklist, indexer_host_filter: HostFilter::new(indexer_host_blocklist) @@ -182,12 +197,7 @@ pub fn spawn( graph_node: min_graph_node_version, }, ), - poi_blocklist: PoiBlocklist::new(poi_blocklist), - poi_resolver: PoiResolver::new( - http.clone(), - Duration::from_secs(5), - Duration::from_secs(20 * 60), - ), + indexer_poi_filer: PoiFilter::new(http.clone(), poi_blocklist), indexing_progress_resolver: IndexingProgressResolver::new(http.clone()), cost_model_resolver: CostModelResolver::new(http.clone()), }; @@ -201,8 +211,7 @@ pub struct InternalState { pub indexer_blocklist: BTreeMap, pub indexer_host_filter: HostFilter, pub indexer_version_filter: VersionFilter, - pub poi_blocklist: PoiBlocklist, - pub poi_resolver: PoiResolver, + pub indexer_poi_filer: PoiFilter, pub indexing_progress_resolver: IndexingProgressResolver, pub cost_model_resolver: CostModelResolver, } diff --git a/src/ttl_hash_map.rs b/src/ttl_hash_map.rs deleted file mode 100644 index 01269015d..000000000 --- a/src/ttl_hash_map.rs +++ /dev/null @@ -1,396 +0,0 @@ -//! A hashmap with entries that expire after a given TTL. -//! -//!
-//! The hashmap expired entries are not automatically removed. You must call -//! [`cleanup`](TtlHashMap::cleanup) to remove the expired entries and release the unused memory. -//!
-use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; - -/// The default TTL for entries is [`Duration::MAX`]. -pub const DEFAULT_TTL: Duration = Duration::MAX; - -/// A hashmap with entries that expire after a given TTL. -#[derive(Clone)] -pub struct TtlHashMap { - ttl: Duration, - inner: HashMap, -} - -impl Default for TtlHashMap { - fn default() -> Self { - Self { - ttl: DEFAULT_TTL, - inner: Default::default(), - } - } -} - -impl TtlHashMap { - /// Create a new hashmap with the default TTL (see [`DEFAULT_TTL`]). - pub fn new() -> Self { - Default::default() - } - - /// Create a new hashmap with the given TTL. - pub fn with_ttl(ttl: Duration) -> Self { - Self { - ttl, - inner: Default::default(), - } - } - - /// Create a new hashmap with the given TTL and capacity. - pub fn with_ttl_and_capacity(ttl: Duration, capacity: usize) -> Self { - Self { - ttl, - inner: HashMap::with_capacity(capacity), - } - } -} - -impl TtlHashMap -where - K: Eq + std::hash::Hash, -{ - /// Insert a key-value pair into the hashmap. - /// - /// If the key already exists, the value is updated and the old value is returned. - /// Otherwise, `None` is returned. - pub fn insert(&mut self, key: K, value: V) -> Option { - let now = Instant::now(); - self.inner - .insert(key, (now, value)) - .and_then(|(timestamp, value)| { - if timestamp.elapsed() < self.ttl { - Some(value) - } else { - None - } - }) - } - - /// Get the value associated with the key. - /// - /// If the key is found and the entry has not expired, the value is returned. Otherwise, - /// `None` is returned. - #[must_use] - pub fn get(&self, key: &K) -> Option<&V> { - self.inner.get(key).and_then(|(timestamp, value)| { - if timestamp.elapsed() < self.ttl { - Some(value) - } else { - None - } - }) - } - - /// Remove the key and its associated value from the hashmap. - /// - /// If the key is found and the entry has not expired, the value is returned. Otherwise, - /// `None` is returned. - pub fn remove(&mut self, key: &K) -> Option { - self.inner.remove(key).and_then(|(timestamp, value)| { - if timestamp.elapsed() < self.ttl { - Some(value) - } else { - None - } - }) - } - - /// Returns the number of elements in the hashmap. - /// - /// This is the number of non-expired entries. - #[must_use] - pub fn len(&self) -> usize { - self.inner - .iter() - .filter(|(_, (timestamp, _))| timestamp.elapsed() < self.ttl) - .count() - } - - /// Returns whether the hashmap is empty. - /// - /// This is true if the hashmap is actually empty, or all its entries are expired. - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns the number of elements in the hashmap, including the expired ones. - /// - /// This is the number of all entries, expired and non-expired. - #[must_use] - pub fn len_all(&self) -> usize { - self.inner.len() - } - - /// Returns the current capacity of the hashmap. - #[must_use] - pub fn capacity(&self) -> usize { - self.inner.capacity() - } - - /// Shrinks the capacity of the hashmap as much as possible. - pub fn shrink_to_fit(&mut self) { - self.inner.shrink_to_fit(); - } - - /// Clear the hashmap, removing all entries. - pub fn clear(&mut self) { - self.inner.clear(); - } - - /// Cleanup the hashmap, removing all expired entries. - /// - /// After removing all expired entries, the inner hashmap is shrunk to fit the new capacity, - /// releasing the unused memory. - pub fn cleanup(&mut self) { - // Remove all expired entries - self.inner - .retain(|_, (timestamp, _)| timestamp.elapsed() < self.ttl); - - // Shrink the inner hashmap to fit the new size - self.shrink_to_fit(); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn insert_and_get_an_item() { - //* Given - let mut ttl_hash_map = TtlHashMap::new(); - - let key = "item"; - let value = 1337; - - //* When - ttl_hash_map.insert(key, value); - - //* Then - assert_eq!(ttl_hash_map.get(&key), Some(&value)); - assert_eq!(ttl_hash_map.len(), 1); - } - - #[test] - fn get_none_if_no_item_is_present() { - //* Given - let ttl_hash_map = TtlHashMap::<&str, ()>::new(); - - let key = "item"; - - //* When - let value = ttl_hash_map.get(&key); - - //* Then - assert_eq!(value, None); - } - - #[test] - fn get_none_if_the_item_is_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key = "item"; - let value = 1337; - - // Pre-populate the map - ttl_hash_map.insert(key, value); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - //* Then - assert_eq!(ttl_hash_map.get(&key), None); - assert_eq!(ttl_hash_map.len(), 0); - } - - #[test] - fn report_the_correct_length() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key_1 = "expired_item_1"; - let value_1 = 1337; - - let key_2 = "expired_item_2"; - let value_2 = 42; - - let key_3 = "non_expired_item"; - let value_3 = 69; - - // Pre-populate the map - ttl_hash_map.insert(key_1, value_1); - ttl_hash_map.insert(key_2, value_2); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - // Insert a new item with different key - ttl_hash_map.insert(key_3, value_3); - - //* Then - // One non-expired item and two expired items - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 3); - assert!(!ttl_hash_map.is_empty()); - } - - #[test] - fn insert_an_item_and_return_the_old_value_if_not_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::new(); - - let key = "item"; - let old_value = 1337; - let new_value = 42; - - // Pre-populate the map - ttl_hash_map.insert(key, old_value); - - //* When - let returned_old_value = ttl_hash_map.insert(key, new_value); - - //* Then - assert_eq!(returned_old_value, Some(old_value)); - assert_eq!(ttl_hash_map.get(&key), Some(&new_value)); - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 1); - assert!(!ttl_hash_map.is_empty()); - } - - #[test] - fn insert_an_item_and_return_none_if_the_old_value_is_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key = "item"; - let old_value = 1337; - let new_value = 42; - - // Pre-populate the map - ttl_hash_map.insert(key, old_value); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - let returned_old_value = ttl_hash_map.insert(key, new_value); - - //* Then - assert_eq!(returned_old_value, None); - assert_eq!(ttl_hash_map.get(&key), Some(&new_value)); - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 1); - assert!(!ttl_hash_map.is_empty()); - } - - #[test] - fn remove_an_item_and_return_it_if_not_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::new(); - - let key = "item"; - let value = 1337; - - // Pre-populate the map - ttl_hash_map.insert(key, value); - - //* When - let removed_value = ttl_hash_map.remove(&key); - - //* Then - assert_eq!(removed_value, Some(value)); - assert_eq!(ttl_hash_map.get(&key), None); - assert_eq!(ttl_hash_map.len(), 0); - assert_eq!(ttl_hash_map.len_all(), 0); - assert!(ttl_hash_map.is_empty()); - } - - #[test] - fn remove_an_item_and_return_none_if_expired() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl(Duration::from_millis(5)); - - let key = "item"; - let value = 1337; - - // Pre-populate the map - ttl_hash_map.insert(key, value); - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - let removed_value = ttl_hash_map.remove(&key); - - //* Then - assert_eq!(removed_value, None); - assert_eq!(ttl_hash_map.get(&key), None); - assert_eq!(ttl_hash_map.len(), 0); - assert_eq!(ttl_hash_map.len_all(), 0); - assert!(ttl_hash_map.is_empty()); - } - - #[test] - fn clear_the_hashmap() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl_and_capacity(Duration::from_millis(5), 5); - - let key_1 = "item_1"; - let value_1 = 1337; - - let key_2 = "item_2"; - let value_2 = 42; - - // Pre-populate the map - ttl_hash_map.insert(key_1, value_1); - ttl_hash_map.insert(key_2, value_2); - - //* When - ttl_hash_map.clear(); - - //* Then - assert_eq!(ttl_hash_map.len(), 0); - assert_eq!(ttl_hash_map.len_all(), 0); - assert!(ttl_hash_map.is_empty()); - } - - #[test] - fn cleanup_the_hashmap_and_shrink_to_fit() { - //* Given - let mut ttl_hash_map = TtlHashMap::with_ttl_and_capacity(Duration::from_millis(5), 5); - - let key = "non_expired_item"; - let value = 69; - - // Pre-populate the map with 100 items - for i in 0..100 { - ttl_hash_map.insert(format!("expired_item_{i}"), i); - } - - //* When - // Wait for the TTL to expire - std::thread::sleep(Duration::from_millis(10)); - - // Insert a new item with different key - ttl_hash_map.insert(key.to_string(), value); - - // Remove expired entries and shrink the hashmap to fit the new size - ttl_hash_map.cleanup(); - - //* Then - assert_eq!(ttl_hash_map.len(), 1); - assert_eq!(ttl_hash_map.len_all(), 1); - assert!(ttl_hash_map.capacity() < 100); - assert!(!ttl_hash_map.is_empty()); - } -}