Skip to content

Commit

Permalink
use watch channel for blocklist
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 10, 2024
1 parent 3b9d5a2 commit cef8715
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
22 changes: 13 additions & 9 deletions src/network/indexer_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,13 @@ pub async fn process_info(
return (*indexer_id, Err(err));
}

let blocklist = state.indexer_blocklist.get(&*indexer.id);
let blocklist = state
.indexer_blocklist
.borrow()
.get(&*indexer.id)
.cloned()
.unwrap_or_default();

// Resolve the indexer's indexings information
let indexings = process_indexer_indexings(
state,
Expand Down Expand Up @@ -190,20 +196,18 @@ async fn process_indexer_indexings(
state: &InternalState,
url: &Url,
indexings: HashMap<DeploymentId, IndexingRawInfo>,
blocklist: Option<&HashSet<DeploymentId>>,
blocklist: HashSet<DeploymentId>,
) -> HashMap<DeploymentId, Result<ResolvedIndexingInfo, UnavailableReason>> {
let mut indexer_indexings: HashMap<DeploymentId, Result<IndexingInfo<(), ()>, _>> = indexings
.into_iter()
.map(|(id, info)| (id, Ok(info.into())))
.collect();

if let Some(blocklist) = blocklist {
for deployment in blocklist {
indexer_indexings.insert(
*deployment,
Err(UnavailableReason::Blocked("missing data".to_string())),
);
}
for deployment in blocklist {
indexer_indexings.insert(
deployment,
Err(UnavailableReason::Blocked("missing data".to_string())),
);
}

// ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2
Expand Down
16 changes: 9 additions & 7 deletions src/network/poi_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::{
use serde_with::serde_as;
use thegraph_core::{alloy::primitives::BlockNumber, DeploymentId, ProofOfIndexing};
use thegraph_graphql_http::http_client::ReqwestExt;
use tokio::time::Instant;
use tokio::{sync::watch, time::Instant};
use url::Url;

use super::GraphQlRequest;

pub struct PoiFilter {
http: reqwest::Client,
blocklist: HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>>,
blocklist: watch::Receiver<HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>>>,
cache: parking_lot::RwLock<HashMap<String, IndexerEntry>>,
}

Expand All @@ -25,7 +25,7 @@ struct IndexerEntry {
impl PoiFilter {
pub fn new(
http: reqwest::Client,
blocklist: HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>>,
blocklist: watch::Receiver<HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>>>,
) -> Self {
Self {
http,
Expand All @@ -39,8 +39,9 @@ impl PoiFilter {
status_url: &Url,
deployments: &[DeploymentId],
) -> HashSet<DeploymentId> {
let requests: Vec<(DeploymentId, BlockNumber)> = self
.blocklist
let blocklist = self.blocklist.borrow().clone();

let requests: Vec<(DeploymentId, BlockNumber)> = blocklist
.iter()
.filter(|(deployment, _)| deployments.contains(deployment))
.flat_map(|(deployment, entries)| {
Expand All @@ -51,7 +52,7 @@ impl PoiFilter {

deployments
.iter()
.filter(|deployment| match self.blocklist.get(deployment) {
.filter(|deployment| match blocklist.get(deployment) {
None => false,
Some(blocklist) => blocklist.iter().any(|(block, poi)| {
pois.get(&(**deployment, *block))
Expand Down Expand Up @@ -208,6 +209,7 @@ mod tests {
alloy::{hex, primitives::FixedBytes},
DeploymentId,
};
use tokio::sync::watch;
use url::Url;

use crate::init_logging;
Expand Down Expand Up @@ -255,7 +257,7 @@ mod tests {
});

let blocklist = HashMap::from([(deployment, vec![(0, bad_poi.into())])]);
let poi_filter = super::PoiFilter::new(reqwest::Client::new(), blocklist);
let poi_filter = super::PoiFilter::new(reqwest::Client::new(), watch::channel(blocklist).1);

let status_url = indexer_url.join("status").unwrap();
let assert_blocked = |blocked: Vec<DeploymentId>| async {
Expand Down
4 changes: 3 additions & 1 deletion src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ pub fn spawn(
}
};
}
let (_, poi_blocklist) = watch::channel(poi_blocklist);
let (_, indexer_blocklist) = watch::channel(indexer_blocklist);

let internal_state = InternalState {
indexer_blocklist,
Expand All @@ -216,7 +218,7 @@ pub fn spawn(
}

pub struct InternalState {
pub indexer_blocklist: HashMap<Address, HashSet<DeploymentId>>,
pub indexer_blocklist: watch::Receiver<HashMap<Address, HashSet<DeploymentId>>>,
pub indexer_host_filter: HostFilter,
pub indexer_version_filter: VersionFilter,
pub indexer_poi_filer: PoiFilter,
Expand Down

0 comments on commit cef8715

Please sign in to comment.