Skip to content

Commit

Permalink
refactor: flatten network::internal module
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 6, 2024
1 parent 50c0d2d commit 98ff5eb
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 126 deletions.
9 changes: 6 additions & 3 deletions src/network.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
pub use errors::{DeploymentError, SubgraphError};
pub use internal::{Indexing, IndexingId};
pub use service::{NetworkService, ResolvedSubgraphInfo};
pub use snapshot::{Indexing, IndexingId};
use thegraph_graphql_http::graphql::{IntoDocument as _, IntoDocumentWithVariables};

pub mod cost_model;
mod errors;
pub mod host_filter;
pub mod indexer_indexing_poi_blocklist;
pub mod indexer_indexing_poi_resolver;
pub mod indexer_indexing_progress_resolver;
pub mod internal;
mod indexer_processing;
pub mod indexing_progress;
mod pre_processing;
pub mod service;
mod snapshot;
pub mod subgraph_client;
mod subgraph_processing;
pub mod version_filter;

pub struct GraphQlRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId,
use tracing::Instrument;
use url::Url;

use super::InternalState;
use crate::{
config::BlockedIndexer,
errors::UnavailableReason,
network::{
indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver,
indexer_indexing_progress_resolver::IndexingProgressResolver,
indexing_progress::IndexingProgressResolver,
},
};

use super::service::InternalState;

