diff --git a/indexer/packages/kafka/package.json b/indexer/packages/kafka/package.json index 284f08173a..29bfa2b433 100644 --- a/indexer/packages/kafka/package.json +++ b/indexer/packages/kafka/package.json @@ -32,6 +32,8 @@ "homepage": "https://github.com/dydxprotocol/indexer#readme", "dependencies": { "@dydxprotocol-indexer/base": "workspace:^0.0.1", + "@dydxprotocol-indexer/postgres": "workspace:^0.0.1", + "@dydxprotocol-indexer/v4-protos": "workspace:^0.0.1", "dotenv-flow": "^3.2.0", "kafkajs": "^2.1.0", "lodash": "^4.17.21", diff --git a/indexer/packages/kafka/src/index.ts b/indexer/packages/kafka/src/index.ts index 739b3ef582..642bd73c0d 100644 --- a/indexer/packages/kafka/src/index.ts +++ b/indexer/packages/kafka/src/index.ts @@ -5,6 +5,7 @@ export * from './kafka'; export * from './constants'; export * from './types'; export * from './batch-kafka-producer'; +export * from './websocket-helper'; export { kafkaConfigSchema } from './config'; export * from '../__tests__/helpers/kafka'; diff --git a/indexer/packages/kafka/src/websocket-helper.ts b/indexer/packages/kafka/src/websocket-helper.ts new file mode 100644 index 0000000000..cdd296e1c1 --- /dev/null +++ b/indexer/packages/kafka/src/websocket-helper.ts @@ -0,0 +1,101 @@ +import { + APIOrderStatus, + APIOrderStatusEnum, + apiTranslations, + IsoString, + OrderFromDatabase, + OrderTable, + PerpetualMarketFromDatabase, + protocolTranslations, + SubaccountMessageContents, + SubaccountTable, + TimeInForce, +} from '@dydxprotocol-indexer/postgres'; +import { + IndexerOrder, + IndexerOrder_ConditionType, + OrderPlaceV1_OrderPlacementStatus, + RedisOrder, + SubaccountMessage, +} from '@dydxprotocol-indexer/v4-protos'; + +import { SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION } from './constants'; + +/** + * Gets the trigger price for an order, returns undefined if the order has an unspecified condition + * type + * @param order + * @param perpetualMarket + * @returns + */ +export function getTriggerPrice( + order: IndexerOrder, + perpetualMarket: PerpetualMarketFromDatabase, +): string | undefined { + if (order.conditionType !== IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED) { + return protocolTranslations.subticksToPrice( + order.conditionalOrderTriggerSubticks.toString(), + perpetualMarket, + ); + } + return undefined; +} + +export function createSubaccountWebsocketMessage( + redisOrder: RedisOrder, + order: OrderFromDatabase | undefined, + perpetualMarket: PerpetualMarketFromDatabase, + placementStatus: OrderPlaceV1_OrderPlacementStatus, +): Buffer { + const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF( + redisOrder.order!.timeInForce, + ); + const status: APIOrderStatus = ( + placementStatus === OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED + ? APIOrderStatusEnum.OPEN + : APIOrderStatusEnum.BEST_EFFORT_OPENED + ); + const createdAtHeight: string | undefined = order?.createdAtHeight; + const updatedAt: IsoString | undefined = order?.updatedAt; + const updatedAtHeight: string | undefined = order?.updatedAtHeight; + const contents: SubaccountMessageContents = { + orders: [ + { + id: OrderTable.orderIdToUuid(redisOrder.order!.orderId!), + subaccountId: SubaccountTable.subaccountIdToUuid( + redisOrder.order!.orderId!.subaccountId!, + ), + clientId: redisOrder.order!.orderId!.clientId.toString(), + clobPairId: perpetualMarket.clobPairId, + side: protocolTranslations.protocolOrderSideToOrderSide(redisOrder.order!.side), + size: redisOrder.size, + price: redisOrder.price, + status, + type: protocolTranslations.protocolConditionTypeToOrderType( + redisOrder.order!.conditionType, + ), + timeInForce: apiTranslations.orderTIFToAPITIF(orderTIF), + postOnly: apiTranslations.isOrderTIFPostOnly(orderTIF), + reduceOnly: redisOrder.order!.reduceOnly, + orderFlags: redisOrder.order!.orderId!.orderFlags.toString(), + goodTilBlock: protocolTranslations.getGoodTilBlock(redisOrder.order!) + ?.toString(), + goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(redisOrder.order!), + ticker: redisOrder.ticker, + ...(createdAtHeight && { createdAtHeight }), + ...(updatedAt && { updatedAt }), + ...(updatedAtHeight && { updatedAtHeight }), + clientMetadata: redisOrder.order!.clientMetadata.toString(), + triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket), + }, + ], + }; + + const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({ + contents: JSON.stringify(contents), + subaccountId: redisOrder.order!.orderId!.subaccountId!, + version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + }); + + return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish())); +} diff --git a/indexer/pnpm-lock.yaml b/indexer/pnpm-lock.yaml index 83b3739a35..43589f5ac1 100644 --- a/indexer/pnpm-lock.yaml +++ b/indexer/pnpm-lock.yaml @@ -121,6 +121,8 @@ importers: specifiers: '@dydxprotocol-indexer/base': workspace:^0.0.1 '@dydxprotocol-indexer/dev': workspace:^0.0.1 + '@dydxprotocol-indexer/postgres': workspace:^0.0.1 + '@dydxprotocol-indexer/v4-protos': workspace:^0.0.1 '@types/jest': ^28.1.4 '@types/lodash': ^4.14.182 '@types/node': ^18.0.3 @@ -133,6 +135,8 @@ importers: uuid: ^8.3.2 dependencies: '@dydxprotocol-indexer/base': link:../base + '@dydxprotocol-indexer/postgres': link:../postgres + '@dydxprotocol-indexer/v4-protos': link:../v4-protos dotenv-flow: 3.2.0 kafkajs: 2.1.0 lodash: 4.17.21 diff --git a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts index f2406f1861..ed52f67c22 100644 --- a/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts +++ b/indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts @@ -10,6 +10,7 @@ import { ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION, producer, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + getTriggerPrice, } from '@dydxprotocol-indexer/kafka'; import { APIOrderStatus, @@ -55,7 +56,7 @@ import { } from '@dydxprotocol-indexer/v4-protos'; import { KafkaMessage } from 'kafkajs'; import Long from 'long'; -import { convertToRedisOrder, getTriggerPrice } from '../../src/handlers/helpers'; +import { convertToRedisOrder } from '../../src/handlers/helpers'; import { redisClient, redisClient as client } from '../../src/helpers/redis/redis-controller'; import { onMessage } from '../../src/lib/on-message'; import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace } from '../helpers/helpers'; diff --git a/indexer/services/vulcan/src/handlers/helpers.ts b/indexer/services/vulcan/src/handlers/helpers.ts index 156b79119a..ba49631b94 100644 --- a/indexer/services/vulcan/src/handlers/helpers.ts +++ b/indexer/services/vulcan/src/handlers/helpers.ts @@ -1,16 +1,7 @@ -import { - OrderTable, - PerpetualMarketFromDatabase, - protocolTranslations, -} from '@dydxprotocol-indexer/postgres'; -import { subticksToPrice } from '@dydxprotocol-indexer/postgres/build/src/lib/protocol-translations'; +import { OrderTable, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres'; import { StateFilledQuantumsCache } from '@dydxprotocol-indexer/redis'; import { - IndexerOrder, - IndexerOrder_ConditionType, - IndexerOrder_Side, - RedisOrder, - RedisOrder_TickerType, + IndexerOrder, IndexerOrder_Side, RedisOrder, RedisOrder_TickerType, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; @@ -43,26 +34,6 @@ export function convertToRedisOrder( }; } -/** - * Gets the trigger price for an order, returns undefined if the order has an unspecified condition - * type - * @param order - * @param perpetualMarket - * @returns - */ -export function getTriggerPrice( - order: IndexerOrder, - perpetualMarket: PerpetualMarketFromDatabase, -): string | undefined { - if (order.conditionType !== IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED) { - return subticksToPrice( - order.conditionalOrderTriggerSubticks.toString(), - perpetualMarket, - ); - } - return undefined; -} - export function orderSideToOrderbookSide( orderSide: IndexerOrder_Side, ): OrderbookSide { diff --git a/indexer/services/vulcan/src/handlers/order-place-handler.ts b/indexer/services/vulcan/src/handlers/order-place-handler.ts index 43f0c0fe9c..b5fd605b96 100644 --- a/indexer/services/vulcan/src/handlers/order-place-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-place-handler.ts @@ -1,42 +1,33 @@ +import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base'; +import { createSubaccountWebsocketMessage, KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { - logger, - runFuncWithTimingStat, - stats, -} from '@dydxprotocol-indexer/base'; -import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION } from '@dydxprotocol-indexer/kafka'; -import { - APIOrderStatus, - APIOrderStatusEnum, + OrderFromDatabase, OrderTable, PerpetualMarketFromDatabase, - SubaccountMessageContents, - SubaccountTable, - TimeInForce, - apiTranslations, perpetualMarketRefresher, protocolTranslations, - OrderFromDatabase, - IsoString, } from '@dydxprotocol-indexer/postgres'; import { + CanceledOrdersCache, OpenOrdersCache, OrderbookLevelsCache, - PlaceOrderResult, placeOrder, - CanceledOrdersCache, + PlaceOrderResult, StatefulOrderUpdatesCache, } from '@dydxprotocol-indexer/redis'; import { - ORDER_FLAG_SHORT_TERM, getOrderIdHash, isStatefulOrder, requiresImmediateExecution, + getOrderIdHash, + isStatefulOrder, + ORDER_FLAG_SHORT_TERM, + requiresImmediateExecution, } from '@dydxprotocol-indexer/v4-proto-parser'; import { - OffChainUpdateV1, IndexerOrder, + OffChainUpdateV1, OrderPlaceV1, OrderPlaceV1_OrderPlacementStatus, OrderUpdateV1, RedisOrder, - SubaccountMessage, } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import { Message } from 'kafkajs'; @@ -45,7 +36,7 @@ import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; import { sendMessageWrapper } from '../lib/send-message-helper'; import { Handler } from './handler'; -import { convertToRedisOrder, getTriggerPrice } from './helpers'; +import { convertToRedisOrder } from './helpers'; /** * Handler for OrderPlace messages. @@ -148,7 +139,7 @@ export class OrderPlaceHandler extends Handler { await this.sendCachedOrderUpdate(orderUuid); } const subaccountMessage: Message = { - value: this.createSubaccountWebsocketMessage( + value: createSubaccountWebsocketMessage( redisOrder, dbOrder, perpetualMarket, @@ -272,65 +263,6 @@ export class OrderPlaceHandler extends Handler { } } - protected createSubaccountWebsocketMessage( - redisOrder: RedisOrder, - order: OrderFromDatabase | undefined, - perpetualMarket: PerpetualMarketFromDatabase, - placementStatus: OrderPlaceV1_OrderPlacementStatus, - ): Buffer { - const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF( - redisOrder.order!.timeInForce, - ); - const status: APIOrderStatus = ( - placementStatus === OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED - ? APIOrderStatusEnum.OPEN - : APIOrderStatusEnum.BEST_EFFORT_OPENED - ); - const createdAtHeight: string | undefined = order?.createdAtHeight; - const updatedAt: IsoString | undefined = order?.updatedAt; - const updatedAtHeight: string | undefined = order?.updatedAtHeight; - const contents: SubaccountMessageContents = { - orders: [ - { - id: OrderTable.orderIdToUuid(redisOrder.order!.orderId!), - subaccountId: SubaccountTable.subaccountIdToUuid( - redisOrder.order!.orderId!.subaccountId!, - ), - clientId: redisOrder.order!.orderId!.clientId.toString(), - clobPairId: perpetualMarket.clobPairId, - side: protocolTranslations.protocolOrderSideToOrderSide(redisOrder.order!.side), - size: redisOrder.size, - price: redisOrder.price, - status, - type: protocolTranslations.protocolConditionTypeToOrderType( - redisOrder.order!.conditionType, - ), - timeInForce: apiTranslations.orderTIFToAPITIF(orderTIF), - postOnly: apiTranslations.isOrderTIFPostOnly(orderTIF), - reduceOnly: redisOrder.order!.reduceOnly, - orderFlags: redisOrder.order!.orderId!.orderFlags.toString(), - goodTilBlock: protocolTranslations.getGoodTilBlock(redisOrder.order!) - ?.toString(), - goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(redisOrder.order!), - ticker: redisOrder.ticker, - ...(createdAtHeight && { createdAtHeight }), - ...(updatedAt && { updatedAt }), - ...(updatedAtHeight && { updatedAtHeight }), - clientMetadata: redisOrder.order!.clientMetadata.toString(), - triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket), - }, - ], - }; - - const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({ - contents: JSON.stringify(contents), - subaccountId: redisOrder.order!.orderId!.subaccountId!, - version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, - }); - - return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish())); - } - /** * Determine whether to send a subaccount websocket message given the order place. * @param orderPlace @@ -358,7 +290,7 @@ export class OrderPlaceHandler extends Handler { if (placeOrderResult.placed === false && placeOrderResult.replaced === false && placementStatus === - OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED) { + OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED) { return false; } return true; diff --git a/indexer/services/vulcan/src/handlers/order-remove-handler.ts b/indexer/services/vulcan/src/handlers/order-remove-handler.ts index bd33024193..428c572780 100644 --- a/indexer/services/vulcan/src/handlers/order-remove-handler.ts +++ b/indexer/services/vulcan/src/handlers/order-remove-handler.ts @@ -1,5 +1,5 @@ import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base'; -import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION } from '@dydxprotocol-indexer/kafka'; +import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, getTriggerPrice } from '@dydxprotocol-indexer/kafka'; import { BlockTable, BlockFromDatabase, @@ -44,7 +44,7 @@ import config from '../config'; import { redisClient } from '../helpers/redis/redis-controller'; import { sendMessageWrapper } from '../lib/send-message-helper'; import { Handler } from './handler'; -import { getStateRemainingQuantums, getTriggerPrice } from './helpers'; +import { getStateRemainingQuantums } from './helpers'; /** * Handler for OrderRemove messages.