diff --git a/packages/indexer-database/src/entities/Bundle.ts b/packages/indexer-database/src/entities/Bundle.ts index 9bb26337..0da591e0 100644 --- a/packages/indexer-database/src/entities/Bundle.ts +++ b/packages/indexer-database/src/entities/Bundle.ts @@ -13,6 +13,7 @@ import { RootBundleCanceled } from "./evm/RootBundleCanceled"; import { RootBundleExecuted } from "./evm/RootBundleExecuted"; import { RootBundleDisputed } from "./evm/RootBundleDisputed"; import { BundleBlockRange } from "./BundleBlockRange"; +import { BundleEvent } from "./BundleEvent"; export enum BundleStatus { Proposed = "Proposed", @@ -89,4 +90,10 @@ export class Bundle { nullable: false, }) ranges: BundleBlockRange[]; + + @Column({ default: false }) + eventsAssociated: boolean; + + @OneToMany(() => BundleEvent, (event) => event.bundle) + events: BundleEvent[]; } diff --git a/packages/indexer-database/src/entities/BundleEvent.ts b/packages/indexer-database/src/entities/BundleEvent.ts new file mode 100644 index 00000000..c9391434 --- /dev/null +++ b/packages/indexer-database/src/entities/BundleEvent.ts @@ -0,0 +1,38 @@ +import { + Column, + Entity, + ManyToOne, + PrimaryGeneratedColumn, + Unique, +} from "typeorm"; +import { Bundle } from "./Bundle"; + +export enum BundleEventType { + Deposit = "deposit", + ExpiredDeposit = "expiredDeposit", + Fill = "fill", + SlowFill = "slowFill", + UnexecutableSlowFill = "unexecutableSlowFill", +} + +@Entity() +@Unique("UK_bundleEvent_eventType_relayHash", ["type", "relayHash"]) +export class BundleEvent { + @PrimaryGeneratedColumn() + id: number; + + @ManyToOne(() => Bundle, (bundle) => bundle.events) + bundle: Bundle; + + @Column() + bundleId: number; + + @Column({ type: "enum", enum: BundleEventType }) + type: BundleEventType; + + @Column() + relayHash: string; + + @Column({ nullable: true }) + repaymentChainId: number; +} diff --git a/packages/indexer-database/src/entities/index.ts b/packages/indexer-database/src/entities/index.ts index fa11d482..58f7c660 100644 --- a/packages/indexer-database/src/entities/index.ts +++ b/packages/indexer-database/src/entities/index.ts @@ -15,6 +15,7 @@ export * from "./evm/TokensBridged"; // Others export * from "./Bundle"; +export * from "./BundleEvent"; export * from "./BundleBlockRange"; export * from "./RootBundleExecutedJoinTable"; export * from "./RelayHashInfo"; diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index cfeeb554..940185ca 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -39,6 +39,7 @@ export const createDataSource = (config: DatabaseConfig): DataSource => { // Bundle entities.Bundle, entities.BundleBlockRange, + entities.BundleEvent, entities.RootBundleExecutedJoinTable, // Others entities.RelayHashInfo, diff --git a/packages/indexer-database/src/migrations/1729644581993-BundleEvent.ts b/packages/indexer-database/src/migrations/1729644581993-BundleEvent.ts new file mode 100644 index 00000000..d8885540 --- /dev/null +++ b/packages/indexer-database/src/migrations/1729644581993-BundleEvent.ts @@ -0,0 +1,38 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class BundleEvent1729644581993 implements MigrationInterface { + name = "BundleEvent1729644581993"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TYPE "public"."bundle_event_type_enum" AS ENUM('deposit', 'expiredDeposit', 'fill', 'slowFill', 'unexecutableSlowFill')`, + ); + await queryRunner.query( + `CREATE TABLE "bundle_event" ( + "id" SERIAL NOT NULL, + "bundleId" integer NOT NULL, + "type" "public"."bundle_event_type_enum" NOT NULL, + "relayHash" character varying NOT NULL, + "repaymentChainId" integer, + CONSTRAINT "UK_bundleEvent_eventType_relayHash" UNIQUE ("type", "relayHash"), + CONSTRAINT "PK_d633122fa4b52768e1b588bddee" PRIMARY KEY ("id"))`, + ); + await queryRunner.query( + `ALTER TABLE "bundle" ADD "eventsAssociated" boolean NOT NULL DEFAULT false`, + ); + await queryRunner.query( + `ALTER TABLE "bundle_event" ADD CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba" FOREIGN KEY ("bundleId") REFERENCES "bundle"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "bundle_event" DROP CONSTRAINT "FK_62dcd4f6f0d1713fab0c8542dba"`, + ); + await queryRunner.query( + `ALTER TABLE "bundle" DROP COLUMN "eventsAssociated"`, + ); + await queryRunner.query(`DROP TABLE "bundle_event"`); + await queryRunner.query(`DROP TYPE "public"."bundle_event_type_enum"`); + } +} diff --git a/packages/indexer/src/database/BundleRepository.ts b/packages/indexer/src/database/BundleRepository.ts index b621c42a..0c1e313d 100644 --- a/packages/indexer/src/database/BundleRepository.ts +++ b/packages/indexer/src/database/BundleRepository.ts @@ -1,4 +1,5 @@ import winston from "winston"; +import * as across from "@across-protocol/sdk"; import { DataSource, entities, utils } from "@repo/indexer-database"; export type BlockRangeInsertType = { @@ -32,6 +33,7 @@ export class BundleRepository extends utils.BaseRepository { postgres: DataSource, logger: winston.Logger, throwError?: boolean, + private chunkSize = 2000, ) { super(postgres, logger, throwError); } @@ -371,4 +373,182 @@ export class BundleRepository extends utils.BaseRepository { return (await executedUpdateQuery.execute())?.affected ?? 0; } + + /** + * Retrieves executed bundles that do not have events associated with them. + * The query can be filtered by the block number and a limit on the number of results returned. + * @param filters - Optional filters for the query. + * @param filters.fromBlock - If provided, retrieves bundles where the proposal's block number is greater than this value. + * @param limit - The maximum number of bundles to retrieve. + * @returns An array of bundles that match the given criteria. + */ + public async getExecutedBundlesWithoutEventsAssociated( + filters: { + fromBlock?: number; + }, + limit = 5, + ): Promise { + const bundleRepo = this.postgres.getRepository(entities.Bundle); + const query = bundleRepo + .createQueryBuilder("b") + .select(["b", "proposal", "ranges"]) + .leftJoinAndSelect("b.ranges", "ranges") + .leftJoinAndSelect("b.proposal", "proposal") + .where("b.status = :executed", { + executed: entities.BundleStatus.Executed, + }) + .andWhere("b.eventsAssociated = false"); + if (filters.fromBlock) { + query.andWhere("proposal.blockNumber > :fromBlock", { + fromBlock: filters.fromBlock, + }); + } + return query.orderBy("proposal.blockNumber", "DESC").take(limit).getMany(); + } + + /** + * Updates the `eventsAssociated` flag to `true` for a specific bundle. + * @param bundleId - The ID of the bundle to update. + * @returns A promise that resolves when the update is complete. + */ + public async updateBundleEventsAssociatedFlag(bundleId: number) { + const bundleRepo = this.postgres.getRepository(entities.Bundle); + const updatedBundle = await bundleRepo + .createQueryBuilder() + .update() + .set({ eventsAssociated: true }) + .where("id = :id", { id: bundleId }) + .execute(); + return updatedBundle.affected; + } + + /** + * Stores bundle events relating them to a given bundle. + * @param bundleData The reconstructed bundle data. + * @param bundleId ID of the bundle to associate these events with. + * @returns A promise that resolves when all the events have been inserted into the database. + */ + public async storeBundleEvents( + bundleData: across.interfaces.LoadDataReturnValue, + bundleId: number, + ) { + const eventsRepo = this.postgres.getRepository(entities.BundleEvent); + + // Store bundle deposits + const deposits = this.formatBundleEvents( + entities.BundleEventType.Deposit, + bundleData.bundleDepositsV3, + bundleId, + ); + const chunkedDeposits = across.utils.chunk(deposits, this.chunkSize); + await Promise.all( + chunkedDeposits.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + ); + + // Store bundle refunded deposits + const expiredDeposits = this.formatBundleEvents( + entities.BundleEventType.ExpiredDeposit, + bundleData.expiredDepositsToRefundV3, + bundleId, + ); + const chunkedRefunds = across.utils.chunk(expiredDeposits, this.chunkSize); + await Promise.all( + chunkedRefunds.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + ); + + // Store bundle slow fills + const slowFills = this.formatBundleEvents( + entities.BundleEventType.SlowFill, + bundleData.bundleSlowFillsV3, + bundleId, + ); + const chunkedSlowFills = across.utils.chunk(slowFills, this.chunkSize); + await Promise.all( + chunkedSlowFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + ); + + // Store bundle unexecutable slow fills + const unexecutableSlowFills = this.formatBundleEvents( + entities.BundleEventType.UnexecutableSlowFill, + bundleData.unexecutableSlowFills, + bundleId, + ); + const chunkedUnexecutableSlowFills = across.utils.chunk( + unexecutableSlowFills, + this.chunkSize, + ); + await Promise.all( + chunkedUnexecutableSlowFills.map((eventsChunk) => + eventsRepo.insert(eventsChunk), + ), + ); + + // Store bundle fills + const fills = this.formatBundleFillEvents( + entities.BundleEventType.Fill, + bundleData.bundleFillsV3, + bundleId, + ); + const chunkedFills = across.utils.chunk(fills, this.chunkSize); + await Promise.all( + chunkedFills.map((eventsChunk) => eventsRepo.insert(eventsChunk)), + ); + + return { + deposits: deposits.length, + expiredDeposits: expiredDeposits.length, + slowFills: slowFills.length, + unexecutableSlowFills: unexecutableSlowFills.length, + fills: fills.length, + }; + } + + private formatBundleEvents( + eventsType: entities.BundleEventType, + bundleEvents: + | across.interfaces.BundleDepositsV3 + | across.interfaces.BundleSlowFills + | across.interfaces.BundleExcessSlowFills + | across.interfaces.ExpiredDepositsToRefundV3, + bundleId: number, + ): { + bundleId: number; + relayHash: string; + type: entities.BundleEventType; + }[] { + return Object.values(bundleEvents).flatMap((tokenEvents) => + Object.values(tokenEvents).flatMap((events) => + events.map((event) => { + return { + bundleId, + relayHash: across.utils.getRelayHashFromEvent(event), + type: eventsType, + }; + }), + ), + ); + } + + private formatBundleFillEvents( + eventsType: entities.BundleEventType.Fill, + bundleEvents: across.interfaces.BundleFillsV3, + bundleId: number, + ): { + bundleId: number; + relayHash: string; + type: entities.BundleEventType.Fill; + }[] { + return Object.entries(bundleEvents).flatMap(([chainId, tokenEvents]) => + Object.values(tokenEvents).flatMap((fillsData) => + fillsData.fills.map((event) => { + return { + bundleId, + relayHash: across.utils.getRelayHashFromEvent(event), + type: eventsType, + repaymentChainId: Number(chainId), + }; + }), + ), + ); + } } diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 10fbcfc1..dc93b8aa 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -83,6 +83,8 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { logger, redis, postgres, + hubPoolClientFactory, + spokePoolClientFactory, }); const spokePoolIndexers = spokePoolChainsEnabled.map((chainId) => { diff --git a/packages/indexer/src/services/BundleBuilderService.ts b/packages/indexer/src/services/BundleBuilderService.ts index c527e7d9..98ccd3a9 100644 --- a/packages/indexer/src/services/BundleBuilderService.ts +++ b/packages/indexer/src/services/BundleBuilderService.ts @@ -9,6 +9,7 @@ import { BundleLeavesCache } from "../redis/bundleLeavesCache"; import { HubPoolBalanceCache } from "../redis/hubBalancesCache"; import { BN_ZERO, + buildPoolRebalanceRoot, ConfigStoreClientFactory, convertProposalRangeResultToProposalRange, getBlockRangeBetweenBundles, @@ -398,30 +399,17 @@ export class BundleBuilderService extends BaseIndexer { chainsToBuildBundleFor, ); // Load the bundle data - const { - bundleDepositsV3, - expiredDepositsToRefundV3, - bundleFillsV3, - unexecutableSlowFills, - bundleSlowFillsV3, - } = await bundleDataClient.loadData( + const bundleData = await bundleDataClient.loadData( bundleRangeForBundleClient, spokeClients, false, ); // Build pool rebalance root and resolve the leaves - const { leaves } = clients.BundleDataClient._buildPoolRebalanceRoot( - bundleRangeForBundleClient[0]![1]!, // Mainnet is always the first chain. Second element is the end block - bundleRangeForBundleClient[0]![1]!, // Mainnet is always the first chain. Second element is the end block - bundleDepositsV3, - bundleFillsV3, - bundleSlowFillsV3, - unexecutableSlowFills, - expiredDepositsToRefundV3, - { - hubPoolClient, - configStoreClient, - }, + const { leaves } = buildPoolRebalanceRoot( + bundleRangeForBundleClient, + bundleData, + hubPoolClient, + configStoreClient, ); // Map the leaves to the desired format return leaves.map((leaf) => ({ diff --git a/packages/indexer/src/services/bundles.ts b/packages/indexer/src/services/bundles.ts index 90e23efa..7e76e306 100644 --- a/packages/indexer/src/services/bundles.ts +++ b/packages/indexer/src/services/bundles.ts @@ -1,11 +1,20 @@ -import { DataSource } from "@repo/indexer-database"; +import { CHAIN_IDs } from "@across-protocol/constants"; +import * as across from "@across-protocol/sdk"; import Redis from "ioredis"; import winston from "winston"; +import { DataSource, entities } from "@repo/indexer-database"; import { BaseIndexer } from "../generics"; import { BlockRangeInsertType, BundleRepository, } from "../database/BundleRepository"; +import * as utils from "../utils"; +import { getBlockTime } from "../web3/constants"; +import { + buildPoolRebalanceRoot, + getBlockRangeBetweenBundles, + getBundleBlockRanges, +} from "../utils/bundleBuilderUtils"; const BUNDLE_LIVENESS_SECONDS = 4 * 60 * 60; // 4 hour const AVERAGE_SECONDS_PER_BLOCK = 13; // 13 seconds per block on ETH @@ -16,7 +25,9 @@ const BLOCKS_PER_BUNDLE = Math.floor( export type BundleConfig = { logger: winston.Logger; redis: Redis | undefined; - postgres: DataSource | undefined; + postgres: DataSource; + hubPoolClientFactory: utils.HubPoolClientFactory; + spokePoolClientFactory: utils.SpokePoolClientFactory; }; /** @@ -36,7 +47,8 @@ export class Processor extends BaseIndexer { } protected async indexerLogic(): Promise { - const { logger } = this.config; + const { logger, hubPoolClientFactory, spokePoolClientFactory } = + this.config; const { bundleRepository } = this; await assignBundleToProposedEvent(bundleRepository, logger); await assignDisputeEventToBundle(bundleRepository, logger); @@ -44,6 +56,12 @@ export class Processor extends BaseIndexer { await assignBundleRangesToProposal(bundleRepository, logger); await assignExecutionsToBundle(bundleRepository, logger); await assignBundleExecutedStatus(bundleRepository, logger); + await assignSpokePoolEventsToExecutedBundles( + bundleRepository, + hubPoolClientFactory, + spokePoolClientFactory, + logger, + ); } protected async initialize(): Promise { @@ -347,3 +365,132 @@ async function assignBundleExecutedStatus( }); } } + +/** + * Assigns spoke pool events to executed bundles by reconstructing the bundle data using the BundleDataClient. + * @param bundleRepo Repository to interact with the Bundle entity. + * @param hubClientFactory Factory to get HubPool clients. + * @param spokeClientFactory Factory to get SpokePool clients. + * @param logger A logger instance. + * @returns A void promise when all executed bundles have been processed. + */ +async function assignSpokePoolEventsToExecutedBundles( + bundleRepo: BundleRepository, + hubClientFactory: utils.HubPoolClientFactory, + spokeClientFactory: utils.SpokePoolClientFactory, + logger: winston.Logger, +): Promise { + const executedBundles = + await bundleRepo.getExecutedBundlesWithoutEventsAssociated({ + fromBlock: utils.ACROSS_V3_MAINNET_DEPLOYMENT_BLOCK, + }); + if (executedBundles.length === 0) return; + + // Get and update HubPool and ConfigStore clients + const hubPoolClient = hubClientFactory.get(CHAIN_IDs.MAINNET); + const configStoreClient = hubPoolClient.configStoreClient; + await configStoreClient.update(); + await hubPoolClient.update(); + const clients = { + hubPoolClient, + configStoreClient, + arweaveClient: null as unknown as across.caching.ArweaveClient, // FIXME: This is a hack to avoid instantiating the Arweave client + }; + + for (const executedBundle of executedBundles) { + // Get bundle ranges as an array of [startBlock, endBlock] for each chain + const ranges = getBundleBlockRanges(executedBundle); + + // Grab historical ranges from the last 8 bundles + // FIXME: This is a hardcoded value, we should make this configurable + const historicalBundle = await bundleRepo.retrieveMostRecentBundle( + entities.BundleStatus.Executed, + undefined, + 8, + ); + // Check if we have enough historical data to build the bundle with + // an ample lookback range. Otherwise skip current bundle + if (!historicalBundle) { + logger.warn({ + at: "BundleProcessor#assignSpokePoolEventsToExecutedBundles", + message: `No historical bundle found. Skipping bundle reconstruction of bundle ${executedBundle.id}`, + }); + continue; + } + // Resolve lookback range for the spoke clients + const lookbackRange = getBlockRangeBetweenBundles( + historicalBundle.proposal, + executedBundle.proposal, + ); + + // Get spoke pool clients + const spokeClients = lookbackRange.reduce( + (acc, { chainId, startBlock, endBlock }) => { + // We need to instantiate spoke clients using a higher end block than + // the bundle range as deposits which fills are included in this bundle could + // have occured outside the bundle range of the origin chain + // NOTE: A buffer time of 15 minutes has been proven to work for older bundles + const blockTime = getBlockTime(chainId); + const endBlockTimeBuffer = 60 * 15; + const blockBuffer = Math.round(endBlockTimeBuffer / blockTime); + return { + ...acc, + [chainId]: spokeClientFactory.get( + chainId, + startBlock, + endBlock + blockBuffer, + { + hubPoolClient, + }, + ), + }; + }, + {} as Record, + ); + + // Update spoke clients + await Promise.all( + Object.values(spokeClients).map((client) => client.update()), + ); + + // Instantiate bundle data client and reconstruct bundle + const bundleDataClient = + new across.clients.BundleDataClient.BundleDataClient( + logger, + clients, + spokeClients, + executedBundle.proposal.chainIds, + ); + const bundleData = await bundleDataClient.loadData(ranges, spokeClients); + + // Build pool rebalance root and check it matches with the root of the stored bundle + const poolRebalanceRoot = buildPoolRebalanceRoot( + ranges, + bundleData, + hubPoolClient, + configStoreClient, + ); + if ( + executedBundle.poolRebalanceRoot === poolRebalanceRoot.tree.getHexRoot() + ) { + // Store bundle events + const storedEvents = await bundleRepo.storeBundleEvents( + bundleData, + executedBundle.id, + ); + // Set bundle 'eventsAssociated' flag to true + await bundleRepo.updateBundleEventsAssociatedFlag(executedBundle.id); + logger.info({ + at: "BundleProcessor#assignSpokePoolEventsToExecutedBundles", + message: "Events associated with bundle", + storedEvents, + }); + } else { + logger.warn({ + at: "BundleProcessor#assignSpokePoolEventsToExecutedBundles", + message: `Mismatching roots. Skipping bundle ${executedBundle.id}.`, + }); + continue; + } + } +} diff --git a/packages/indexer/src/services/spokePoolProcessor.ts b/packages/indexer/src/services/spokePoolProcessor.ts index da4dc0a1..cbeef592 100644 --- a/packages/indexer/src/services/spokePoolProcessor.ts +++ b/packages/indexer/src/services/spokePoolProcessor.ts @@ -40,8 +40,7 @@ export class SpokePoolProcessor { events.fills, ); await this.updateExpiredRelays(); - if (events.executedRefundRoots.length > 0) - await this.updateRefundedDepositsStatus(events.executedRefundRoots); + await this.updateRefundedDepositsStatus(); } /** @@ -130,60 +129,87 @@ export class SpokePoolProcessor { } /** - * Calls the database to find expired relays and looks for related refunds within - * the recently saved executedRelayerRefundRoot events. + * Calls the database to find expired relays and looks for related + * refunds in the bundle events table. * When a matching refund is found, updates the relay status to refunded - * @param executedRefundRoots An array of already stored executedRelayerRefundRoot events * @returns A void promise */ - private async updateRefundedDepositsStatus( - executedRefundRoots: entities.ExecutedRelayerRefundRoot[], - ): Promise { + private async updateRefundedDepositsStatus(): Promise { const relayHashInfoRepository = this.postgres.getRepository( entities.RelayHashInfo, ); - const expiredDeposits = await relayHashInfoRepository.find({ - where: { - status: entities.RelayStatus.Expired, - originChainId: this.chainId, - }, - relations: ["depositEvent"], - }); + const bundleEventsRepository = this.postgres.getRepository( + entities.BundleEvent, + ); + + const expiredDeposits = await relayHashInfoRepository + .createQueryBuilder("rhi") + .select("rhi.relayHash") + .where("rhi.status = :expired", { expired: entities.RelayStatus.Expired }) + .andWhere("rhi.originChainId = :chainId", { chainId: this.chainId }) + .limit(100) + .getMany(); + let updatedRows = 0; for (const expiredDeposit of expiredDeposits) { - const { - depositor, - inputAmount, - inputToken, - blockNumber: depositBlockNumber, - } = expiredDeposit.depositEvent; - // TODO: modify this once we are associating events with bundles. This is a temporary solution. - const matchingRefunds = executedRefundRoots.filter((refund) => { - return ( - refund.l2TokenAddress === inputToken && - refund.blockNumber > depositBlockNumber && - refund.refundAddresses.some( - (address, idx) => - address === depositor && - refund.refundAmounts[idx] === inputAmount, - ) - ); - }); - if (matchingRefunds.length > 1) { - this.logger.warn({ - at: "SpokePoolProcessor#updateRefundedDepositsStatus", - message: `Unable to set refund for deposit with id ${expiredDeposit.depositEventId}. Found ${matchingRefunds.length} matches.`, - }); - } else if (matchingRefunds[0]) { - await relayHashInfoRepository.update( - { id: expiredDeposit.id }, - { - depositRefundTxHash: matchingRefunds[0].transactionHash, - status: entities.RelayStatus.Refunded, - }, - ); - updatedRows += 1; - } + // Check if this deposited is associated with a bundle + const refundBundleEvent = await bundleEventsRepository + .createQueryBuilder("be") + .leftJoinAndSelect("bundle", "bundle", "be.bundleId = bundle.id") + .where("be.eventType = :expiredDeposit", { + expiredDeposit: entities.BundleEventType.ExpiredDeposit, + }) + .andWhere("be.relayHash = :expiredDepositRelayHash", { + expiredDepositRelayHash: expiredDeposit.relayHash, + }) + .getOne(); + if (!refundBundleEvent) continue; + + // Get the relayerRefundRoot that included this refund + const relayerRefundRoot = refundBundleEvent.bundle.relayerRefundRoot; + + // Look for a relayed root bundle event that matches the relayerRefundRoot + const relayRootBundleRepo = this.postgres.getRepository( + entities.RelayedRootBundle, + ); + const relayedRootBundleEvent = await relayRootBundleRepo + .createQueryBuilder("rrb") + .select("rrb.rootBundleId") + .where("rrb.relayerRefundRoot = :relayerRefundRoot", { + relayerRefundRoot, + }) + .andWhere("rrb.chainId = :chainId", { chainId: this.chainId }) + .getOne(); + if (!relayedRootBundleEvent) continue; + + // Look for the execution of the relayer refund root using the rootBundleId + const rootBundleId = relayedRootBundleEvent.rootBundleId; + const executedRelayerRefundRepo = this.postgres.getRepository( + entities.ExecutedRelayerRefundRoot, + ); + const executedRelayerRefundRootEvent = await executedRelayerRefundRepo + .createQueryBuilder("err") + .where("err.rootBundleId = :rootBundleId", { + rootBundleId, + }) + .andWhere("err.chainId = :chainId", { chainId: this.chainId }) + .getOne(); + if (!executedRelayerRefundRootEvent) continue; + + // If we found the execution of the relayer refund root, we can update the relay status + await relayHashInfoRepository + .createQueryBuilder() + .update() + .set({ + status: entities.RelayStatus.Refunded, + depositRefundTxHash: executedRelayerRefundRootEvent.transactionHash, + }) + .where("relayHash = :relayHash", { + relayHash: expiredDeposit.relayHash, + }) + .execute(); + + updatedRows += 1; } if (updatedRows > 0) { this.logger.info({ diff --git a/packages/indexer/src/utils/bundleBuilderUtils.ts b/packages/indexer/src/utils/bundleBuilderUtils.ts index dbc9e375..e575d6af 100644 --- a/packages/indexer/src/utils/bundleBuilderUtils.ts +++ b/packages/indexer/src/utils/bundleBuilderUtils.ts @@ -1,7 +1,7 @@ +import { utils, clients, interfaces } from "@across-protocol/sdk"; +import winston from "winston"; import { entities } from "@repo/indexer-database"; import { BundleRepository } from "../database/BundleRepository"; -import winston from "winston"; -import { utils } from "@across-protocol/sdk"; import { RetryProvidersFactory } from "../web3/RetryProvidersFactory"; export type ProposalRange = Pick< @@ -57,6 +57,27 @@ export async function resolveMostRecentProposedAndExecutedBundles( return { lastExecutedBundle, lastProposedBundle }; } +/** + * Given a bundle entity with its related ranges and proposals, + * format the bundle ranges to an array of [startBlock, endBlock] following + * the order of the proposal chain ids. + * @param bundle A bundle entity with its related proposal and ranges + * @returns An array of [startBlock, endBlock] for each chain on the proposal chain ids. + */ +export function getBundleBlockRanges(bundle: entities.Bundle) { + return bundle.proposal.chainIds.map((chainId) => { + const bundleRange = bundle.ranges.find( + (range) => range.chainId === chainId, + ); + if (!bundleRange) { + throw Error( + `Range for bundle ${bundle.id} and chainId ${chainId} not found`, + ); + } + return [bundleRange.startBlock, bundleRange.endBlock]; + }); +} + /** * Given the previous and current proposed bundles, returns the block ranges for each chain. * @param previous The previous proposed bundle range. @@ -79,7 +100,7 @@ export function getBlockRangeBetweenBundles( // the start block should never be greater than the end block. startBlock: Math.min( previous.bundleEvaluationBlockNumbers[idx] - ? previous.bundleEvaluationBlockNumbers[idx] + 1 + ? previous.bundleEvaluationBlockNumbers[idx]! + 1 : 0, // If this is a new chain, start from block 0 current.bundleEvaluationBlockNumbers[idx]!, ), @@ -131,3 +152,24 @@ export function convertProposalRangeResultToProposalRange( bundleEvaluationBlockNumbers: ranges.map((r) => r.endBlock), }; } + +export function buildPoolRebalanceRoot( + ranges: number[][], + bundleData: interfaces.LoadDataReturnValue, + hubPoolClient: clients.HubPoolClient, + configStoreClient: clients.AcrossConfigStoreClient, +) { + return clients.BundleDataClient._buildPoolRebalanceRoot( + ranges[0]![1]!, // Mainnet is always the first chain. Second element is the end block + ranges[0]![1]!, // Mainnet is always the first chain. Second element is the end block + bundleData.bundleDepositsV3, + bundleData.bundleFillsV3, + bundleData.bundleSlowFillsV3, + bundleData.unexecutableSlowFills, + bundleData.expiredDepositsToRefundV3, + { + hubPoolClient, + configStoreClient, + }, + ); +} diff --git a/packages/indexer/src/utils/contractUtils.ts b/packages/indexer/src/utils/contractUtils.ts index 45080445..47916666 100644 --- a/packages/indexer/src/utils/contractUtils.ts +++ b/packages/indexer/src/utils/contractUtils.ts @@ -10,6 +10,7 @@ import * as across from "@across-protocol/sdk"; import { providers, Contract } from "ethers"; export const CONFIG_STORE_VERSION = 4; +export const ACROSS_V3_MAINNET_DEPLOYMENT_BLOCK = 19277710; export type GetSpokeClientParams = { provider: providers.Provider; diff --git a/packages/indexer/src/web3/constants.ts b/packages/indexer/src/web3/constants.ts index 2071fac3..e386a994 100644 --- a/packages/indexer/src/web3/constants.ts +++ b/packages/indexer/src/web3/constants.ts @@ -95,3 +95,37 @@ export function getMaxBlockLookBack(chainId: number) { return maxBlockLookBack; } + +// Average block time in seconds by chain +export const BLOCK_TIME_SECONDS: { [chainId: number]: number } = { + [CHAIN_IDs.ARBITRUM]: 0.25, + [CHAIN_IDs.BASE]: 2, + [CHAIN_IDs.BLAST]: 2, + [CHAIN_IDs.BOBA]: 2, + [CHAIN_IDs.LINEA]: 3, + [CHAIN_IDs.LISK]: 2, + [CHAIN_IDs.MAINNET]: 12, + [CHAIN_IDs.MODE]: 2, + [CHAIN_IDs.OPTIMISM]: 2, + [CHAIN_IDs.POLYGON]: 2, + [CHAIN_IDs.REDSTONE]: 2, + [CHAIN_IDs.SCROLL]: 3, + [CHAIN_IDs.WORLD_CHAIN]: 2, + [CHAIN_IDs.ZK_SYNC]: 1, + [CHAIN_IDs.ZORA]: 2, +}; + +/** + * Resolves the block time in seconds for a given chain id + * @param chainId Chain id to resolve block time for + * @returns The average block time in seconds from {@link BLOCK_TIME_SECONDS} or a default of 10 + */ +export function getBlockTime(chainId: number) { + const blockTime = BLOCK_TIME_SECONDS[chainId]; + + if (!blockTime) { + return 10; + } + + return blockTime; +}