From 77ce540ebaff68bab4c94104ae92508a74ebe426 Mon Sep 17 00:00:00 2001 From: Rares <6453351+raress96@users.noreply.github.com> Date: Fri, 28 Jun 2024 13:18:17 +0300 Subject: [PATCH] Refactoring and fix tests for new grpc verify. --- .../contract-call-event.processor.service.ts | 6 +- .../src/processors/gateway.processor.spec.ts | 55 +-------- .../src/processors/gateway.processor.ts | 4 - .../contract-call-event.processor.e2e-spec.ts | 67 ++--------- .../contract-call-event.repository.ts | 10 +- libs/common/src/grpc/grpc.service.spec.ts | 106 ++++++++++++++++++ libs/common/src/grpc/grpc.service.ts | 60 +++++----- 7 files changed, 160 insertions(+), 148 deletions(-) create mode 100644 libs/common/src/grpc/grpc.service.spec.ts diff --git a/apps/mvx-event-processor/src/contract-call-event-processor/contract-call-event.processor.service.ts b/apps/mvx-event-processor/src/contract-call-event-processor/contract-call-event.processor.service.ts index cf2b6ee..e22db81 100644 --- a/apps/mvx-event-processor/src/contract-call-event-processor/contract-call-event.processor.service.ts +++ b/apps/mvx-event-processor/src/contract-call-event-processor/contract-call-event.processor.service.ts @@ -19,7 +19,7 @@ export class ContractCallEventProcessorService { } // Offset at second 15 to not run at the same time as processPendingMessageApproved - @Cron('*/15 * * * * *') + @Cron('15 */2 * * * *') async processPendingContractCallEvent() { await Locker.lock('processPendingContractCallEvent', async () => { this.logger.debug('Running processPendingContractCallEvent cron'); @@ -40,9 +40,11 @@ export class ContractCallEventProcessorService { continue; } + contractCallEvent.retry += 1; + this.logger.debug(`Trying to verify ContractCallEvent with id ${contractCallEvent.id}, retry ${contractCallEvent.retry}}`); - await this.contractCallEventRepository.updateRetry(contractCallEvent.id, contractCallEvent.retry + 1); + await this.contractCallEventRepository.updateRetry(contractCallEvent.id, contractCallEvent.retry); this.grpcService.verify(contractCallEvent); } diff --git a/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts b/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts index 07eb37e..03f2b7d 100644 --- a/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts +++ b/apps/mvx-event-processor/src/processors/gateway.processor.spec.ts @@ -7,14 +7,12 @@ import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repos import { GatewayProcessor } from './gateway.processor'; import { NotifierEvent } from '../event-processor/types'; import { Address } from '@multiversx/sdk-core/out'; -import { MessageApproved, MessageApprovedStatus, ContractCallEventStatus } from '@prisma/client'; +import { ContractCallEventStatus, MessageApproved, MessageApprovedStatus } from '@prisma/client'; import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service'; import { GatewayContract } from '@mvx-monorepo/common/contracts/gateway.contract'; -import { MessageApprovedEvent, ContractCallEvent } from '@mvx-monorepo/common/contracts/entities/gateway-events'; +import { ContractCallEvent, MessageApprovedEvent } from '@mvx-monorepo/common/contracts/entities/gateway-events'; import { TransactionEvent } from '@multiversx/sdk-network-providers/out'; import { MessageApprovedRepository } from '@mvx-monorepo/common/database/repository/message-approved.repository'; -import { Subject } from 'rxjs'; -import { ErrorCode, VerifyResponse } from '@mvx-monorepo/common/grpc/entities/amplifier'; describe('ContractCallProcessor', () => { let gatewayContract: DeepMocked; @@ -119,17 +117,13 @@ describe('ContractCallProcessor', () => { ], }; - it('Should handle event success', async () => { - const observable = new Subject(); - grpcService.verify.mockReturnValueOnce(observable); - + it('Should handle event', async () => { await service.handleEvent(rawEvent); expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent)); expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); expect(contractCallEventRepository.create).toHaveBeenCalledWith({ - id: '0xtxHash-0', txHash: 'txHash', eventIndex: 0, status: ContractCallEventStatus.PENDING, @@ -139,50 +133,9 @@ describe('ContractCallProcessor', () => { destinationChain: 'ethereum', payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7', payload: Buffer.from('payload'), + retry: 0, }); expect(grpcService.verify).toHaveBeenCalledTimes(1); - - observable.next({ - message: undefined, - error: undefined, - }); - observable.complete(); - - expect(contractCallEventRepository.updateStatus).toHaveBeenCalledTimes(1); - expect(contractCallEventRepository.updateStatus).toHaveBeenCalledWith({ - id: 'multiversx_txHash-0', - txHash: 'txHash', - eventIndex: 0, - status: ContractCallEventStatus.APPROVED, - sourceAddress: 'erd1qqqqqqqqqqqqqpgqzqvm5ywqqf524efwrhr039tjs29w0qltkklsa05pk7', - sourceChain: 'multiversx', - destinationAddress: 'destinationAddress', - destinationChain: 'ethereum', - payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7', - payload: Buffer.from('payload'), - }); - }); - - it('Should handle error success', async () => { - const observable = new Subject(); - grpcService.verify.mockReturnValueOnce(observable); - - await service.handleEvent(rawEvent); - - expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1); - expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1); - expect(grpcService.verify).toHaveBeenCalledTimes(1); - - observable.next({ - message: undefined, - error: { - error: 'error', - errorCode: ErrorCode.VERIFICATION_FAILED, - }, - }); - observable.complete(); - - expect(contractCallEventRepository.updateStatus).not.toHaveBeenCalled(); }); it('Should not handle duplicate', async () => { diff --git a/apps/mvx-event-processor/src/processors/gateway.processor.ts b/apps/mvx-event-processor/src/processors/gateway.processor.ts index 49a06fc..84ef3ad 100644 --- a/apps/mvx-event-processor/src/processors/gateway.processor.ts +++ b/apps/mvx-event-processor/src/processors/gateway.processor.ts @@ -63,10 +63,7 @@ export class GatewayProcessor implements ProcessorInterface { private async handleContractCallEvent(rawEvent: NotifierEvent) { const event = this.gatewayContract.decodeContractCallEvent(TransactionEvent.fromHttpResponse(rawEvent)); - // The id needs to have `0x` in front of the txHash (hex string) - const id = `0x${rawEvent.txHash}-${UNSUPPORTED_LOG_INDEX}`; const contractCallEvent = await this.contractCallEventRepository.create({ - id, txHash: rawEvent.txHash, eventIndex: UNSUPPORTED_LOG_INDEX, status: ContractCallEventStatus.PENDING, @@ -84,7 +81,6 @@ export class GatewayProcessor implements ProcessorInterface { return; } - // TODO: Test if this works correctly this.grpcService.verify(contractCallEvent); } diff --git a/apps/mvx-event-processor/test/contract-call-event.processor.e2e-spec.ts b/apps/mvx-event-processor/test/contract-call-event.processor.e2e-spec.ts index 3370085..ecb10e5 100644 --- a/apps/mvx-event-processor/test/contract-call-event.processor.e2e-spec.ts +++ b/apps/mvx-event-processor/test/contract-call-event.processor.e2e-spec.ts @@ -9,8 +9,6 @@ import { import { ContractCallEvent, ContractCallEventStatus } from '@prisma/client'; import { createMock, DeepMocked } from '@golevelup/ts-jest'; import { GrpcModule, GrpcService } from '@mvx-monorepo/common'; -import { Subject } from 'rxjs'; -import { ErrorCode, VerifyResponse } from '@mvx-monorepo/common/grpc/entities/amplifier'; import { TestGrpcModule } from './testGrpc.module'; describe('ContractCallEventProcessorService', () => { @@ -77,21 +75,9 @@ describe('ContractCallEventProcessorService', () => { return result; }; - it('Should process pending contract call event success', async () => { + it('Should process pending contract call event', async () => { const originalEntry = await createContractCallEvent(); - const observable = new Subject(); - grpcService.verify.mockReturnValueOnce(observable); - - // Publish to observable with a delay - setTimeout(() => { - observable.next({ - message: undefined, - error: undefined, - }); - observable.complete(); - }, 500); - try { await service.processPendingContractCallEvent(); } catch (e) { @@ -99,6 +85,7 @@ describe('ContractCallEventProcessorService', () => { } expect(await contractCallEventRepository.findPending()).toEqual([]); + expect(grpcService.verify).toHaveBeenCalledTimes(1); const firstEntry = await prisma.contractCallEvent.findUnique({ where: { @@ -107,55 +94,16 @@ describe('ContractCallEventProcessorService', () => { }); expect(firstEntry).toEqual({ ...originalEntry, - status: ContractCallEventStatus.APPROVED, + status: ContractCallEventStatus.PENDING, + retry: 1, updatedAt: expect.any(Date), }); }); - it('Should process pending contract call event not success', async () => { - const originalEntry = await createContractCallEvent(); - - const observable = new Subject(); - grpcService.verify.mockReturnValueOnce(observable); - - // Publish to observable with a delay - setTimeout(() => { - observable.next({ - message: undefined, - error: { - error: 'error', - errorCode: ErrorCode.VERIFICATION_FAILED, - }, - }); - observable.complete(); - }, 500); - - try { - await service.processPendingContractCallEvent(); - } catch (e) { - // Locker.lock throws error for some reason, ignore - } - - expect(await contractCallEventRepository.findPending()).toEqual([]); - - const firstEntry = await prisma.contractCallEvent.findUnique({ - where: { - id: originalEntry.id, - }, + it('Should process pending contract call event retry', async () => { + const originalEntry = await createContractCallEvent({ + retry: 3, }); - expect(firstEntry).toEqual({ - ...originalEntry, - status: ContractCallEventStatus.FAILED, - updatedAt: expect.any(Date), - }); - }); - - it('Should process pending contract call event failed', async () => { - const originalEntry = await createContractCallEvent(); - - const observable = new Subject(); - grpcService.verify.mockReturnValueOnce(observable); - observable.complete(); try { await service.processPendingContractCallEvent(); @@ -164,6 +112,7 @@ describe('ContractCallEventProcessorService', () => { } expect(await contractCallEventRepository.findPending()).toEqual([]); + expect(grpcService.verify).not.toHaveBeenCalled(); const firstEntry = await prisma.contractCallEvent.findUnique({ where: { diff --git a/libs/common/src/database/repository/contract-call-event.repository.ts b/libs/common/src/database/repository/contract-call-event.repository.ts index bfd2fcc..bcc1d0b 100644 --- a/libs/common/src/database/repository/contract-call-event.repository.ts +++ b/libs/common/src/database/repository/contract-call-event.repository.ts @@ -7,10 +7,16 @@ import { ContractCallEventWithGasPaid } from '@mvx-monorepo/common/database/enti export class ContractCallEventRepository { constructor(private readonly prisma: PrismaService) {} - async create(data: Prisma.ContractCallEventCreateInput): Promise { + async create(data: Omit): Promise { + // The id needs to have `0x` in front of the txHash (hex string) + const id = `0x${data.txHash}-${data.eventIndex}`; + try { return await this.prisma.contractCallEvent.create({ - data, + data: { + id, + ...data, + }, }); } catch (e) { if (e instanceof Prisma.PrismaClientKnownRequestError) { diff --git a/libs/common/src/grpc/grpc.service.spec.ts b/libs/common/src/grpc/grpc.service.spec.ts new file mode 100644 index 0000000..02a02d2 --- /dev/null +++ b/libs/common/src/grpc/grpc.service.spec.ts @@ -0,0 +1,106 @@ +import { ApiConfigService } from '@mvx-monorepo/common'; +import { createMock, DeepMocked } from '@golevelup/ts-jest'; +import { Test } from '@nestjs/testing'; +import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository'; +import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service'; +import { Observable } from 'rxjs'; +import { + Amplifier, + Error, + ErrorCode, + VerifyRequest, + VerifyResponse, +} from '@mvx-monorepo/common/grpc/entities/amplifier'; +import { ClientGrpc } from '@nestjs/microservices'; +import { ProviderKeys } from '@mvx-monorepo/common/utils/provider.enum'; +import { ContractCallEvent, ContractCallEventStatus } from '@prisma/client'; + +describe('ContractCallProcessor', () => { + let amplifierService: Amplifier; + + let client: DeepMocked; + let contractCallEventRepository: DeepMocked; + let apiConfigService: DeepMocked; + + let service: GrpcService; + + const errorQueue: Error[] = []; + + beforeEach(async () => { + // @ts-ignore + amplifierService = { + verify(requestStream: Observable): Observable { + return new Observable((observer) => { + requestStream.subscribe({ + next: (request) => { + const item = errorQueue.shift(); + + // Simulate receiving a response for each request + observer.next({ + message: request.message, + error: item, + }); + }, + error: (err) => observer.error(err), + complete: () => observer.complete(), + }); + }); + }, + }; + + client = createMock(); + contractCallEventRepository = createMock(); + apiConfigService = createMock(); + + client.getService.mockReturnValue(amplifierService); + + const moduleRef = await Test.createTestingModule({ + providers: [GrpcService], + }) + .useMocker((token) => { + if (token === ProviderKeys.AXELAR_GRPC_CLIENT) { + return client; + } + + if (token === ContractCallEventRepository) { + return contractCallEventRepository; + } + + if (token === ApiConfigService) { + return apiConfigService; + } + + return null; + }) + .compile(); + await moduleRef.init(); + + service = moduleRef.get(GrpcService); + }); + + describe('verify', () => { + it('Should handle event success', () => { + const contractCallEvent: DeepMocked = createMock(); + contractCallEvent.id = 'id'; + + service.verify(contractCallEvent); + + expect(contractCallEventRepository.updateStatus).toHaveBeenCalledTimes(1); + expect(contractCallEventRepository.updateStatus).toHaveBeenCalledWith('id', ContractCallEventStatus.APPROVED); + }); + + it('Should handle event error', () => { + const contractCallEvent: DeepMocked = createMock(); + contractCallEvent.id = 'id'; + + errorQueue.push({ + error: 'some error', + errorCode: ErrorCode.INTERNAL_ERROR, + }); + + service.verify(contractCallEvent); + + expect(contractCallEventRepository.updateStatus).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/libs/common/src/grpc/grpc.service.ts b/libs/common/src/grpc/grpc.service.ts index eafe1f2..a4d6ba4 100644 --- a/libs/common/src/grpc/grpc.service.ts +++ b/libs/common/src/grpc/grpc.service.ts @@ -3,13 +3,15 @@ import { ProviderKeys } from '@mvx-monorepo/common/utils/provider.enum'; import { ClientGrpc } from '@nestjs/microservices'; import { ContractCallEvent, ContractCallEventStatus } from '@prisma/client'; import { Amplifier, SubscribeToApprovalsResponse, VerifyRequest } from '@mvx-monorepo/common/grpc/entities/amplifier'; -import { Observable, Subject, Subscription } from 'rxjs'; +import { Observable, retry, Subject, Subscription } from 'rxjs'; import BigNumber from 'bignumber.js'; import { ApiConfigService } from '@mvx-monorepo/common/config'; import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository'; const AMPLIFIER_SERVICE = 'Amplifier'; +const RETRY_DELAY = 5000; // 5 seconds + @Injectable() export class GrpcService implements OnModuleInit { // @ts-ignore @@ -18,7 +20,7 @@ export class GrpcService implements OnModuleInit { private readonly logger: Logger; private verifySubscription: Subscription | null = null; - private requestVerifySubject: Subject | null = null; + private requestVerifySubject: Subject; constructor( @Inject(ProviderKeys.AXELAR_GRPC_CLIENT) private readonly client: ClientGrpc, @@ -27,6 +29,7 @@ export class GrpcService implements OnModuleInit { ) { this.axelarContractVotingVerifier = apiConfigService.getAxelarContractVotingVerifier(); this.logger = new Logger(GrpcService.name); + this.requestVerifySubject = new Subject(); } onModuleInit() { @@ -34,37 +37,34 @@ export class GrpcService implements OnModuleInit { } verify(contractCallEvent: ContractCallEvent) { - if ( - !this.verifySubscription || - this.verifySubscription.closed || - !this.requestVerifySubject || - this.requestVerifySubject.closed - ) { - if (this.verifySubscription && !this.verifySubscription.closed) { - this.verifySubscription.unsubscribe(); - } - + if (!this.verifySubscription || this.verifySubscription.closed) { this.requestVerifySubject = new Subject(); - this.verifySubscription = this.amplifierService.verify(this.requestVerifySubject.asObservable()).subscribe({ - next: async (response) => { - if (!response.error && response.message) { - this.logger.debug(`Succesfully verified contract call event ${response.message.id}!`); + this.verifySubscription = this.amplifierService + .verify(this.requestVerifySubject.asObservable()) + .pipe( + retry({ + delay: RETRY_DELAY, + }), + ) + .subscribe({ + next: async (response) => { + if (response.error || !response.message) { + this.logger.warn( + `Verify contract call event ${response.message?.id} was not successful. Will be retried.`, + response, + ); + + return; + } + + this.logger.debug(`Successfully verified contract call event ${response.message.id}!`); await this.contractCallEventRepository.updateStatus(response.message.id, ContractCallEventStatus.APPROVED); - - return; - } - - // TODO: In case of some errors, should we just mark the message directly as failed? - - this.logger.warn(`Verify contract call event ${response.message?.id} was not successful. Will be retried.`, response); - }, - error: (err) => { - this.logger.error(`Verify stream ended with error...`, err); - - this.verifySubscription?.unsubscribe(); - }, - }); + }, + error: (err) => { + this.logger.error(`Verify stream ended with error... Will restart`, err); + }, + }); } const request = {