Skip to content

Commit

Permalink
Refactoring and fix tests for new grpc verify.
Browse files Browse the repository at this point in the history
  • Loading branch information
raress96 committed Jun 28, 2024
1 parent 67c7399 commit 77ce540
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class ContractCallEventProcessorService {
}

// Offset at second 15 to not run at the same time as processPendingMessageApproved
@Cron('*/15 * * * * *')
@Cron('15 */2 * * * *')
async processPendingContractCallEvent() {
await Locker.lock('processPendingContractCallEvent', async () => {
this.logger.debug('Running processPendingContractCallEvent cron');
Expand All @@ -40,9 +40,11 @@ export class ContractCallEventProcessorService {
continue;
}

contractCallEvent.retry += 1;

this.logger.debug(`Trying to verify ContractCallEvent with id ${contractCallEvent.id}, retry ${contractCallEvent.retry}}`);

await this.contractCallEventRepository.updateRetry(contractCallEvent.id, contractCallEvent.retry + 1);
await this.contractCallEventRepository.updateRetry(contractCallEvent.id, contractCallEvent.retry);

this.grpcService.verify(contractCallEvent);
}
Expand Down
55 changes: 4 additions & 51 deletions apps/mvx-event-processor/src/processors/gateway.processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repos
import { GatewayProcessor } from './gateway.processor';
import { NotifierEvent } from '../event-processor/types';
import { Address } from '@multiversx/sdk-core/out';
import { MessageApproved, MessageApprovedStatus, ContractCallEventStatus } from '@prisma/client';
import { ContractCallEventStatus, MessageApproved, MessageApprovedStatus } from '@prisma/client';
import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service';
import { GatewayContract } from '@mvx-monorepo/common/contracts/gateway.contract';
import { MessageApprovedEvent, ContractCallEvent } from '@mvx-monorepo/common/contracts/entities/gateway-events';
import { ContractCallEvent, MessageApprovedEvent } from '@mvx-monorepo/common/contracts/entities/gateway-events';
import { TransactionEvent } from '@multiversx/sdk-network-providers/out';
import { MessageApprovedRepository } from '@mvx-monorepo/common/database/repository/message-approved.repository';
import { Subject } from 'rxjs';
import { ErrorCode, VerifyResponse } from '@mvx-monorepo/common/grpc/entities/amplifier';

describe('ContractCallProcessor', () => {
let gatewayContract: DeepMocked<GatewayContract>;
Expand Down Expand Up @@ -119,17 +117,13 @@ describe('ContractCallProcessor', () => {
],
};

it('Should handle event success', async () => {
const observable = new Subject<VerifyResponse>();
grpcService.verify.mockReturnValueOnce(observable);

it('Should handle event', async () => {
await service.handleEvent(rawEvent);

expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1);
expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledWith(TransactionEvent.fromHttpResponse(rawEvent));
expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1);
expect(contractCallEventRepository.create).toHaveBeenCalledWith({
id: '0xtxHash-0',
txHash: 'txHash',
eventIndex: 0,
status: ContractCallEventStatus.PENDING,
Expand All @@ -139,50 +133,9 @@ describe('ContractCallProcessor', () => {
destinationChain: 'ethereum',
payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7',
payload: Buffer.from('payload'),
retry: 0,
});
expect(grpcService.verify).toHaveBeenCalledTimes(1);

observable.next({
message: undefined,
error: undefined,
});
observable.complete();

expect(contractCallEventRepository.updateStatus).toHaveBeenCalledTimes(1);
expect(contractCallEventRepository.updateStatus).toHaveBeenCalledWith({
id: 'multiversx_txHash-0',
txHash: 'txHash',
eventIndex: 0,
status: ContractCallEventStatus.APPROVED,
sourceAddress: 'erd1qqqqqqqqqqqqqpgqzqvm5ywqqf524efwrhr039tjs29w0qltkklsa05pk7',
sourceChain: 'multiversx',
destinationAddress: 'destinationAddress',
destinationChain: 'ethereum',
payloadHash: 'ebc84cbd75ba5516bf45e7024a9e12bc3c5c880f73e3a5beca7ebba52b2867a7',
payload: Buffer.from('payload'),
});
});

it('Should handle error success', async () => {
const observable = new Subject<VerifyResponse>();
grpcService.verify.mockReturnValueOnce(observable);

await service.handleEvent(rawEvent);

expect(gatewayContract.decodeContractCallEvent).toHaveBeenCalledTimes(1);
expect(contractCallEventRepository.create).toHaveBeenCalledTimes(1);
expect(grpcService.verify).toHaveBeenCalledTimes(1);

observable.next({
message: undefined,
error: {
error: 'error',
errorCode: ErrorCode.VERIFICATION_FAILED,
},
});
observable.complete();

expect(contractCallEventRepository.updateStatus).not.toHaveBeenCalled();
});

it('Should not handle duplicate', async () => {
Expand Down
4 changes: 0 additions & 4 deletions apps/mvx-event-processor/src/processors/gateway.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ export class GatewayProcessor implements ProcessorInterface {
private async handleContractCallEvent(rawEvent: NotifierEvent) {
const event = this.gatewayContract.decodeContractCallEvent(TransactionEvent.fromHttpResponse(rawEvent));

// The id needs to have `0x` in front of the txHash (hex string)
const id = `0x${rawEvent.txHash}-${UNSUPPORTED_LOG_INDEX}`;
const contractCallEvent = await this.contractCallEventRepository.create({
id,
txHash: rawEvent.txHash,
eventIndex: UNSUPPORTED_LOG_INDEX,
status: ContractCallEventStatus.PENDING,
Expand All @@ -84,7 +81,6 @@ export class GatewayProcessor implements ProcessorInterface {
return;
}

// TODO: Test if this works correctly
this.grpcService.verify(contractCallEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import {
import { ContractCallEvent, ContractCallEventStatus } from '@prisma/client';
import { createMock, DeepMocked } from '@golevelup/ts-jest';
import { GrpcModule, GrpcService } from '@mvx-monorepo/common';
import { Subject } from 'rxjs';
import { ErrorCode, VerifyResponse } from '@mvx-monorepo/common/grpc/entities/amplifier';
import { TestGrpcModule } from './testGrpc.module';

describe('ContractCallEventProcessorService', () => {
Expand Down Expand Up @@ -77,28 +75,17 @@ describe('ContractCallEventProcessorService', () => {
return result;
};

it('Should process pending contract call event success', async () => {
it('Should process pending contract call event', async () => {
const originalEntry = await createContractCallEvent();

const observable = new Subject<VerifyResponse>();
grpcService.verify.mockReturnValueOnce(observable);

// Publish to observable with a delay
setTimeout(() => {
observable.next({
message: undefined,
error: undefined,
});
observable.complete();
}, 500);

try {
await service.processPendingContractCallEvent();
} catch (e) {
// Locker.lock throws error for some reason, ignore
}

expect(await contractCallEventRepository.findPending()).toEqual([]);
expect(grpcService.verify).toHaveBeenCalledTimes(1);

const firstEntry = await prisma.contractCallEvent.findUnique({
where: {
Expand All @@ -107,55 +94,16 @@ describe('ContractCallEventProcessorService', () => {
});
expect(firstEntry).toEqual({
...originalEntry,
status: ContractCallEventStatus.APPROVED,
status: ContractCallEventStatus.PENDING,
retry: 1,
updatedAt: expect.any(Date),
});
});

it('Should process pending contract call event not success', async () => {
const originalEntry = await createContractCallEvent();

const observable = new Subject<VerifyResponse>();
grpcService.verify.mockReturnValueOnce(observable);

// Publish to observable with a delay
setTimeout(() => {
observable.next({
message: undefined,
error: {
error: 'error',
errorCode: ErrorCode.VERIFICATION_FAILED,
},
});
observable.complete();
}, 500);

try {
await service.processPendingContractCallEvent();
} catch (e) {
// Locker.lock throws error for some reason, ignore
}

expect(await contractCallEventRepository.findPending()).toEqual([]);

const firstEntry = await prisma.contractCallEvent.findUnique({
where: {
id: originalEntry.id,
},
it('Should process pending contract call event retry', async () => {
const originalEntry = await createContractCallEvent({
retry: 3,
});
expect(firstEntry).toEqual({
...originalEntry,
status: ContractCallEventStatus.FAILED,
updatedAt: expect.any(Date),
});
});

it('Should process pending contract call event failed', async () => {
const originalEntry = await createContractCallEvent();

const observable = new Subject<VerifyResponse>();
grpcService.verify.mockReturnValueOnce(observable);
observable.complete();

try {
await service.processPendingContractCallEvent();
Expand All @@ -164,6 +112,7 @@ describe('ContractCallEventProcessorService', () => {
}

expect(await contractCallEventRepository.findPending()).toEqual([]);
expect(grpcService.verify).not.toHaveBeenCalled();

const firstEntry = await prisma.contractCallEvent.findUnique({
where: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ import { ContractCallEventWithGasPaid } from '@mvx-monorepo/common/database/enti
export class ContractCallEventRepository {
constructor(private readonly prisma: PrismaService) {}

async create(data: Prisma.ContractCallEventCreateInput): Promise<ContractCallEvent | null> {
async create(data: Omit<Prisma.ContractCallEventCreateInput, 'id'>): Promise<ContractCallEvent | null> {
// The id needs to have `0x` in front of the txHash (hex string)
const id = `0x${data.txHash}-${data.eventIndex}`;

try {
return await this.prisma.contractCallEvent.create({
data,
data: {
id,
...data,
},
});
} catch (e) {
if (e instanceof Prisma.PrismaClientKnownRequestError) {
Expand Down
106 changes: 106 additions & 0 deletions libs/common/src/grpc/grpc.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { ApiConfigService } from '@mvx-monorepo/common';
import { createMock, DeepMocked } from '@golevelup/ts-jest';
import { Test } from '@nestjs/testing';
import { ContractCallEventRepository } from '@mvx-monorepo/common/database/repository/contract-call-event.repository';
import { GrpcService } from '@mvx-monorepo/common/grpc/grpc.service';
import { Observable } from 'rxjs';
import {
Amplifier,
Error,
ErrorCode,
VerifyRequest,
VerifyResponse,
} from '@mvx-monorepo/common/grpc/entities/amplifier';
import { ClientGrpc } from '@nestjs/microservices';
import { ProviderKeys } from '@mvx-monorepo/common/utils/provider.enum';
import { ContractCallEvent, ContractCallEventStatus } from '@prisma/client';

describe('ContractCallProcessor', () => {
let amplifierService: Amplifier;

let client: DeepMocked<ClientGrpc>;
let contractCallEventRepository: DeepMocked<ContractCallEventRepository>;
let apiConfigService: DeepMocked<ApiConfigService>;

let service: GrpcService;

const errorQueue: Error[] = [];

beforeEach(async () => {
// @ts-ignore
amplifierService = {
verify(requestStream: Observable<VerifyRequest>): Observable<VerifyResponse> {
return new Observable<VerifyResponse>((observer) => {
requestStream.subscribe({
next: (request) => {
const item = errorQueue.shift();

// Simulate receiving a response for each request
observer.next({
message: request.message,
error: item,
});
},
error: (err) => observer.error(err),
complete: () => observer.complete(),
});
});
},
};

client = createMock();
contractCallEventRepository = createMock();
apiConfigService = createMock();

client.getService.mockReturnValue(amplifierService);

const moduleRef = await Test.createTestingModule({
providers: [GrpcService],
})
.useMocker((token) => {
if (token === ProviderKeys.AXELAR_GRPC_CLIENT) {
return client;
}

if (token === ContractCallEventRepository) {
return contractCallEventRepository;
}

if (token === ApiConfigService) {
return apiConfigService;
}

return null;
})
.compile();
await moduleRef.init();

service = moduleRef.get(GrpcService);
});

describe('verify', () => {
it('Should handle event success', () => {
const contractCallEvent: DeepMocked<ContractCallEvent> = createMock();
contractCallEvent.id = 'id';

service.verify(contractCallEvent);

expect(contractCallEventRepository.updateStatus).toHaveBeenCalledTimes(1);
expect(contractCallEventRepository.updateStatus).toHaveBeenCalledWith('id', ContractCallEventStatus.APPROVED);
});

it('Should handle event error', () => {
const contractCallEvent: DeepMocked<ContractCallEvent> = createMock();
contractCallEvent.id = 'id';

errorQueue.push({
error: 'some error',
errorCode: ErrorCode.INTERNAL_ERROR,
});

service.verify(contractCallEvent);

expect(contractCallEventRepository.updateStatus).not.toHaveBeenCalled();
});
});
});
Loading

0 comments on commit 77ce540

Please sign in to comment.