diff --git a/docker-compose.yml b/docker-compose.yml index d0423655..c3819360 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,9 @@ version: "3.7" +networks: + test-network: + external: true + services: backend: container_name: scraper-api-backend @@ -18,6 +22,8 @@ services: depends_on: - postgres - redis + networks: + - test-network postgres: container_name: scraper-api-postgres image: postgres:12.2 @@ -27,21 +33,26 @@ services: POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_DB: ${DB_DATABASE_NAME} PG_DATA: /var/lib/postgresql/data + command: ["postgres", "-c", "log_statement=all"] ports: - - 5432:5432 + - 5433:5432 volumes: - scraper-api-pgdata:/var/lib/postgresql/data + networks: + - test-network redis: container_name: scraper-api-redis image: redis:6.2-alpine restart: always ports: - - 6379:6379 + - 6378:6379 command: ["redis-server", "--appendonly", "yes"] environment: REDIS_PASSWORD: ${REDIS_PASSWORD} volumes: - scraper-api-redis-volume:/data + networks: + - test-network volumes: backend-node-modules: scraper-api-pgdata: diff --git a/migrations/1735003840563-rewardedDeposit.ts b/migrations/1735003840563-rewardedDeposit.ts new file mode 100644 index 00000000..1c253097 --- /dev/null +++ b/migrations/1735003840563-rewardedDeposit.ts @@ -0,0 +1,35 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class RewardedDeposit1735003840563 implements MigrationInterface { + name = 'RewardedDeposit1735003840563'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "rewarded_deposit" ( + "id" SERIAL NOT NULL, + "relayHash" character varying NOT NULL, + "depositTxHash" character varying NOT NULL, + "depositId" integer NOT NULL, + "originChainId" integer NOT NULL, + "destinationChainId" integer NOT NULL, + "depositor" character varying NOT NULL, + "recipient" character varying NOT NULL, + "inputToken" character varying NOT NULL, + "inputAmount" character varying NOT NULL, + "outputToken" character varying NOT NULL, + "outputAmount" character varying NOT NULL, + "exclusiveRelayer" character varying NOT NULL, + "depositDate" TIMESTAMP NOT NULL, + "fillTxHash" character varying NOT NULL, + "relayer" character varying NOT NULL, + "totalBridgeFeeUsd" character varying NOT NULL, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "UK_rewardedDeposit_depositId_originChainId" UNIQUE ("depositId", "originChainId"), + CONSTRAINT "PK_891e96cd023b61f620566c0898a" PRIMARY KEY ("id"))`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "rewarded_deposit"`); + } + +} diff --git a/migrations/1735659642491-OpRewardV2.ts b/migrations/1735659642491-OpRewardV2.ts new file mode 100644 index 00000000..6ecdf4a9 --- /dev/null +++ b/migrations/1735659642491-OpRewardV2.ts @@ -0,0 +1,38 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class OpRewardV21735659642491 implements MigrationInterface { + name = 'OpRewardV21735659642491'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`CREATE TABLE "op_reward_v2" ( + "id" SERIAL NOT NULL, + "depositId" integer NOT NULL, + "originChainId" integer NOT NULL, + "depositDate" TIMESTAMP NOT NULL, + "recipient" character varying NOT NULL, + "rate" numeric NOT NULL, + "amount" character varying NOT NULL, + "amountUsd" character varying NOT NULL, + "rewardTokenId" integer NOT NULL, + "windowIndex" integer, + "isClaimed" boolean NOT NULL DEFAULT false, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "UK_opRewardV2_recipient_depId_chainId" UNIQUE ("recipient", "depositId", "originChainId"), + CONSTRAINT "REL_f057b9c78adeb22f4f4fb69ea8" UNIQUE ("depositId", "originChainId"), + CONSTRAINT "PK_fc4c022f3de3d4170ae8200c76b" PRIMARY KEY ("id")) + `, + ); + await queryRunner.query(`CREATE INDEX "IX_op_reward_v2_recipient" ON "op_reward_v2" ("recipient") `); + await queryRunner.query(`ALTER TABLE "op_reward_v2" ADD CONSTRAINT "FK_op_reward_deposit" FOREIGN KEY ("depositId", "originChainId") REFERENCES "rewarded_deposit"("depositId","originChainId") ON DELETE NO ACTION ON UPDATE NO ACTION`); + await queryRunner.query(`ALTER TABLE "op_reward_v2" ADD CONSTRAINT "FK_op_reward_token" FOREIGN KEY ("rewardTokenId") REFERENCES "token"("id") ON DELETE NO ACTION ON UPDATE NO ACTION`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "op_reward_v2" DROP CONSTRAINT "FK_op_reward_token"`); + await queryRunner.query(`ALTER TABLE "op_reward_v2" DROP CONSTRAINT "FK_op_reward_deposit"`); + await queryRunner.query(`DROP INDEX "public"."IX_op_reward_v2_recipient"`); + await queryRunner.query(`DROP TABLE "op_reward_v2"`); + } + +} diff --git a/src/modules/database/database.providers.ts b/src/modules/database/database.providers.ts index 66c3e320..efd7c098 100644 --- a/src/modules/database/database.providers.ts +++ b/src/modules/database/database.providers.ts @@ -32,6 +32,8 @@ import { ArbReward } from "../rewards/model/arb-reward.entity"; import { FindMissedFillEventJob } from "../scraper/model/FindMissedFillEventJob.entity"; import { HubPoolProcessedBlock } from "../scraper/model/HubPoolProcessedBlock.entity"; import { SetPoolRebalanceRouteEvent } from "../web3/model/SetPoolRebalanceRouteEvent.entity"; +import { RewardedDeposit } from "../rewards/model/RewardedDeposit.entity"; +import { OpRewardV2 } from "../rewards/model/OpRewardV2.entity"; // TODO: Add db entities here const entities = [ @@ -66,6 +68,8 @@ const entities = [ FindMissedFillEventJob, HubPoolProcessedBlock, SetPoolRebalanceRouteEvent, + RewardedDeposit, + OpRewardV2, ]; @Injectable() diff --git a/src/modules/rewards/model/OpRewardV2.entity.ts b/src/modules/rewards/model/OpRewardV2.entity.ts new file mode 100644 index 00000000..92cbd59d --- /dev/null +++ b/src/modules/rewards/model/OpRewardV2.entity.ts @@ -0,0 +1,70 @@ +import { Token } from "../../web3/model/token.entity"; +import { + Column, + CreateDateColumn, + Entity, + Index, + JoinColumn, + ManyToOne, + OneToOne, + PrimaryGeneratedColumn, + Unique, + UpdateDateColumn, +} from "typeorm"; +import { RewardedDeposit } from "./RewardedDeposit.entity"; + +@Entity() +@Unique("UK_opRewardV2_recipient_depId_chainId", ["recipient", "depositId", "originChainId"]) +@Index("IX_op_reward_v2_recipient", ["recipient"]) +export class OpRewardV2 { + @PrimaryGeneratedColumn() + id: number; + + @Column() + depositId: number; + + @Column() + originChainId: number; + + @OneToOne(() => RewardedDeposit) + @JoinColumn([ + { name: "depositId", referencedColumnName: "depositId", foreignKeyConstraintName: "FK_op_reward_deposit" }, + { name: "originChainId", referencedColumnName: "originChainId", foreignKeyConstraintName: "FK_op_reward_deposit" }, + ]) + deposit: RewardedDeposit; + + @Column() + depositDate: Date; + + @Column() + recipient: string; + + @Column({ type: "decimal" }) + rate: string; + + @Column() + amount: string; + + @Column() + amountUsd: string; + + @Column() + rewardTokenId: number; + + @ManyToOne(() => Token) + @JoinColumn([{ name: "rewardTokenId", referencedColumnName: "id", foreignKeyConstraintName: "FK_op_reward_token" }]) + rewardToken: Token; + + // The window index set in the MerkleDistributor contract + @Column({ nullable: true }) + windowIndex: number; + + @Column({ default: false }) + isClaimed: boolean; + + @CreateDateColumn() + createdAt: Date; + + @UpdateDateColumn() + updatedAt: Date; +} diff --git a/src/modules/rewards/model/RewardedDeposit.entity.ts b/src/modules/rewards/model/RewardedDeposit.entity.ts new file mode 100644 index 00000000..805faa24 --- /dev/null +++ b/src/modules/rewards/model/RewardedDeposit.entity.ts @@ -0,0 +1,68 @@ +import { + Column, + CreateDateColumn, + Entity, + PrimaryGeneratedColumn, + Unique, +} from "typeorm"; + +@Entity() +@Unique("UK_rewardedDeposit_depositId_originChainId", [ + "depositId", + "originChainId", +]) +export class RewardedDeposit { + @PrimaryGeneratedColumn() + id: number; + + @Column() + relayHash: string; + + @Column() + depositTxHash: string; + + @Column() + depositId: number; + + @Column() + originChainId: number; + + @Column() + destinationChainId: number; + + @Column() + depositor: string; + + @Column() + recipient: string; + + @Column() + inputToken: string; + + @Column() + inputAmount: string; + + @Column() + outputToken: string; + + @Column() + outputAmount: string; + + @Column() + exclusiveRelayer: string; + + @Column() + depositDate: Date; + + @Column() + fillTxHash: string; + + @Column() + relayer: string; + + @Column() + totalBridgeFeeUsd: string; + + @CreateDateColumn() + createdAt: Date; +} diff --git a/src/modules/rewards/module.ts b/src/modules/rewards/module.ts index 62c2fbbd..f35be1eb 100644 --- a/src/modules/rewards/module.ts +++ b/src/modules/rewards/module.ts @@ -22,6 +22,9 @@ import { RewardsWindowJobFixture } from "./adapter/db/rewards-window-job-fixture import { ArbRebateService } from "./services/arb-rebate-service"; import { ArbReward } from "./model/arb-reward.entity"; import { ArbRewardFixture } from "./adapter/db/arb-reward-fixture"; +import { RewardedDeposit } from "./model/RewardedDeposit.entity"; +import { OpRewardV2 } from "./model/OpRewardV2.entity"; +import { OpRebateServiceV2 } from "./services/opRebateServiceV2"; @Module({}) export class RewardModule { @@ -29,12 +32,16 @@ export class RewardModule { const module: DynamicModule = { module: RewardModule, controllers: [RewardController], - providers: [RewardService, OpRebateService, ReferralService, ReferralRewardsService, ArbRebateService], + providers: [ + ArbRebateService, OpRebateService, OpRebateServiceV2, ReferralService, ReferralRewardsService, RewardService, + ], imports: [ TypeOrmModule.forFeature([ Deposit, OpReward, + OpRewardV2, DepositsMv, + RewardedDeposit, RewardsWindowJob, ReferralRewardsWindowJobResult, ArbReward, @@ -44,7 +51,7 @@ export class RewardModule { ReferralModule.forRoot(moduleOptions), MarketPriceModule.forRoot(), ], - exports: [RewardService, OpRebateService, ReferralService, ArbRebateService], + exports: [ArbRebateService, OpRebateService, OpRebateServiceV2, ReferralService, RewardService], }; if (moduleOptions.runModes.includes(RunMode.Test)) { diff --git a/src/modules/rewards/services/op-rebate-service.ts b/src/modules/rewards/services/op-rebate-service.ts index 5d0ed935..cff67e6e 100644 --- a/src/modules/rewards/services/op-rebate-service.ts +++ b/src/modules/rewards/services/op-rebate-service.ts @@ -19,7 +19,7 @@ import { ReferralRewardsWindowJobResult } from "../model/RewardsWindowJobResult. const OP_REBATE_RATE = 0.95; const REWARDS_PERCENTAGE_LIMIT = 0.0025; // 25 bps -const ELIGIBLE_OP_REWARDS_CHAIN_IDS = [ +export const ELIGIBLE_OP_REWARDS_CHAIN_IDS = [ ChainIds.base, ChainIds.lisk, ChainIds.mode, diff --git a/src/modules/rewards/services/opRebateServiceV2.ts b/src/modules/rewards/services/opRebateServiceV2.ts new file mode 100644 index 00000000..f5585680 --- /dev/null +++ b/src/modules/rewards/services/opRebateServiceV2.ts @@ -0,0 +1,347 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { InjectRepository } from "@nestjs/typeorm"; +import { DataSource, Repository } from "typeorm"; +import BigNumber from "bignumber.js"; +import { ethers } from "ethers"; +import { DateTime } from "luxon"; + +import { RewardedDeposit } from "../model/RewardedDeposit.entity"; +import { AppConfig } from "../../configuration/configuration.service"; +import { EthProvidersService } from "../../web3/services/EthProvidersService"; +import { ChainIds } from "../../web3/model/ChainId"; +import { MarketPriceService } from "../../market-price/services/service"; +import { assertValidAddress, splitArrayInChunks } from "../../../utils"; + +import { OpRewardV2 } from "../model/OpRewardV2.entity"; +// import { GetRewardsQuery } from "../entrypoints/http/dto"; +import { WindowAlreadySetException } from "./exceptions"; +import { ReferralRewardsWindowJobResult } from "../model/RewardsWindowJobResult.entity"; +import { Token } from "src/modules/web3/model/token.entity"; + +const OP_REBATE_RATE = 0.95; +const REWARDS_PERCENTAGE_LIMIT = 0.0025; // 25 bps +export const ELIGIBLE_OP_REWARDS_CHAIN_IDS = [ + ChainIds.base, + ChainIds.ink, + ChainIds.lisk, + ChainIds.mode, + ChainIds.optimism, + ChainIds.redstone, + ChainIds.worldChain, + ChainIds.zora, +]; + +@Injectable() +export class OpRebateServiceV2 { + private logger = new Logger(OpRebateServiceV2.name); + + constructor( + @InjectRepository(RewardedDeposit) readonly depositRepository: Repository, + @InjectRepository(OpRewardV2) readonly rewardRepository: Repository, + private marketPriceService: MarketPriceService, + private ethProvidersService: EthProvidersService, + private appConfig: AppConfig, + private dataSource: DataSource, + ) {} + + public async getEarnedRewards(userAddress: string) { + userAddress = assertValidAddress(userAddress); + + const baseQuery = this.buildBaseQuery(this.rewardRepository.createQueryBuilder("r"), userAddress); + const { opRewards } = await baseQuery + .select("SUM(CAST(r.amount as DECIMAL))", "opRewards") + .where("r.isClaimed = :isClaimed", { isClaimed: true }) + .getRawOne<{ opRewards: string }>(); + + return opRewards; + } + + public async getOpRebatesSummary(userAddress: string) { + userAddress = assertValidAddress(userAddress); + + const baseQuery = this.buildBaseQuery(this.rewardRepository.createQueryBuilder("r"), userAddress); + baseQuery.andWhere("r.isClaimed = :isClaimed", { isClaimed: false }); + const [{ depositsCount }, { unclaimedRewards }, { volumeUsd }] = await Promise.all([ + baseQuery.select("COUNT(DISTINCT r.depositPrimaryKey)", "depositsCount").getRawOne<{ + depositsCount: string; + }>(), + baseQuery.select("SUM(CAST(r.amount as DECIMAL))", "unclaimedRewards").getRawOne<{ + unclaimedRewards: number; + }>(), + baseQuery + .leftJoinAndSelect("r.deposit", "d") + .leftJoinAndSelect("d.token", "t") + .leftJoinAndSelect("d.price", "p") + .select(`COALESCE(SUM(d.amount / power(10, t.decimals) * p.usd), 0)`, "volumeUsd") + .getRawOne<{ + volumeUsd: number; + }>(), + // TODO: add claimable rewards + ]); + + return { + depositsCount: parseInt(depositsCount), + unclaimedRewards, + volumeUsd, + claimableRewards: "0", + }; + } + + // public async getOpRebateRewards(query: GetRewardsQuery) { + // const limit = parseInt(query.limit ?? "10"); + // const offset = parseInt(query.offset ?? "0"); + // const userAddress = assertValidAddress(query.userAddress); + + // const baseQuery = this.buildBaseQuery(this.rewardRepository.createQueryBuilder("r"), userAddress); + + // const rewardsQuery = baseQuery.orderBy("r.depositDate", "DESC").limit(limit).offset(offset); + // const [rewards, total] = await rewardsQuery.getManyAndCount(); + + // // JOIN instead of query deposits separately + // const depositPrimaryKeys = rewards.map((reward) => reward.depositPrimaryKey); + // const deposits = await this.depositRepository.find({ + // where: { id: In(depositPrimaryKeys) }, + // }); + + // return { + // rewards: rewards.map((reward) => ({ + // ...reward, + // deposit: deposits.find((deposit) => deposit.id === reward.depositPrimaryKey), + // })), + // pagination: { + // limit, + // offset, + // total, + // }, + // }; + // } + + public async getOpRebateRewardsForDepositPrimaryKeys(depositPrimaryKeys: number[]) { + if (depositPrimaryKeys.length === 0) { + return []; + } + const rewardsQuery = this.rewardRepository + .createQueryBuilder("r") + .where("r.depositPrimaryKey IN (:...depositPrimaryKeys)", { depositPrimaryKeys }) + .leftJoinAndSelect("r.rewardToken", "rewardToken"); + const rewards = await rewardsQuery.getMany(); + return rewards; + } + + public async createOpRebatesForDeposit(depositId: number, originChainId: number) { + if (!this.appConfig.values.rewardPrograms["op-rebates"].enabled) { + this.logger.verbose(`OP rebate rewards are disabled. Skipping...`); + return; + } + + const deposit: RewardedDeposit = await this.depositRepository.findOne({ + where: { depositId, originChainId }, + }); + + if (!deposit || !deposit.fillTxHash) return; + if (!this.isDepositEligibleForOpRewards(deposit)) return; + if (!deposit.totalBridgeFeeUsd) { + throw new Error(`Missing total bridge fee in USD (depositId: ${depositId}, originChainId: ${originChainId}).`); + } + if (!this.isDepositTimeAfterStart(deposit)) return; + if (!this.isDepositTimeBeforeEnd(deposit)) return; + + // We use the `from` address of the deposit transaction as the reward receiver + // to also take into accounts deposits routed through the SpokePoolVerifier contract. + const provider = this.ethProvidersService.getProvider(deposit.originChainId); + const depositTransaction = await provider.getTransaction(deposit.depositTxHash); // This could also be part of the indexer data + const rewardReceiver = depositTransaction.from; + + const { rewardToken, rewardsUsd, rewardsAmount } = await this.calcOpRebateRewards(deposit); + + let reward = await this.rewardRepository.findOne({ + where: { + depositId, originChainId, + recipient: rewardReceiver, + }, + }); + + if (!reward) { + reward = this.rewardRepository.create({ + depositId, originChainId, + recipient: rewardReceiver, + depositDate: deposit.depositDate, + }); + } + + await this.rewardRepository.save({ + ...reward, + rate: OP_REBATE_RATE.toString(), + amount: rewardsAmount.toString(), + amountUsd: rewardsUsd, + rewardTokenId: rewardToken.id, + }); + } + + public setWindowForOpRewards(jobId: number, windowIndex: number, maxDepositDate: Date) { + return this.dataSource.transaction(async (entityManager) => { + const opRewardsWithSameWindowIndex = await entityManager + .createQueryBuilder() + .select("r") + .from(OpRewardV2, "r") + .where("r.windowIndex = :windowIndex", { windowIndex }) + .getOne(); + + if (opRewardsWithSameWindowIndex) { + throw new WindowAlreadySetException(); + } + + await entityManager + .createQueryBuilder() + .update(OpRewardV2) + .set({ windowIndex }) + .where("windowIndex IS NULL") + .andWhere("depositDate <= :maxDepositDate", { maxDepositDate }) + .execute(); + const rewards = await entityManager + .createQueryBuilder() + .select("r") + .from(OpRewardV2, "r") + .where("r.windowIndex = :windowIndex", { windowIndex }) + .getMany(); + const { recipients, totalRewardsAmount } = this.aggregateOpRewards(rewards); + + for (const recipientsChunk of splitArrayInChunks(recipients, 100)) { + await entityManager + .createQueryBuilder() + .insert() + .into(ReferralRewardsWindowJobResult) + .values( + recipientsChunk.map((recipient) => ({ + jobId, + windowIndex, + totalRewardsAmount, + address: recipient.account, + amount: recipient.amount, + })), + ) + .execute(); + } + }); + } + + public aggregateOpRewards(rewards: OpRewardV2[]) { + // Map an address to considered deposits for referral rewards + const recipientOpRewardsMap = rewards.reduce((acc, r) => { + acc[r.recipient] = [...(acc[r.recipient] || []), r]; + return acc; + }, {} as Record); + + let totalRewardsAmount: BigNumber = new BigNumber(0); + const recipients: { + account: string; + amount: string; + }[] = []; + + for (const [address, opRewards] of Object.entries(recipientOpRewardsMap)) { + const rewardsAmount = opRewards.reduce((sum, opReward) => { + return sum.plus(opReward.amount); + }, new BigNumber(0)); + + totalRewardsAmount = totalRewardsAmount.plus(rewardsAmount); + recipients.push({ + account: address, + amount: rewardsAmount.toFixed(), + }); + } + + return { totalRewardsAmount: totalRewardsAmount.toFixed(), recipients }; + } + + public isDepositEligibleForOpRewards(deposit: Pick) { + return ELIGIBLE_OP_REWARDS_CHAIN_IDS.includes(deposit.destinationChainId); + } + + /** + * PRIVATE METHODS + */ + + private async calcOpRebateRewards(deposit: RewardedDeposit) { + // lp fee + relayer capital fee + relayer destination gas fee + const bridgeFeeUsd = new BigNumber(deposit.totalBridgeFeeUsd); + const rewardToken = await this.ethProvidersService.getCachedToken( + this.appConfig.values.rewardPrograms["op-rebates"].rewardToken.chainId, + this.appConfig.values.rewardPrograms["op-rebates"].rewardToken.address, + ); + + if (bridgeFeeUsd.lte(0)) { + return { + rewardToken, + rewardsUsd: "0", + rewardsAmount: new BigNumber(0), + }; + } + + const inputToken = await this.ethProvidersService.getCachedToken(deposit.originChainId, deposit.inputToken); + const outputToken = await this.ethProvidersService.getCachedToken(deposit.originChainId, deposit.outputToken); + + const inputTokenPrice = await this.getInputTokenPrice(deposit, inputToken, outputToken); // This could also be part of the indexer data + const historicRewardTokenPrice = await this.getTokenPrice(deposit.depositDate, rewardToken); + + const inputAmountUsd = new BigNumber(deposit.inputAmount) + .multipliedBy(inputTokenPrice) + .dividedBy(new BigNumber(10).pow(inputToken.decimals)); + const cappedFeeForRewardsUsd = inputAmountUsd.multipliedBy(REWARDS_PERCENTAGE_LIMIT); + const rewardsUsd = BigNumber.min(bridgeFeeUsd, cappedFeeForRewardsUsd).multipliedBy(OP_REBATE_RATE).toFixed(); + const rewardsAmount = ethers.utils.parseEther( + new BigNumber(rewardsUsd).dividedBy(historicRewardTokenPrice.usd).toFixed(18), + ); + + return { + rewardToken, + rewardsUsd, + rewardsAmount, + historicRewardTokenPrice, + }; + } + + private isDepositTimeAfterStart(deposit: RewardedDeposit) { + const start = this.appConfig.values.rewardPrograms["op-rebates"].startDate; + + // If no start time is set, then the program is active for any deposit + if (!start) { + return true; + } + + return deposit.depositDate >= start; + } + + private isDepositTimeBeforeEnd(deposit: RewardedDeposit) { + const end = this.appConfig.values.rewardPrograms["op-rebates"].endDate; + + // If no end time is set, then the program is active for any deposit + if (!end) { + return true; + } + + return deposit.depositDate < end; + } + + private async getInputTokenPrice(deposit: RewardedDeposit, inputToken: Token, outputToken: Token): Promise { + if ( + deposit.originChainId === ChainIds.blast && + inputToken.symbol === "USDB" && + outputToken.symbol === "DAI" + ) { + const outputTokenPrice = await this.getTokenPrice(deposit.depositDate, outputToken); + return outputTokenPrice.usd; + } + const inputTokenPrice = await this.getTokenPrice(deposit.depositDate, inputToken); + return inputTokenPrice.usd; + } + + private async getTokenPrice(depositDate: Date, token: Token){ + return await this.marketPriceService.getCachedHistoricMarketPrice( + DateTime.fromJSDate(depositDate).minus({ days: 1 }).toJSDate(), + token.symbol.toLowerCase()); + } + + private buildBaseQuery(qb: ReturnType, recipientAddress: string) { + return qb.where("r.recipient = :recipient", { recipient: recipientAddress }); + } +} diff --git a/src/modules/scraper/adapter/messaging/OpRebateRewardConsumerV2.ts b/src/modules/scraper/adapter/messaging/OpRebateRewardConsumerV2.ts new file mode 100644 index 00000000..8b1a88c4 --- /dev/null +++ b/src/modules/scraper/adapter/messaging/OpRebateRewardConsumerV2.ts @@ -0,0 +1,27 @@ +import { OnQueueFailed, Process, Processor } from "@nestjs/bull"; +import { Logger } from "@nestjs/common"; +import { Job } from "bull"; + +import { OpRebateRewardV2Message, ScraperQueue } from "."; +import { OpRebateServiceV2 } from "../../../rewards/services/opRebateServiceV2"; + +@Processor(ScraperQueue.OpRebateRewardV2) +export class OpRebateRewardConsumerV2 { + private logger = new Logger(OpRebateRewardConsumerV2.name); + + constructor( + private opRebateService: OpRebateServiceV2, + ) {} + + @Process() + private async process(job: Job) { + console.log("Processing msg"); + const { depositId, originChainId } = job.data; + await this.opRebateService.createOpRebatesForDeposit(depositId, originChainId); + } + + @OnQueueFailed() + private onQueueFailed(job: Job, error: Error) { + this.logger.error(`${JSON.stringify(job.data)} failed: ${error}`); + } +} diff --git a/src/modules/scraper/adapter/messaging/index.ts b/src/modules/scraper/adapter/messaging/index.ts index 474d5bd3..e4cc1a1b 100644 --- a/src/modules/scraper/adapter/messaging/index.ts +++ b/src/modules/scraper/adapter/messaging/index.ts @@ -20,8 +20,14 @@ export enum ScraperQueue { MerkleDistributorClaim = "MerkleDistributorClaim", FindMissedFillEvent = "FindMissedFillEvent", HubPoolBlocksEvents = "HubPoolBlocksEvents", + OpRebateRewardV2 = "OpRebateRewardV2", } +export type OpRebateRewardV2Message = { + depositId: number; + originChainId: number; +}; + export type BlocksEventsQueueMessage = { chainId: number; from: number; diff --git a/src/modules/scraper/module.ts b/src/modules/scraper/module.ts index 1c0c36ea..03c9b2bb 100644 --- a/src/modules/scraper/module.ts +++ b/src/modules/scraper/module.ts @@ -62,11 +62,15 @@ import { Block } from "../web3/model/block.entity"; import { ArbReward } from "../rewards/model/arb-reward.entity"; import { HubPoolBlocksEventsConsumer } from "./adapter/messaging/HubPoolBlocksEventsConsumer"; import { HubPoolProcessedBlock } from "./model/HubPoolProcessedBlock.entity"; +import { CopyIndexerDepositsCron } from "./service/CopyIndexerDepositsCron"; +import { OpRewardV2 } from "../rewards/model/OpRewardV2.entity"; +import { OpRebateRewardConsumerV2 } from "./adapter/messaging/OpRebateRewardConsumerV2"; @Module({}) export class ScraperModule { static forRoot(moduleOptions: ModuleOptions): DynamicModule { const crons = [ + CopyIndexerDepositsCron, QueuesMonitoringCron, DepositsGapDetectionCron, CheckMissedFillEventsCron, @@ -101,6 +105,7 @@ export class ScraperModule { TrackFillEventConsumer, FeeBreakdownConsumer, OpRebateRewardConsumer, + OpRebateRewardConsumerV2, ArbRebateRewardConsumer, MerkleDistributorClaimConsumer, FindMissedFillEventConsumer, @@ -122,6 +127,7 @@ export class ScraperModule { FilledRelayEv, RequestedSpeedUpDepositEv, OpReward, + OpRewardV2, ArbReward, QueueJobCount, MerkleDistributorClaim, @@ -232,6 +238,9 @@ export class ScraperModule { BullModule.registerQueue({ name: ScraperQueue.HubPoolBlocksEvents, }), + BullModule.registerQueue({ + name: ScraperQueue.OpRebateRewardV2, + }), ], exports: [ScraperQueuesService], controllers: [ScraperController], diff --git a/src/modules/scraper/service/CopyIndexerDepositsCron.ts b/src/modules/scraper/service/CopyIndexerDepositsCron.ts new file mode 100644 index 00000000..c88fef0f --- /dev/null +++ b/src/modules/scraper/service/CopyIndexerDepositsCron.ts @@ -0,0 +1,132 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { DataSource } from "typeorm"; +import { CronExpression } from "@nestjs/schedule"; +import {performance, PerformanceObserver} from "perf_hooks"; + +import { AppConfig } from "../../configuration/configuration.service"; +import { EnhancedCron, splitArrayInChunks } from "../../../utils"; + +import { ScraperQueuesService } from "./ScraperQueuesService"; +import {ELIGIBLE_OP_REWARDS_CHAIN_IDS} from "../../rewards/services/op-rebate-service"; +import { RewardedDeposit } from "../../rewards/model/RewardedDeposit.entity"; +import { OpRebateRewardV2Message, ScraperQueue } from "../adapter/messaging"; + +@Injectable() +export class CopyIndexerDepositsCron { + private logger = new Logger(CopyIndexerDepositsCron.name); + private lock = false; + + constructor( + private appConfig: AppConfig, + private dataSource: DataSource, + private scraperQueuesService: ScraperQueuesService, + ) {} + + @EnhancedCron(CronExpression.EVERY_30_SECONDS) + async run() { + try { + const perfObserver = new PerformanceObserver((items) => { + items.getEntries().forEach((entry) => { + console.log(entry); + }); + }); + perfObserver.observe({ entryTypes: ["measure"], buffered: true }); + if (this.lock) { + this.logger.warn("CopyIndexerDepositsCron is locked"); + return; + } + this.lock = true; + + await this.copyIndexerDeposits(); + + this.lock = false; + performance.clearMarks(); + } catch (error) { + console.log(error); + this.logger.error(error); + this.lock = false; + } + } + + private async copyIndexerDeposits() { + let eligibleDestinationChains = []; + if (this.appConfig.values.rewardPrograms["op-rebates"].enabled){ + eligibleDestinationChains = ELIGIBLE_OP_REWARDS_CHAIN_IDS; + } + + if (eligibleDestinationChains.length > 0){ + const newFilledDepositsQuery = ` + WITH indexer_deposits AS ( + SELECT + indexer_rhi."relayHash" as "relayHash", + indexer_rhi."depositTxHash" as "depositTxHash", + indexer_rhi."depositId" as "depositId", + indexer_rhi."originChainId" as "originChainId", + indexer_rhi."destinationChainId" as "destinationChainId", + indexer_deposit_events.depositor as depositor, + indexer_deposit_events.recipient as recipient, + indexer_deposit_events."inputToken" as "inputToken", + indexer_deposit_events."inputAmount" as "inputAmount", + indexer_deposit_events."outputToken" as "outputToken", + indexer_deposit_events."outputAmount" as "outputAmount", + indexer_deposit_events."exclusiveRelayer" as "exclusiveRelayer", + indexer_deposit_events."blockTimestamp" as "blockTimestamp", + indexer_rhi."fillTxHash" as "fillTxHash", + indexer_fills.relayer as relayer + FROM + relay_hash_info indexer_rhi + JOIN evm.v3_funds_deposited indexer_deposit_events ON + indexer_rhi."depositEventId" = indexer_deposit_events.id + JOIN evm.filled_v3_relay indexer_fills ON + indexer_rhi."fillEventId" = indexer_fills.id + WHERE + indexer_rhi."status" = 'filled' + AND indexer_deposit_events.finalised is true + AND indexer_fills.finalised is true + AND indexer_rhi."destinationChainId" in (${eligibleDestinationChains}) + LIMIT 5000 + ) + SELECT + indexer_deposits.* + FROM indexer_deposits + LEFT JOIN rewarded_deposit ON indexer_deposits."relayHash" = rewarded_deposit."relayHash" + WHERE rewarded_deposit."relayHash" IS NULL; + `; + + performance.mark("query-start"); + const newFilledDeposits = await this.dataSource.query(newFilledDepositsQuery) as RewardedDeposit[]; + performance.mark("query-end"); + performance.measure("query", "query-start", "query-end"); + + performance.mark("insert-start"); + const insertResults = await Promise.all(splitArrayInChunks(newFilledDeposits, 2).map(deps=> this.dataSource + .getRepository(RewardedDeposit) + .createQueryBuilder() + .insert() + .values(deps.map(dep => {return {...dep, depositDate: new Date(), totalBridgeFeeUsd: "3"};})) + .returning("*") + .execute())); + performance.mark("insert-end"); + performance.measure("insert", "insert-start", "insert-end"); + + const savedDeposits = insertResults.map(result=>result.generatedMaps); + + // For each inserted deposit, send messages to new op worker + for (const deposit of savedDeposits){ + const queueMsg = {depositId: deposit["depositId"], originChainId: deposit["originChainId"]}; + console.log(deposit); + console.log(queueMsg); + console.log("publishing message"); + try{ + await this.scraperQueuesService.publishMessage( + ScraperQueue.OpRebateRewardV2, + queueMsg, + ); + } catch (err){ + console.log(err); + } + } + } + + } +} \ No newline at end of file diff --git a/src/modules/scraper/service/ScraperQueuesService.ts b/src/modules/scraper/service/ScraperQueuesService.ts index 473f5a47..649ff3bb 100644 --- a/src/modules/scraper/service/ScraperQueuesService.ts +++ b/src/modules/scraper/service/ScraperQueuesService.ts @@ -31,6 +31,7 @@ export class ScraperQueuesService { @InjectQueue(ScraperQueue.MerkleDistributorClaim) private merkleDistributorClaimQueue: Queue, @InjectQueue(ScraperQueue.FindMissedFillEvent) private findMissedFillEventQueue: Queue, @InjectQueue(ScraperQueue.HubPoolBlocksEvents) private hubPoolBlocksEventsQueue: Queue, + @InjectQueue(ScraperQueue.OpRebateRewardV2) private opRebateRewardsV2Queue: Queue, ) { this.queuesMap = { [ScraperQueue.BlocksEvents]: this.blocksEventsQueue, @@ -54,6 +55,7 @@ export class ScraperQueuesService { [ScraperQueue.MerkleDistributorClaim]: this.merkleDistributorClaimQueue, [ScraperQueue.FindMissedFillEvent]: this.findMissedFillEventQueue, [ScraperQueue.HubPoolBlocksEvents]: this.hubPoolBlocksEventsQueue, + [ScraperQueue.OpRebateRewardV2]: this.opRebateRewardsV2Queue, }; }