Skip to content

Commit

Permalink
feat: unify blocklist
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 9, 2024
1 parent a977e66 commit 3b9d5a2
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 70 deletions.
47 changes: 24 additions & 23 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//! The Graph Gateway configuration.
use std::{
collections::{BTreeMap, HashSet},
path::{Path, PathBuf},
Expand All @@ -26,9 +24,9 @@ pub struct Config {
#[serde(default)]
pub api_keys: Option<ApiKeys>,
pub attestations: AttestationConfig,
/// List of indexer addresses to block. This should only be used temprorarily.
/// Blocklist applying to indexers.
#[serde(default)]
pub blocked_indexers: BTreeMap<Address, BlockedIndexer>,
pub blocklist: Vec<BlocklistEntry>,
/// Chain aliases
#[serde(default)]
pub chain_aliases: BTreeMap<String, String>,
Expand All @@ -53,9 +51,6 @@ pub struct Config {
pub trusted_indexers: Vec<TrustedIndexer>,
/// Check payment state of client (disable for testnets)
pub payment_required: bool,
/// POI blocklist
#[serde(default)]
pub poi_blocklist: Vec<BlockedPoi>,
/// public API port
pub port_api: u16,
/// private metrics port
Expand Down Expand Up @@ -96,11 +91,28 @@ pub enum ApiKeys {
Fixed(Vec<ApiKey>),
}

#[derive(Deserialize)]
pub struct BlockedIndexer {
/// empty array blocks on all deployments
pub deployments: Vec<DeploymentId>,
pub reason: String,
#[derive(Clone, Deserialize, Serialize)]
#[serde(untagged)]
pub enum BlocklistEntry {
Poi {
deployment: DeploymentId,
info: BlocklistInfo,
public_poi: B256,
block: BlockNumber,
},
Other {
deployment: DeploymentId,
info: BlocklistInfo,
indexer: Address,
},
}

#[derive(Clone, Deserialize, Serialize)]
pub struct BlocklistInfo {
/// Example query (should be minimal to reproduce bad response)
query: Option<String>,
/// Bad query response, from the above query executed on indexers with this blocked PoI
bad_query_response: Option<String>,
}

/// Attestation configuration.
Expand Down Expand Up @@ -171,17 +183,6 @@ pub struct Receipts {
pub verifier: Address,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct BlockedPoi {
pub public_poi: B256,
pub deployment: DeploymentId,
pub block_number: BlockNumber,
/// Example query (should be minimal to reproduce bad response)
pub query: Option<String>,
/// Bad query response, from the above query executed on indexers with this blocked PoI
pub bad_query_response: Option<String>,
}

/// Load the configuration from a JSON file.
pub fn load_from_file(path: &Path) -> anyhow::Result<Config> {
let config_content = std::fs::read_to_string(path)?;
Expand Down
14 changes: 9 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use budgets::{Budgeter, USD};
use chains::Chains;
use client_query::context::Context;
use config::{ApiKeys, ExchangeRateProvider};
use headers::ContentType;
use indexer_client::IndexerClient;
use indexing_performance::IndexingPerformance;
use middleware::{
Expand All @@ -52,6 +53,7 @@ use thegraph_core::{
alloy::{dyn_abi::Eip712Domain, primitives::ChainId, signers::local::PrivateKeySigner},
attestation,
};
use thegraph_headers::HttpBuilderExt;
use tokio::{net::TcpListener, signal::unix::SignalKind, sync::watch};
use tower_http::cors::{self, CorsLayer};
use tracing_subscriber::{prelude::*, EnvFilter};
Expand Down Expand Up @@ -112,11 +114,10 @@ async fn main() {
let mut network = network::service::spawn(
http_client.clone(),
network_subgraph_client,
conf.blocklist.clone(),
conf.min_indexer_version,
conf.min_graph_node_version,
conf.blocked_indexers,
indexer_host_blocklist,
conf.poi_blocklist.clone(),
);
let indexing_perf = IndexingPerformance::new(network.clone());
network.wait_until_ready().await;
Expand Down Expand Up @@ -157,7 +158,7 @@ async fn main() {
reporter,
};

let poi_blocklist: &'static str = serde_json::to_string(&conf.poi_blocklist).unwrap().leak();
let blocklist: &'static str = serde_json::to_string(&conf.blocklist).unwrap().leak();

// Host metrics on a separate server with a port that isn't open to public requests.
tokio::spawn(async move {
Expand Down Expand Up @@ -226,8 +227,11 @@ async fn main() {
.route(
"/blocklist",
routing::get(move || async move {
let headers = [(reqwest::header::CONTENT_TYPE, "application/json")];
(headers, poi_blocklist)
axum::http::Response::builder()
.status(StatusCode::OK)
.header_typed(ContentType::json())
.body(blocklist.to_string())
.unwrap()
}),
)
.nest("/api", api);
Expand Down
27 changes: 9 additions & 18 deletions src/network/indexer_processing.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use custom_debug::CustomDebug;
use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId};
use tracing::Instrument;
use url::Url;

use crate::{
config::BlockedIndexer,
errors::UnavailableReason,
network::{indexing_progress::IndexingProgressResolver, service::InternalState},
};
Expand Down Expand Up @@ -191,29 +190,21 @@ async fn process_indexer_indexings(
state: &InternalState,
url: &Url,
indexings: HashMap<DeploymentId, IndexingRawInfo>,
blocklist: Option<&BlockedIndexer>,
blocklist: Option<&HashSet<DeploymentId>>,
) -> HashMap<DeploymentId, Result<ResolvedIndexingInfo, UnavailableReason>> {
let mut indexer_indexings: HashMap<DeploymentId, Result<IndexingInfo<(), ()>, _>> = indexings
.into_iter()
.map(|(id, info)| (id, Ok(info.into())))
.collect();

match blocklist {
None => (),
Some(blocklist) if blocklist.deployments.is_empty() => {
for entry in indexer_indexings.values_mut() {
*entry = Err(UnavailableReason::Blocked(blocklist.reason.clone()));
}
}
Some(blocklist) => {
for deployment in &blocklist.deployments {
indexer_indexings.insert(
*deployment,
Err(UnavailableReason::Blocked(blocklist.reason.clone())),
);
}
if let Some(blocklist) = blocklist {
for deployment in blocklist {
indexer_indexings.insert(
*deployment,
Err(UnavailableReason::Blocked("missing data".to_string())),
);
}
};
}

// ref: df8e647b-1e6e-422a-8846-dc9ee7e0dcc2
let status_url = url.join("status").unwrap();
Expand Down
57 changes: 33 additions & 24 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
//! query processing pipeline
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{HashMap, HashSet},
time::Duration,
};

use ipnetwork::IpNetwork;
use semver::Version;
use thegraph_core::{
alloy::primitives::{Address, BlockNumber},
DeploymentId, IndexerId, SubgraphId,
DeploymentId, IndexerId, ProofOfIndexing, SubgraphId,
};
use tokio::{sync::watch, time::MissedTickBehavior};

Expand All @@ -28,10 +28,7 @@ use super::{
version_filter::{MinimumVersionRequirements, VersionFilter},
DeploymentError, SubgraphError,
};
use crate::{
config::{BlockedIndexer, BlockedPoi},
errors::UnavailableReason,
};
use crate::{config::BlocklistEntry, errors::UnavailableReason};

/// Subgraph resolution information returned by the [`NetworkService`].
pub struct ResolvedSubgraphInfo {
Expand Down Expand Up @@ -164,28 +161,40 @@ impl NetworkService {
pub fn spawn(
http: reqwest::Client,
subgraph_client: SubgraphClient,
blocklist: Vec<BlocklistEntry>,
min_indexer_service_version: Version,
min_graph_node_version: Version,
indexer_blocklist: BTreeMap<Address, BlockedIndexer>,
indexer_host_blocklist: HashSet<IpNetwork>,
poi_blocklist: Vec<BlockedPoi>,
) -> NetworkService {
let poi_blocklist = poi_blocklist
.iter()
.map(|entry| &entry.deployment)
.collect::<HashSet<_>>()
.into_iter()
.map(|deployment| {
(
*deployment,
let mut poi_blocklist: HashMap<DeploymentId, Vec<(BlockNumber, ProofOfIndexing)>> =
Default::default();
let mut indexer_blocklist: HashMap<Address, HashSet<DeploymentId>> = Default::default();
for entry in blocklist {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
poi_blocklist
.iter()
.filter(|entry| &entry.deployment == deployment)
.map(|entry| (entry.block_number, entry.public_poi.into()))
.collect::<Vec<_>>(),
)
})
.collect();
.entry(deployment)
.or_default()
.push((block, public_poi.into()));
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
indexer_blocklist
.entry(indexer)
.or_default()
.insert(deployment);
}
};
}

let internal_state = InternalState {
indexer_blocklist,
indexer_host_filter: HostFilter::new(indexer_host_blocklist)
Expand All @@ -207,7 +216,7 @@ pub fn spawn(
}

pub struct InternalState {
pub indexer_blocklist: BTreeMap<Address, BlockedIndexer>,
pub indexer_blocklist: HashMap<Address, HashSet<DeploymentId>>,
pub indexer_host_filter: HostFilter,
pub indexer_version_filter: VersionFilter,
pub indexer_poi_filer: PoiFilter,
Expand Down

0 comments on commit 3b9d5a2

Please sign in to comment.