From 98ff5ebada09f7b539f3889e11ea30b1512ddd97 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 6 Dec 2024 11:34:26 -0500 Subject: [PATCH] refactor: flatten network::internal module --- src/network.rs | 9 ++- .../{internal => }/indexer_processing.rs | 21 +++--- ...gress_resolver.rs => indexing_progress.rs} | 0 src/network/internal.rs | 69 ----------------- src/network/internal/state.rs | 23 ------ src/network/{internal => }/pre_processing.rs | 9 +-- src/network/service.rs | 74 +++++++++++++++++-- src/network/{internal => }/snapshot.rs | 11 +-- .../{internal => }/subgraph_processing.rs | 10 +-- 9 files changed, 100 insertions(+), 126 deletions(-) rename src/network/{internal => }/indexer_processing.rs (96%) rename src/network/{indexer_indexing_progress_resolver.rs => indexing_progress.rs} (100%) delete mode 100644 src/network/internal.rs delete mode 100644 src/network/internal/state.rs rename src/network/{internal => }/pre_processing.rs (97%) rename src/network/{internal => }/snapshot.rs (98%) rename src/network/{internal => }/subgraph_processing.rs (96%) diff --git a/src/network.rs b/src/network.rs index 7f26425a..f4d54b71 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,6 +1,6 @@ pub use errors::{DeploymentError, SubgraphError}; -pub use internal::{Indexing, IndexingId}; pub use service::{NetworkService, ResolvedSubgraphInfo}; +pub use snapshot::{Indexing, IndexingId}; use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariables}; pub mod cost_model; @@ -8,10 +8,13 @@ mod errors; pub mod host_filter; pub mod indexer_indexing_poi_blocklist; pub mod indexer_indexing_poi_resolver; -pub mod indexer_indexing_progress_resolver; -pub mod internal; +mod indexer_processing; +pub mod indexing_progress; +mod pre_processing; pub mod service; +mod snapshot; pub mod subgraph_client; +mod subgraph_processing; pub mod version_filter; pub struct GraphQlRequest { diff --git a/src/network/internal/indexer_processing.rs b/src/network/indexer_processing.rs similarity index 96% rename from src/network/internal/indexer_processing.rs rename to src/network/indexer_processing.rs index 9a2038e6..b8a13ad8 100644 --- a/src/network/internal/indexer_processing.rs +++ b/src/network/indexer_processing.rs @@ -5,21 +5,22 @@ use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, use tracing::Instrument; use url::Url; -use super::InternalState; use crate::{ config::BlockedIndexer, errors::UnavailableReason, network::{ indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, - indexer_indexing_progress_resolver::IndexingProgressResolver, + indexing_progress::IndexingProgressResolver, }, }; +use super::service::InternalState; + /// Internal representation of the indexer pre-processed information. /// /// This is not the final representation of the indexer. #[derive(CustomDebug)] -pub(super) struct IndexerRawInfo { +pub struct IndexerRawInfo { /// The indexer's ID. pub id: IndexerId, /// The indexer's URL. @@ -37,7 +38,7 @@ pub(super) struct IndexerRawInfo { /// /// This is not the final representation of the indexer. #[derive(CustomDebug)] -pub(super) struct IndexerInfo { +pub struct IndexerInfo { /// The indexer's ID. pub id: IndexerId, /// The indexer's URL. @@ -56,7 +57,7 @@ pub(super) struct IndexerInfo { /// /// This is not the final representation of the indexer's indexing information. #[derive(Clone, Debug, Eq, PartialEq)] -pub(super) struct IndexingRawInfo { +pub struct IndexingRawInfo { /// The largest allocation. pub largest_allocation: AllocationId, /// The total amount of tokens allocated. @@ -68,7 +69,7 @@ pub(super) struct IndexingRawInfo { /// This type uses the "type-state" pattern to represent the different states of the indexing /// information: unresolved, partially resolved (with indexing progress) and completely resolved. #[derive(Debug)] -pub(super) struct IndexingInfo { +pub struct IndexingInfo { /// The largest allocation. pub largest_allocation: AllocationId, @@ -124,13 +125,13 @@ impl IndexingInfo { } } -pub(super) type ResolvedIndexingInfo = IndexingInfo; +pub type ResolvedIndexingInfo = IndexingInfo; -pub(super) type ResolvedIndexerInfo = IndexerInfo; +pub type ResolvedIndexerInfo = IndexerInfo; /// Internal representation of the indexing's progress information. #[derive(Clone, Debug)] -pub(super) struct IndexingProgress { +pub struct IndexingProgress { /// The latest indexed block. pub latest_block: BlockNumber, /// The minimum indexed block. @@ -138,7 +139,7 @@ pub(super) struct IndexingProgress { } /// Process the fetched network topology information. -pub(super) async fn process_info( +pub async fn process_info( state: &InternalState, indexers: &HashMap, ) -> HashMap> { diff --git a/src/network/indexer_indexing_progress_resolver.rs b/src/network/indexing_progress.rs similarity index 100% rename from src/network/indexer_indexing_progress_resolver.rs rename to src/network/indexing_progress.rs diff --git a/src/network/internal.rs b/src/network/internal.rs deleted file mode 100644 index 4f6e9905..00000000 --- a/src/network/internal.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::{collections::HashMap, time::Duration}; - -use thegraph_core::{DeploymentId, IndexerId, SubgraphId}; - -use self::indexer_processing::IndexerRawInfo; -pub use self::{ - snapshot::{Indexing, IndexingId, NetworkTopologySnapshot}, - state::InternalState, - subgraph_processing::{AllocationInfo, DeploymentInfo, SubgraphInfo}, -}; -use super::{subgraph_client::Client as SubgraphClient, DeploymentError, SubgraphError}; - -mod indexer_processing; -mod pre_processing; -mod snapshot; -mod state; -mod subgraph_processing; - -/// Fetch the network topology information from the graph network subgraph. -pub async fn fetch_update( - network: &PreprocessedNetworkInfo, - state: &InternalState, -) -> NetworkTopologySnapshot { - // Process network topology information - let indexers_info = indexer_processing::process_info(state, &network.indexers).await; - snapshot::new_from( - indexers_info, - network.subgraphs.clone(), - network.deployments.clone(), - ) -} - -pub struct PreprocessedNetworkInfo { - subgraphs: HashMap>, - deployments: HashMap>, - indexers: HashMap, -} - -/// Fetch the subgraphs information from the graph network subgraph and performs pre-processing -/// steps, i.e., validation and conversion into the internal representation. -/// -/// 1. Fetch the subgraphs information from the graph network subgraph. -/// 2. Validate and convert the subgraphs fetched info into the internal representation. -/// -/// If the fetch fails or the response is empty, an error is returned. -/// -/// Invalid info is filtered out before converting into the internal representation. -pub async fn fetch_and_preprocess_subgraph_info( - client: &mut SubgraphClient, - timeout: Duration, -) -> anyhow::Result { - // Fetch the subgraphs information from the graph network subgraph - let data = tokio::time::timeout(timeout, client.fetch()).await??; - anyhow::ensure!(!data.is_empty(), "empty subgraph response"); - - // Pre-process (validate and convert) the fetched subgraphs information - let indexers = pre_processing::into_internal_indexers_raw_info(data.iter()); - let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter()); - let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values()); - - let subgraphs = subgraph_processing::process_subgraph_info(subgraphs); - let deployments = subgraph_processing::process_deployments_info(deployments); - - Ok(PreprocessedNetworkInfo { - subgraphs, - deployments, - indexers, - }) -} diff --git a/src/network/internal/state.rs b/src/network/internal/state.rs deleted file mode 100644 index 9867f0c9..00000000 --- a/src/network/internal/state.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::collections::BTreeMap; - -use thegraph_core::alloy::primitives::Address; - -use crate::{ - config::BlockedIndexer, - network::{ - cost_model::CostModelResolver, host_filter::HostFilter, - indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, - indexer_indexing_progress_resolver::IndexingProgressResolver, - version_filter::VersionFilter, - }, -}; - -pub struct InternalState { - pub indexer_blocklist: BTreeMap, - pub indexer_host_filter: HostFilter, - pub indexer_version_filter: VersionFilter, - pub poi_blocklist: PoiBlocklist, - pub poi_resolver: PoiResolver, - pub indexing_progress_resolver: IndexingProgressResolver, - pub cost_model_resolver: CostModelResolver, -} diff --git a/src/network/internal/pre_processing.rs b/src/network/pre_processing.rs similarity index 97% rename from src/network/internal/pre_processing.rs rename to src/network/pre_processing.rs index f13e4435..9bbb2545 100644 --- a/src/network/internal/pre_processing.rs +++ b/src/network/pre_processing.rs @@ -5,13 +5,12 @@ use thegraph_core::{AllocationId, DeploymentId, IndexerId, SubgraphId}; use url::Url; use crate::network::{ - internal::{ - indexer_processing::{IndexerRawInfo, IndexingRawInfo}, - subgraph_processing::{DeploymentRawInfo, SubgraphRawInfo, SubgraphVersionRawInfo}, - AllocationInfo, - }, + indexer_processing::{IndexerRawInfo, IndexingRawInfo}, subgraph_client, subgraph_client::types::SubgraphVersion, + subgraph_processing::{ + AllocationInfo, DeploymentRawInfo, SubgraphRawInfo, SubgraphVersionRawInfo, + }, }; pub fn into_internal_indexers_raw_info<'a>( diff --git a/src/network/service.rs b/src/network/service.rs index 9911226e..7c72236a 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -11,7 +11,7 @@ use ipnetwork::IpNetwork; use semver::Version; use thegraph_core::{ alloy::primitives::{Address, BlockNumber}, - DeploymentId, SubgraphId, + DeploymentId, IndexerId, SubgraphId, }; use tokio::{sync::watch, time::MissedTickBehavior}; @@ -21,17 +21,17 @@ use super::{ host_filter::HostFilter, indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, - indexer_indexing_progress_resolver::IndexingProgressResolver, - internal::{ - fetch_and_preprocess_subgraph_info, fetch_update, Indexing, IndexingId, InternalState, - NetworkTopologySnapshot, PreprocessedNetworkInfo, - }, + indexer_processing::{self, IndexerRawInfo}, + indexing_progress::IndexingProgressResolver, + snapshot::{self, Indexing, IndexingId, NetworkTopologySnapshot}, subgraph_client::Client as SubgraphClient, + subgraph_processing::{DeploymentInfo, SubgraphInfo}, version_filter::{MinimumVersionRequirements, VersionFilter}, }; use crate::{ config::{BlockedIndexer, BlockedPoi}, errors::UnavailableReason, + network::{pre_processing, subgraph_processing}, }; /// Subgraph resolution information returned by the [`NetworkService`]. @@ -197,6 +197,16 @@ pub fn spawn( NetworkService { network } } +pub struct InternalState { + pub indexer_blocklist: BTreeMap, + pub indexer_host_filter: HostFilter, + pub indexer_version_filter: VersionFilter, + pub poi_blocklist: PoiBlocklist, + pub poi_resolver: PoiResolver, + pub indexing_progress_resolver: IndexingProgressResolver, + pub cost_model_resolver: CostModelResolver, +} + /// Spawn a background task to fetch the network topology information from the graph network /// subgraph at regular intervals fn spawn_updater_task( @@ -240,3 +250,55 @@ fn spawn_updater_task( rx } + +/// Fetch the subgraphs information from the graph network subgraph and performs pre-processing +/// steps, i.e., validation and conversion into the internal representation. +/// +/// 1. Fetch the subgraphs information from the graph network subgraph. +/// 2. Validate and convert the subgraphs fetched info into the internal representation. +/// +/// If the fetch fails or the response is empty, an error is returned. +/// +/// Invalid info is filtered out before converting into the internal representation. +pub async fn fetch_and_preprocess_subgraph_info( + client: &mut SubgraphClient, + timeout: Duration, +) -> anyhow::Result { + // Fetch the subgraphs information from the graph network subgraph + let data = tokio::time::timeout(timeout, client.fetch()).await??; + anyhow::ensure!(!data.is_empty(), "empty subgraph response"); + + // Pre-process (validate and convert) the fetched subgraphs information + let indexers = pre_processing::into_internal_indexers_raw_info(data.iter()); + let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter()); + let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values()); + + let subgraphs = subgraph_processing::process_subgraph_info(subgraphs); + let deployments = subgraph_processing::process_deployments_info(deployments); + + Ok(PreprocessedNetworkInfo { + subgraphs, + deployments, + indexers, + }) +} + +/// Fetch the network topology information from the graph network subgraph. +pub async fn fetch_update( + network: &PreprocessedNetworkInfo, + state: &InternalState, +) -> NetworkTopologySnapshot { + // Process network topology information + let indexers_info = indexer_processing::process_info(state, &network.indexers).await; + snapshot::new_from( + indexers_info, + network.subgraphs.clone(), + network.deployments.clone(), + ) +} + +pub struct PreprocessedNetworkInfo { + subgraphs: HashMap>, + deployments: HashMap>, + indexers: HashMap, +} diff --git a/src/network/internal/snapshot.rs b/src/network/snapshot.rs similarity index 98% rename from src/network/internal/snapshot.rs rename to src/network/snapshot.rs index 36e217f3..91e32dc1 100644 --- a/src/network/internal/snapshot.rs +++ b/src/network/snapshot.rs @@ -12,13 +12,14 @@ use thegraph_core::{ }; use url::Url; -use super::{DeploymentInfo, SubgraphInfo}; use crate::{ errors::UnavailableReason, - network::{ - errors::{DeploymentError, SubgraphError}, - internal::indexer_processing::ResolvedIndexerInfo, - }, + network::errors::{DeploymentError, SubgraphError}, +}; + +use super::{ + indexer_processing::ResolvedIndexerInfo, + subgraph_processing::{DeploymentInfo, SubgraphInfo}, }; /// The [`IndexingId`] struct represents the unique identifier of an indexing. diff --git a/src/network/internal/subgraph_processing.rs b/src/network/subgraph_processing.rs similarity index 96% rename from src/network/internal/subgraph_processing.rs rename to src/network/subgraph_processing.rs index 456cd537..16a821b0 100644 --- a/src/network/internal/subgraph_processing.rs +++ b/src/network/subgraph_processing.rs @@ -8,7 +8,7 @@ use crate::network::errors::{DeploymentError, SubgraphError}; /// /// This is not the final representation of the subgraph. #[derive(Debug, Clone)] -pub(super) struct SubgraphRawInfo { +pub struct SubgraphRawInfo { pub id: SubgraphId, pub versions: Vec, } @@ -17,7 +17,7 @@ pub(super) struct SubgraphRawInfo { /// /// This is not the final representation of the subgraph version. #[derive(Debug, Clone)] -pub(super) struct SubgraphVersionRawInfo { +pub struct SubgraphVersionRawInfo { pub deployment: DeploymentRawInfo, } @@ -25,7 +25,7 @@ pub(super) struct SubgraphVersionRawInfo { /// /// This is not the final representation of the deployment. #[derive(Debug, Clone)] -pub(super) struct DeploymentRawInfo { +pub struct DeploymentRawInfo { pub id: DeploymentId, pub manifest_network: String, pub manifest_start_block: BlockNumber, @@ -75,7 +75,7 @@ pub struct AllocationInfo { /// Process the fetched subgraphs' information. /// /// - If the subgraph has no allocations, [`SubgraphError::NoAllocations`] is returned. -pub(super) fn process_subgraph_info( +pub fn process_subgraph_info( subgraphs: HashMap, ) -> HashMap> { subgraphs @@ -144,7 +144,7 @@ fn check_subgraph_has_allocations(subgraph: &SubgraphRawInfo) -> Result<(), Subg /// Process the fetched deployments' information. /// /// - If the deployment has no allocations, [`DeploymentError::NoAllocations`] is returned. -pub(super) fn process_deployments_info( +pub fn process_deployments_info( deployments: HashMap, ) -> HashMap> { deployments