From 619ab73d80d851b316e8d76db51d6fe7e098c48b Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 5 Feb 2024 13:58:35 -0500 Subject: [PATCH] refactor: improve topology access to indexing --- graph-gateway/src/client_query.rs | 4 ++-- graph-gateway/src/indexers/indexing.rs | 2 +- graph-gateway/src/indexings_blocklist.rs | 8 +------- graph-gateway/src/main.rs | 4 ++-- graph-gateway/src/topology.rs | 18 ++++++++++++++---- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 56b9712e..43973945 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -232,8 +232,8 @@ async fn handle_client_query_inner( .iter() .flat_map(move |deployment| { let id = deployment.id; - deployment.indexers.iter().map(move |indexer| Indexing { - indexer: indexer.id, + deployment.indexers.keys().map(move |indexer| Indexing { + indexer: *indexer, deployment: id, }) }) diff --git a/graph-gateway/src/indexers/indexing.rs b/graph-gateway/src/indexers/indexing.rs index a58a9063..1528eecd 100644 --- a/graph-gateway/src/indexers/indexing.rs +++ b/graph-gateway/src/indexers/indexing.rs @@ -85,7 +85,7 @@ async fn update_statuses( // There can only be one URL per indexer entity in the network subgraph let mut indexers: HashMap)> = Default::default(); for deployment in deployments.values() { - for indexer in &deployment.indexers { + for indexer in deployment.indexers.values() { let (_, deployments) = indexers .entry(indexer.id) .or_insert_with(|| (indexer.url.clone(), vec![])); diff --git a/graph-gateway/src/indexings_blocklist.rs b/graph-gateway/src/indexings_blocklist.rs index 08ffe257..8a78361e 100644 --- a/graph-gateway/src/indexings_blocklist.rs +++ b/graph-gateway/src/indexings_blocklist.rs @@ -120,13 +120,7 @@ pub fn deployment_indexer_addresses( ) -> Vec
{ deployments .get(deployment_id) - .map(|deployment| { - deployment - .indexers - .iter() - .map(|indexer| indexer.id) - .collect::>() - }) + .map(|deployment| deployment.indexers.keys().cloned().collect()) .unwrap_or_default() } diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 207da3bf..b1511cd0 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -485,8 +485,8 @@ async fn write_indexer_inputs( for (deployment, indexer) in deployments.values().flat_map(|deployment| { deployment .indexers - .iter() - .map(move |indexer| (deployment.as_ref(), indexer.as_ref())) + .values() + .map(|indexer| (deployment.as_ref(), indexer.as_ref())) }) { let indexing = Indexing { indexer: indexer.id, diff --git a/graph-gateway/src/topology.rs b/graph-gateway/src/topology.rs index 33cc85d4..bc18cce8 100644 --- a/graph-gateway/src/topology.rs +++ b/graph-gateway/src/topology.rs @@ -11,7 +11,7 @@ use thegraph::types::{DeploymentId, SubgraphId}; use tokio::sync::RwLock; use toolshed::url::Url; -use gateway_common::types::GRT; +use gateway_common::types::{Indexing, GRT}; use gateway_framework::{ipfs, network::network_subgraph}; /// Representation of the graph network being used to serve queries @@ -40,7 +40,7 @@ pub struct Deployment { pub manifest: Arc, /// An indexer may have multiple active allocations on a deployment. We collapse them into a /// single logical allocation using the largest allocation ID and sum of the allocated tokens. - pub indexers: Vec>, + pub indexers: HashMap>, /// A deployment may be associated with multiple subgraphs. pub subgraphs: BTreeSet, /// Indicates that the deployment should not be served directly by this gateway. This will @@ -113,7 +113,7 @@ impl GraphNetwork { .values() .flat_map(|subgraph| &subgraph.deployments) .flat_map(|deployment| &deployment.indexers) - .map(|indexer| (indexer.id, indexer.clone())) + .map(|(id, indexer)| (*id, indexer.clone())) .collect::>>() .into() }); @@ -207,7 +207,7 @@ impl GraphNetwork { indexer.allocated_tokens = total_allocation; Some(indexer) }) - .map(Arc::new) + .map(|indexer| (indexer.id, indexer.into())) .collect(); // abf62a6d-c071-4507-b528-ddc8e250127a @@ -232,6 +232,16 @@ impl GraphNetwork { pub fn deployment_by_id(&self, id: &DeploymentId) -> Option> { self.deployments.value_immediate()?.get(id).cloned() } + + // Get then indexer data for some deployment. + pub fn indexing(&self, indexing: &Indexing) -> Option> { + self.deployments + .value_immediate()? + .get(&indexing.deployment)? + .indexers + .get(&indexing.indexer) + .cloned() + } } struct IpfsCache {