From b2a772c6f1584d95c68e00c1bcf75416ba1f0a0d Mon Sep 17 00:00:00 2001 From: Will Liu Date: Mon, 20 May 2024 13:22:43 -0400 Subject: [PATCH] gate with flag --- indexer/packages/kafka/src/constants.ts | 2 +- .../comlink/public/websocket-documentation.md | 24 ++++++------ .../handlers/order-remove-handler.test.ts | 22 +++++++++++ indexer/services/vulcan/src/config.ts | 3 ++ .../src/handlers/order-remove-handler.ts | 38 ++++++++++--------- 5 files changed, 58 insertions(+), 31 deletions(-) diff --git a/indexer/packages/kafka/src/constants.ts b/indexer/packages/kafka/src/constants.ts index 9f28e5d29e..befccddddd 100644 --- a/indexer/packages/kafka/src/constants.ts +++ b/indexer/packages/kafka/src/constants.ts @@ -1,7 +1,7 @@ export const TO_ENDER_TOPIC: string = 'to-ender'; export const ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; -export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '2.4.0'; +export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '2.4.1'; export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0'; export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; diff --git a/indexer/services/comlink/public/websocket-documentation.md b/indexer/services/comlink/public/websocket-documentation.md index 172259c09f..5bd3031d70 100644 --- a/indexer/services/comlink/public/websocket-documentation.md +++ b/indexer/services/comlink/public/websocket-documentation.md @@ -162,27 +162,27 @@ export interface OrderSubaccountMessageContents { id: string; subaccountId: string; clientId: string; - clobPairId: string; - side: OrderSide; - size: string; - ticker: string, - price: string; - type: OrderType; - timeInForce: APITimeInForce; - postOnly: boolean; - reduceOnly: boolean; + clobPairId?: string; + side?: OrderSide; + size?: string; + ticker?: string, + price?: string; + type?: OrderType; + timeInForce?: APITimeInForce; + postOnly?: boolean; + reduceOnly?: boolean; status: APIOrderStatus; orderFlags: string; totalFilled?: string; totalOptimisticFilled?: string; goodTilBlock?: string; goodTilBlockTime?: string; - removalReason?: string; - createdAtHeight?: string; - clientMetadata: string; triggerPrice?: string; updatedAt?: IsoString; updatedAtHeight?: string; + removalReason?: string; + createdAtHeight?: string; + clientMetadata?: string; } export enum OrderSide { diff --git a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts index dcd60987c5..759f5fe91f 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts @@ -74,6 +74,7 @@ import { import { expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers'; import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import Long from 'long'; +import config from '../../src/config'; jest.mock('@dydxprotocol-indexer/base', () => ({ ...jest.requireActual('@dydxprotocol-indexer/base'), @@ -99,6 +100,7 @@ describe('OrderRemoveHandler', () => { await dbHelpers.clearData(); await redis.deleteAllAsync(redisClient); jest.resetAllMocks(); + config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = false; }); afterAll(async () => { @@ -202,7 +204,26 @@ describe('OrderRemoveHandler', () => { }); describe('Order Remove Message - not a Stateful Cancelation', () => { + it('successfully returns early if unable to find order in redis', async () => { + const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); + + const orderRemoveHandler: OrderRemoveHandler = new OrderRemoveHandler(); + await orderRemoveHandler.handleUpdate( + offChainUpdate, + defaultKafkaHeaders, + ); + + expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({ + at: 'orderRemoveHandler#handleOrderRemoval', + message: 'Unable to find order', + orderId: defaultOrderRemove.removedOrderId, + })); + expect(logger.error).not.toHaveBeenCalled(); + expectTimingStats(); + }); + it('successfully sends subaccount websocket message and returns if unable to find order in redis', async () => { + config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = true; const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); @@ -248,6 +269,7 @@ describe('OrderRemoveHandler', () => { it('successfully sends subaccount websocket message with db order fields if unable to find order in redis', async () => { + config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS = true; await OrderTable.create(testConstants.defaultOrder); const offChainUpdate: OffChainUpdateV1 = orderRemoveToOffChainUpdate(defaultOrderRemove); const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis(); diff --git a/indexer/services/vulcan/src/config.ts b/indexer/services/vulcan/src/config.ts index e71f830f76..6c89ab4877 100644 --- a/indexer/services/vulcan/src/config.ts +++ b/indexer/services/vulcan/src/config.ts @@ -37,6 +37,9 @@ export const configSchema = { SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_STATEFUL_ORDERS: parseBoolean({ default: true, }), + SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS: parseBoolean({ + default: false, + }), }; export default parseSchema(configSchema); diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index 945e4df6b7..2e1ebb5de1 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -284,24 +284,26 @@ export class OrderRemoveHandler extends Handler { orderId: orderRemove.removedOrderId, orderRemove, }); - const canceledOrder: OrderFromDatabase | undefined = await runFuncWithTimingStat( - OrderTable.findById(OrderTable.orderIdToUuid(orderRemove.removedOrderId!)), - this.generateTimingStatsOptions('find_order'), - ); - const subaccountMessage: Message = { - value: this.createSubaccountWebsocketMessageFromOrderRemoveMessage( - canceledOrder, - orderRemove, - perpetualMarket.ticker, - ), - headers, - }; - const reason: OrderRemovalReason = orderRemove.reason; - if (!( - reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED || - reason === OrderRemovalReason.ORDER_REMOVAL_REASON_FULLY_FILLED - )) { - sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + if (config.SEND_SUBACCOUNT_WEBSOCKET_MESSAGE_FOR_CANCELS_MISSING_ORDERS) { + const canceledOrder: OrderFromDatabase | undefined = await runFuncWithTimingStat( + OrderTable.findById(OrderTable.orderIdToUuid(orderRemove.removedOrderId!)), + this.generateTimingStatsOptions('find_order'), + ); + const subaccountMessage: Message = { + value: this.createSubaccountWebsocketMessageFromOrderRemoveMessage( + canceledOrder, + orderRemove, + perpetualMarket.ticker, + ), + headers, + }; + const reason: OrderRemovalReason = orderRemove.reason; + if (!( + reason === OrderRemovalReason.ORDER_REMOVAL_REASON_INDEXER_EXPIRED || + reason === OrderRemovalReason.ORDER_REMOVAL_REASON_FULLY_FILLED + )) { + sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); + } } return; }