diff --git a/src/config.rs b/src/config.rs index c2ad61e1..f967eb89 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,3 @@ -//! The Graph Gateway configuration. - use std::{ collections::{BTreeMap, HashSet}, path::{Path, PathBuf}, @@ -26,9 +24,9 @@ pub struct Config { #[serde(default)] pub api_keys: Option, pub attestations: AttestationConfig, - /// List of indexer addresses to block. This should only be used temprorarily. + /// Blocklist applying to indexers. #[serde(default)] - pub blocked_indexers: BTreeMap, + pub blocklist: Vec, /// Chain aliases #[serde(default)] pub chain_aliases: BTreeMap, @@ -53,9 +51,6 @@ pub struct Config { pub trusted_indexers: Vec, /// Check payment state of client (disable for testnets) pub payment_required: bool, - /// POI blocklist - #[serde(default)] - pub poi_blocklist: Vec, /// public API port pub port_api: u16, /// private metrics port @@ -96,11 +91,28 @@ pub enum ApiKeys { Fixed(Vec), } -#[derive(Deserialize)] -pub struct BlockedIndexer { - /// empty array blocks on all deployments - pub deployments: Vec, - pub reason: String, +#[derive(Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum BlocklistEntry { + Poi { + deployment: DeploymentId, + info: BlocklistInfo, + public_poi: B256, + block: BlockNumber, + }, + Other { + deployment: DeploymentId, + info: BlocklistInfo, + indexer: Address, + }, +} + +#[derive(Clone, Deserialize, Serialize)] +pub struct BlocklistInfo { + /// Example query (should be minimal to reproduce bad response) + query: Option, + /// Bad query response, from the above query executed on indexers with this blocked PoI + bad_query_response: Option, } /// Attestation configuration. @@ -171,17 +183,6 @@ pub struct Receipts { pub verifier: Address, } -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct BlockedPoi { - pub public_poi: B256, - pub deployment: DeploymentId, - pub block_number: BlockNumber, - /// Example query (should be minimal to reproduce bad response) - pub query: Option, - /// Bad query response, from the above query executed on indexers with this blocked PoI - pub bad_query_response: Option, -} - /// Load the configuration from a JSON file. pub fn load_from_file(path: &Path) -> anyhow::Result { let config_content = std::fs::read_to_string(path)?; diff --git a/src/main.rs b/src/main.rs index 93c185de..9c7ac855 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,6 +40,7 @@ use budgets::{Budgeter, USD}; use chains::Chains; use client_query::context::Context; use config::{ApiKeys, ExchangeRateProvider}; +use headers::ContentType; use indexer_client::IndexerClient; use indexing_performance::IndexingPerformance; use middleware::{ @@ -52,6 +53,7 @@ use thegraph_core::{ alloy::{dyn_abi::Eip712Domain, primitives::ChainId, signers::local::PrivateKeySigner}, attestation, }; +use thegraph_headers::HttpBuilderExt; use tokio::{net::TcpListener, signal::unix::SignalKind, sync::watch}; use tower_http::cors::{self, CorsLayer}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -112,11 +114,10 @@ async fn main() { let mut network = network::service::spawn( http_client.clone(), network_subgraph_client, + conf.blocklist.clone(), conf.min_indexer_version, conf.min_graph_node_version, - conf.blocked_indexers, indexer_host_blocklist, - conf.poi_blocklist.clone(), ); let indexing_perf = IndexingPerformance::new(network.clone()); network.wait_until_ready().await; @@ -157,7 +158,7 @@ async fn main() { reporter, }; - let poi_blocklist: &'static str = serde_json::to_string(&conf.poi_blocklist).unwrap().leak(); + let blocklist: &'static str = serde_json::to_string(&conf.blocklist).unwrap().leak(); // Host metrics on a separate server with a port that isn't open to public requests. tokio::spawn(async move { @@ -226,8 +227,11 @@ async fn main() { .route( "/blocklist", routing::get(move || async move { - let headers = [(reqwest::header::CONTENT_TYPE, "application/json")]; - (headers, poi_blocklist) + axum::http::Response::builder() + .status(StatusCode::OK) + .header_typed(ContentType::json()) + .body(blocklist.to_string()) + .unwrap() }), ) .nest("/api", api); diff --git a/src/network/indexer_processing.rs b/src/network/indexer_processing.rs index 6c710001..ed01b25c 100644 --- a/src/network/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use custom_debug::CustomDebug; use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId}; @@ -6,7 +6,6 @@ use tracing::Instrument; use url::Url; use crate::{ - config::BlockedIndexer, errors::UnavailableReason, network::{indexing_progress::IndexingProgressResolver, service::InternalState}, }; @@ -191,29 +190,21 @@ async fn process_indexer_indexings( state: &InternalState, url: &Url, indexings: HashMap, - blocklist: Option<&BlockedIndexer>, + blocklist: Option<&HashSet>, ) -> HashMap> { let mut indexer_indexings: HashMap, _>> = indexings .into_iter() .map(|(id, info)| (id, Ok(info.into()))) .collect(); - match blocklist { - None => (), - Some(blocklist) if blocklist.deployments.is_empty() => { - for entry in indexer_indexings.values_mut() { - *entry = Err(UnavailableReason::Blocked(blocklist.reason.clone())); - } - } - Some(blocklist) => { - for deployment in &blocklist.deployments { - indexer_indexings.insert( - *deployment, - Err(UnavailableReason::Blocked(blocklist.reason.clone())), - ); - } + if let Some(blocklist) = blocklist { + for deployment in blocklist { + indexer_indexings.insert( + *deployment, + Err(UnavailableReason::Blocked("missing data".to_string())), + ); } - }; + } // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 let status_url = url.join("status").unwrap(); diff --git a/src/network/service.rs b/src/network/service.rs index 626e4803..835516d9 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -3,7 +3,7 @@ //! query processing pipeline use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{HashMap, HashSet}, time::Duration, }; @@ -11,7 +11,7 @@ use ipnetwork::IpNetwork; use semver::Version; use thegraph_core::{ alloy::primitives::{Address, BlockNumber}, - DeploymentId, IndexerId, SubgraphId, + DeploymentId, IndexerId, ProofOfIndexing, SubgraphId, }; use tokio::{sync::watch, time::MissedTickBehavior}; @@ -28,10 +28,7 @@ use super::{ version_filter::{MinimumVersionRequirements, VersionFilter}, DeploymentError, SubgraphError, }; -use crate::{ - config::{BlockedIndexer, BlockedPoi}, - errors::UnavailableReason, -}; +use crate::{config::BlocklistEntry, errors::UnavailableReason}; /// Subgraph resolution information returned by the [`NetworkService`]. pub struct ResolvedSubgraphInfo { @@ -164,28 +161,40 @@ impl NetworkService { pub fn spawn( http: reqwest::Client, subgraph_client: SubgraphClient, + blocklist: Vec, min_indexer_service_version: Version, min_graph_node_version: Version, - indexer_blocklist: BTreeMap, indexer_host_blocklist: HashSet, - poi_blocklist: Vec, ) -> NetworkService { - let poi_blocklist = poi_blocklist - .iter() - .map(|entry| &entry.deployment) - .collect::>() - .into_iter() - .map(|deployment| { - ( - *deployment, + let mut poi_blocklist: HashMap> = + Default::default(); + let mut indexer_blocklist: HashMap> = Default::default(); + for entry in blocklist { + match entry { + BlocklistEntry::Poi { + deployment, + block, + public_poi, + .. + } => { poi_blocklist - .iter() - .filter(|entry| &entry.deployment == deployment) - .map(|entry| (entry.block_number, entry.public_poi.into())) - .collect::>(), - ) - }) - .collect(); + .entry(deployment) + .or_default() + .push((block, public_poi.into())); + } + BlocklistEntry::Other { + deployment, + indexer, + .. + } => { + indexer_blocklist + .entry(indexer) + .or_default() + .insert(deployment); + } + }; + } + let internal_state = InternalState { indexer_blocklist, indexer_host_filter: HostFilter::new(indexer_host_blocklist) @@ -207,7 +216,7 @@ pub fn spawn( } pub struct InternalState { - pub indexer_blocklist: BTreeMap, + pub indexer_blocklist: HashMap>, pub indexer_host_filter: HostFilter, pub indexer_version_filter: VersionFilter, pub indexer_poi_filer: PoiFilter,