From 048b8b6227d6120ae4d3be0a93e74881553f1b5b Mon Sep 17 00:00:00 2001 From: amateima <89395931+amateima@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:50:43 +0300 Subject: [PATCH] feat: migrate SpokePoolIndexer to new Indexer (#64) Co-authored-by: Alexandru Matei --- docker-compose.yml | 2 + .../entities/evm/ExecutedRelayerRefundRoot.ts | 3 + .../src/entities/evm/FilledV3Relay.ts | 3 + .../src/entities/evm/RelayedRootBundle.ts | 3 + .../entities/evm/RequestedSpeedUpV3Deposit.ts | 3 + .../src/entities/evm/RequestedV3SlowFill.ts | 3 + .../src/entities/evm/TokensBridged.ts | 3 + .../src/entities/evm/V3FundsDeposited.ts | 3 + .../1728296909794-SpokePoolFinalised.ts | 53 ++++ .../src/utils/BaseRepository.ts | 26 ++ .../src/data-indexing/service/Indexer.ts | 55 ++-- .../src/database/SpokePoolRepository.ts | 115 +++++-- packages/indexer/src/main.ts | 62 ++-- .../src/services/HubPoolIndexerDataHandler.ts | 1 + .../services/SpokePoolIndexerDataHandler.ts | 233 ++++++++++++++ packages/indexer/src/services/index.ts | 1 - .../indexer/src/services/spokePoolIndexer.ts | 296 ------------------ .../src/services/spokePoolProcessor.ts | 2 +- 18 files changed, 486 insertions(+), 381 deletions(-) create mode 100644 packages/indexer-database/src/migrations/1728296909794-SpokePoolFinalised.ts create mode 100644 packages/indexer/src/services/SpokePoolIndexerDataHandler.ts delete mode 100644 packages/indexer/src/services/spokePoolIndexer.ts diff --git a/docker-compose.yml b/docker-compose.yml index 608857c6..9c40c91b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,6 +20,8 @@ services: container_name: redis_cache volumes: - indexer-redis-volume:/data + ports: + - 6379:6379 # Indexer Service indexer-scraper: diff --git a/packages/indexer-database/src/entities/evm/ExecutedRelayerRefundRoot.ts b/packages/indexer-database/src/entities/evm/ExecutedRelayerRefundRoot.ts index acb0cefd..0e853760 100644 --- a/packages/indexer-database/src/entities/evm/ExecutedRelayerRefundRoot.ts +++ b/packages/indexer-database/src/entities/evm/ExecutedRelayerRefundRoot.ts @@ -52,6 +52,9 @@ export class ExecutedRelayerRefundRoot { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/FilledV3Relay.ts b/packages/indexer-database/src/entities/evm/FilledV3Relay.ts index e01d182e..bb00e0c2 100644 --- a/packages/indexer-database/src/entities/evm/FilledV3Relay.ts +++ b/packages/indexer-database/src/entities/evm/FilledV3Relay.ts @@ -90,6 +90,9 @@ export class FilledV3Relay { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/RelayedRootBundle.ts b/packages/indexer-database/src/entities/evm/RelayedRootBundle.ts index 0e474ef3..d5f87c95 100644 --- a/packages/indexer-database/src/entities/evm/RelayedRootBundle.ts +++ b/packages/indexer-database/src/entities/evm/RelayedRootBundle.ts @@ -39,6 +39,9 @@ export class RelayedRootBundle { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/RequestedSpeedUpV3Deposit.ts b/packages/indexer-database/src/entities/evm/RequestedSpeedUpV3Deposit.ts index 75905d1f..bec0743f 100644 --- a/packages/indexer-database/src/entities/evm/RequestedSpeedUpV3Deposit.ts +++ b/packages/indexer-database/src/entities/evm/RequestedSpeedUpV3Deposit.ts @@ -46,6 +46,9 @@ export class RequestedSpeedUpV3Deposit { @Column() logIndex: number; + @Column() + finalised: boolean; + @Column() blockNumber: number; diff --git a/packages/indexer-database/src/entities/evm/RequestedV3SlowFill.ts b/packages/indexer-database/src/entities/evm/RequestedV3SlowFill.ts index 64d27090..5a82bc57 100644 --- a/packages/indexer-database/src/entities/evm/RequestedV3SlowFill.ts +++ b/packages/indexer-database/src/entities/evm/RequestedV3SlowFill.ts @@ -72,6 +72,9 @@ export class RequestedV3SlowFill { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/TokensBridged.ts b/packages/indexer-database/src/entities/evm/TokensBridged.ts index 01fc1e3a..6fcd7f4e 100644 --- a/packages/indexer-database/src/entities/evm/TokensBridged.ts +++ b/packages/indexer-database/src/entities/evm/TokensBridged.ts @@ -44,6 +44,9 @@ export class TokensBridged { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts b/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts index 013d6a31..8d1214ee 100644 --- a/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts +++ b/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts @@ -84,6 +84,9 @@ export class V3FundsDeposited { @Column() blockNumber: number; + @Column() + finalised: boolean; + @CreateDateColumn() createdAt: Date; } diff --git a/packages/indexer-database/src/migrations/1728296909794-SpokePoolFinalised.ts b/packages/indexer-database/src/migrations/1728296909794-SpokePoolFinalised.ts new file mode 100644 index 00000000..40e1b2a3 --- /dev/null +++ b/packages/indexer-database/src/migrations/1728296909794-SpokePoolFinalised.ts @@ -0,0 +1,53 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class SpokePoolFinalised1728296909794 implements MigrationInterface { + name = "SpokePoolFinalised1728296909794"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "evm"."v3_funds_deposited" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."filled_v3_relay" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."requested_v3_slow_fill" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."requested_speed_up_v3_deposit" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."relayed_root_bundle" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."executed_relayer_refund_root" ADD "finalised" boolean NOT NULL`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."tokens_bridged" ADD "finalised" boolean NOT NULL`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "evm"."tokens_bridged" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."executed_relayer_refund_root" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."relayed_root_bundle" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."requested_speed_up_v3_deposit" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."requested_v3_slow_fill" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."filled_v3_relay" DROP COLUMN "finalised"`, + ); + await queryRunner.query( + `ALTER TABLE "evm"."v3_funds_deposited" DROP COLUMN "finalised"`, + ); + } +} diff --git a/packages/indexer-database/src/utils/BaseRepository.ts b/packages/indexer-database/src/utils/BaseRepository.ts index 4ee04e09..33221868 100644 --- a/packages/indexer-database/src/utils/BaseRepository.ts +++ b/packages/indexer-database/src/utils/BaseRepository.ts @@ -37,4 +37,30 @@ export class BaseRepository { } } } + + protected async insertWithFinalisationCheck( + entity: EntityTarget, + data: Partial[], + uniqueKeys: (keyof Entity)[], + lastFinalisedBlock: number, + ) { + const repository = this.postgres.getRepository(entity); + const uniqueKeysAsStrings = uniqueKeys.map((key) => key.toString()); + + const savedData = await repository + .createQueryBuilder() + .insert() + .values(data) + .orUpdate(Object.keys(data[0] as any), uniqueKeysAsStrings) + .returning("*") + .execute(); + await repository + .createQueryBuilder() + .delete() + .where("finalised = false") + .andWhere("blockNumber <= :lastFinalisedBlock", { lastFinalisedBlock }) + .execute(); + + return savedData.generatedMaps as Entity[]; + } } diff --git a/packages/indexer/src/data-indexing/service/Indexer.ts b/packages/indexer/src/data-indexing/service/Indexer.ts index a9e1b05a..d456795e 100644 --- a/packages/indexer/src/data-indexing/service/Indexer.ts +++ b/packages/indexer/src/data-indexing/service/Indexer.ts @@ -34,7 +34,8 @@ export class Indexer { public async start() { while (!this.stopRequested) { try { - const { latestBlockNumber, blockRange } = await this.getBlockRange(); + const { latestBlockNumber, blockRange, lastFinalisedBlock } = + await this.getBlockRange(); if (!blockRange) { this.logger.info({ @@ -44,18 +45,13 @@ export class Indexer { dataIdentifier: this.dataHandler.getDataIdentifier(), }); } else { - const lastFinalisedBlockInBlockRange = - this.getLastFinalisedBlockInBlockRange( - latestBlockNumber, - blockRange, - ); await this.dataHandler.processBlockRange( blockRange, - lastFinalisedBlockInBlockRange, + lastFinalisedBlock, ); await this.redisCache.set( this.getLastFinalisedBlockCacheKey(), - lastFinalisedBlockInBlockRange, + lastFinalisedBlock, ); } } catch (error) { @@ -83,36 +79,37 @@ export class Indexer { this.stopRequested = true; } - private getLastFinalisedBlockInBlockRange( - latestBlockNumber: number, - blockRange: BlockRange, - ) { - const lastOnChainFinalisedBlock = - latestBlockNumber - this.config.finalisedBlockBufferDistance + 1; - const lastFinalisedBlockInBlockRange = Math.min( - blockRange.to, - lastOnChainFinalisedBlock, - ); - - return lastFinalisedBlockInBlockRange; - } - private async getBlockRange() { - const lastBlockFinalised = await this.redisCache.get( + const lastBlockFinalisedStored = await this.redisCache.get( this.getLastFinalisedBlockCacheKey(), ); const latestBlockNumber = await this.rpcProvider.getBlockNumber(); - // If the last block finalised is the same as the latest block, no new blocks to process - if (latestBlockNumber === lastBlockFinalised) { - return { latestBlockNumber, blockRange: undefined }; + const lastFinalisedBlockOnChain = + latestBlockNumber - this.config.finalisedBlockBufferDistance; + + if (lastBlockFinalisedStored === lastFinalisedBlockOnChain) { + return { + latestBlockNumber, + blockRange: undefined, + lastFinalisedBlock: lastFinalisedBlockOnChain, + }; } - const fromBlock = lastBlockFinalised - ? lastBlockFinalised + 1 + const fromBlock = lastBlockFinalisedStored + ? lastBlockFinalisedStored + 1 : this.dataHandler.getStartIndexingBlockNumber(); // TODO: hardcoded 200_000, should be a config or removed const toBlock = Math.min(fromBlock + 200_000, latestBlockNumber); const blockRange: BlockRange = { from: fromBlock, to: toBlock }; - return { latestBlockNumber, blockRange }; + const lastFinalisedBlockInBlockRange = Math.min( + blockRange.to, + lastFinalisedBlockOnChain, + ); + + return { + latestBlockNumber, + blockRange, + lastFinalisedBlock: lastFinalisedBlockInBlockRange, + }; } private getLastFinalisedBlockCacheKey() { diff --git a/packages/indexer/src/database/SpokePoolRepository.ts b/packages/indexer/src/database/SpokePoolRepository.ts index 3419a807..7ccb4bc7 100644 --- a/packages/indexer/src/database/SpokePoolRepository.ts +++ b/packages/indexer/src/database/SpokePoolRepository.ts @@ -7,10 +7,9 @@ export class SpokePoolRepository extends utils.BaseRepository { constructor( postgres: DataSource, logger: winston.Logger, - throwError: boolean, private chunkSize = 2000, ) { - super(postgres, logger, throwError); + super(postgres, logger, true); } private formatRelayData( @@ -34,7 +33,7 @@ export class SpokePoolRepository extends utils.BaseRepository { v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & { integratorId: string | undefined; })[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = v3FundsDepositedEvents.map((event) => { return { @@ -42,12 +41,18 @@ export class SpokePoolRepository extends utils.BaseRepository { relayHash: getRelayHashFromEvent(event), ...this.formatRelayData(event), quoteTimestamp: new Date(event.quoteTimestamp * 1000), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insert(entities.V3FundsDeposited, eventsChunk, throwError), + this.insertWithFinalisationCheck( + entities.V3FundsDeposited, + eventsChunk, + ["depositId", "originChainId"], + lastFinalisedBlock, + ), ), ); return savedEvents.flat(); @@ -55,7 +60,7 @@ export class SpokePoolRepository extends utils.BaseRepository { public async formatAndSaveFilledV3RelayEvents( filledV3RelayEvents: across.interfaces.FillWithBlock[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = filledV3RelayEvents.map((event) => { return { @@ -67,12 +72,18 @@ export class SpokePoolRepository extends utils.BaseRepository { updatedOutputAmount: event.relayExecutionInfo.updatedOutputAmount.toString(), }, + finalised: event.blockNumber <= lastFinalisedBlock, }; }); const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insert(entities.FilledV3Relay, eventsChunk, throwError), + this.insertWithFinalisationCheck( + entities.FilledV3Relay, + eventsChunk, + ["relayHash"], + lastFinalisedBlock, + ), ), ); return savedEvents.flat(); @@ -80,20 +91,28 @@ export class SpokePoolRepository extends utils.BaseRepository { public async formatAndSaveRequestedV3SlowFillEvents( requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = requestedV3SlowFillEvents.map((event) => { return { ...event, relayHash: getRelayHashFromEvent(event), ...this.formatRelayData(event), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - return this.insert( - entities.RequestedV3SlowFill, - formattedEvents, - throwError, + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const savedEvents = await Promise.all( + chunkedEvents.map((eventsChunk) => + this.insertWithFinalisationCheck( + entities.RequestedV3SlowFill, + eventsChunk, + ["depositId", "originChainId"], + lastFinalisedBlock, + ), + ), ); + return savedEvents.flat(); } public async formatAndSaveRequestedSpeedUpV3Events( @@ -102,7 +121,7 @@ export class SpokePoolRepository extends utils.BaseRepository { [depositId: number]: across.interfaces.SpeedUpWithBlock[]; }; }, - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = Object.values(requestedSpeedUpV3Events).flatMap( (eventsByDepositId) => @@ -111,56 +130,100 @@ export class SpokePoolRepository extends utils.BaseRepository { return { ...event, updatedOutputAmount: event.updatedOutputAmount.toString(), + finalised: event.blockNumber <= lastFinalisedBlock, }; }), ), ); - await this.insert( - entities.RequestedSpeedUpV3Deposit, - formattedEvents, - throwError, + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const savedEvents = await Promise.all( + chunkedEvents.map((eventsChunk) => + this.insertWithFinalisationCheck( + entities.RequestedSpeedUpV3Deposit, + eventsChunk, + ["depositId", "originChainId", "transactionHash"], + lastFinalisedBlock, + ), + ), ); + return savedEvents.flat(); } public async formatAndSaveRelayedRootBundleEvents( relayedRootBundleEvents: across.interfaces.RootBundleRelayWithBlock[], chainId: number, - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = relayedRootBundleEvents.map((event) => { - return { ...event, chainId }; + return { + ...event, + chainId, + finalised: event.blockNumber <= lastFinalisedBlock, + }; }); - await this.insert(entities.RelayedRootBundle, formattedEvents, throwError); + + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const savedEvents = await Promise.all( + chunkedEvents.map((eventsChunk) => + this.insertWithFinalisationCheck( + entities.RelayedRootBundle, + eventsChunk, + ["chainId", "rootBundleId"], + lastFinalisedBlock, + ), + ), + ); + return savedEvents.flat(); } public async formatAndSaveExecutedRelayerRefundRootEvents( executedRelayerRefundRootEvents: across.interfaces.RelayerRefundExecutionWithBlock[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = executedRelayerRefundRootEvents.map((event) => { return { ...event, amountToReturn: event.amountToReturn.toString(), refundAmounts: event.refundAmounts.map((amount) => amount.toString()), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - return this.insert( - entities.ExecutedRelayerRefundRoot, - formattedEvents, - throwError, + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const savedEvents = await Promise.all( + chunkedEvents.map((eventsChunk) => + this.insertWithFinalisationCheck( + entities.ExecutedRelayerRefundRoot, + eventsChunk, + ["chainId", "rootBundleId", "leafId"], + lastFinalisedBlock, + ), + ), ); + return savedEvents.flat(); } public async formatAndSaveTokensBridgedEvents( tokensBridgedEvents: across.interfaces.TokensBridged[], - throwError?: boolean, + lastFinalisedBlock: number, ) { const formattedEvents = tokensBridgedEvents.map((event) => { return { ...event, amountToReturn: event.amountToReturn.toString(), + finalised: event.blockNumber <= lastFinalisedBlock, }; }); - await this.insert(entities.TokensBridged, formattedEvents, throwError); + const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); + const savedEvents = await Promise.all( + chunkedEvents.map((eventsChunk) => + this.insertWithFinalisationCheck( + entities.TokensBridged, + eventsChunk, + ["chainId", "leafId", "l2TokenAddress", "transactionHash"], + lastFinalisedBlock, + ), + ), + ); + return savedEvents.flat(); } } diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 0be12848..50be2865 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -2,7 +2,6 @@ import * as services from "./services"; import winston from "winston"; import Redis from "ioredis"; import * as across from "@across-protocol/sdk"; -import * as acrossConstants from "@across-protocol/constants"; import { connectToDatabase } from "./database/database.provider"; import * as parseEnv from "./parseEnv"; @@ -15,11 +14,14 @@ import { Indexer, } from "./data-indexing/service"; import { HubPoolRepository } from "./database/HubPoolRepository"; +import { SpokePoolRepository } from "./database/SpokePoolRepository"; import { ConfigStoreClientFactory, HubPoolClientFactory, SpokePoolClientFactory, -} from "./utils"; +} from "./utils/contractFactoryUtils"; +import { SpokePoolIndexerDataHandler } from "./services/SpokePoolIndexerDataHandler"; +import { SpokePoolProcessor } from "./services/spokePoolProcessor"; async function initializeRedis( config: parseEnv.RedisConfig, @@ -52,7 +54,6 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { redisCache, logger, ).initializeProviders(); - const configStoreClientFactory = new ConfigStoreClientFactory( retryProvidersFactory, logger, @@ -74,20 +75,31 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { redis, postgres, }); - const spokePoolIndexers = spokePoolChainsEnabled.map( - (spokePoolChainId) => - new services.spokePoolIndexer.Indexer({ - logger, - redis, - postgres, - spokePoolChainId, - configStoreFactory: configStoreClientFactory, - hubPoolFactory: hubPoolClientFactory, - spokePoolClientFactory, - hubChainId, - retryProviderFactory: retryProvidersFactory, - }), - ); + + const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => { + const spokePoolIndexerDataHandler = new SpokePoolIndexerDataHandler( + logger, + chainId, + hubChainId, + retryProvidersFactory.getProviderForChainId(chainId), + configStoreClientFactory, + hubPoolClientFactory, + spokePoolClientFactory, + new SpokePoolRepository(postgres, logger), + new SpokePoolProcessor(postgres, logger, chainId), + ); + const spokePoolIndexer = new Indexer( + { + loopWaitTimeSeconds: getLoopWaitTimeSeconds(chainId), + finalisedBlockBufferDistance: getFinalisedBlockBufferDistance(chainId), + }, + spokePoolIndexerDataHandler, + retryProvidersFactory.getProviderForChainId(chainId), + redisCache, + logger, + ); + return spokePoolIndexer; + }); const hubPoolIndexerDataHandler = new HubPoolIndexerDataHandler( logger, @@ -98,17 +110,11 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { ); const hubPoolIndexer = new Indexer( { - loopWaitTimeSeconds: getLoopWaitTimeSeconds( - acrossConstants.CHAIN_IDs.MAINNET, - ), - finalisedBlockBufferDistance: getFinalisedBlockBufferDistance( - acrossConstants.CHAIN_IDs.MAINNET, - ), + loopWaitTimeSeconds: getLoopWaitTimeSeconds(hubChainId), + finalisedBlockBufferDistance: getFinalisedBlockBufferDistance(hubChainId), }, hubPoolIndexerDataHandler, - retryProvidersFactory.getProviderForChainId( - acrossConstants.CHAIN_IDs.MAINNET, - ), + retryProvidersFactory.getProviderForChainId(hubChainId), new RedisCache(redis), logger, ); @@ -119,7 +125,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { logger.info( "\nWait for shutdown, or press Ctrl+C again to forcefully exit.", ); - spokePoolIndexers.map((s) => s.stop()); + spokePoolIndexers.map((s) => s.stopGracefully()); hubPoolIndexer.stopGracefully(); } else { logger.info("\nForcing exit..."); @@ -139,7 +145,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { await Promise.allSettled([ bundleProcessor.start(10), hubPoolIndexer.start(), - ...spokePoolIndexers.map((s) => s.start(10)), + ...spokePoolIndexers.map((s) => s.start()), ]); logger.info({ diff --git a/packages/indexer/src/services/HubPoolIndexerDataHandler.ts b/packages/indexer/src/services/HubPoolIndexerDataHandler.ts index 3a67a920..deb641b1 100644 --- a/packages/indexer/src/services/HubPoolIndexerDataHandler.ts +++ b/packages/indexer/src/services/HubPoolIndexerDataHandler.ts @@ -55,6 +55,7 @@ export class HubPoolIndexerDataHandler implements IndexerDataHandler { message: "HubPoolIndexerDataHandler::Processing block range", blockRange, lastFinalisedBlock, + identifier: this.getDataIdentifier(), }); if (!this.isInitialized) { await this.initialize(); diff --git a/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts new file mode 100644 index 00000000..cda148fb --- /dev/null +++ b/packages/indexer/src/services/SpokePoolIndexerDataHandler.ts @@ -0,0 +1,233 @@ +import { Logger } from "winston"; +import * as across from "@across-protocol/sdk"; +import { + getDeployedAddress, + getDeployedBlockNumber, +} from "@across-protocol/contracts"; + +import { BlockRange } from "../data-indexing/model"; +import { IndexerDataHandler } from "../data-indexing/service/IndexerDataHandler"; + +import * as utils from "../utils"; +import { providers } from "@across-protocol/sdk"; +import { SpokePoolRepository } from "../database/SpokePoolRepository"; +import { SpokePoolProcessor } from "./spokePoolProcessor"; + +type FetchEventsResult = { + v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & { + integratorId: string | undefined; + })[]; + filledV3RelayEvents: across.interfaces.FillWithBlock[]; + requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[]; + requestedSpeedUpV3Events: { + [depositorAddress: string]: { + [depositId: number]: across.interfaces.SpeedUpWithBlock[]; + }; + }; + relayedRootBundleEvents: across.interfaces.RootBundleRelayWithBlock[]; + executedRelayerRefundRootEvents: across.interfaces.RelayerRefundExecutionWithBlock[]; + tokensBridgedEvents: across.interfaces.TokensBridged[]; +}; + +export class SpokePoolIndexerDataHandler implements IndexerDataHandler { + private isInitialized: boolean; + private configStoreClient: across.clients.AcrossConfigStoreClient; + private hubPoolClient: across.clients.HubPoolClient; + private spokePoolClient: across.clients.SpokePoolClient; + + constructor( + private logger: Logger, + private chainId: number, + private hubPoolChainId: number, + private provider: providers.RetryProvider, + private configStoreFactory: utils.ConfigStoreClientFactory, + private hubPoolFactory: utils.HubPoolClientFactory, + private spokePoolFactory: utils.SpokePoolClientFactory, + private spokePoolClientRepository: SpokePoolRepository, + private spokePoolProcessor: SpokePoolProcessor, + ) { + this.isInitialized = false; + } + + public getDataIdentifier() { + return `${getDeployedAddress("SpokePool", this.chainId)}:${this.chainId}`; + } + public getStartIndexingBlockNumber() { + return getDeployedBlockNumber("SpokePool", this.chainId); + } + + public async processBlockRange( + blockRange: BlockRange, + lastFinalisedBlock: number, + ) { + this.logger.info({ + message: "SpokePoolIndexerDataHandler::Processing block range", + blockRange, + lastFinalisedBlock, + identifier: this.getDataIdentifier(), + }); + + if (!this.isInitialized) { + this.initialize(); + this.isInitialized = true; + } + + const events = await this.fetchEventsByRange(blockRange); + const requestedSpeedUpV3EventsCount = Object.values( + events.requestedSpeedUpV3Events, + ).reduce((acc, speedUps) => { + return acc + Object.values(speedUps).length; + }, 0); + this.logger.info({ + message: "SpokePoolIndexerDataHandler::Found events", + events: { + v3FundsDepositedEvents: events.v3FundsDepositedEvents.length, + filledV3RelayEvents: events.filledV3RelayEvents.length, + requestedV3SlowFillEvents: events.requestedV3SlowFillEvents.length, + requestedSpeedUpV3Events: requestedSpeedUpV3EventsCount, + relayedRootBundleEvents: events.relayedRootBundleEvents.length, + executedRelayerRefundRootEvents: + events.executedRelayerRefundRootEvents.length, + tokensBridgedEvents: events.tokensBridgedEvents.length, + }, + }); + const storedEvents = await this.storeEvents(events, lastFinalisedBlock); + await this.spokePoolProcessor.process(storedEvents); + } + + private async fetchEventsByRange( + blockRange: BlockRange, + ): Promise { + const { configStoreClient, hubPoolClient, spokePoolClient } = this; + + spokePoolClient.eventSearchConfig.toBlock = blockRange.to; + + await configStoreClient.update(); + await hubPoolClient.update([ + "SetPoolRebalanceRoute", + "CrossChainContractsSet", + ]); + await spokePoolClient.update(); + + const v3FundsDepositedEvents = spokePoolClient.getDeposits({ + fromBlock: blockRange.from, + toBlock: blockRange.to, + }); + const v3FundsDepositedWithIntegradorId = await Promise.all( + v3FundsDepositedEvents.map(async (deposit) => { + return { + ...deposit, + integratorId: await this.getIntegratorId(deposit), + }; + }), + ); + const filledV3RelayEvents = spokePoolClient.getFills(); + const requestedV3SlowFillEvents = + spokePoolClient.getSlowFillRequestsForOriginChain(this.chainId); + const requestedSpeedUpV3Events = spokePoolClient.getSpeedUps(); + const relayedRootBundleEvents = spokePoolClient.getRootBundleRelays(); + const executedRelayerRefundRootEvents = + spokePoolClient.getRelayerRefundExecutions(); + const tokensBridgedEvents = spokePoolClient.getTokensBridged(); + + return { + v3FundsDepositedEvents: v3FundsDepositedWithIntegradorId, + filledV3RelayEvents, + requestedV3SlowFillEvents, + requestedSpeedUpV3Events, + relayedRootBundleEvents, + executedRelayerRefundRootEvents, + tokensBridgedEvents, + }; + } + + private async storeEvents( + params: FetchEventsResult, + lastFinalisedBlock: number, + ) { + const { spokePoolClientRepository } = this; + const { + v3FundsDepositedEvents, + filledV3RelayEvents, + requestedV3SlowFillEvents, + requestedSpeedUpV3Events, + relayedRootBundleEvents, + executedRelayerRefundRootEvents, + tokensBridgedEvents, + } = params; + const savedV3FundsDepositedEvents = + await spokePoolClientRepository.formatAndSaveV3FundsDepositedEvents( + v3FundsDepositedEvents, + lastFinalisedBlock, + ); + const savedV3RequestedSlowFills = + await spokePoolClientRepository.formatAndSaveRequestedV3SlowFillEvents( + requestedV3SlowFillEvents, + lastFinalisedBlock, + ); + const savedFilledV3RelayEvents = + await spokePoolClientRepository.formatAndSaveFilledV3RelayEvents( + filledV3RelayEvents, + lastFinalisedBlock, + ); + const savedExecutedRelayerRefundRootEvents = + await spokePoolClientRepository.formatAndSaveExecutedRelayerRefundRootEvents( + executedRelayerRefundRootEvents, + lastFinalisedBlock, + ); + await spokePoolClientRepository.formatAndSaveRequestedSpeedUpV3Events( + requestedSpeedUpV3Events, + lastFinalisedBlock, + ); + await spokePoolClientRepository.formatAndSaveRelayedRootBundleEvents( + relayedRootBundleEvents, + this.chainId, + lastFinalisedBlock, + ); + await spokePoolClientRepository.formatAndSaveTokensBridgedEvents( + tokensBridgedEvents, + lastFinalisedBlock, + ); + return { + deposits: savedV3FundsDepositedEvents, + fills: savedFilledV3RelayEvents, + slowFillRequests: savedV3RequestedSlowFills, + executedRefundRoots: savedExecutedRelayerRefundRootEvents, + }; + } + + private initialize() { + this.configStoreClient = this.configStoreFactory.get(this.hubPoolChainId); + this.hubPoolClient = this.hubPoolFactory.get( + this.hubPoolChainId, + undefined, + undefined, + { + configStoreClient: this.configStoreClient, + }, + ); + this.spokePoolClient = this.spokePoolFactory.get( + this.chainId, + undefined, + undefined, + { + hubPoolClient: this.hubPoolClient, + }, + ); + } + + private async getIntegratorId(deposit: across.interfaces.DepositWithBlock) { + const INTEGRATOR_DELIMITER = "1dc0de"; + const INTEGRATOR_ID_LENGTH = 4; // Integrator ids are 4 characters long + let integratorId = undefined; + const txn = await this.provider.getTransaction(deposit.transactionHash); + const txnData = txn.data; + if (txnData.includes(INTEGRATOR_DELIMITER)) { + integratorId = txnData + .split(INTEGRATOR_DELIMITER) + .pop() + ?.substring(0, INTEGRATOR_ID_LENGTH); + } + return integratorId; + } +} diff --git a/packages/indexer/src/services/index.ts b/packages/indexer/src/services/index.ts index 1963143e..98f38373 100644 --- a/packages/indexer/src/services/index.ts +++ b/packages/indexer/src/services/index.ts @@ -1,3 +1,2 @@ export * as bundles from "./bundles"; -export * as spokePoolIndexer from "./spokePoolIndexer"; export * as spokeProcessor from "./spokePoolProcessor"; diff --git a/packages/indexer/src/services/spokePoolIndexer.ts b/packages/indexer/src/services/spokePoolIndexer.ts deleted file mode 100644 index 13a7b5f5..00000000 --- a/packages/indexer/src/services/spokePoolIndexer.ts +++ /dev/null @@ -1,296 +0,0 @@ -import { getDeployedBlockNumber } from "@across-protocol/contracts"; -import winston from "winston"; -import * as across from "@across-protocol/sdk"; -import Redis from "ioredis"; -import { DataSource, entities } from "@repo/indexer-database"; -import { differenceWith, isEqual } from "lodash"; - -import { RedisCache } from "../redis/redisCache"; -import { SpokePoolRepository } from "../database/SpokePoolRepository"; -import { RangeQueryStore, Ranges } from "../redis/rangeQueryStore"; -import * as utils from "../utils"; -import { BaseIndexer } from "../generics"; -import { providers } from "ethers"; -import { Processor } from "./spokePoolProcessor"; -import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; -import { getMaxBlockLookBack } from "../web3/constants"; - -export type Config = { - logger: winston.Logger; - redis: Redis; - postgres: DataSource; - hubChainId: number; - spokePoolChainId: number; - retryProviderFactory: RetryProvidersFactory; - configStoreFactory: utils.ConfigStoreClientFactory; - hubPoolFactory: utils.HubPoolClientFactory; - spokePoolClientFactory: utils.SpokePoolClientFactory; -}; - -export class Indexer extends BaseIndexer { - private resolvedRangeStore: RangeQueryStore; - private hubPoolClient: across.clients.HubPoolClient; - private spokePoolClientRepository: SpokePoolRepository; - private spokePoolProcessor: Processor; - private spokePoolProvider: providers.Provider; - private configStoreClient: across.clients.AcrossConfigStoreClient; - - constructor(private readonly config: Config) { - super(config.logger, "spokePool"); - } - - protected async initialize(): Promise { - const { - logger, - redis, - postgres, - retryProviderFactory, - hubChainId, - spokePoolChainId, - hubPoolFactory, - configStoreFactory, - } = this.config; - // Instantiate redis store - this.resolvedRangeStore = new RangeQueryStore({ - redis, - prefix: `spokePoolIndexer:${spokePoolChainId}:rangeQuery:resolved`, - }); - // Instantiate providers - this.spokePoolProvider = - retryProviderFactory.getProviderForChainId(spokePoolChainId); - // Instantiate respositories - this.spokePoolClientRepository = new SpokePoolRepository( - postgres, - logger, - true, - ); - // Instantiate processors - this.spokePoolProcessor = new Processor(postgres, logger, spokePoolChainId); - // Instantiate clients - this.configStoreClient = configStoreFactory.get(hubChainId); - this.hubPoolClient = hubPoolFactory.get(hubChainId, undefined, undefined, { - configStoreClient: this.configStoreClient, - }); - } - - protected async indexerLogic(): Promise { - const allPendingQueries = await this.getUnprocessedRanges(); - this.logger.info({ - message: `Running indexer on ${allPendingQueries.length} block range requests`, - }); - for (const query of allPendingQueries) { - if (this.stopRequested) break; - const [fromBlock, toBlock] = query; - try { - this.logger.info({ - message: `Starting update for block range ${fromBlock} to ${toBlock}`, - query, - }); - const events = await this.fetchEventsByRange(fromBlock, toBlock); - // TODO: may need to catch error to see if there is some data that exists in db already or change storage to overwrite any existing values - const storedEvents = await this.storeEvents(events); - await this.spokePoolProcessor.process(storedEvents); - await this.resolvedRangeStore.setByRange(fromBlock, toBlock); - this.logger.info({ - message: `Completed update for block range ${fromBlock} to ${toBlock}`, - query, - }); - } catch (error) { - if (error instanceof Error) { - this.logger.error({ - message: `Error updating for block range ${fromBlock} to ${toBlock}`, - query, - errorMessage: error.message, - }); - } else { - // not an error type, throw it and crash app likely - throw error; - } - } - } - } - - private async storeEvents(params: { - v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & { - integratorId: string | undefined; - })[]; - filledV3RelayEvents: across.interfaces.FillWithBlock[]; - requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[]; - requestedSpeedUpV3Events: { - [depositorAddress: string]: { - [depositId: number]: across.interfaces.SpeedUpWithBlock[]; - }; - }; - relayedRootBundleEvents: across.interfaces.RootBundleRelayWithBlock[]; - executedRelayerRefundRootEvents: across.interfaces.RelayerRefundExecutionWithBlock[]; - tokensBridgedEvents: across.interfaces.TokensBridged[]; - }) { - const { spokePoolClientRepository } = this; - const { - v3FundsDepositedEvents, - filledV3RelayEvents, - requestedV3SlowFillEvents, - requestedSpeedUpV3Events, - relayedRootBundleEvents, - executedRelayerRefundRootEvents, - tokensBridgedEvents, - } = params; - const savedV3FundsDepositedEvents = - await spokePoolClientRepository.formatAndSaveV3FundsDepositedEvents( - v3FundsDepositedEvents, - ); - const savedV3RequestedSlowFills = - await spokePoolClientRepository.formatAndSaveRequestedV3SlowFillEvents( - requestedV3SlowFillEvents, - ); - const savedFilledV3RelayEvents = - await spokePoolClientRepository.formatAndSaveFilledV3RelayEvents( - filledV3RelayEvents, - ); - const savedExecutedRelayerRefundRootEvents = - await spokePoolClientRepository.formatAndSaveExecutedRelayerRefundRootEvents( - executedRelayerRefundRootEvents, - ); - await spokePoolClientRepository.formatAndSaveRequestedSpeedUpV3Events( - requestedSpeedUpV3Events, - ); - await spokePoolClientRepository.formatAndSaveRelayedRootBundleEvents( - relayedRootBundleEvents, - this.config.spokePoolChainId, - ); - await spokePoolClientRepository.formatAndSaveTokensBridgedEvents( - tokensBridgedEvents, - ); - return { - deposits: savedV3FundsDepositedEvents, - fills: savedFilledV3RelayEvents, - slowFillRequests: savedV3RequestedSlowFills, - executedRefundRoots: savedExecutedRelayerRefundRootEvents, - }; - } - - private async getUnprocessedRanges(toBlock?: number): Promise { - const deployedBlockNumber = getDeployedBlockNumber( - "SpokePool", - this.config.spokePoolChainId, - ); - const spokeLatestBlockNumber = - toBlock ?? (await this.spokePoolProvider.getBlockNumber()); - - const allPaginatedBlockRanges = across.utils.getPaginatedBlockRanges({ - fromBlock: deployedBlockNumber, - toBlock: spokeLatestBlockNumber, - maxBlockLookBack: getMaxBlockLookBack(this.config.spokePoolChainId), - }); - - const allQueries = await this.resolvedRangeStore.entries(); - const resolvedRanges = allQueries.map(([, x]) => [x.fromBlock, x.toBlock]); - const needsProcessing = differenceWith( - allPaginatedBlockRanges, - resolvedRanges, - isEqual, - ); - - this.logger.info({ - message: `${needsProcessing.length} block ranges need processing`, - deployedBlockNumber, - spokeLatestBlockNumber, - }); - - return needsProcessing; - } - - private async fetchEventsByRange(fromBlock: number, toBlock: number) { - this.logger.info({ - message: "updating config store client", - fromBlock, - toBlock, - }); - await this.configStoreClient.update(); - this.logger.info({ - message: "updated config store client", - fromBlock, - toBlock, - }); - - this.logger.info({ - message: "updating hubpool client", - fromBlock, - toBlock, - }); - await this.hubPoolClient.update(); - this.logger.info({ - message: "updated hubpool client", - fromBlock, - toBlock, - }); - - const spokeClient = this.config.spokePoolClientFactory.get( - this.config.spokePoolChainId, - fromBlock, - toBlock, - { - hubPoolClient: this.hubPoolClient, - }, - ); - - this.logger.info({ - message: "updating spokepool client", - fromBlock, - toBlock, - }); - await spokeClient.update(); - this.logger.info({ - message: "updated spokepool client", - fromBlock, - toBlock, - }); - - const v3FundsDepositedEvents = spokeClient.getDeposits(); - const v3FundsDepositedWithIntegradorId = await Promise.all( - v3FundsDepositedEvents.map(async (deposit) => { - return { - ...deposit, - integratorId: await this.getIntegratorId(deposit), - }; - }), - ); - const filledV3RelayEvents = spokeClient.getFills(); - const requestedV3SlowFillEvents = - spokeClient.getSlowFillRequestsForOriginChain( - this.config.spokePoolChainId, - ); - const requestedSpeedUpV3Events = spokeClient.getSpeedUps(); - const relayedRootBundleEvents = spokeClient.getRootBundleRelays(); - const executedRelayerRefundRootEvents = - spokeClient.getRelayerRefundExecutions(); - const tokensBridgedEvents = spokeClient.getTokensBridged(); - - return { - v3FundsDepositedEvents: v3FundsDepositedWithIntegradorId, - filledV3RelayEvents, - requestedV3SlowFillEvents, - requestedSpeedUpV3Events, - relayedRootBundleEvents, - executedRelayerRefundRootEvents, - tokensBridgedEvents, - }; - } - - private async getIntegratorId(deposit: across.interfaces.DepositWithBlock) { - const INTEGRATOR_DELIMITER = "1dc0de"; - const INTEGRATOR_ID_LENGTH = 4; // Integrator ids are 4 characters long - let integratorId = undefined; - const txn = await this.spokePoolProvider.getTransaction( - deposit.transactionHash, - ); - const txnData = txn.data; - if (txnData.includes(INTEGRATOR_DELIMITER)) { - integratorId = txnData - .split(INTEGRATOR_DELIMITER) - .pop() - ?.substring(0, INTEGRATOR_ID_LENGTH); - } - return integratorId; - } -} diff --git a/packages/indexer/src/services/spokePoolProcessor.ts b/packages/indexer/src/services/spokePoolProcessor.ts index 5967190a..eff93e05 100644 --- a/packages/indexer/src/services/spokePoolProcessor.ts +++ b/packages/indexer/src/services/spokePoolProcessor.ts @@ -8,7 +8,7 @@ enum SpokePoolEvents { RequestedV3SlowFill = "RequestedV3SlowFill", } -export class Processor { +export class SpokePoolProcessor { constructor( private readonly postgres: DataSource, private readonly logger: winston.Logger,