/// Internal representation of the indexer pre-processed information.
///
/// This is not the final representation of the indexer.
#[derive(CustomDebug)]
pub(super) struct IndexerRawInfo {
pub struct IndexerRawInfo {
/// The indexer's ID.
pub id: IndexerId,
/// The indexer's URL.
Expand All @@ -37,7 +38,7 @@ pub(super) struct IndexerRawInfo {
///
/// This is not the final representation of the indexer.
#[derive(CustomDebug)]
pub(super) struct IndexerInfo<I> {
pub struct IndexerInfo<I> {
/// The indexer's ID.
pub id: IndexerId,
/// The indexer's URL.
Expand All @@ -56,7 +57,7 @@ pub(super) struct IndexerInfo<I> {
///
/// This is not the final representation of the indexer's indexing information.
#[derive(Clone, Debug, Eq, PartialEq)]
pub(super) struct IndexingRawInfo {
pub struct IndexingRawInfo {
/// The largest allocation.
pub largest_allocation: AllocationId,
/// The total amount of tokens allocated.
Expand All @@ -68,7 +69,7 @@ pub(super) struct IndexingRawInfo {
/// This type uses the "type-state" pattern to represent the different states of the indexing
/// information: unresolved, partially resolved (with indexing progress) and completely resolved.
#[derive(Debug)]
pub(super) struct IndexingInfo<P, C> {
pub struct IndexingInfo<P, C> {
/// The largest allocation.
pub largest_allocation: AllocationId,

Expand Down Expand Up @@ -124,21 +125,21 @@ impl IndexingInfo<IndexingProgress, ()> {
}
}

pub(super) type ResolvedIndexingInfo = IndexingInfo<IndexingProgress, u128>;
pub type ResolvedIndexingInfo = IndexingInfo<IndexingProgress, u128>;

pub(super) type ResolvedIndexerInfo = IndexerInfo<ResolvedIndexingInfo>;
pub type ResolvedIndexerInfo = IndexerInfo<ResolvedIndexingInfo>;

/// Internal representation of the indexing's progress information.
#[derive(Clone, Debug)]
pub(super) struct IndexingProgress {
pub struct IndexingProgress {
/// The latest indexed block.
pub latest_block: BlockNumber,
/// The minimum indexed block.
pub min_block: Option<BlockNumber>,
}

/// Process the fetched network topology information.
pub(super) async fn process_info(
pub async fn process_info(
state: &InternalState,
indexers: &HashMap<IndexerId, IndexerRawInfo>,
) -> HashMap<IndexerId, Result<ResolvedIndexerInfo, UnavailableReason>> {
Expand Down
File renamed without changes.
69 changes: 0 additions & 69 deletions src/network/internal.rs

This file was deleted.

23 changes: 0 additions & 23 deletions src/network/internal/state.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ use thegraph_core::{AllocationId, DeploymentId, IndexerId, SubgraphId};
use url::Url;

use crate::network::{
internal::{
indexer_processing::{IndexerRawInfo, IndexingRawInfo},
subgraph_processing::{DeploymentRawInfo, SubgraphRawInfo, SubgraphVersionRawInfo},
AllocationInfo,
},
indexer_processing::{IndexerRawInfo, IndexingRawInfo},
subgraph_client,
subgraph_client::types::SubgraphVersion,
subgraph_processing::{
AllocationInfo, DeploymentRawInfo, SubgraphRawInfo, SubgraphVersionRawInfo,
},
};

pub fn into_internal_indexers_raw_info<'a>(
Expand Down
74 changes: 68 additions & 6 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ipnetwork::IpNetwork;
use semver::Version;
use thegraph_core::{
alloy::primitives::{Address, BlockNumber},
DeploymentId, SubgraphId,
DeploymentId, IndexerId, SubgraphId,
};
use tokio::{sync::watch, time::MissedTickBehavior};

Expand All @@ -21,17 +21,17 @@ use super::{
host_filter::HostFilter,
indexer_indexing_poi_blocklist::PoiBlocklist,
indexer_indexing_poi_resolver::PoiResolver,
indexer_indexing_progress_resolver::IndexingProgressResolver,
internal::{
fetch_and_preprocess_subgraph_info, fetch_update, Indexing, IndexingId, InternalState,
NetworkTopologySnapshot, PreprocessedNetworkInfo,
},
indexer_processing::{self, IndexerRawInfo},
indexing_progress::IndexingProgressResolver,
snapshot::{self, Indexing, IndexingId, NetworkTopologySnapshot},
subgraph_client::Client as SubgraphClient,
subgraph_processing::{DeploymentInfo, SubgraphInfo},
version_filter::{MinimumVersionRequirements, VersionFilter},
};
use crate::{
config::{BlockedIndexer, BlockedPoi},
errors::UnavailableReason,
network::{pre_processing, subgraph_processing},
};

/// Subgraph resolution information returned by the [`NetworkService`].
Expand Down Expand Up @@ -197,6 +197,16 @@ pub fn spawn(
NetworkService { network }
}

pub struct InternalState {
pub indexer_blocklist: BTreeMap<Address, BlockedIndexer>,
pub indexer_host_filter: HostFilter,
pub indexer_version_filter: VersionFilter,
pub poi_blocklist: PoiBlocklist,
pub poi_resolver: PoiResolver,
pub indexing_progress_resolver: IndexingProgressResolver,
pub cost_model_resolver: CostModelResolver,
}

/// Spawn a background task to fetch the network topology information from the graph network
/// subgraph at regular intervals
fn spawn_updater_task(
Expand Down Expand Up @@ -240,3 +250,55 @@ fn spawn_updater_task(

rx
}

/// Fetch the subgraphs information from the graph network subgraph and performs pre-processing
/// steps, i.e., validation and conversion into the internal representation.
///
/// 1. Fetch the subgraphs information from the graph network subgraph.
/// 2. Validate and convert the subgraphs fetched info into the internal representation.
///
/// If the fetch fails or the response is empty, an error is returned.
///
/// Invalid info is filtered out before converting into the internal representation.
pub async fn fetch_and_preprocess_subgraph_info(
client: &mut SubgraphClient,
timeout: Duration,
) -> anyhow::Result<PreprocessedNetworkInfo> {
// Fetch the subgraphs information from the graph network subgraph
let data = tokio::time::timeout(timeout, client.fetch()).await??;
anyhow::ensure!(!data.is_empty(), "empty subgraph response");

// Pre-process (validate and convert) the fetched subgraphs information
let indexers = pre_processing::into_internal_indexers_raw_info(data.iter());
let subgraphs = pre_processing::into_internal_subgraphs_raw_info(data.into_iter());
let deployments = pre_processing::into_internal_deployments_raw_info(subgraphs.values());

let subgraphs = subgraph_processing::process_subgraph_info(subgraphs);
let deployments = subgraph_processing::process_deployments_info(deployments);

Ok(PreprocessedNetworkInfo {
subgraphs,
deployments,
indexers,
})
}

/// Fetch the network topology information from the graph network subgraph.
pub async fn fetch_update(
network: &PreprocessedNetworkInfo,
state: &InternalState,
) -> NetworkTopologySnapshot {
// Process network topology information
let indexers_info = indexer_processing::process_info(state, &network.indexers).await;
snapshot::new_from(
indexers_info,
network.subgraphs.clone(),
network.deployments.clone(),
)
}

pub struct PreprocessedNetworkInfo {
subgraphs: HashMap<SubgraphId, Result<SubgraphInfo, SubgraphError>>,
deployments: HashMap<DeploymentId, Result<DeploymentInfo, DeploymentError>>,
indexers: HashMap<IndexerId, IndexerRawInfo>,
}
11 changes: 6 additions & 5 deletions src/network/internal/snapshot.rs → src/network/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ use thegraph_core::{
};
use url::Url;

use super::{DeploymentInfo, SubgraphInfo};
use crate::{
errors::UnavailableReason,
network::{
errors::{DeploymentError, SubgraphError},
internal::indexer_processing::ResolvedIndexerInfo,
},
network::errors::{DeploymentError, SubgraphError},
};

use super::{
indexer_processing::ResolvedIndexerInfo,
subgraph_processing::{DeploymentInfo, SubgraphInfo},
};

/// The [`IndexingId`] struct represents the unique identifier of an indexing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::network::errors::{DeploymentError, SubgraphError};
///
/// This is not the final representation of the subgraph.
#[derive(Debug, Clone)]
pub(super) struct SubgraphRawInfo {
pub struct SubgraphRawInfo {
pub id: SubgraphId,
pub versions: Vec<SubgraphVersionRawInfo>,
}
Expand All @@ -17,15 +17,15 @@ pub(super) struct SubgraphRawInfo {
///
/// This is not the final representation of the subgraph version.
#[derive(Debug, Clone)]
pub(super) struct SubgraphVersionRawInfo {
pub struct SubgraphVersionRawInfo {
pub deployment: DeploymentRawInfo,
}

/// Internal representation of the fetched deployment information.
///
/// This is not the final representation of the deployment.
#[derive(Debug, Clone)]
pub(super) struct DeploymentRawInfo {
pub struct DeploymentRawInfo {
pub id: DeploymentId,
pub manifest_network: String,
pub manifest_start_block: BlockNumber,
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct AllocationInfo {
/// Process the fetched subgraphs' information.
///
/// - If the subgraph has no allocations, [`SubgraphError::NoAllocations`] is returned.
pub(super) fn process_subgraph_info(
pub fn process_subgraph_info(
subgraphs: HashMap<SubgraphId, SubgraphRawInfo>,
) -> HashMap<SubgraphId, Result<SubgraphInfo, SubgraphError>> {
subgraphs
Expand Down Expand Up @@ -144,7 +144,7 @@ fn check_subgraph_has_allocations(subgraph: &SubgraphRawInfo) -> Result<(), Subg
/// Process the fetched deployments' information.
///
/// - If the deployment has no allocations, [`DeploymentError::NoAllocations`] is returned.
pub(super) fn process_deployments_info(
pub fn process_deployments_info(
deployments: HashMap<DeploymentId, DeploymentRawInfo>,
) -> HashMap<DeploymentId, Result<DeploymentInfo, DeploymentError>> {
deployments
Expand Down

0 comments on commit 98ff5eb

Please sign in to comment.