From fae1504182f52a21cf1fca7651f33b62d949a841 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Tue, 7 May 2024 17:09:45 -0400 Subject: [PATCH] All tests in one file. --- ....test.ts => message-forwarder-accounts.ts} | 0 .../__tests__/lib/message-forwarder.test.ts | 218 +++++++++++++++++- indexer/services/socks/package.json | 2 +- 3 files changed, 217 insertions(+), 3 deletions(-) rename indexer/services/socks/__tests__/lib/{message-forwarder-accounts.test.ts => message-forwarder-accounts.ts} (100%) diff --git a/indexer/services/socks/__tests__/lib/message-forwarder-accounts.test.ts b/indexer/services/socks/__tests__/lib/message-forwarder-accounts.ts similarity index 100% rename from indexer/services/socks/__tests__/lib/message-forwarder-accounts.test.ts rename to indexer/services/socks/__tests__/lib/message-forwarder-accounts.ts diff --git a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts index 7f10d2d711..dfc888f064 100644 --- a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts +++ b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts @@ -12,6 +12,7 @@ import { kafka, startConsumer, TRADES_WEBSOCKET_MESSAGE_VERSION, + SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, } from '@dydxprotocol-indexer/kafka'; import { MessageForwarder } from '../../src/lib/message-forwarder'; import WebSocket from 'ws'; @@ -26,11 +27,14 @@ import { WebsocketEvents, } from '../../src/types'; import { Admin } from 'kafkajs'; -import { TradeMessage } from '@dydxprotocol-indexer/v4-protos'; +import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; import { dbHelpers, testMocks, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres'; import { btcClobPairId, btcTicker, + defaultChildAccNumber, + defaultChildSubaccountId, + defaultSubaccountId, ethClobPairId, ethTicker, } from '../constants'; @@ -53,6 +57,24 @@ describe('message-forwarder', () => { version: TRADES_WEBSOCKET_MESSAGE_VERSION, }; + const baseSubaccountMessage: SubaccountMessage = { + blockHeight: '2', + transactionIndex: 2, + eventIndex: 2, + contents: '{}', + subaccountId: defaultSubaccountId, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + }; + + const childSubaccountMessage: SubaccountMessage = { + blockHeight: '2', + transactionIndex: 2, + eventIndex: 2, + contents: '{}', + subaccountId: defaultChildSubaccountId, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + }; + const btcTradesMessages: TradeMessage[] = [ { ...baseTradeMessage, @@ -101,7 +123,33 @@ describe('message-forwarder', () => { }, ]; + const subaccountMessages: SubaccountMessage[] = [ + { + ...baseSubaccountMessage, + contents: JSON.stringify({ val: '1' }), + }, + { + ...baseSubaccountMessage, + contents: JSON.stringify({ val: '2' }), + }, + ]; + + const childSubaccountMessages: SubaccountMessage[] = [ + { + ...childSubaccountMessage, + contents: JSON.stringify({ val: '1' }), + }, + { + ...childSubaccountMessage, + contents: JSON.stringify({ val: '2' }), + }, + ]; + const mockAxiosResponse: Object = { a: 'b' }; + const subaccountInitialMessage: Object = { + ...mockAxiosResponse, + orders: mockAxiosResponse, + }; beforeAll(async () => { await dbHelpers.migrate(); @@ -233,6 +281,149 @@ describe('message-forwarder', () => { }); }); + it('Batch sends subaccount messages', (done: jest.DoneCallback) => { + const channel: Channel = Channel.V4_ACCOUNTS; + const id: string = `${defaultSubaccountId.owner}/${defaultSubaccountId.number}`; + + const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index); + subscriptions.start(messageForwarder.forwardToClient); + messageForwarder.start(); + + const ws = new WebSocket(WS_HOST); + let connectionId: string; + + ws.on(WebsocketEvents.MESSAGE, async (message) => { + const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage; + if (msg.message_id === 0) { + connectionId = msg.connection_id; + } + + if (msg.message_id === 1) { + // Check that the initial message is correct. + checkInitialMessage( + msg as SubscribedMessage, + connectionId, + channel, + id, + subaccountInitialMessage, + ); + + // await each message to ensure they are sent in order + for (const subaccountMessage of subaccountMessages) { + await producer.send({ + topic: WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS, + messages: [{ + value: Buffer.from( + Uint8Array.from( + SubaccountMessage.encode(subaccountMessage).finish(), + ), + ), + partition: 0, + timestamp: `${Date.now()}`, + }], + }); + } + } + + if (msg.message_id >= 2) { + const batchMsg: ChannelBatchDataMessage = JSON.parse( + message.toString(), + ) as ChannelBatchDataMessage; + + checkBatchMessage( + batchMsg, + connectionId, + channel, + id, + SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + subaccountMessages, + ); + done(); + } + }); + + ws.on('open', () => { + ws.send(JSON.stringify({ + type: IncomingMessageType.SUBSCRIBE, + channel, + id, + batched: true, + })); + }); + }); + + it('Batch sends subaccount messages to parent subaccount channel', (done: jest.DoneCallback) => { + const channel: Channel = Channel.V4_PARENT_ACCOUNTS; + const id: string = `${defaultSubaccountId.owner}/${defaultSubaccountId.number}`; + + const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index); + subscriptions.start(messageForwarder.forwardToClient); + messageForwarder.start(); + + const ws = new WebSocket(WS_HOST); + let connectionId: string; + + ws.on(WebsocketEvents.MESSAGE, async (message) => { + const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage; + if (msg.message_id === 0) { + connectionId = msg.connection_id; + } + + if (msg.message_id === 1) { + // Check that the initial message is correct. + checkInitialMessage( + msg as SubscribedMessage, + connectionId, + channel, + id, + subaccountInitialMessage, + ); + + // await each message to ensure they are sent in order + for (const subaccountMessage of childSubaccountMessages) { + await producer.send({ + topic: WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS, + messages: [{ + value: Buffer.from( + Uint8Array.from( + SubaccountMessage.encode(subaccountMessage).finish(), + ), + ), + partition: 0, + timestamp: `${Date.now()}`, + }], + }); + } + } + + if (msg.message_id >= 2) { + const batchMsg: ChannelBatchDataMessage = JSON.parse( + message.toString(), + ) as ChannelBatchDataMessage; + + checkBatchMessage( + batchMsg, + connectionId, + channel, + id, + SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + subaccountMessages, + defaultChildAccNumber, + ); + done(); + } + }); + + ws.on('open', () => { + ws.send(JSON.stringify({ + type: IncomingMessageType.SUBSCRIBE, + channel, + id, + batched: true, + })); + }); + }); + it('forwards messages', (done: jest.DoneCallback) => { const channel: Channel = Channel.V4_TRADES; const id: string = ethTicker; @@ -322,6 +513,29 @@ function checkInitialMessage( expect(subscribedMessage.contents).toEqual(initialMessage); } +function checkBatchMessage( + batchMsg: ChannelBatchDataMessage, + connectionId: string, + channel: string, + id: string, + version: string, + expectedMessages: {contents: string}[], + subaccountNumber?: number, +): void { + expect(batchMsg.connection_id).toBe(connectionId); + expect(batchMsg.type).toBe(OutgoingMessageType.CHANNEL_BATCH_DATA); + expect(batchMsg.channel).toBe(channel); + expect(batchMsg.id).toBe(id); + expect(batchMsg.contents.length).toBe(expectedMessages.length); + expect(batchMsg.version).toBe(version); + expect(batchMsg.subaccountNumber).toBe(subaccountNumber); + batchMsg.contents.forEach( + (individualMessage: Object, idx: number) => { + expect(individualMessage).toEqual(JSON.parse(expectedMessages[idx].contents)); + }, + ); +} + function checkVersionedBatchMessage( batchMsg: ChannelBatchDataMessage, connectionId: string, @@ -343,4 +557,4 @@ function checkVersionedBatchMessage( ); } }); -} +} \ No newline at end of file diff --git a/indexer/services/socks/package.json b/indexer/services/socks/package.json index 1e86abea75..9b068a0742 100644 --- a/indexer/services/socks/package.json +++ b/indexer/services/socks/package.json @@ -11,7 +11,7 @@ "coverage": "pnpm test -- --coverage", "lint": "eslint --ext .ts,.js .", "lint:fix": "eslint --ext .ts,.js . --fix", - "test": "NODE_ENV=test jest --runInBand --forceExit" + "test": "NODE_ENV=test jest --maxWorkers 1 --forceExit" }, "author": "", "license": "AGPL-3.0",