Skip to content

Commit

Permalink
move indexer blocklist to actor
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 12, 2024
1 parent cef8715 commit 7543ecf
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 49 deletions.
19 changes: 6 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,19 @@ use axum::{
use budgets::{Budgeter, USD};
use chains::Chains;
use client_query::context::Context;
use config::{ApiKeys, ExchangeRateProvider};
use headers::ContentType;
use config::{ApiKeys, BlocklistEntry, ExchangeRateProvider};
use indexer_client::IndexerClient;
use indexing_performance::IndexingPerformance;
use middleware::{
legacy_auth_adapter, RequestTracingLayer, RequireAuthorizationLayer, SetRequestIdLayer,
};
use network::subgraph_client::Client as SubgraphClient;
use network::{indexer_blocklist, subgraph_client::Client as SubgraphClient};
use prometheus::{self, Encoder as _};
use receipts::ReceiptSigner;
use thegraph_core::{
alloy::{dyn_abi::Eip712Domain, primitives::ChainId, signers::local::PrivateKeySigner},
attestation,
};
use thegraph_headers::HttpBuilderExt;
use tokio::{net::TcpListener, signal::unix::SignalKind, sync::watch};
use tower_http::cors::{self, CorsLayer};
use tracing_subscriber::{prelude::*, EnvFilter};
Expand Down Expand Up @@ -111,10 +109,11 @@ async fn main() {
}
None => Default::default(),
};
let indexer_blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist);
let mut network = network::service::spawn(
http_client.clone(),
network_subgraph_client,
conf.blocklist.clone(),
indexer_blocklist.clone(),
conf.min_indexer_version,
conf.min_graph_node_version,
indexer_host_blocklist,
Expand Down Expand Up @@ -158,7 +157,7 @@ async fn main() {
reporter,
};

let blocklist: &'static str = serde_json::to_string(&conf.blocklist).unwrap().leak();
let blocklist: watch::Receiver<Vec<BlocklistEntry>> = indexer_blocklist.blocklist;

// Host metrics on a separate server with a port that isn't open to public requests.
tokio::spawn(async move {
Expand Down Expand Up @@ -226,13 +225,7 @@ async fn main() {
.route("/ready", routing::get(|| async { "Ready" }))
.route(
"/blocklist",
routing::get(move || async move {
axum::http::Response::builder()
.status(StatusCode::OK)
.header_typed(ContentType::json())
.body(blocklist.to_string())
.unwrap()
}),
routing::get(move || async move { axum::Json(blocklist.borrow().clone()) }),
)
.nest("/api", api);

Expand Down
1 change: 1 addition & 0 deletions src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariable

pub mod cost_model;
pub mod host_filter;
pub mod indexer_blocklist;
mod indexer_processing;
pub mod indexing_progress;
pub mod poi_filter;
Expand Down
78 changes: 78 additions & 0 deletions src/network/indexer_blocklist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::collections::{HashMap, HashSet};

use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing};
use tokio::sync::watch;

use crate::config::BlocklistEntry;

#[derive(Clone)]
pub struct Blocklist {
pub blocklist: watch::Receiver<Vec<BlocklistEntry>>,
pub poi: watch::Receiver<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
pub indexer: watch::Receiver<HashMap<Address, HashSet<DeploymentId>>>,
}

impl Blocklist {
pub fn spawn(init: Vec<BlocklistEntry>) -> Self {
let (blocklist_tx, blocklist_rx) = watch::channel(Default::default());
let (poi_tx, poi_rx) = watch::channel(Default::default());
let (indexer_tx, indexer_rx) = watch::channel(Default::default());
let mut actor = Actor {
blocklist: blocklist_tx,
poi: poi_tx,
indexer: indexer_tx,
};
for entry in init {
actor.add_entry(entry);
}
tokio::spawn(async move {
actor.run().await;
});
Self {
blocklist: blocklist_rx,
poi: poi_rx,
indexer: indexer_rx,
}
}
}

struct Actor {
pub blocklist: watch::Sender<Vec<BlocklistEntry>>,
pub poi: watch::Sender<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
pub indexer: watch::Sender<HashMap<Address, HashSet<DeploymentId>>>,
}

impl Actor {
async fn run(&mut self) {
todo!();
}

fn add_entry(&mut self, entry: BlocklistEntry) {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
self.poi.send_modify(move |blocklist| {
blocklist
.entry(deployment)
.or_default()
.push((block, public_poi.into()));
});
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
self.indexer.send_modify(move |blocklist| {
blocklist.entry(indexer).or_default().insert(deployment);
});
}
};
self.blocklist
.send_modify(move |blocklist| blocklist.push(entry));
}
}
42 changes: 6 additions & 36 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use ipnetwork::IpNetwork;
use semver::Version;
use thegraph_core::{
alloy::primitives::{Address, BlockNumber},
DeploymentId, IndexerId, ProofOfIndexing, SubgraphId,
DeploymentId, IndexerId, SubgraphId,
};
use tokio::{sync::watch, time::MissedTickBehavior};

use super::{
cost_model::CostModelResolver,
host_filter::HostFilter,
indexer_blocklist,
indexer_processing::{self, IndexerRawInfo},
indexing_progress::IndexingProgressResolver,
poi_filter::PoiFilter,
Expand All @@ -28,7 +29,7 @@ use super::{
version_filter::{MinimumVersionRequirements, VersionFilter},
DeploymentError, SubgraphError,
};
use crate::{config::BlocklistEntry, errors::UnavailableReason};
use crate::errors::UnavailableReason;

/// Subgraph resolution information returned by the [`NetworkService`].
pub struct ResolvedSubgraphInfo {
Expand Down Expand Up @@ -161,44 +162,13 @@ impl NetworkService {
pub fn spawn(
http: reqwest::Client,
subgraph_client: SubgraphClient,
blocklist: Vec<BlocklistEntry>,
indexer_blocklist: indexer_blocklist::Blocklist,
min_indexer_service_version: Version,
min_graph_node_version: Version,
indexer_host_blocklist: HashSet<IpNetwork>,
) -> NetworkService {
let mut poi_blocklist: HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>> =
Default::default();
let mut indexer_blocklist: HashMap<Address, HashSet<DeploymentId>> = Default::default();
for entry in blocklist {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
poi_blocklist
.entry(deployment)
.or_default()
.push((block, public_poi.into()));
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
indexer_blocklist
.entry(indexer)
.or_default()
.insert(deployment);
}
};
}
let (_, poi_blocklist) = watch::channel(poi_blocklist);
let (_, indexer_blocklist) = watch::channel(indexer_blocklist);

let internal_state = InternalState {
indexer_blocklist,
indexer_blocklist: indexer_blocklist.indexer,
indexer_host_filter: HostFilter::new(indexer_host_blocklist)
.expect("failed to create host resolver"),
indexer_version_filter: VersionFilter::new(
Expand All @@ -208,7 +178,7 @@ pub fn spawn(
graph_node: min_graph_node_version,
},
),
indexer_poi_filer: PoiFilter::new(http.clone(), poi_blocklist),
indexer_poi_filer: PoiFilter::new(http.clone(), indexer_blocklist.poi),
indexing_progress_resolver: IndexingProgressResolver::new(http.clone()),
cost_model_resolver: CostModelResolver::new(http.clone()),
};
Expand Down

0 comments on commit 7543ecf

Please sign in to comment.