From 9ee8f9bb150da716ace661bfc5c74ee5ef1f7713 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Fri, 27 Dec 2024 12:03:25 -0800 Subject: [PATCH] common: limit graph-node status query to actions' deploymentIDs --- packages/indexer-common/src/graph-node.ts | 113 +++++++++++++++++- .../src/indexer-management/allocations.ts | 8 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/packages/indexer-common/src/graph-node.ts b/packages/indexer-common/src/graph-node.ts index 9e47d6616..7f2c57971 100644 --- a/packages/indexer-common/src/graph-node.ts +++ b/packages/indexer-common/src/graph-node.ts @@ -12,6 +12,7 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types' import pRetry, { Options } from 'p-retry' import axios, { AxiosInstance } from 'axios' import fetch from 'isomorphic-fetch' +import { Action } from './indexer-management' interface indexNode { id: string @@ -103,9 +104,15 @@ export class GraphNode { await pRetry( async () => { const deployments = await this.subgraphDeployments() - this.logger.info(`Successfully connected to indexing status API`, { - currentDeployments: deployments.map((deployment) => deployment.display), - }) + if (deployments.length < 100) { + this.logger.info(`Successfully connected to indexing status API`, { + currentDeployments: deployments.map((deployment) => deployment.display), + }) + } else { + this.logger.info(`Successfully connected to indexing status API`, { + currentDeploymentCount: deployments.length, + }) + } }, { retries: 10, @@ -148,10 +155,98 @@ export class GraphNode { ) } + public async subgraphDeploymentAssignmentsForAllocateActions( + subgraphStatus: SubgraphStatus, + actions: Action[], + ): Promise { + const deploymentIDs = actions.map((action) => action.deploymentID) + + const nodeOnlyResult = await this.status + .query( + gql` + query indexingStatuses($subgraphs: [String!]!) { + indexingStatuses(subgraphs: $subgraphs) { + subgraphDeployment: subgraph + node + } + } + `, + { subgraphs: deploymentIDs }, + ) + .toPromise() + + if (nodeOnlyResult.error) { + throw nodeOnlyResult.error + } + + const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses + .filter( + (result: { node: string | null }) => + result.node !== null && result.node !== undefined, + ) + .map((result: { subgraphDeployment: string }) => result.subgraphDeployment) + + const result = await this.status + .query( + gql` + query indexingStatuses($subgraphs: [String!]!) { + indexingStatuses(subgraphs: $subgraphs) { + subgraphDeployment: subgraph + node + paused + } + } + `, + { subgraphs: withAssignments }, + ) + .toPromise() + + if (result.error) { + throw result.error + } + + if (!result.data.indexingStatuses || result.data.length === 0) { + this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, { + data: result.data, + }) + return [] + } + + type QueryResult = { + subgraphDeployment: string + node: string | undefined + paused: boolean | undefined + } + + const results = result.data.indexingStatuses + .filter((status: QueryResult) => { + if (subgraphStatus === SubgraphStatus.ACTIVE) { + return ( + status.paused === false || + (status.paused === undefined && status.node !== 'removed') + ) + } else if (subgraphStatus === SubgraphStatus.PAUSED) { + return status.node === 'removed' || status.paused === true + } else if (subgraphStatus === SubgraphStatus.ALL) { + return true + } + }) + .map((status: QueryResult) => { + return { + id: new SubgraphDeploymentID(status.subgraphDeployment), + node: status.node, + paused: status.paused ?? status.node === 'removed', + } + }) + + return results + } + public async subgraphDeploymentsAssignments( subgraphStatus: SubgraphStatus, ): Promise { try { + const startTimeMs = Date.now() this.logger.debug('Fetch subgraph deployment assignments') // FIXME: remove this initial check for just node when graph-node releases @@ -170,6 +265,10 @@ export class GraphNode { ) .toPromise() + this.logger.debug( + `Fetch subgraph deployment assignments took ${Date.now() - startTimeMs}ms`, + ) + if (nodeOnlyResult.error) { throw nodeOnlyResult.error } @@ -214,7 +313,7 @@ export class GraphNode { paused: boolean | undefined } - return result.data.indexingStatuses + const results = result.data.indexingStatuses .filter((status: QueryResult) => { if (subgraphStatus === SubgraphStatus.ACTIVE) { return ( @@ -234,6 +333,12 @@ export class GraphNode { paused: status.paused ?? status.node === 'removed', } }) + this.logger.debug( + `Fetching mapped subgraph deployment ${results.length} assignments took ${ + Date.now() - startTimeMs + }ms`, + ) + return results } catch (error) { const err = indexerError(IndexerErrorCode.IE018, error) this.logger.error(`Failed to query indexing status API`, { err }) diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index f556224b3..c30192152 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -322,9 +322,11 @@ export class AllocationManager { logger.info('Ensure subgraph deployments are deployed before we allocate to them', { allocateActions, }) - const currentAssignments = await this.graphNode.subgraphDeploymentsAssignments( - SubgraphStatus.ALL, - ) + const currentAssignments = + await this.graphNode.subgraphDeploymentAssignmentsForAllocateActions( + SubgraphStatus.ALL, + actions, + ) await pMap( allocateActions, async (action: Action) =>