diff --git a/src/network/indexer_processing.rs b/src/network/indexer_processing.rs index ed01b25c..1b1f5e74 100644 --- a/src/network/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -155,7 +155,13 @@ pub async fn process_info( return (*indexer_id, Err(err)); } - let blocklist = state.indexer_blocklist.get(&*indexer.id); + let blocklist = state + .indexer_blocklist + .borrow() + .get(&*indexer.id) + .cloned() + .unwrap_or_default(); + // Resolve the indexer's indexings information let indexings = process_indexer_indexings( state, @@ -190,20 +196,18 @@ async fn process_indexer_indexings( state: &InternalState, url: &Url, indexings: HashMap, - blocklist: Option<&HashSet>, + blocklist: HashSet, ) -> HashMap> { let mut indexer_indexings: HashMap, _>> = indexings .into_iter() .map(|(id, info)| (id, Ok(info.into()))) .collect(); - if let Some(blocklist) = blocklist { - for deployment in blocklist { - indexer_indexings.insert( - *deployment, - Err(UnavailableReason::Blocked("missing data".to_string())), - ); - } + for deployment in blocklist { + indexer_indexings.insert( + deployment, + Err(UnavailableReason::Blocked("missing data".to_string())), + ); } // ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2 diff --git a/src/network/poi_filter.rs b/src/network/poi_filter.rs index c1e17f23..cfadd9fd 100644 --- a/src/network/poi_filter.rs +++ b/src/network/poi_filter.rs @@ -6,14 +6,14 @@ use std::{ use serde_with::serde_as; use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing}; use thegraph_graphql_http::http_client::ReqwestExt; -use tokio::time::Instant; +use tokio::{sync::watch, time::Instant}; use url::Url; use super::GraphQlRequest; pub struct PoiFilter { http: reqwest::Client, - blocklist: HashMap>, + blocklist: watch::Receiver>>, cache: parking_lot::RwLock>, } @@ -25,7 +25,7 @@ struct IndexerEntry { impl PoiFilter { pub fn new( http: reqwest::Client, - blocklist: HashMap>, + blocklist: watch::Receiver>>, ) -> Self { Self { http, @@ -39,8 +39,9 @@ impl PoiFilter { status_url: &Url, deployments: &[DeploymentId], ) -> HashSet { - let requests: Vec<(DeploymentId, BlockNumber)> = self - .blocklist + let blocklist = self.blocklist.borrow().clone(); + + let requests: Vec<(DeploymentId, BlockNumber)> = blocklist .iter() .filter(|(deployment, _)| deployments.contains(deployment)) .flat_map(|(deployment, entries)| { @@ -51,7 +52,7 @@ impl PoiFilter { deployments .iter() - .filter(|deployment| match self.blocklist.get(deployment) { + .filter(|deployment| match blocklist.get(deployment) { None => false, Some(blocklist) => blocklist.iter().any(|(block, poi)| { pois.get(&(**deployment, *block)) @@ -208,6 +209,7 @@ mod tests { alloy::{hex, primitives::FixedBytes}, DeploymentId, }; + use tokio::sync::watch; use url::Url; use crate::init_logging; @@ -255,7 +257,7 @@ mod tests { }); let blocklist = HashMap::from([(deployment, vec![(0, bad_poi.into())])]); - let poi_filter = super::PoiFilter::new(reqwest::Client::new(), blocklist); + let poi_filter = super::PoiFilter::new(reqwest::Client::new(), watch::channel(blocklist).1); let status_url = indexer_url.join("status").unwrap(); let assert_blocked = |blocked: Vec| async { diff --git a/src/network/service.rs b/src/network/service.rs index 835516d9..6140af37 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -194,6 +194,8 @@ pub fn spawn( } }; } + let (_, poi_blocklist) = watch::channel(poi_blocklist); + let (_, indexer_blocklist) = watch::channel(indexer_blocklist); let internal_state = InternalState { indexer_blocklist, @@ -216,7 +218,7 @@ pub fn spawn( } pub struct InternalState { - pub indexer_blocklist: HashMap>, + pub indexer_blocklist: watch::Receiver>>, pub indexer_host_filter: HostFilter, pub indexer_version_filter: VersionFilter, pub indexer_poi_filer: PoiFilter,