Skip to content

Commit

Permalink
agent: reorder startup to hold more components in migration context
Browse files Browse the repository at this point in the history
  • Loading branch information
tilacog committed Mar 10, 2023
1 parent 5544cf8 commit da62b33
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 78 deletions.
161 changes: 83 additions & 78 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,86 @@ export default {
// inside the agent
registerIndexerErrorMetrics(metrics)

const indexingStatusResolver = new IndexingStatusResolver({
logger: logger,
statusEndpoint: argv.graphNodeStatusEndpoint,
})

const networkSubgraph = await NetworkSubgraph.create({
logger,
endpoint: argv.networkSubgraphEndpoint,
deployment: argv.networkSubgraphDeployment
? {
indexingStatusResolver: indexingStatusResolver,
deployment: new SubgraphDeploymentID(
argv.networkSubgraphDeployment,
),
graphNodeQueryEndpoint: argv.graphNodeQueryEndpoint,
}
: undefined,
})

const networkProvider = await Network.provider(
logger,
metrics,
argv.ethereum,
argv.ethereumPollingInterval,
)

const networkMeta = await networkProvider.getNetwork()

logger.info(`Connect to contracts`, {
network: networkMeta.name,
chainId: networkMeta.chainId,
providerNetworkChainID: networkProvider.network.chainId,
})

logger.info(`Connect wallet`, {
network: networkMeta.name,
chainId: networkMeta.chainId,
})
let wallet = Wallet.fromMnemonic(argv.mnemonic)
wallet = wallet.connect(networkProvider)
logger.info(`Connected wallet`)

let contracts = undefined
try {
contracts = await connectContracts(wallet, networkMeta.chainId)
} catch (err) {
logger.error(
`Failed to connect to contracts, please ensure you are using the intended Ethereum network`,
{
err,
},
)
process.exit(1)
}
logger.info(`Successfully connected to contracts`, {
curation: contracts.curation.address,
disputeManager: contracts.disputeManager.address,
epochManager: contracts.epochManager.address,
gns: contracts.gns.address,
rewardsManager: contracts.rewardsManager.address,
serviceRegistry: contracts.serviceRegistry.address,
staking: contracts.staking.address,
token: contracts.token.address,
})

const indexerAddress = toAddress(argv.indexerAddress)

const epochSubgraph = await EpochSubgraph.create(argv.epochSubgraphEndpoint)

const networkMonitor = new NetworkMonitor(
await resolveChainId(networkMeta.chainId),
contracts,
toAddress(indexerAddress),
logger.child({ component: 'NetworkMonitor' }),
indexingStatusResolver,
networkSubgraph,
networkProvider,
epochSubgraph,
)

logger.info('Connect to database', {
host: argv.postgresHost,
port: argv.postgresPort,
Expand Down Expand Up @@ -515,6 +595,9 @@ export default {
context: {
queryInterface: sequelize.getQueryInterface(),
logger,
indexingStatusResolver,
graphNodeAdminEndpoint: argv.graphNodeAdminEndpoint,
networkMonitor,
},
storage: new SequelizeStorage({ sequelize }),
logger: console,
Expand All @@ -538,73 +621,6 @@ export default {
await sequelize.sync()
logger.info(`Successfully synced database models`)

const networkProvider = await Network.provider(
logger,
metrics,
argv.ethereum,
argv.ethereumPollingInterval,
)
const networkMeta = await networkProvider.getNetwork()

logger.info(`Connect wallet`, {
network: networkMeta.name,
chainId: networkMeta.chainId,
})
let wallet = Wallet.fromMnemonic(argv.mnemonic)
wallet = wallet.connect(networkProvider)
logger.info(`Connected wallet`)

logger.info(`Connect to contracts`, {
network: networkMeta.name,
chainId: networkMeta.chainId,
providerNetworkChainID: networkProvider.network.chainId,
})
let contracts = undefined
try {
contracts = await connectContracts(wallet, networkMeta.chainId)
} catch (err) {
logger.error(
`Failed to connect to contracts, please ensure you are using the intended Ethereum network`,
{
err,
},
)
process.exit(1)
}
logger.info(`Successfully connected to contracts`, {
curation: contracts.curation.address,
disputeManager: contracts.disputeManager.address,
epochManager: contracts.epochManager.address,
gns: contracts.gns.address,
rewardsManager: contracts.rewardsManager.address,
serviceRegistry: contracts.serviceRegistry.address,
staking: contracts.staking.address,
token: contracts.token.address,
})

const indexerAddress = toAddress(argv.indexerAddress)

const indexingStatusResolver = new IndexingStatusResolver({
logger: logger,
statusEndpoint: argv.graphNodeStatusEndpoint,
})

const networkSubgraph = await NetworkSubgraph.create({
logger,
endpoint: argv.networkSubgraphEndpoint,
deployment: argv.networkSubgraphDeployment
? {
indexingStatusResolver: indexingStatusResolver,
deployment: new SubgraphDeploymentID(
argv.networkSubgraphDeployment,
),
graphNodeQueryEndpoint: argv.graphNodeQueryEndpoint,
}
: undefined,
})

const epochSubgraph = await EpochSubgraph.create(argv.epochSubgraphEndpoint)

logger.info('Connect to network')
const maxGasFee = argv.baseFeeGasMax || argv.gasPriceMax
const network = await Network.create(
Expand Down Expand Up @@ -644,17 +660,6 @@ export default {
})
await receiptCollector.queuePendingReceiptsFromDatabase()

const networkMonitor = new NetworkMonitor(
await resolveChainId(networkMeta.chainId),
contracts,
toAddress(indexerAddress),
logger.child({ component: 'NetworkMonitor' }),
indexingStatusResolver,
networkSubgraph,
networkProvider,
epochSubgraph,
)

logger.info('Launch indexer management API server')
const allocationManagementMode =
AllocationManagementMode[
Expand Down
120 changes: 120 additions & 0 deletions packages/indexer-agent/src/db/migrations/11-update-deployment-names.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import { Logger } from '@graphprotocol/common-ts'
import {
formatDeploymentName,
indexerError,
IndexerErrorCode,
IndexingStatusResolver,
NetworkMonitor,
SubgraphDeploymentAssignment,
} from '@graphprotocol/indexer-common'
import { Client } from 'jayson/promise'
import pMap from 'p-map'

interface MigrationContext {
logger: Logger
indexingStatusResolver: IndexingStatusResolver
graphNodeAdminEndpoint: string
networkMonitor: NetworkMonitor
}

interface Context {
context: MigrationContext
}

interface SubgraphRedeployment {
newName: string
ipfsHash: string
nodeId: string
}

export async function up({ context }: Context): Promise<void> {
const { logger, networkMonitor: networkMonitor } = context

const clientConstructor = context.graphNodeAdminEndpoint.startsWith('https')
? Client.https
: Client.http
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const rpc = clientConstructor(context.graphNodeAdminEndpoint as any)

// Fetch active deployments.
const subgraphDeploymentAssignments =
await context.indexingStatusResolver.subgraphDeploymentsAssignments()

// Maps assignments to redeployments
const mapper = async (assignment: SubgraphDeploymentAssignment) =>
await processAssignment(assignment, networkMonitor, logger)

// Produces the Redeployments.
const subgraphRedeployments: SubgraphRedeployment[] = (
await pMap(subgraphDeploymentAssignments, mapper)
).filter((item): item is SubgraphRedeployment => Boolean(item))

// Execute Redeployments over Graph-Node's RPC endpoint
await pMap(subgraphRedeployments, async subgraphRedeployment =>
redeploy(rpc, subgraphRedeployment, logger),
)
}

export async function down(): Promise<void> {
// Nothing to do here. The old subgraph names should still exist in Graph Node's database.
}

// Performs redeployment in Graph-Node
async function redeploy(
client: Client,
subgraphRedeployment: SubgraphRedeployment,
logger: Logger,
): Promise<void> {
logger = logger.child({
...subgraphRedeployment,
})
try {
logger.info(`Redeploying subgraph with adjusted name`)
logger.debug(`Sending subgraph_create request`)
const create_response = await client.request('subgraph_create', {
name: subgraphRedeployment.newName,
})
if (create_response.error) {
throw create_response.error
}
logger.debug(`Sending subgraph_deploy request`)
const deploy_response = await client.request('subgraph_deploy', {
name: subgraphRedeployment.newName,
ipfs_hash: subgraphRedeployment.ipfsHash,
node_id: subgraphRedeployment.nodeId,
})
if (deploy_response.error) {
throw deploy_response.error
}
logger.info(`Successfully redeployed subgraph with a fixed name`)
} catch (error) {
const err = indexerError(IndexerErrorCode.IE026, error)
logger.error(`Failed to redeploy subgraph with a fixed name`, { err })
throw err
}
}

// Tentatively converts a `SubgraphDeploymentAssignment` into a `SubgraphRedeployment`
async function processAssignment(
assignment: SubgraphDeploymentAssignment,
networkMonitor: NetworkMonitor,
logger: Logger,
): Promise<SubgraphRedeployment | undefined> {
logger.debug(
`Querying the Network Subgraph for more details on subgraph deployment ${assignment.id}`,
)
const deployment = await networkMonitor.subgraphDeployment(
assignment.id.ipfsHash,
)
if (!deployment) {
logger.info(
`Subgraph deployment ${assignment.id} was not found in Network Subgraph. Skipping its redeployment`,
)
return undefined
}
return {
newName: formatDeploymentName(deployment),
ipfsHash: assignment.id.ipfsHash,
nodeId: assignment.node,
}
}

0 comments on commit da62b33

Please sign in to comment.