diff --git a/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.module.ts b/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.module.ts new file mode 100644 index 0000000..1546989 --- /dev/null +++ b/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { ScheduleModule } from '@nestjs/schedule'; +import { ApiConfigModule, ContractsModule, DatabaseModule, GrpcModule } from '@mvx-monorepo/common'; +import { CrossChainTransactionProcessorService } from './cross-chain-transaction.processor.service'; +import { HelpersModule } from '@mvx-monorepo/common/helpers/helpers.module'; + +@Module({ + imports: [ScheduleModule.forRoot(), DatabaseModule, GrpcModule, HelpersModule, ContractsModule, ApiConfigModule], + providers: [CrossChainTransactionProcessorService], +}) +export class CrossChainTransactionProcessorModule {} diff --git a/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.service.ts b/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.service.ts new file mode 100644 index 0000000..d529478 --- /dev/null +++ b/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.service.ts @@ -0,0 +1,139 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { Locker } from '@multiversx/sdk-nestjs-common'; +import { ApiConfigService, CacheInfo, GatewayContract, GrpcService } from '@mvx-monorepo/common'; +import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository'; +import { RedisHelper } from '@mvx-monorepo/common/helpers/redis.helper'; +import { ITransactionEvent, ITransactionOnNetwork } from '@multiversx/sdk-core/out'; +import { ProxyNetworkProvider } from '@multiversx/sdk-network-providers/out'; +import { EventIdentifiers, Events } from '@mvx-monorepo/common/utils/event.enum'; +import { ContractCallEventStatus } from '@prisma/client'; +import { CONSTANTS } from '@mvx-monorepo/common/utils/constants.enum'; + +@Injectable() +export class CrossChainTransactionProcessorService { + private readonly logger: Logger; + private readonly contractGateway: string; + + constructor( + private readonly contractCallEventRepository: ContractCallEventRepository, + private readonly grpcService: GrpcService, + private readonly redisHelper: RedisHelper, + private readonly proxy: ProxyNetworkProvider, + private readonly gatewayContract: GatewayContract, + apiConfigService: ApiConfigService, + ) { + this.contractGateway = apiConfigService.getContractGateway(); + this.logger = new Logger(CrossChainTransactionProcessorService.name); + } + + // Runs every 15 seconds + @Cron('*/15 * * * * *') + async processCrossChainTransactions() { + await Locker.lock('processCrossChainTransactions', this.processCrossChainTransactionsRaw.bind(this)); + } + + async processCrossChainTransactionsRaw() { + this.logger.debug('Running processCrossChainTransactions cron'); + + const txHashes = await this.redisHelper.smembers(CacheInfo.CrossChainTransactions().key); + + for (const txHash of txHashes) { + try { + const transaction = await this.proxy.getTransaction(txHash); + + // Wait for transaction to be finished + if (transaction.status.isPending()) { + continue; + } + + // Only handle events if successful + if (transaction.status.isSuccessful()) { + await this.handleEvents(transaction); + } + + await this.redisHelper.srem(CacheInfo.CrossChainTransactions().key, txHash); + } catch (e) { + this.logger.warn(`An error occurred while processing cross chain transaction ${txHash}. Will be retried`, e); + } + } + } + + private async handleEvents(transaction: ITransactionOnNetwork) { + for (const [index, rawEvent] of transaction.logs.events.entries()) { + if (rawEvent.address.bech32() !== this.contractGateway) { + continue; + } + + const eventName = rawEvent.topics?.[0]?.toString(); + + if (rawEvent.identifier === EventIdentifiers.CALL_CONTRACT && eventName === Events.CONTRACT_CALL_EVENT) { + await this.handleContractCallEvent(rawEvent, transaction.hash, index); + + continue; + } + + if (rawEvent.identifier === EventIdentifiers.ROTATE_SIGNERS && eventName === Events.SIGNERS_ROTATED_EVENT) { + await this.handleSignersRotatedEvent(rawEvent, transaction.hash, index); + } + } + } + + private async handleContractCallEvent(rawEvent: ITransactionEvent, txHash: string, index: number) { + const event = this.gatewayContract.decodeContractCallEvent(rawEvent); + + const contractCallEvent = await this.contractCallEventRepository.create({ + txHash: txHash, + eventIndex: index, + status: ContractCallEventStatus.PENDING, + sourceAddress: event.sender.bech32(), + sourceChain: CONSTANTS.SOURCE_CHAIN_NAME, + destinationAddress: event.destinationAddress, + destinationChain: event.destinationChain, + payloadHash: event.payloadHash, + payload: event.payload, + retry: 0, + }); + + // A duplicate might exist in the database, so we can skip creation in this case + if (!contractCallEvent) { + return; + } + + this.grpcService.verify(contractCallEvent); + } + + private async handleSignersRotatedEvent(rawEvent: ITransactionEvent, txHash: string, index: number) { + const weightedSigners = this.gatewayContract.decodeSignersRotatedEvent(rawEvent); + + // The id needs to have `0x` in front of the txHash (hex string) + const id = `0x${txHash}-${index}`; + + // TODO: Test that this works correctly + const response = await this.grpcService.verifyVerifierSet( + id, + weightedSigners.signers, + weightedSigners.threshold, + weightedSigners.nonce, + ); + + if (response.published) { + return; + } + + this.logger.warn(`Couldn't dispatch verifyWorkerSet ${id} to Amplifier API. Retrying...`); + + setTimeout(async () => { + const response = await this.grpcService.verifyVerifierSet( + id, + weightedSigners.signers, + weightedSigners.threshold, + weightedSigners.nonce, + ); + + if (!response.published) { + this.logger.error(`Couldn't dispatch verifyWorkerSet ${id} to Amplifier API.`); + } + }, 60_000); + } +} diff --git a/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.spec.ts b/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.spec.ts new file mode 100644 index 0000000..1e819ef --- /dev/null +++ b/apps/mvx-event-processor/src/cross-chain-transaction-processor/cross-chain-transaction.processor.spec.ts @@ -0,0 +1,285 @@ +import { ApiConfigService } from '@mvx-monorepo/common'; +import { createMock, DeepMocked } from '@golevelup/ts-jest'; +import { Test } from '@nestjs/testing'; +import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository'; +import { Address } from '@multiversx/sdk-core/out'; +import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service'; +import { GatewayContract } from '@mvx-monorepo/common/contracts/gateway.contract'; +import { ContractCallEvent } from '@mvx-monorepo/common/contracts/entities/gateway-events'; +import { + ProxyNetworkProvider, + TransactionEvent, + TransactionOnNetwork, + TransactionStatus, +} from '@multiversx/sdk-network-providers/out'; +import { RedisHelper } from '@mvx-monorepo/common/helpers/redis.helper'; +import { CrossChainTransactionProcessorService } from './cross-chain-transaction.processor.service'; +import { ContractCallEventStatus } from '@prisma/client'; +import { NotifierEvent } from '../event-processor/types'; +import { EventIdentifiers, Events } from '@mvx-monorepo/common/utils/event.enum'; +import { BinaryUtils } from '@multiversx/sdk-nestjs-common'; + +describe('CrossChainTransactionProcessor', () => { + let contractCallEventRepository: DeepMocked; + let grpcService: DeepMocked; + let redisHelper: DeepMocked; + let proxy: DeepMocked; + let gatewayContract: DeepMocked; + let apiConfigService: DeepMocked; + + let service: CrossChainTransactionProcessorService; + + const contractCallEvent: ContractCallEvent = { + sender: Address.fromBech32('erd1qqqqqqqqqqqqqpgqzqvm5ywqqf524efwrhr039tjs29w0qltkklsa05pk7'), + destinationChain: 'ethereum', + destinationAddress: 'destinationAddress', + payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7', + payload: Buffer.from('payload'), + }; + + beforeEach(async () => { + contractCallEventRepository = createMock(); + grpcService = createMock(); + redisHelper = createMock(); + proxy = createMock(); + gatewayContract = createMock(); + apiConfigService = createMock(); + + apiConfigService.getContractGateway.mockReturnValue('mockGatewayAddress'); + + const moduleRef = await Test.createTestingModule({ + providers: [CrossChainTransactionProcessorService], + }) + .useMocker((token) => { + if (token === ContractCallEventRepository) { + return contractCallEventRepository; + } + + if (token === GrpcService) { + return grpcService; + } + + if (token === RedisHelper) { + return redisHelper; + } + + if (token === ProxyNetworkProvider) { + return proxy; + } + + if (token === GatewayContract) { + return gatewayContract; + } + + if (token === ApiConfigService) { + return apiConfigService; + } + + return null; + }) + .compile(); + + gatewayContract.decodeContractCallEvent.mockReturnValue(contractCallEvent); + + service = moduleRef.get(CrossChainTransactionProcessorService); + }); + + it('Should not process pending or failed transaction', async () => { + redisHelper.smembers.mockReturnValueOnce(Promise.resolve(['txHashNone', 'txHashPending', 'txHashFailed'])); + + proxy.getTransaction.mockImplementation((txHash) => { + if (txHash === 'txHashNone') { + throw new Error('not found'); + } + + const transaction = createMock(); + transaction.hash = txHash; + + if (txHash === 'txHashPending') { + transaction.status = new TransactionStatus('pending'); + } else if (txHash === 'txHashFailed') { + transaction.status = new TransactionStatus('failed'); + } + + return Promise.resolve(transaction); + }); + + await service.processCrossChainTransactionsRaw(); + + expect(redisHelper.srem).toHaveBeenCalledTimes(1); + expect(redisHelper.srem).toHaveBeenCalledWith('crossChainTransactions', 'txHashFailed'); + }); + + describe('handleContractCallEvent', () => { + const data = contractCallEvent.payload; + + const rawEvent: NotifierEvent = { + txHash: 'txHash', + address: 'mockGatewayAddress', + identifier: EventIdentifiers.CALL_CONTRACT, + data: data.toString('base64'), + topics: [ + BinaryUtils.base64Encode(Events.CONTRACT_CALL_EVENT), + Buffer.from((contractCallEvent.sender as Address).hex(), 'hex').toString('base64'), + BinaryUtils.base64Encode(contractCallEvent.destinationChain), + BinaryUtils.base64Encode(contractCallEvent.destinationAddress), + Buffer.from(contractCallEvent.payloadHash, 'hex').toString('base64'), + ], + }; + + const transaction = createMock(); + transaction.hash = 'txHash'; + transaction.status = new TransactionStatus('success'); + + it('Should handle multiple events', async () => { + transaction.logs.events = [ + TransactionEvent.fromHttpResponse(rawEvent), + TransactionEvent.fromHttpResponse(rawEvent), + ]; + + redisHelper.smembers.mockReturnValueOnce(Promise.resolve(['txHash'])); + proxy.getTransaction.mockReturnValueOnce(Promise.resolve(transaction)); + + await service.processCrossChainTransactionsRaw(); + + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(2); + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(transaction.logs.events[0]); + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(transaction.logs.events[1]); + expect(contractCallEventRepository.create).toHaveBeenCalledTimes(2); + expect(contractCallEventRepository.create).toHaveBeenCalledWith({ + txHash: 'txHash', + eventIndex: 0, + status: ContractCallEventStatus.PENDING, + sourceAddress: 'erd1qqqqqqqqqqqqqpgqzqvm5ywqqf524efwrhr039tjs29w0qltkklsa05pk7', + sourceChain: 'multiversx', + destinationAddress: 'destinationAddress', + destinationChain: 'ethereum', + payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7', + payload: Buffer.from('payload'), + retry: 0, + }); + expect(contractCallEventRepository.create).toHaveBeenCalledWith({ + txHash: 'txHash', + eventIndex: 1, + status: ContractCallEventStatus.PENDING, + sourceAddress: 'erd1qqqqqqqqqqqqqpgqzqvm5ywqqf524efwrhr039tjs29w0qltkklsa05pk7', + sourceChain: 'multiversx', + destinationAddress: 'destinationAddress', + destinationChain: 'ethereum', + payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7', + payload: Buffer.from('payload'), + retry: 0, + }); + expect(grpcService.verify).toHaveBeenCalledTimes(2); + + expect(redisHelper.srem).toHaveBeenCalledTimes(1); + expect(redisHelper.srem).toHaveBeenCalledWith('crossChainTransactions', 'txHash'); + }); + + it('Should not handle duplicate in database', async () => { + transaction.logs.events = [TransactionEvent.fromHttpResponse(rawEvent)]; + + contractCallEventRepository.create.mockReturnValueOnce(Promise.resolve(null)); + + redisHelper.smembers.mockReturnValueOnce(Promise.resolve(['txHash'])); + proxy.getTransaction.mockReturnValueOnce(Promise.resolve(transaction)); + + await service.processCrossChainTransactionsRaw(); + + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent)); + expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); + expect(grpcService.verify).not.toHaveBeenCalled(); + }); + + it('Should handle error can not save in database', async () => { + transaction.logs.events = [TransactionEvent.fromHttpResponse(rawEvent)]; + + contractCallEventRepository.create.mockRejectedValue(new Error('Can not save in database')); + + redisHelper.smembers.mockReturnValueOnce(Promise.resolve(['txHash'])); + proxy.getTransaction.mockReturnValueOnce(Promise.resolve(transaction)); + + await service.processCrossChainTransactionsRaw(); + + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); + expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent)); + expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); + expect(grpcService.verify).not.toHaveBeenCalled(); + }); + }); + + describe('handleSignersRotatedEvent', () => { + const rawEvent: NotifierEvent = { + txHash: 'txHash', + address: 'mockGatewayAddress', + identifier: EventIdentifiers.ROTATE_SIGNERS, + data: Buffer.from( + '000000030139472eff6886771a982f3083da5d421f24c29181e63888228dc81ca60d69e100000001018049d639e5a6980d1cd2392abcce41029cda74a1563523a202f09641cc2618f80000000101b2a11555ce521e4944e09ab17549d85b487dcd26c84b5017a39e31a3670889ba00000001010000000103290decd9548b62a8d60345a988386fc84ba6bc95484008f6362f93160ef3e563', + 'hex', + ).toString('base64'), + topics: [ + BinaryUtils.base64Encode(Events.SIGNERS_ROTATED_EVENT), + BinaryUtils.hexToBase64('01'), + Buffer.from('0c38359b7a35c755573659d797afec315bb0e51374a056745abd9764715a15da', 'hex').toString('base64'), + ], + }; + + const transaction = createMock(); + transaction.hash = 'txHash'; + transaction.status = new TransactionStatus('success'); + + it('Should handle event', async () => { + transaction.logs.events = [ + TransactionEvent.fromHttpResponse({ + address: 'other', + identifier: 'other', + topics: [], + data: '', + }), + TransactionEvent.fromHttpResponse({ + address: 'mockGatewayAddress', + identifier: 'other', + topics: [], + data: '', + }), + TransactionEvent.fromHttpResponse(rawEvent), + ]; + + redisHelper.smembers.mockReturnValueOnce(Promise.resolve(['txHash'])); + proxy.getTransaction.mockReturnValueOnce(Promise.resolve(transaction)); + + await service.processCrossChainTransactionsRaw(); + + expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledTimes(1); + expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledWith(transaction.logs.events[2]); + expect(grpcService.verifyVerifierSet).toHaveBeenCalledTimes(1); + expect(grpcService.verifyVerifierSet).toHaveBeenCalledWith( + '0xtxHash-2', + expect.anything(), + expect.anything(), + expect.anything(), + ); + }); + + it('Should handle event error', async () => { + transaction.logs.events = [TransactionEvent.fromHttpResponse(rawEvent)]; + + grpcService.verifyVerifierSet.mockReturnValueOnce( + Promise.resolve({ + published: false, + receiptId: '', + }), + ); + + redisHelper.smembers.mockReturnValueOnce(Promise.resolve(['txHash'])); + proxy.getTransaction.mockReturnValueOnce(Promise.resolve(transaction)); + + await service.processCrossChainTransactionsRaw(); + + expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledTimes(1); + expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledWith(transaction.logs.events[0]); + expect(grpcService.verifyVerifierSet).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/apps/mvx-event-processor/src/cross-chain-transaction-processor/index.ts b/apps/mvx-event-processor/src/cross-chain-transaction-processor/index.ts new file mode 100644 index 0000000..4c49e35 --- /dev/null +++ b/apps/mvx-event-processor/src/cross-chain-transaction-processor/index.ts @@ -0,0 +1,2 @@ +export * from './cross-chain-transaction.processor.module'; +export * from './cross-chain-transaction.processor.service'; diff --git a/apps/mvx-event-processor/src/event-processor/event.processor.module.ts b/apps/mvx-event-processor/src/event-processor/event.processor.module.ts index 4bab530..9d0c917 100644 --- a/apps/mvx-event-processor/src/event-processor/event.processor.module.ts +++ b/apps/mvx-event-processor/src/event-processor/event.processor.module.ts @@ -3,6 +3,7 @@ import { Module } from '@nestjs/common'; import { EventProcessorService } from './event.processor.service'; import { ApiConfigModule, ApiConfigService } from '@mvx-monorepo/common'; import { ProcessorsModule } from '../processors'; +import { HelpersModule } from '@mvx-monorepo/common/helpers/helpers.module'; @Module({ imports: [ @@ -15,6 +16,7 @@ import { ProcessorsModule } from '../processors'; inject: [ApiConfigService], }), ProcessorsModule, + HelpersModule, ], providers: [EventProcessorService], }) diff --git a/apps/mvx-event-processor/src/event-processor/event.processor.service.spec.ts b/apps/mvx-event-processor/src/event-processor/event.processor.service.spec.ts index 1c4bd14..1bfd6c2 100644 --- a/apps/mvx-event-processor/src/event-processor/event.processor.service.spec.ts +++ b/apps/mvx-event-processor/src/event-processor/event.processor.service.spec.ts @@ -3,11 +3,13 @@ import { ApiConfigService } from '@mvx-monorepo/common'; import { createMock, DeepMocked } from '@golevelup/ts-jest'; import { Test } from '@nestjs/testing'; import { NotifierBlockEvent } from './types'; -import { GatewayProcessor, GasServiceProcessor } from '../processors'; +import { GasServiceProcessor, GatewayProcessor } from '../processors'; +import { RedisHelper } from '@mvx-monorepo/common/helpers/redis.helper'; describe('EventProcessorService', () => { let gatewayProcessor: DeepMocked; let gasServiceProcessor: DeepMocked; + let redisHelper: DeepMocked; let apiConfigService: DeepMocked; let service: EventProcessorService; @@ -15,6 +17,7 @@ describe('EventProcessorService', () => { beforeEach(async () => { gatewayProcessor = createMock(); gasServiceProcessor = createMock(); + redisHelper = createMock(); apiConfigService = createMock(); apiConfigService.getContractGateway.mockReturnValue('mockGatewayAddress'); @@ -32,6 +35,10 @@ describe('EventProcessorService', () => { return gasServiceProcessor; } + if (token === RedisHelper) { + return redisHelper; + } + if (token === ApiConfigService) { return apiConfigService; } @@ -72,16 +79,26 @@ describe('EventProcessorService', () => { expect(apiConfigService.getContractGateway).toHaveBeenCalledTimes(1); expect(gatewayProcessor.handleEvent).not.toHaveBeenCalled(); expect(gasServiceProcessor.handleEvent).not.toHaveBeenCalled(); + expect(redisHelper.sadd).not.toHaveBeenCalled(); }); it('Should consume gateway event', async () => { + gatewayProcessor.handleEvent.mockReturnValue(Promise.resolve('txHash')); + const blockEvent: NotifierBlockEvent = { hash: 'test', shardId: 1, timestamp: 123456, events: [ { - txHash: 'test', + txHash: 'txHash', + address: 'mockGatewayAddress', + identifier: 'any', + data: '', + topics: [], + }, + { + txHash: 'txHash', address: 'mockGatewayAddress', identifier: 'any', data: '', @@ -93,8 +110,10 @@ describe('EventProcessorService', () => { await service.consumeEvents(blockEvent); expect(apiConfigService.getContractGateway).toHaveBeenCalledTimes(1); - expect(gatewayProcessor.handleEvent).toHaveBeenCalledTimes(1); + expect(gatewayProcessor.handleEvent).toHaveBeenCalledTimes(2); expect(gasServiceProcessor.handleEvent).not.toHaveBeenCalled(); + expect(redisHelper.sadd).toHaveBeenCalledTimes(1); + expect(redisHelper.sadd).toHaveBeenCalledWith('crossChainTransactions', 'txHash'); }); it('Should consume gas contract event', async () => { @@ -118,6 +137,7 @@ describe('EventProcessorService', () => { expect(apiConfigService.getContractGateway).toHaveBeenCalledTimes(1); expect(gasServiceProcessor.handleEvent).toHaveBeenCalledTimes(1); expect(gatewayProcessor.handleEvent).not.toHaveBeenCalled(); + expect(redisHelper.sadd).not.toHaveBeenCalled(); }); }); }); diff --git a/apps/mvx-event-processor/src/event-processor/event.processor.service.ts b/apps/mvx-event-processor/src/event-processor/event.processor.service.ts index 0b15b22..eb56f47 100644 --- a/apps/mvx-event-processor/src/event-processor/event.processor.service.ts +++ b/apps/mvx-event-processor/src/event-processor/event.processor.service.ts @@ -1,9 +1,10 @@ import { Injectable, Logger } from '@nestjs/common'; -import { ApiConfigService } from '@mvx-monorepo/common'; +import { ApiConfigService, CacheInfo } from '@mvx-monorepo/common'; import { NotifierBlockEvent, NotifierEvent } from './types'; import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { EVENTS_NOTIFIER_QUEUE } from '../../../../config/configuration'; import { GatewayProcessor, GasServiceProcessor } from '../processors'; +import { RedisHelper } from '@mvx-monorepo/common/helpers/redis.helper'; @Injectable() export class EventProcessorService { @@ -14,6 +15,7 @@ export class EventProcessorService { constructor( private readonly gatewayProcessor: GatewayProcessor, private readonly gasServiceProcessor: GasServiceProcessor, + private readonly redisHelper: RedisHelper, apiConfigService: ApiConfigService, ) { this.contractGateway = apiConfigService.getContractGateway(); @@ -27,8 +29,18 @@ export class EventProcessorService { }) async consumeEvents(blockEvent: NotifierBlockEvent) { try { + const crossChainTransactions = new Set(); + for (const event of blockEvent.events) { - await this.handleEvent(event); + const txHash = await this.handleEvent(event); + + if (txHash) { + crossChainTransactions.add(txHash); + } + } + + if (crossChainTransactions.size > 0) { + await this.redisHelper.sadd(CacheInfo.CrossChainTransactions().key, ...crossChainTransactions); } } catch (error) { this.logger.error( @@ -56,9 +68,9 @@ export class EventProcessorService { this.logger.debug('Received Gateway event from MultiversX:'); this.logger.debug(JSON.stringify(event)); - await this.gatewayProcessor.handleEvent(event); - - return; + return await this.gatewayProcessor.handleEvent(event); } + + return undefined; } } diff --git a/apps/mvx-event-processor/src/mvx-event-processor.module.ts b/apps/mvx-event-processor/src/mvx-event-processor.module.ts index c125064..9fde826 100644 --- a/apps/mvx-event-processor/src/mvx-event-processor.module.ts +++ b/apps/mvx-event-processor/src/mvx-event-processor.module.ts @@ -3,6 +3,7 @@ import { EventProcessorModule } from './event-processor'; import { MessageApprovedProcessorModule } from './message-approved-processor'; import { GasCheckerModule } from './gas-checker/gas-checker.module'; import { ContractCallEventProcessorModule } from './contract-call-event-processor'; +import { CrossChainTransactionProcessorModule } from './cross-chain-transaction-processor'; @Module({ imports: [ @@ -10,6 +11,7 @@ import { ContractCallEventProcessorModule } from './contract-call-event-processo MessageApprovedProcessorModule, GasCheckerModule, ContractCallEventProcessorModule, + CrossChainTransactionProcessorModule, ], }) export class MvxEventProcessorModule {} diff --git a/apps/mvx-event-processor/src/processors/entities/processor.interface.ts b/apps/mvx-event-processor/src/processors/entities/processor.interface.ts deleted file mode 100644 index 79fc973..0000000 --- a/apps/mvx-event-processor/src/processors/entities/processor.interface.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { NotifierEvent } from '../../event-processor/types'; - -export interface ProcessorInterface { - handleEvent(rawEvent: NotifierEvent): Promise; -} diff --git a/apps/mvx-event-processor/src/processors/gas-service.processor.ts b/apps/mvx-event-processor/src/processors/gas-service.processor.ts index efa32d2..3d1db29 100644 --- a/apps/mvx-event-processor/src/processors/gas-service.processor.ts +++ b/apps/mvx-event-processor/src/processors/gas-service.processor.ts @@ -1,5 +1,4 @@ import { Injectable, Logger } from '@nestjs/common'; -import { ProcessorInterface } from './entities/processor.interface'; import { NotifierEvent } from '../event-processor/types'; import { BinaryUtils } from '@multiversx/sdk-nestjs-common'; import { Events } from '@mvx-monorepo/common/utils/event.enum'; @@ -12,7 +11,7 @@ import { GasAddedEvent, GasPaidForContractCallEvent } from '@mvx-monorepo/common import BigNumber from 'bignumber.js'; @Injectable() -export class GasServiceProcessor implements ProcessorInterface { +export class GasServiceProcessor { private logger: Logger; constructor( diff --git a/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts b/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts index 03f2b7d..23817f1 100644 --- a/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts +++ b/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts @@ -1,25 +1,21 @@ -import { ApiConfigService } from '@mvx-monorepo/common'; import { createMock, DeepMocked } from '@golevelup/ts-jest'; import { Test } from '@nestjs/testing'; import { BinaryUtils } from '@multiversx/sdk-nestjs-common'; import { EventIdentifiers, Events } from '@mvx-monorepo/common/utils/event.enum'; -import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository'; import { GatewayProcessor } from './gateway.processor'; import { NotifierEvent } from '../event-processor/types'; import { Address } from '@multiversx/sdk-core/out'; -import { ContractCallEventStatus, MessageApproved, MessageApprovedStatus } from '@prisma/client'; +import { MessageApproved, MessageApprovedStatus } from '@prisma/client'; import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service'; import { GatewayContract } from '@mvx-monorepo/common/contracts/gateway.contract'; import { ContractCallEvent, MessageApprovedEvent } from '@mvx-monorepo/common/contracts/entities/gateway-events'; import { TransactionEvent } from '@multiversx/sdk-network-providers/out'; import { MessageApprovedRepository } from '@mvx-monorepo/common/database/repository/message-approved.repository'; -describe('ContractCallProcessor', () => { +describe('GatewayProcessor', () => { let gatewayContract: DeepMocked; - let contractCallEventRepository: DeepMocked; let messageApprovedRepository: DeepMocked; let grpcService: DeepMocked; - let apiConfigService: DeepMocked; let service: GatewayProcessor; @@ -41,10 +37,8 @@ describe('ContractCallProcessor', () => { beforeEach(async () => { gatewayContract = createMock(); - contractCallEventRepository = createMock(); messageApprovedRepository = createMock(); grpcService = createMock(); - apiConfigService = createMock(); const moduleRef = await Test.createTestingModule({ providers: [GatewayProcessor], @@ -54,10 +48,6 @@ describe('ContractCallProcessor', () => { return gatewayContract; } - if (token === ContractCallEventRepository) { - return contractCallEventRepository; - } - if (token === MessageApprovedRepository) { return messageApprovedRepository; } @@ -66,15 +56,10 @@ describe('ContractCallProcessor', () => { return grpcService; } - if (token === ApiConfigService) { - return apiConfigService; - } - return null; }) .compile(); - gatewayContract.decodeContractCallEvent.mockReturnValue(contractCallEvent); gatewayContract.decodeMessageApprovedEvent.mockReturnValue(messageApprovedEvent); gatewayContract.decodeMessageExecutedEvent.mockReturnValue(messageApprovedEvent.commandId); @@ -94,7 +79,6 @@ describe('ContractCallProcessor', () => { expect(gatewayContract.decodeContractCallEvent).not.toHaveBeenCalled(); expect(gatewayContract.decodeMessageApprovedEvent).not.toHaveBeenCalled(); - expect(contractCallEventRepository.create).not.toHaveBeenCalled(); expect(messageApprovedRepository.create).not.toHaveBeenCalled(); expect(grpcService.verify).not.toHaveBeenCalled(); expect(grpcService.getPayload).not.toHaveBeenCalled(); @@ -118,46 +102,9 @@ describe('ContractCallProcessor', () => { }; it('Should handle event', async () => { - await service.handleEvent(rawEvent); - - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent)); - expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); - expect(contractCallEventRepository.create).toHaveBeenCalledWith({ - txHash: 'txHash', - eventIndex: 0, - status: ContractCallEventStatus.PENDING, - sourceAddress: 'erd1qqqqqqqqqqqqqpgqzqvm5ywqqf524efwrhr039tjs29w0qltkklsa05pk7', - sourceChain: 'multiversx', - destinationAddress: 'destinationAddress', - destinationChain: 'ethereum', - payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7', - payload: Buffer.from('payload'), - retry: 0, - }); - expect(grpcService.verify).toHaveBeenCalledTimes(1); - }); - - it('Should not handle duplicate', async () => { - contractCallEventRepository.create.mockReturnValueOnce(Promise.resolve(null)); - - await service.handleEvent(rawEvent); - - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent)); - expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); - expect(grpcService.verify).not.toHaveBeenCalled(); - }); - - it('Should throw error can not save in database', async () => { - contractCallEventRepository.create.mockRejectedValue(new Error('Can not save in database')); - - await expect(service.handleEvent(rawEvent)).rejects.toThrow(); + const txHash = await service.handleEvent(rawEvent); - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent)); - expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); - expect(grpcService.verify).not.toHaveBeenCalled(); + expect(txHash).toEqual('txHash'); }); }); @@ -241,30 +188,9 @@ describe('ContractCallProcessor', () => { }; it('Should handle event', async () => { - await service.handleEvent(rawEvent); - - expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledTimes(1); - expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledWith( - TransactionEvent.fromHttpResponse(rawEvent), - ); - expect(grpcService.verifyVerifierSet).toHaveBeenCalledTimes(1); - }); - - it('Should handle event error', async () => { - grpcService.verifyVerifierSet.mockReturnValueOnce( - Promise.resolve({ - published: false, - receiptId: '', - }), - ); + const txHash = await service.handleEvent(rawEvent); - await service.handleEvent(rawEvent); - - expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledTimes(1); - expect(gatewayContract.decodeSignersRotatedEvent).toHaveBeenCalledWith( - TransactionEvent.fromHttpResponse(rawEvent), - ); - expect(grpcService.verifyVerifierSet).toHaveBeenCalledTimes(1); + expect(txHash).toEqual('txHash'); }); }); diff --git a/apps/mvx-event-processor/src/processors/gateway.processor.ts b/apps/mvx-event-processor/src/processors/gateway.processor.ts index 6bf86fe..ce0e6ec 100644 --- a/apps/mvx-event-processor/src/processors/gateway.processor.ts +++ b/apps/mvx-event-processor/src/processors/gateway.processor.ts @@ -2,26 +2,18 @@ import { Injectable, Logger } from '@nestjs/common'; import { NotifierEvent } from '../event-processor/types'; import { GatewayContract } from '@mvx-monorepo/common/contracts/gateway.contract'; import { TransactionEvent } from '@multiversx/sdk-network-providers/out'; -import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository'; -import { ContractCallEventStatus, MessageApprovedStatus } from '@prisma/client'; +import { MessageApprovedStatus } from '@prisma/client'; import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service'; -import { ProcessorInterface } from './entities/processor.interface'; import { EventIdentifiers, Events } from '@mvx-monorepo/common/utils/event.enum'; import { BinaryUtils } from '@multiversx/sdk-nestjs-common'; import { MessageApprovedRepository } from '@mvx-monorepo/common/database/repository/message-approved.repository'; -import { CONSTANTS } from '@mvx-monorepo/common/utils/constants.enum'; - -// order/logIndex is unsupported since we can't easily get it in the relayer, so we use 0 by default -// this means that only one cross chain call is supported for now (the first appropriate call found in transaction logs) -const UNSUPPORTED_LOG_INDEX: number = 0; @Injectable() -export class GatewayProcessor implements ProcessorInterface { +export class GatewayProcessor { private readonly logger: Logger; constructor( private readonly gatewayContract: GatewayContract, - private readonly contractCallEventRepository: ContractCallEventRepository, private readonly messageApprovedRepository: MessageApprovedRepository, private readonly grpcService: GrpcService, ) { @@ -31,10 +23,11 @@ export class GatewayProcessor implements ProcessorInterface { async handleEvent(rawEvent: NotifierEvent) { const eventName = BinaryUtils.base64Decode(rawEvent.topics[0]); - if (rawEvent.identifier === EventIdentifiers.CALL_CONTRACT && eventName === Events.CONTRACT_CALL_EVENT) { - await this.handleContractCallEvent(rawEvent); - - return; + if ( + (rawEvent.identifier === EventIdentifiers.CALL_CONTRACT && eventName === Events.CONTRACT_CALL_EVENT) || + (rawEvent.identifier === EventIdentifiers.ROTATE_SIGNERS && eventName === Events.SIGNERS_ROTATED_EVENT) + ) { + return rawEvent.txHash; } if (rawEvent.identifier === EventIdentifiers.APPROVE_MESSAGES && eventName === Events.MESSAGE_APPROVED_EVENT) { @@ -43,45 +36,13 @@ export class GatewayProcessor implements ProcessorInterface { return; } - if ( - rawEvent.identifier === EventIdentifiers.ROTATE_SIGNERS && - eventName === Events.SIGNERS_ROTATED_EVENT - ) { - await this.handleSignersRotatedEvent(rawEvent); - } - - if ( - rawEvent.identifier === EventIdentifiers.VALIDATE_MESSAGE && - eventName === Events.MESSAGE_EXECUTED_EVENT - ) { + if (rawEvent.identifier === EventIdentifiers.VALIDATE_MESSAGE && eventName === Events.MESSAGE_EXECUTED_EVENT) { await this.handleMessageExecutedEvent(rawEvent); return; } - } - private async handleContractCallEvent(rawEvent: NotifierEvent) { - const event = this.gatewayContract.decodeContractCallEvent(TransactionEvent.fromHttpResponse(rawEvent)); - - const contractCallEvent = await this.contractCallEventRepository.create({ - txHash: rawEvent.txHash, - eventIndex: UNSUPPORTED_LOG_INDEX, - status: ContractCallEventStatus.PENDING, - sourceAddress: event.sender.bech32(), - sourceChain: CONSTANTS.SOURCE_CHAIN_NAME, - destinationAddress: event.destinationAddress, - destinationChain: event.destinationChain, - payloadHash: event.payloadHash, - payload: event.payload, - retry: 0, - }); - - // A duplicate might exist in the database, so we can skip creation in this case - if (!contractCallEvent) { - return; - } - - this.grpcService.verify(contractCallEvent); + return undefined; } private async handleMessageApprovedEvent(rawEvent: NotifierEvent) { @@ -107,41 +68,6 @@ export class GatewayProcessor implements ProcessorInterface { } } - private async handleSignersRotatedEvent(rawEvent: NotifierEvent) { - const weightedSigners = this.gatewayContract.decodeSignersRotatedEvent( - TransactionEvent.fromHttpResponse(rawEvent), - ); - - const id = `${rawEvent.txHash}-${UNSUPPORTED_LOG_INDEX}`; - - // TODO: Test that this works correctly - const response = await this.grpcService.verifyVerifierSet( - id, - weightedSigners.signers, - weightedSigners.threshold, - weightedSigners.nonce, - ); - - if (response.published) { - return; - } - - this.logger.warn(`Couldn't dispatch verifyWorkerSet ${id} to Amplifier API. Retrying...`); - - setTimeout(async () => { - const response = await this.grpcService.verifyVerifierSet( - id, - weightedSigners.signers, - weightedSigners.threshold, - weightedSigners.nonce, - ); - - if (!response.published) { - this.logger.error(`Couldn't dispatch verifyWorkerSet ${id} to Amplifier API.`); - } - }, 60_000); - } - private async handleMessageExecutedEvent(rawEvent: NotifierEvent) { const commandId = this.gatewayContract.decodeMessageExecutedEvent(TransactionEvent.fromHttpResponse(rawEvent)); diff --git a/libs/common/src/contracts/gateway.contract.ts b/libs/common/src/contracts/gateway.contract.ts index ae497eb..378e0cd 100644 --- a/libs/common/src/contracts/gateway.contract.ts +++ b/libs/common/src/contracts/gateway.contract.ts @@ -1,6 +1,7 @@ import { AbiRegistry, IAddress, + ITransactionEvent, ResultsParser, SmartContract, Transaction, @@ -36,9 +37,16 @@ export class GatewayContract { }); } - decodeContractCallEvent(event: TransactionEvent): ContractCallEvent { + decodeContractCallEvent(event: ITransactionEvent): ContractCallEvent { const eventDefinition = this.abi.getEvent(Events.CONTRACT_CALL_EVENT); - const outcome = this.resultsParser.parseEvent(event, eventDefinition); + const outcome = this.resultsParser.parseEvent( + { + topics: event.topics.map((topic) => Buffer.from(topic.hex(), 'hex')), + dataPayload: event.dataPayload, + additionalData: event.additionalData, + }, + eventDefinition, + ); return { sender: outcome.sender, @@ -63,9 +71,16 @@ export class GatewayContract { }; } - decodeSignersRotatedEvent(event: TransactionEvent): WeightedSigners { + decodeSignersRotatedEvent(event: ITransactionEvent): WeightedSigners { const eventDefinition = this.abi.getEvent(Events.SIGNERS_ROTATED_EVENT); - const outcome = this.resultsParser.parseEvent(event, eventDefinition); + const outcome = this.resultsParser.parseEvent( + { + topics: event.topics.map((topic) => Buffer.from(topic.hex(), 'hex')), + dataPayload: event.dataPayload, + additionalData: event.additionalData, + }, + eventDefinition, + ); const signers = outcome.signers; diff --git a/libs/common/src/grpc/grpc.service.ts b/libs/common/src/grpc/grpc.service.ts index 42c6868..6aec3d5 100644 --- a/libs/common/src/grpc/grpc.service.ts +++ b/libs/common/src/grpc/grpc.service.ts @@ -126,7 +126,7 @@ export class GrpcService implements OnModuleInit { const payload = Buffer.from( JSON.stringify({ verify_verifier_set: { - message_id: '0x' + messageId, // TODO: Check that this format is correct for the messageId + message_id: messageId, new_verifier_set: { signers, threshold: threshold.toString(), diff --git a/libs/common/src/helpers/helpers.module.ts b/libs/common/src/helpers/helpers.module.ts new file mode 100644 index 0000000..c98dff8 --- /dev/null +++ b/libs/common/src/helpers/helpers.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { DynamicModuleUtils } from '@mvx-monorepo/common'; +import { RedisHelper } from '@mvx-monorepo/common/helpers/redis.helper'; +import { MetricsModule } from '@multiversx/sdk-nestjs-monitoring'; + +@Module({ + imports: [DynamicModuleUtils.getRedisCacheModule(), DynamicModuleUtils.getRedisModule(), MetricsModule], + providers: [RedisHelper], + exports: [RedisHelper], +}) +export class HelpersModule {} diff --git a/libs/common/src/helpers/redis.helper.ts b/libs/common/src/helpers/redis.helper.ts new file mode 100644 index 0000000..da7e3ed --- /dev/null +++ b/libs/common/src/helpers/redis.helper.ts @@ -0,0 +1,49 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import Redis from 'ioredis'; +import { REDIS_CLIENT_TOKEN } from '@multiversx/sdk-nestjs-redis/lib/entities/common.constants'; +import { RedisCacheService } from '@multiversx/sdk-nestjs-cache'; +import { MetricsService, PerformanceProfiler } from '@multiversx/sdk-nestjs-monitoring'; + +@Injectable() +export class RedisHelper { + private readonly logger: Logger; + + constructor( + @Inject(REDIS_CLIENT_TOKEN) private readonly redis: Redis, + private readonly redisCache: RedisCacheService, + private readonly metricsService: MetricsService, + ) { + this.logger = new Logger(RedisHelper.name); + } + + sadd(key: string, ...values: string[]) { + return this.redisCache.sadd(key, ...values); + } + + smembers(key: string) { + return this.redisCache.smembers(key); + } + + async srem(key: string, ...values: string[]) { + const performanceProfiler = new PerformanceProfiler(); + try { + return await this.redis.srem(key, ...values); + } catch (error) { + if (error instanceof Error) { + this.logger.error( + 'An error occurred while trying to srem redis.', + Object.assign( + { + exception: error === null || error === void 0 ? void 0 : error.toString(), + key, + }, + ), + ); + } + throw error; + } finally { + performanceProfiler.stop(); + this.metricsService.setRedisDuration('SREM', performanceProfiler.duration); + } + } +} diff --git a/libs/common/src/utils/cache.info.ts b/libs/common/src/utils/cache.info.ts index 9959230..63aed29 100644 --- a/libs/common/src/utils/cache.info.ts +++ b/libs/common/src/utils/cache.info.ts @@ -24,4 +24,11 @@ export class CacheInfo { ttl: Constants.oneWeek(), }; } + + static CrossChainTransactions(): CacheInfo { + return { + key: `crossChainTransactions`, + ttl: Constants.oneWeek(), + }; + } } diff --git a/libs/common/src/utils/dynamic.module.utils.ts b/libs/common/src/utils/dynamic.module.utils.ts index 0ddeabf..1c2c51e 100644 --- a/libs/common/src/utils/dynamic.module.utils.ts +++ b/libs/common/src/utils/dynamic.module.utils.ts @@ -1,6 +1,7 @@ import { CacheModule, RedisCacheModule, RedisCacheModuleOptions } from '@multiversx/sdk-nestjs-cache'; import { DynamicModule } from '@nestjs/common'; import { ApiConfigModule, ApiConfigService } from '../config'; +import { RedisModule } from '@multiversx/sdk-nestjs-redis'; export class DynamicModuleUtils { static getCacheModule(): DynamicModule { @@ -38,4 +39,19 @@ export class DynamicModuleUtils { inject: [ApiConfigService], }); } + + static getRedisModule(): DynamicModule { + return RedisModule.forRootAsync({ + imports: [ApiConfigModule], + useFactory: (apiConfigService: ApiConfigService) => { + return { + config: { + host: apiConfigService.getRedisUrl(), + port: apiConfigService.getRedisPort(), + }, + }; + }, + inject: [ApiConfigService], + }); + } }