diff --git a/src/main.rs b/src/main.rs index 9c7ac855..06eac060 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,21 +39,19 @@ use axum::{ use budgets::{Budgeter, USD}; use chains::Chains; use client_query::context::Context; -use config::{ApiKeys, ExchangeRateProvider}; -use headers::ContentType; +use config::{ApiKeys, BlocklistEntry, ExchangeRateProvider}; use indexer_client::IndexerClient; use indexing_performance::IndexingPerformance; use middleware::{ legacy_auth_adapter, RequestTracingLayer, RequireAuthorizationLayer, SetRequestIdLayer, }; -use network::subgraph_client::Client as SubgraphClient; +use network::{indexer_blocklist, subgraph_client::Client as SubgraphClient}; use prometheus::{self, Encoder as _}; use receipts::ReceiptSigner; use thegraph_core::{ alloy::{dyn_abi::Eip712Domain, primitives::ChainId, signers::local::PrivateKeySigner}, attestation, }; -use thegraph_headers::HttpBuilderExt; use tokio::{net::TcpListener, signal::unix::SignalKind, sync::watch}; use tower_http::cors::{self, CorsLayer}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -111,10 +109,11 @@ async fn main() { } None => Default::default(), }; + let indexer_blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist); let mut network = network::service::spawn( http_client.clone(), network_subgraph_client, - conf.blocklist.clone(), + indexer_blocklist.clone(), conf.min_indexer_version, conf.min_graph_node_version, indexer_host_blocklist, @@ -158,7 +157,7 @@ async fn main() { reporter, }; - let blocklist: &'static str = serde_json::to_string(&conf.blocklist).unwrap().leak(); + let blocklist: watch::Receiver> = indexer_blocklist.blocklist; // Host metrics on a separate server with a port that isn't open to public requests. tokio::spawn(async move { @@ -226,13 +225,7 @@ async fn main() { .route("/ready", routing::get(|| async { "Ready" })) .route( "/blocklist", - routing::get(move || async move { - axum::http::Response::builder() - .status(StatusCode::OK) - .header_typed(ContentType::json()) - .body(blocklist.to_string()) - .unwrap() - }), + routing::get(move || async move { axum::Json(blocklist.borrow().clone()) }), ) .nest("/api", api); diff --git a/src/network.rs b/src/network.rs index 03bb9051..ac9114eb 100644 --- a/src/network.rs +++ b/src/network.rs @@ -4,6 +4,7 @@ use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariable pub mod cost_model; pub mod host_filter; +pub mod indexer_blocklist; mod indexer_processing; pub mod indexing_progress; pub mod poi_filter; diff --git a/src/network/indexer_blocklist.rs b/src/network/indexer_blocklist.rs new file mode 100644 index 00000000..5f6dbdce --- /dev/null +++ b/src/network/indexer_blocklist.rs @@ -0,0 +1,78 @@ +use std::collections::{HashMap, HashSet}; + +use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing}; +use tokio::sync::watch; + +use crate::config::BlocklistEntry; + +#[derive(Clone)] +pub struct Blocklist { + pub blocklist: watch::Receiver>, + pub poi: watch::Receiver>>, + pub indexer: watch::Receiver>>, +} + +impl Blocklist { + pub fn spawn(init: Vec) -> Self { + let (blocklist_tx, blocklist_rx) = watch::channel(Default::default()); + let (poi_tx, poi_rx) = watch::channel(Default::default()); + let (indexer_tx, indexer_rx) = watch::channel(Default::default()); + let mut actor = Actor { + blocklist: blocklist_tx, + poi: poi_tx, + indexer: indexer_tx, + }; + for entry in init { + actor.add_entry(entry); + } + tokio::spawn(async move { + actor.run().await; + }); + Self { + blocklist: blocklist_rx, + poi: poi_rx, + indexer: indexer_rx, + } + } +} + +struct Actor { + pub blocklist: watch::Sender>, + pub poi: watch::Sender>>, + pub indexer: watch::Sender>>, +} + +impl Actor { + async fn run(&mut self) { + todo!(); + } + + fn add_entry(&mut self, entry: BlocklistEntry) { + match entry { + BlocklistEntry::Poi { + deployment, + block, + public_poi, + .. + } => { + self.poi.send_modify(move |blocklist| { + blocklist + .entry(deployment) + .or_default() + .push((block, public_poi.into())); + }); + } + BlocklistEntry::Other { + deployment, + indexer, + .. + } => { + self.indexer.send_modify(move |blocklist| { + blocklist.entry(indexer).or_default().insert(deployment); + }); + } + }; + self.blocklist + .send_modify(move |blocklist| blocklist.push(entry)); + } +} diff --git a/src/network/service.rs b/src/network/service.rs index 6140af37..a337a0de 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -11,13 +11,14 @@ use ipnetwork::IpNetwork; use semver::Version; use thegraph_core::{ alloy::primitives::{Address, BlockNumber}, - DeploymentId, IndexerId, ProofOfIndexing, SubgraphId, + DeploymentId, IndexerId, SubgraphId, }; use tokio::{sync::watch, time::MissedTickBehavior}; use super::{ cost_model::CostModelResolver, host_filter::HostFilter, + indexer_blocklist, indexer_processing::{self, IndexerRawInfo}, indexing_progress::IndexingProgressResolver, poi_filter::PoiFilter, @@ -28,7 +29,7 @@ use super::{ version_filter::{MinimumVersionRequirements, VersionFilter}, DeploymentError, SubgraphError, }; -use crate::{config::BlocklistEntry, errors::UnavailableReason}; +use crate::errors::UnavailableReason; /// Subgraph resolution information returned by the [`NetworkService`]. pub struct ResolvedSubgraphInfo { @@ -161,44 +162,13 @@ impl NetworkService { pub fn spawn( http: reqwest::Client, subgraph_client: SubgraphClient, - blocklist: Vec, + indexer_blocklist: indexer_blocklist::Blocklist, min_indexer_service_version: Version, min_graph_node_version: Version, indexer_host_blocklist: HashSet, ) -> NetworkService { - let mut poi_blocklist: HashMap> = - Default::default(); - let mut indexer_blocklist: HashMap> = Default::default(); - for entry in blocklist { - match entry { - BlocklistEntry::Poi { - deployment, - block, - public_poi, - .. - } => { - poi_blocklist - .entry(deployment) - .or_default() - .push((block, public_poi.into())); - } - BlocklistEntry::Other { - deployment, - indexer, - .. - } => { - indexer_blocklist - .entry(indexer) - .or_default() - .insert(deployment); - } - }; - } - let (_, poi_blocklist) = watch::channel(poi_blocklist); - let (_, indexer_blocklist) = watch::channel(indexer_blocklist); - let internal_state = InternalState { - indexer_blocklist, + indexer_blocklist: indexer_blocklist.indexer, indexer_host_filter: HostFilter::new(indexer_host_blocklist) .expect("failed to create host resolver"), indexer_version_filter: VersionFilter::new( @@ -208,7 +178,7 @@ pub fn spawn( graph_node: min_graph_node_version, }, ), - indexer_poi_filer: PoiFilter::new(http.clone(), poi_blocklist), + indexer_poi_filer: PoiFilter::new(http.clone(), indexer_blocklist.poi), indexing_progress_resolver: IndexingProgressResolver::new(http.clone()), cost_model_resolver: CostModelResolver::new(http.clone()), };