Skip to content

Commit

Permalink
refactor helper methods into kafka package for re-use in Ender (#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill authored Mar 21, 2024
1 parent 33038ff commit 0072986
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 115 deletions.
2 changes: 2 additions & 0 deletions indexer/packages/kafka/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
"homepage": "https://github.com/dydxprotocol/indexer#readme",
"dependencies": {
"@dydxprotocol-indexer/base": "workspace:^0.0.1",
"@dydxprotocol-indexer/postgres": "workspace:^0.0.1",
"@dydxprotocol-indexer/v4-protos": "workspace:^0.0.1",
"dotenv-flow": "^3.2.0",
"kafkajs": "^2.1.0",
"lodash": "^4.17.21",
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/kafka/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './kafka';
export * from './constants';
export * from './types';
export * from './batch-kafka-producer';
export * from './websocket-helper';
export { kafkaConfigSchema } from './config';

export * from '../__tests__/helpers/kafka';
101 changes: 101 additions & 0 deletions indexer/packages/kafka/src/websocket-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import {
APIOrderStatus,
APIOrderStatusEnum,
apiTranslations,
IsoString,
OrderFromDatabase,
OrderTable,
PerpetualMarketFromDatabase,
protocolTranslations,
SubaccountMessageContents,
SubaccountTable,
TimeInForce,
} from '@dydxprotocol-indexer/postgres';
import {
IndexerOrder,
IndexerOrder_ConditionType,
OrderPlaceV1_OrderPlacementStatus,
RedisOrder,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';

import { SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION } from './constants';

/**
* Gets the trigger price for an order, returns undefined if the order has an unspecified condition
* type
* @param order
* @param perpetualMarket
* @returns
*/
export function getTriggerPrice(
order: IndexerOrder,
perpetualMarket: PerpetualMarketFromDatabase,
): string | undefined {
if (order.conditionType !== IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED) {
return protocolTranslations.subticksToPrice(
order.conditionalOrderTriggerSubticks.toString(),
perpetualMarket,
);
}
return undefined;
}

export function createSubaccountWebsocketMessage(
redisOrder: RedisOrder,
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
): Buffer {
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
);
const status: APIOrderStatus = (
placementStatus === OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED
? APIOrderStatusEnum.OPEN
: APIOrderStatusEnum.BEST_EFFORT_OPENED
);
const createdAtHeight: string | undefined = order?.createdAtHeight;
const updatedAt: IsoString | undefined = order?.updatedAt;
const updatedAtHeight: string | undefined = order?.updatedAtHeight;
const contents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(redisOrder.order!.orderId!),
subaccountId: SubaccountTable.subaccountIdToUuid(
redisOrder.order!.orderId!.subaccountId!,
),
clientId: redisOrder.order!.orderId!.clientId.toString(),
clobPairId: perpetualMarket.clobPairId,
side: protocolTranslations.protocolOrderSideToOrderSide(redisOrder.order!.side),
size: redisOrder.size,
price: redisOrder.price,
status,
type: protocolTranslations.protocolConditionTypeToOrderType(
redisOrder.order!.conditionType,
),
timeInForce: apiTranslations.orderTIFToAPITIF(orderTIF),
postOnly: apiTranslations.isOrderTIFPostOnly(orderTIF),
reduceOnly: redisOrder.order!.reduceOnly,
orderFlags: redisOrder.order!.orderId!.orderFlags.toString(),
goodTilBlock: protocolTranslations.getGoodTilBlock(redisOrder.order!)
?.toString(),
goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(redisOrder.order!),
ticker: redisOrder.ticker,
...(createdAtHeight && { createdAtHeight }),
...(updatedAt && { updatedAt }),
...(updatedAtHeight && { updatedAtHeight }),
clientMetadata: redisOrder.order!.clientMetadata.toString(),
triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket),
},
],
};

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(contents),
subaccountId: redisOrder.order!.orderId!.subaccountId!,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish()));
}
4 changes: 4 additions & 0 deletions indexer/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
ORDERBOOKS_WEBSOCKET_MESSAGE_VERSION,
producer,
SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
getTriggerPrice,
} from '@dydxprotocol-indexer/kafka';
import {
APIOrderStatus,
Expand Down Expand Up @@ -55,7 +56,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import { KafkaMessage } from 'kafkajs';
import Long from 'long';
import { convertToRedisOrder, getTriggerPrice } from '../../src/handlers/helpers';
import { convertToRedisOrder } from '../../src/handlers/helpers';
import { redisClient, redisClient as client } from '../../src/helpers/redis/redis-controller';
import { onMessage } from '../../src/lib/on-message';
import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace } from '../helpers/helpers';
Expand Down
33 changes: 2 additions & 31 deletions indexer/services/vulcan/src/handlers/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
import {
OrderTable,
PerpetualMarketFromDatabase,
protocolTranslations,
} from '@dydxprotocol-indexer/postgres';
import { subticksToPrice } from '@dydxprotocol-indexer/postgres/build/src/lib/protocol-translations';
import { OrderTable, PerpetualMarketFromDatabase, protocolTranslations } from '@dydxprotocol-indexer/postgres';
import { StateFilledQuantumsCache } from '@dydxprotocol-indexer/redis';
import {
IndexerOrder,
IndexerOrder_ConditionType,
IndexerOrder_Side,
RedisOrder,
RedisOrder_TickerType,
IndexerOrder, IndexerOrder_Side, RedisOrder, RedisOrder_TickerType,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';

Expand Down Expand Up @@ -43,26 +34,6 @@ export function convertToRedisOrder(
};
}

