diff --git a/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts b/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts index 8d1214ee..b7f90f8e 100644 --- a/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts +++ b/packages/indexer-database/src/entities/evm/V3FundsDeposited.ts @@ -70,7 +70,7 @@ export class V3FundsDeposited { quoteBlockNumber: number; @Column({ nullable: true }) - integratorId: string; + integratorId?: string; @Column() transactionHash: string; diff --git a/packages/indexer-database/src/index.ts b/packages/indexer-database/src/index.ts index 570fecc3..f4791a07 100644 --- a/packages/indexer-database/src/index.ts +++ b/packages/indexer-database/src/index.ts @@ -1,3 +1,4 @@ export * from "./main"; export * as entities from "./entities"; export * as utils from "./utils"; +export * from "./model"; diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index 3d3e3429..7ba56df8 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -1,15 +1,9 @@ import "reflect-metadata"; import { DataSource, LessThan, Not, In } from "typeorm"; import * as entities from "./entities"; +import { DatabaseConfig } from "./model"; export { DataSource, LessThan, Not, In }; -export type DatabaseConfig = { - host: string; - port: string; - user: string; - password: string; - dbName: string; -}; export const createDataSource = (config: DatabaseConfig): DataSource => { return new DataSource({ diff --git a/packages/indexer-database/src/model/index.ts b/packages/indexer-database/src/model/index.ts new file mode 100644 index 00000000..d3671917 --- /dev/null +++ b/packages/indexer-database/src/model/index.ts @@ -0,0 +1,28 @@ +export type DatabaseConfig = { + host: string; + port: string; + user: string; + password: string; + dbName: string; +}; + +/** + * Enum to represent the result type of a query. + * - If the entity is identical to the one in the database, return `Nothing`. + * - If the unique keys are not present, return Inserted. + * - If the finalised field was the only one that changed, return `Finalised`. + * - If any of the entity fields were changed, return Updated. + * - If both the finalised field and other fields were changed, return UpdatedAndFinalised. + */ +export enum SaveQueryResultType { + Nothing = "nothing", + Inserted = "inserted", + Finalised = "finalised", + Updated = "updated", + UpdatedAndFinalised = "updatedAndFinalised", +} + +export type SaveQueryResult = { + data: T | undefined; + result: SaveQueryResultType; +}; diff --git a/packages/indexer-database/src/utils/BlockchainEventRepository.ts b/packages/indexer-database/src/utils/BlockchainEventRepository.ts new file mode 100644 index 00000000..b295f3b7 --- /dev/null +++ b/packages/indexer-database/src/utils/BlockchainEventRepository.ts @@ -0,0 +1,114 @@ +import { DataSource, EntityTarget, ObjectLiteral } from "typeorm"; +import winston from "winston"; + +import { SaveQueryResultType, SaveQueryResult } from "../model"; + +export function filterSaveQueryResults( + results: SaveQueryResult[], + type: SaveQueryResultType, +) { + return results + .filter((result) => result.result === type) + .map((result) => result.data) + .filter((data) => data !== undefined); +} + +export class BlockchainEventRepository { + constructor( + protected postgres: DataSource, + protected logger: winston.Logger, + ) {} + + /** + * Saves the entities to the database. + * @param entity - The entity to save. + * @param data - The data to save. + * @param uniqueKeys + * The unique keys to check for. It is recommended these keys to be indexed columns, so that the query is faster. + * @param comparisonKeys - The keys to compare for changes. + */ + protected async saveAndHandleFinalisationBatch( + entity: EntityTarget, + data: Partial[], + uniqueKeys: (keyof Entity)[], + comparisonKeys: (keyof Entity)[], + ): Promise[]> { + return Promise.all( + data.map((dataItem) => + this.saveAndHandleFinalisation( + entity, + dataItem, + uniqueKeys, + comparisonKeys, + ), + ), + ); + } + + /** + * Saves the entity to the database. + * @param entity - The entity to save. + * @param data - The data to save. + * @param uniqueKeys + * The unique keys to check for. It is recommended these keys to be indexed columns, so that the query is faster. + * @param comparisonKeys - The keys to compare for changes. + */ + protected async saveAndHandleFinalisation( + entity: EntityTarget, + data: Partial, + uniqueKeys: (keyof Entity)[], + comparisonKeys: (keyof Entity)[], + ): Promise> { + const where = uniqueKeys.reduce( + (acc, key) => { + acc[key] = data[key]; + return acc; + }, + {} as Record, + ); + const dbEntity = await this.postgres + .getRepository(entity) + .findOne({ where }); + const repository = this.postgres.getRepository(entity); + + if (!dbEntity) { + await repository.insert(data); + return { + data: (await repository.findOne({ where })) as Entity, + result: SaveQueryResultType.Inserted, + }; + } + + // Check if any of the values of the comparison keys have changed + const isChanged = comparisonKeys.some((key) => data[key] !== dbEntity[key]); + // Check if the data moved in finalised state + const isFinalisedChanged = data.finalised && !dbEntity.finalised; + + if (isChanged) { + await repository.update(where, data); + if (isFinalisedChanged) { + return { + data: (await repository.findOne({ where })) as Entity, + result: SaveQueryResultType.UpdatedAndFinalised, + }; + } + return { + data: (await repository.findOne({ where })) as Entity, + result: SaveQueryResultType.Updated, + }; + } + + if (isFinalisedChanged) { + await repository.update(where, data); + return { + data: (await repository.findOne({ where })) as Entity, + result: SaveQueryResultType.Finalised, + }; + } + + return { + data: undefined, + result: SaveQueryResultType.Nothing, + }; + } +} diff --git a/packages/indexer-database/src/utils/index.ts b/packages/indexer-database/src/utils/index.ts index 07fd7baf..468fc8c9 100644 --- a/packages/indexer-database/src/utils/index.ts +++ b/packages/indexer-database/src/utils/index.ts @@ -1 +1,2 @@ export * from "./BaseRepository"; +export * from "./BlockchainEventRepository"; diff --git a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts index d1a00087..4a8c740a 100644 --- a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts +++ b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts @@ -110,12 +110,12 @@ export class AcrossIndexerManager { return spokePoolIndexer; }, ); + this.spokePoolIndexers = spokePoolIndexers; - if (spokePoolIndexers.length === 0) { + if (this.spokePoolIndexers.length === 0) { this.logger.warn("No spoke pool indexers to start"); return; } - this.spokePoolIndexers = spokePoolIndexers; return Promise.all( this.spokePoolIndexers.map((indexer) => indexer.start()), ); diff --git a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts index e17cd59d..d1399e23 100644 --- a/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts +++ b/packages/indexer/src/data-indexing/service/SpokePoolIndexerDataHandler.ts @@ -4,18 +4,24 @@ import { getDeployedAddress, getDeployedBlockNumber, } from "@across-protocol/contracts"; -import { entities } from "@repo/indexer-database"; +import { + entities, + utils as indexerDatabaseUtils, + SaveQueryResult, +} from "@repo/indexer-database"; +import { SaveQueryResultType } from "@repo/indexer-database"; import { BlockRange } from "../model"; import { IndexerDataHandler } from "./IndexerDataHandler"; import * as utils from "../../utils"; +import { getIntegratorId } from "../../utils/spokePoolUtils"; import { SpokePoolRepository } from "../../database/SpokePoolRepository"; import { SpokePoolProcessor } from "../../services/spokePoolProcessor"; import { IndexerQueues, IndexerQueuesService } from "../../messaging/service"; import { IntegratorIdMessage } from "../../messaging/IntegratorIdWorker"; -type FetchEventsResult = { +export type FetchEventsResult = { v3FundsDepositedEvents: utils.V3FundsDepositedWithIntegradorId[]; filledV3RelayEvents: across.interfaces.FillWithBlock[]; requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[]; @@ -29,6 +35,13 @@ type FetchEventsResult = { tokensBridgedEvents: across.interfaces.TokensBridged[]; }; +export type StoreEventsResult = { + deposits: SaveQueryResult[]; + fills: SaveQueryResult[]; + slowFillRequests: SaveQueryResult[]; + executedRefundRoots: SaveQueryResult[]; +}; + export class SpokePoolIndexerDataHandler implements IndexerDataHandler { private isInitialized: boolean; private configStoreClient: across.clients.AcrossConfigStoreClient; @@ -95,20 +108,12 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { blockRange, identifier: this.getDataIdentifier(), }); - - // Fetch integratorId synchronously when there are fewer than 1K deposit events - // For larger sets, use the IntegratorId queue for asynchronous processing - const fetchIntegratorIdSync = events.v3FundsDepositedEvents.length < 1000; - if (fetchIntegratorIdSync) { - this.appendIntegratorIdToDeposits(events.v3FundsDepositedEvents); - } - const storedEvents = await this.storeEvents(events, lastFinalisedBlock); - - if (!fetchIntegratorIdSync) { - await this.publishIntegratorIdMessages(storedEvents.deposits); - } - + const newInsertedDeposits = indexerDatabaseUtils.filterSaveQueryResults( + storedEvents.deposits, + SaveQueryResultType.Inserted, + ); + await this.updateNewDepositsWithIntegratorId(newInsertedDeposits); await this.spokePoolProcessor.process(storedEvents); } @@ -159,7 +164,7 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { private async storeEvents( params: FetchEventsResult, lastFinalisedBlock: number, - ) { + ): Promise { const { spokePoolClientRepository } = this; const { v3FundsDepositedEvents, @@ -223,20 +228,22 @@ export class SpokePoolIndexerDataHandler implements IndexerDataHandler { ); } - private async appendIntegratorIdToDeposits( - deposits: utils.V3FundsDepositedWithIntegradorId[], + private async updateNewDepositsWithIntegratorId( + deposits: entities.V3FundsDeposited[], ) { - await across.utils.forEachAsync( - deposits, - async (deposit, index, deposits) => { - const integratorId = await utils.getIntegratorId( - this.provider, - new Date(deposit.quoteTimestamp * 1000), - deposit.transactionHash, + await across.utils.forEachAsync(deposits, async (deposit) => { + const integratorId = await getIntegratorId( + this.provider, + deposit.quoteTimestamp, + deposit.transactionHash, + ); + if (integratorId) { + await this.spokePoolClientRepository.updateDepositEventWithIntegratorId( + deposit.id, + integratorId, ); - deposits[index] = { ...deposit, integratorId }; - }, - ); + } + }); } private async publishIntegratorIdMessages( diff --git a/packages/indexer/src/database/SpokePoolRepository.ts b/packages/indexer/src/database/SpokePoolRepository.ts index 6ae5e84f..cd733f9c 100644 --- a/packages/indexer/src/database/SpokePoolRepository.ts +++ b/packages/indexer/src/database/SpokePoolRepository.ts @@ -3,13 +3,19 @@ import * as across from "@across-protocol/sdk"; import { DataSource, entities, utils as dbUtils } from "@repo/indexer-database"; import * as utils from "../utils"; -export class SpokePoolRepository extends dbUtils.BaseRepository { +export class SpokePoolRepository extends dbUtils.BlockchainEventRepository { constructor( postgres: DataSource, logger: winston.Logger, - private chunkSize = 2000, + private chunkSize = 100, ) { - super(postgres, logger, true); + super(postgres, logger); + } + + public updateDepositEventWithIntegratorId(id: number, integratorId: string) { + return this.postgres + .getRepository(entities.V3FundsDeposited) + .update({ id }, { integratorId }); } private formatRelayData( @@ -45,11 +51,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.V3FundsDeposited, eventsChunk, ["depositId", "originChainId"], - lastFinalisedBlock, + ["relayHash", "transactionHash"], ), ), ); @@ -84,11 +90,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.FilledV3Relay, eventsChunk, ["relayHash"], - lastFinalisedBlock, + ["transactionHash"], ), ), ); @@ -110,11 +116,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.RequestedV3SlowFill, eventsChunk, - ["relayHash"], - lastFinalisedBlock, + ["depositId", "originChainId"], + ["relayHash", "transactionHash"], ), ), ); @@ -144,11 +150,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.RequestedSpeedUpV3Deposit, eventsChunk, ["depositId", "originChainId", "transactionHash", "logIndex"], - lastFinalisedBlock, + ["transactionHash"], ), ), ); @@ -171,11 +177,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.RelayedRootBundle, eventsChunk, ["chainId", "rootBundleId"], - lastFinalisedBlock, + ["transactionHash"], ), ), ); @@ -197,11 +203,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.ExecutedRelayerRefundRoot, eventsChunk, ["chainId", "rootBundleId", "leafId"], - lastFinalisedBlock, + ["transactionHash"], ), ), ); @@ -222,11 +228,11 @@ export class SpokePoolRepository extends dbUtils.BaseRepository { const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); const savedEvents = await Promise.all( chunkedEvents.map((eventsChunk) => - this.insertWithFinalisationCheck( + this.saveAndHandleFinalisationBatch( entities.TokensBridged, eventsChunk, ["chainId", "leafId", "l2TokenAddress", "transactionHash"], - lastFinalisedBlock, + ["transactionHash"], ), ), ); diff --git a/packages/indexer/src/services/spokePoolProcessor.ts b/packages/indexer/src/services/spokePoolProcessor.ts index cc2f0c21..c70fb8ca 100644 --- a/packages/indexer/src/services/spokePoolProcessor.ts +++ b/packages/indexer/src/services/spokePoolProcessor.ts @@ -1,7 +1,13 @@ import { utils } from "@across-protocol/sdk"; -import { DataSource, entities } from "@repo/indexer-database"; +import { + DataSource, + entities, + utils as dbUtils, + SaveQueryResultType, +} from "@repo/indexer-database"; import winston from "winston"; import { RelayStatus } from "../../../indexer-database/dist/src/entities"; +import { StoreEventsResult } from "../data-indexing/service/SpokePoolIndexerDataHandler"; enum SpokePoolEvents { V3FundsDeposited = "V3FundsDeposited", @@ -18,27 +24,52 @@ export class SpokePoolProcessor { private readonly chainId: number, ) {} - public async process(events: { - deposits: entities.V3FundsDeposited[]; - fills: entities.FilledV3Relay[]; - slowFillRequests: entities.RequestedV3SlowFill[]; - executedRefundRoots: entities.ExecutedRelayerRefundRoot[]; - }) { - if (events.deposits.length > 0) - await this.assignSpokeEventsToRelayHashInfo( - SpokePoolEvents.V3FundsDeposited, - events.deposits, - ); - if (events.slowFillRequests.length > 0) - await this.assignSpokeEventsToRelayHashInfo( - SpokePoolEvents.RequestedV3SlowFill, - events.slowFillRequests, - ); - if (events.fills.length > 0) - await this.assignSpokeEventsToRelayHashInfo( - SpokePoolEvents.FilledV3Relay, - events.fills, - ); + public async process(events: StoreEventsResult) { + const newDeposits = dbUtils.filterSaveQueryResults( + events.deposits, + SaveQueryResultType.Inserted, + ); + const updatedDeposits = dbUtils.filterSaveQueryResults( + events.deposits, + SaveQueryResultType.Updated, + ); + await this.assignSpokeEventsToRelayHashInfo( + SpokePoolEvents.V3FundsDeposited, + [...newDeposits, ...updatedDeposits], + ); + // TODO: for new deposits, notify status change to unfilled + // here... + + const newSlowFillRequests = dbUtils.filterSaveQueryResults( + events.slowFillRequests, + SaveQueryResultType.Inserted, + ); + const updatedSlowFillRequests = dbUtils.filterSaveQueryResults( + events.slowFillRequests, + SaveQueryResultType.Updated, + ); + await this.assignSpokeEventsToRelayHashInfo( + SpokePoolEvents.RequestedV3SlowFill, + [...newSlowFillRequests, ...updatedSlowFillRequests], + ); + // TODO: for new slow fill requests, notify status change to slow fill requested + // here... + + const newFills = dbUtils.filterSaveQueryResults( + events.fills, + SaveQueryResultType.Inserted, + ); + const updatedFills = dbUtils.filterSaveQueryResults( + events.fills, + SaveQueryResultType.Updated, + ); + await this.assignSpokeEventsToRelayHashInfo(SpokePoolEvents.FilledV3Relay, [ + ...newFills, + ...updatedFills, + ]); + // TODO: for new fills, notify status change to filled + // here... + await this.updateExpiredRelays(); await this.updateRefundedDepositsStatus(); } @@ -108,6 +139,10 @@ export class SpokePoolProcessor { const relayHashInfoRepository = this.postgres.getRepository( entities.RelayHashInfo, ); + this.logger.info({ + at: "SpokePoolProcessor#updateExpiredRelays", + message: `Updating status for expired relays`, + }); const expiredDeposits = await relayHashInfoRepository .createQueryBuilder() .update() @@ -123,8 +158,7 @@ export class SpokePoolProcessor { .execute(); this.logger.info({ at: "SpokePoolProcessor#updateExpiredRelays", - message: `Updated status for expired relays`, - updatedRelayHashInfoRows: expiredDeposits.generatedMaps.length, + message: `Updated status for ${expiredDeposits.generatedMaps.length} expired relays`, }); } @@ -135,6 +169,10 @@ export class SpokePoolProcessor { * @returns A void promise */ private async updateRefundedDepositsStatus(): Promise { + this.logger.info({ + at: "SpokePoolProcessor#updateRefundedDepositsStatus", + message: `Updating status for refunded deposits`, + }); const relayHashInfoRepository = this.postgres.getRepository( entities.RelayHashInfo, );