From c8a7d5ff2e0a4b012ce4e25f1993d761962c9646 Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Tue, 7 May 2024 01:44:02 -0400 Subject: [PATCH] Add back parent subaccount test. --- .../__tests__/lib/message-forwarder.test.ts | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts index 89b24c91d7..7094bfcd14 100644 --- a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts +++ b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts @@ -330,6 +330,77 @@ describe('message-forwarder', () => { }); }); + it('Batch sends subaccount messages to parent subaccount channel', (done: jest.DoneCallback) => { + const channel: Channel = Channel.V4_PARENT_ACCOUNTS; + const id: string = `${defaultSubaccountId.owner}/${defaultSubaccountId.number}`; + + const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index); + subscriptions.start(messageForwarder.forwardToClient); + messageForwarder.start(); + + const ws = new WebSocket(WS_HOST); + let connectionId: string; + + ws.on(WebsocketEvents.MESSAGE, async (message) => { + const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage; + if (msg.message_id === 0) { + connectionId = msg.connection_id; + } + + if (msg.message_id === 1) { + // Check that the initial message is correct. + checkInitialMessage( + msg as SubscribedMessage, + connectionId, + channel, + id, + subaccountInitialMessage, + ); + + // await each message to ensure they are sent in order + for (const subaccountMessage of subaccountMessages) { + await producer.send({ + topic: WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS, + messages: [{ + value: Buffer.from( + Uint8Array.from( + SubaccountMessage.encode(subaccountMessage).finish(), + ), + ), + partition: 0, + timestamp: `${Date.now()}`, + }], + }); + } + } + + if (msg.message_id >= 2) { + const batchMsg: ChannelBatchDataMessage = JSON.parse( + message.toString(), + ) as ChannelBatchDataMessage; + + checkBatchMessage( + batchMsg, + connectionId, + channel, + id, + SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + subaccountMessages, + ); + done(); + } + }); + + ws.on('open', () => { + ws.send(JSON.stringify({ + type: IncomingMessageType.SUBSCRIBE, + channel, + id, + batched: true, + })); + }); + }); + it('forwards messages', (done: jest.DoneCallback) => { const channel: Channel = Channel.V4_TRADES; const id: string = ethTicker;