/**
* Gets the trigger price for an order, returns undefined if the order has an unspecified condition
* type
* @param order
* @param perpetualMarket
* @returns
*/
export function getTriggerPrice(
order: IndexerOrder,
perpetualMarket: PerpetualMarketFromDatabase,
): string | undefined {
if (order.conditionType !== IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED) {
return subticksToPrice(
order.conditionalOrderTriggerSubticks.toString(),
perpetualMarket,
);
}
return undefined;
}

export function orderSideToOrderbookSide(
orderSide: IndexerOrder_Side,
): OrderbookSide {
Expand Down
94 changes: 13 additions & 81 deletions indexer/services/vulcan/src/handlers/order-place-handler.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,33 @@
import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base';
import { createSubaccountWebsocketMessage, KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
logger,
runFuncWithTimingStat,
stats,
} from '@dydxprotocol-indexer/base';
import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION } from '@dydxprotocol-indexer/kafka';
import {
APIOrderStatus,
APIOrderStatusEnum,
OrderFromDatabase,
OrderTable,
PerpetualMarketFromDatabase,
SubaccountMessageContents,
SubaccountTable,
TimeInForce,
apiTranslations,
perpetualMarketRefresher,
protocolTranslations,
OrderFromDatabase,
IsoString,
} from '@dydxprotocol-indexer/postgres';
import {
CanceledOrdersCache,
OpenOrdersCache,
OrderbookLevelsCache,
PlaceOrderResult,
placeOrder,
CanceledOrdersCache,
PlaceOrderResult,
StatefulOrderUpdatesCache,
} from '@dydxprotocol-indexer/redis';
import {
ORDER_FLAG_SHORT_TERM, getOrderIdHash, isStatefulOrder, requiresImmediateExecution,
getOrderIdHash,
isStatefulOrder,
ORDER_FLAG_SHORT_TERM,
requiresImmediateExecution,
} from '@dydxprotocol-indexer/v4-proto-parser';
import {
OffChainUpdateV1,
IndexerOrder,
OffChainUpdateV1,
OrderPlaceV1,
OrderPlaceV1_OrderPlacementStatus,
OrderUpdateV1,
RedisOrder,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { Message } from 'kafkajs';
Expand All @@ -45,7 +36,7 @@ import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
import { sendMessageWrapper } from '../lib/send-message-helper';
import { Handler } from './handler';
import { convertToRedisOrder, getTriggerPrice } from './helpers';
import { convertToRedisOrder } from './helpers';

/**
* Handler for OrderPlace messages.
Expand Down Expand Up @@ -148,7 +139,7 @@ export class OrderPlaceHandler extends Handler {
await this.sendCachedOrderUpdate(orderUuid);
}
const subaccountMessage: Message = {
value: this.createSubaccountWebsocketMessage(
value: createSubaccountWebsocketMessage(
redisOrder,
dbOrder,
perpetualMarket,
Expand Down Expand Up @@ -272,65 +263,6 @@ export class OrderPlaceHandler extends Handler {
}
}

protected createSubaccountWebsocketMessage(
redisOrder: RedisOrder,
order: OrderFromDatabase | undefined,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: OrderPlaceV1_OrderPlacementStatus,
): Buffer {
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
);
const status: APIOrderStatus = (
placementStatus === OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_OPENED
? APIOrderStatusEnum.OPEN
: APIOrderStatusEnum.BEST_EFFORT_OPENED
);
const createdAtHeight: string | undefined = order?.createdAtHeight;
const updatedAt: IsoString | undefined = order?.updatedAt;
const updatedAtHeight: string | undefined = order?.updatedAtHeight;
const contents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(redisOrder.order!.orderId!),
subaccountId: SubaccountTable.subaccountIdToUuid(
redisOrder.order!.orderId!.subaccountId!,
),
clientId: redisOrder.order!.orderId!.clientId.toString(),
clobPairId: perpetualMarket.clobPairId,
side: protocolTranslations.protocolOrderSideToOrderSide(redisOrder.order!.side),
size: redisOrder.size,
price: redisOrder.price,
status,
type: protocolTranslations.protocolConditionTypeToOrderType(
redisOrder.order!.conditionType,
),
timeInForce: apiTranslations.orderTIFToAPITIF(orderTIF),
postOnly: apiTranslations.isOrderTIFPostOnly(orderTIF),
reduceOnly: redisOrder.order!.reduceOnly,
orderFlags: redisOrder.order!.orderId!.orderFlags.toString(),
goodTilBlock: protocolTranslations.getGoodTilBlock(redisOrder.order!)
?.toString(),
goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(redisOrder.order!),
ticker: redisOrder.ticker,
...(createdAtHeight && { createdAtHeight }),
...(updatedAt && { updatedAt }),
...(updatedAtHeight && { updatedAtHeight }),
clientMetadata: redisOrder.order!.clientMetadata.toString(),
triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket),
},
],
};

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(contents),
subaccountId: redisOrder.order!.orderId!.subaccountId!,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish()));
}

/**
* Determine whether to send a subaccount websocket message given the order place.
* @param orderPlace
Expand Down Expand Up @@ -358,7 +290,7 @@ export class OrderPlaceHandler extends Handler {
if (placeOrderResult.placed === false &&
placeOrderResult.replaced === false &&
placementStatus ===
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED) {
OrderPlaceV1_OrderPlacementStatus.ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED) {
return false;
}
return true;
Expand Down
4 changes: 2 additions & 2 deletions indexer/services/vulcan/src/handlers/order-remove-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base';
import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION } from '@dydxprotocol-indexer/kafka';
import { KafkaTopics, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, getTriggerPrice } from '@dydxprotocol-indexer/kafka';
import {
BlockTable,
BlockFromDatabase,
Expand Down Expand Up @@ -44,7 +44,7 @@ import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
import { sendMessageWrapper } from '../lib/send-message-helper';
import { Handler } from './handler';
import { getStateRemainingQuantums, getTriggerPrice } from './helpers';
import { getStateRemainingQuantums } from './helpers';

/**
* Handler for OrderRemove messages.
Expand Down

0 comments on commit 0072986

Please sign in to comment.