Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
All tests in one file.
Browse files Browse the repository at this point in the history
vincentwschau committed May 7, 2024
1 parent 52f125e commit fae1504
Showing 3 changed files with 217 additions and 3 deletions.
218 changes: 216 additions & 2 deletions indexer/services/socks/__tests__/lib/message-forwarder.test.ts
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import {
kafka,
startConsumer,
TRADES_WEBSOCKET_MESSAGE_VERSION,
SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import { MessageForwarder } from '../../src/lib/message-forwarder';
import WebSocket from 'ws';
@@ -26,11 +27,14 @@ import {
WebsocketEvents,
} from '../../src/types';
import { Admin } from 'kafkajs';
import { TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos';
import { dbHelpers, testMocks, perpetualMarketRefresher } from '@dydxprotocol-indexer/postgres';
import {
btcClobPairId,
btcTicker,
defaultChildAccNumber,
defaultChildSubaccountId,
defaultSubaccountId,
ethClobPairId,
ethTicker,
} from '../constants';
@@ -53,6 +57,24 @@ describe('message-forwarder', () => {
version: TRADES_WEBSOCKET_MESSAGE_VERSION,
};

const baseSubaccountMessage: SubaccountMessage = {
blockHeight: '2',
transactionIndex: 2,
eventIndex: 2,
contents: '{}',
subaccountId: defaultSubaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
};

const childSubaccountMessage: SubaccountMessage = {
blockHeight: '2',
transactionIndex: 2,
eventIndex: 2,
contents: '{}',
subaccountId: defaultChildSubaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
};

const btcTradesMessages: TradeMessage[] = [
{
...baseTradeMessage,
@@ -101,7 +123,33 @@ describe('message-forwarder', () => {
},
];

const subaccountMessages: SubaccountMessage[] = [
{
...baseSubaccountMessage,
contents: JSON.stringify({ val: '1' }),
},
{
...baseSubaccountMessage,
contents: JSON.stringify({ val: '2' }),
},
];

const childSubaccountMessages: SubaccountMessage[] = [
{
...childSubaccountMessage,
contents: JSON.stringify({ val: '1' }),
},
{
...childSubaccountMessage,
contents: JSON.stringify({ val: '2' }),
},
];

const mockAxiosResponse: Object = { a: 'b' };
const subaccountInitialMessage: Object = {
...mockAxiosResponse,
orders: mockAxiosResponse,
};

beforeAll(async () => {
await dbHelpers.migrate();
@@ -233,6 +281,149 @@ describe('message-forwarder', () => {
});
});

it('Batch sends subaccount messages', (done: jest.DoneCallback) => {
const channel: Channel = Channel.V4_ACCOUNTS;
const id: string = `${defaultSubaccountId.owner}/${defaultSubaccountId.number}`;

const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index);
subscriptions.start(messageForwarder.forwardToClient);
messageForwarder.start();

const ws = new WebSocket(WS_HOST);
let connectionId: string;

ws.on(WebsocketEvents.MESSAGE, async (message) => {
const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage;
if (msg.message_id === 0) {
connectionId = msg.connection_id;
}

if (msg.message_id === 1) {
// Check that the initial message is correct.
checkInitialMessage(
msg as SubscribedMessage,
connectionId,
channel,
id,
subaccountInitialMessage,
);

// await each message to ensure they are sent in order
for (const subaccountMessage of subaccountMessages) {
await producer.send({
topic: WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: [{
value: Buffer.from(
Uint8Array.from(
SubaccountMessage.encode(subaccountMessage).finish(),
),
),
partition: 0,
timestamp: `${Date.now()}`,
}],
});
}
}

if (msg.message_id >= 2) {
const batchMsg: ChannelBatchDataMessage = JSON.parse(
message.toString(),
) as ChannelBatchDataMessage;

checkBatchMessage(
batchMsg,
connectionId,
channel,
id,
SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
subaccountMessages,
);
done();
}
});

ws.on('open', () => {
ws.send(JSON.stringify({
type: IncomingMessageType.SUBSCRIBE,
channel,
id,
batched: true,
}));
});
});

it('Batch sends subaccount messages to parent subaccount channel', (done: jest.DoneCallback) => {
const channel: Channel = Channel.V4_PARENT_ACCOUNTS;
const id: string = `${defaultSubaccountId.owner}/${defaultSubaccountId.number}`;

const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index);
subscriptions.start(messageForwarder.forwardToClient);
messageForwarder.start();

const ws = new WebSocket(WS_HOST);
let connectionId: string;

ws.on(WebsocketEvents.MESSAGE, async (message) => {
const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage;
if (msg.message_id === 0) {
connectionId = msg.connection_id;
}

if (msg.message_id === 1) {
// Check that the initial message is correct.
checkInitialMessage(
msg as SubscribedMessage,
connectionId,
channel,
id,
subaccountInitialMessage,
);

// await each message to ensure they are sent in order
for (const subaccountMessage of childSubaccountMessages) {
await producer.send({
topic: WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS,
messages: [{
value: Buffer.from(
Uint8Array.from(
SubaccountMessage.encode(subaccountMessage).finish(),
),
),
partition: 0,
timestamp: `${Date.now()}`,
}],
});
}
}

if (msg.message_id >= 2) {
const batchMsg: ChannelBatchDataMessage = JSON.parse(
message.toString(),
) as ChannelBatchDataMessage;

checkBatchMessage(
batchMsg,
connectionId,
channel,
id,
SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
subaccountMessages,
defaultChildAccNumber,
);
done();
}
});

ws.on('open', () => {
ws.send(JSON.stringify({
type: IncomingMessageType.SUBSCRIBE,
channel,
id,
batched: true,
}));
});
});

it('forwards messages', (done: jest.DoneCallback) => {
const channel: Channel = Channel.V4_TRADES;
const id: string = ethTicker;
@@ -322,6 +513,29 @@ function checkInitialMessage(
expect(subscribedMessage.contents).toEqual(initialMessage);
}

function checkBatchMessage(
batchMsg: ChannelBatchDataMessage,
connectionId: string,
channel: string,
id: string,
version: string,
expectedMessages: {contents: string}[],
subaccountNumber?: number,
): void {
expect(batchMsg.connection_id).toBe(connectionId);
expect(batchMsg.type).toBe(OutgoingMessageType.CHANNEL_BATCH_DATA);
expect(batchMsg.channel).toBe(channel);
expect(batchMsg.id).toBe(id);
expect(batchMsg.contents.length).toBe(expectedMessages.length);
expect(batchMsg.version).toBe(version);
expect(batchMsg.subaccountNumber).toBe(subaccountNumber);
batchMsg.contents.forEach(
(individualMessage: Object, idx: number) => {
expect(individualMessage).toEqual(JSON.parse(expectedMessages[idx].contents));
},
);
}

function checkVersionedBatchMessage(
batchMsg: ChannelBatchDataMessage,
connectionId: string,
@@ -343,4 +557,4 @@ function checkVersionedBatchMessage(
);
}
});
}
}
2 changes: 1 addition & 1 deletion indexer/services/socks/package.json
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
"coverage": "pnpm test -- --coverage",
"lint": "eslint --ext .ts,.js .",
"lint:fix": "eslint --ext .ts,.js . --fix",
"test": "NODE_ENV=test jest --runInBand --forceExit"
"test": "NODE_ENV=test jest --maxWorkers 1 --forceExit"
},
"author": "",
"license": "AGPL-3.0",

0 comments on commit fae1504

Please sign in to comment.