Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adam/ct 948 add functionality to auxo to create new kafka topics #1756

Closed
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b9f6cca
Add block height message to BlockProposer
adamfraser Jun 13, 2024
f7481b2
Add ability for socks to forward block height messages
adamfraser Jun 17, 2024
fd27c82
Add block-height topic to docker config
adamfraser Jun 18, 2024
ce5dfcf
Allow subscribe and unsubscribe to block height without id
adamfraser Jun 18, 2024
f6c19e3
Fix incorrect block height url
adamfraser Jun 18, 2024
62acfd9
Use BlockHeightMessage type
adamfraser Jun 18, 2024
a7b2538
Correctly format block height message in kafka helper
adamfraser Jun 19, 2024
2251142
Remove unused import
adamfraser Jun 19, 2024
914c51d
Modify workflow to push to dev4 from this branch
adamfraser Jun 19, 2024
4fe5a8d
Add kafka publisher tests
adamfraser Jun 19, 2024
a3a840d
Handle block height id is same manner as markets
adamfraser Jun 19, 2024
eb64d70
Revert "Modify workflow to push to dev4 from this branch"
adamfraser Jun 19, 2024
1bcc69d
Rename V4_BLOCK_HEIGHT constant for consistency
adamfraser Jun 20, 2024
f224ef1
Add example to documentation
adamfraser Jun 20, 2024
ec3c1c0
Add block height websocket topic to docker compose local
adamfraser Jun 20, 2024
23a5e41
Rename & add some more tags to stats (#1715)
dydxwill Jun 18, 2024
c1d4f76
chore: `x/subaccounts` move helper functions to to `lib` package (#1711)
BrendanChou Jun 18, 2024
7f9dec0
[OTE-419] Add GB_GEO to compliance reasons (#1697)
Christopher-Li Jun 18, 2024
1e2a44a
[OTE-379] Move Indexer SendOnchainData from Endblocker to Precommitte…
teddyding Jun 18, 2024
34f6c3d
Optimize GetSettlementPpmWithPerpetual, Remove GetSettlementPpm (#1722)
BrendanChou Jun 18, 2024
41d32e3
chore: remove `BigInt0()` & `BigFloat0()` (#1724)
BrendanChou Jun 18, 2024
7eb0f44
[OTE-438] Allow users to read data if they are in FIRST_STRIKE_CLOSE_…
Christopher-Li Jun 18, 2024
473e615
[TRA-354] Add a hard cap to the number of markets listed for PML (#1644)
shrenujb Jun 18, 2024
707cd90
[TRA-414] Add x/revshare module skeleton (#1719)
shrenujb Jun 18, 2024
edc419c
optimize perpInfos (#1723)
BrendanChou Jun 19, 2024
1aaef3a
[OTE-420]: Deprecate ONBOARD from Indexer (#1728)
Christopher-Li Jun 19, 2024
dfbc8a8
[CT-856] Add order replacement to fix vault causing orderbook flicker…
chenyaoy Jun 19, 2024
def4635
Send order remove subaccount message on order replace (#1735)
chenyaoy Jun 20, 2024
8d78baa
chore: add `margin.Risk` type, make margining functions stateless (#1…
BrendanChou Jun 20, 2024
a4cf38d
[TRA-415] implement market mapper rev share gov msg (#1733)
shrenujb Jun 20, 2024
8b6c3d8
[CT-946] only send snapshots to uninitialized streams (#1738)
jayy04 Jun 20, 2024
a2777f3
Optimize `ContainsDuplicates` (#1637)
BrendanChou Jun 21, 2024
a3dea6f
Cherry-pick "Handle invalid price level updates gracefully. (#1747)" …
vincentwschau Jun 21, 2024
59cb777
[TRA-433] Add MarketMapperRevShareDetails state and associated functi…
shrenujb Jun 21, 2024
b49bf48
[TRA-442] Add query for market mapper revenue share params (#1746)
shrenujb Jun 21, 2024
d357ac7
[CT-941] introduce state methods for managing the safety heap (#1731)
jayy04 Jun 22, 2024
b97b568
add MapToSortedSlice function (#1753)
BrendanChou Jun 22, 2024
0228ca8
Perf: add an software override (1.5sec) for `TimeoutPropose` (#1751)
teddyding Jun 24, 2024
a59b27e
Fix logger using incorrect function
adamfraser Jun 24, 2024
c100f2d
Add functionality to create new kafka topics
adamfraser Jun 24, 2024
09a18d4
Deploy to dev5
adamfraser Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Send order remove subaccount message on order replace (#1735)
chenyaoy authored and adamfraser committed Jun 24, 2024
commit def4635f628534dcedc7c3123c7f5770a028af08
Original file line number Diff line number Diff line change
@@ -1188,7 +1188,7 @@ function expectWebsocketMessagesSent(

expectWebsocketSubaccountMessage(
producerSendSpy.mock.calls[callIndex][0],
subaccountMessage,
[subaccountMessage],
defaultKafkaHeaders,
);
callIndex += 1;
Original file line number Diff line number Diff line change
@@ -2221,7 +2221,7 @@ describe('OrderRemoveHandler', () => {
const subaccountProducerRecord: ProducerRecord = producerSendSpy.mock.calls[0][0];
expectWebsocketSubaccountMessage(
subaccountProducerRecord,
expectedSubaccountMessage,
[expectedSubaccountMessage],
defaultKafkaHeaders,
);
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import {
dbHelpers,
OrderbookMessageContents,
OrderFromDatabase,
OrderStatus,
OrderTable,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
@@ -52,12 +53,13 @@ import {
RedisOrder,
SubaccountId,
SubaccountMessage,
OrderRemovalReason,
} from '@dydxprotocol-indexer/v4-protos';
import { KafkaMessage } from 'kafkajs';
import { redisClient, redisClient as client } from '../../src/helpers/redis/redis-controller';
import { onMessage } from '../../src/lib/on-message';
import { expectCanceledOrderStatus, handleInitialOrderPlace } from '../helpers/helpers';
import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import { defaultKafkaHeaders } from '../helpers/constants';
import config from '../../src/config';
@@ -68,11 +70,6 @@ jest.mock('@dydxprotocol-indexer/base', () => ({
wrapBackgroundTask: jest.fn(),
}));

interface OffchainUpdateRecord {
key: Buffer,
offchainUpdate: OffChainUpdateV1
}

describe('order-replace-handler', () => {
beforeAll(async () => {
await BlockTable.create(testConstants.defaultBlock);
@@ -96,16 +93,15 @@ describe('order-replace-handler', () => {
...replacementOrderGoodTilBlockTime,
subticks: replacementOrderGoodTilBlockTime.subticks.mul(2),
};

const replacedOrder: RedisOrder = redisPackage.convertToRedisOrder(
const replacementRedisOrder: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrder,
testConstants.defaultPerpetualMarket,
);
const replacedOrderGoodTilBlockTime: RedisOrder = redisPackage.convertToRedisOrder(
const replacementRedisOrderGoodTilBlockTime: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrderGoodTilBlockTime,
testConstants.defaultPerpetualMarket,
);
const replacedOrderDifferentPrice: RedisOrder = redisPackage.convertToRedisOrder(
const replacementRedisOrderDifferentPrice: RedisOrder = redisPackage.convertToRedisOrder(
replacementOrderDifferentPrice,
testConstants.defaultPerpetualMarket,
);
@@ -185,6 +181,7 @@ describe('order-replace-handler', () => {
jest.spyOn(CanceledOrdersCache, 'removeOrderFromCaches');
jest.spyOn(stats, 'increment');
jest.spyOn(redisPackage, 'placeOrder');
jest.spyOn(redisPackage, 'removeOrder');
jest.spyOn(logger, 'error');
jest.spyOn(logger, 'info');
});
@@ -207,7 +204,7 @@ describe('order-replace-handler', () => {
dbDefaultOrder,
redisTestConstants.defaultOrderUuid,
redisTestConstants.defaultReplacementOrderUuid,
replacedOrder,
replacementRedisOrder,
true,
false,
],
@@ -219,7 +216,7 @@ describe('order-replace-handler', () => {
dbOrderGoodTilBlockTime,
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.defaultReplacementOrderUuidGTBT,
replacedOrderGoodTilBlockTime,
replacementRedisOrderGoodTilBlockTime,
false,
false,
],
@@ -231,7 +228,7 @@ describe('order-replace-handler', () => {
dbDefaultOrder,
redisTestConstants.defaultOrderUuid,
redisTestConstants.defaultReplacementOrderUuid,
replacedOrder,
replacementRedisOrder,
true,
true,
],
@@ -243,7 +240,7 @@ describe('order-replace-handler', () => {
dbOrderGoodTilBlockTime,
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.defaultReplacementOrderUuidGTBT,
replacedOrderGoodTilBlockTime,
replacementRedisOrderGoodTilBlockTime,
false,
true,
],
@@ -255,7 +252,7 @@ describe('order-replace-handler', () => {
dbOrder: OrderFromDatabase,
expectedOldOrderUuid: string,
expectedNewOrderUuid: string,
expectedReplacedOrder: RedisOrder,
expectedReplacementOrder: RedisOrder,
expectSubaccountMessage: boolean,
hasCanceledOrderId: boolean,
) => {
@@ -286,7 +283,6 @@ describe('order-replace-handler', () => {
expectSubaccountMessage,
);
expectStats();
// clear mocks
jest.clearAllMocks();

// Handle the order replacement off-chain update with the replacement order
@@ -296,22 +292,29 @@ describe('order-replace-handler', () => {
expectedOldOrderUuid,
expectedNewOrderUuid,
redisTestConstants.defaultSubaccountUuid,
expectedReplacedOrder,
expectedReplacementOrder,
);
expect(OrderbookLevelsCache.updatePriceLevel).not.toHaveBeenCalled();
if (hasCanceledOrderId) {
expect(CanceledOrdersCache.removeOrderFromCaches).toHaveBeenCalled();
}
await expectCanceledOrderStatus(expectedOldOrderUuid, CanceledOrderStatus.NOT_CANCELED);
await expectCanceledOrderStatus(expectedOldOrderUuid, CanceledOrderStatus.CANCELED);
await expectCanceledOrderStatus(expectedNewOrderUuid, CanceledOrderStatus.NOT_CANCELED);

expect(logger.error).not.toHaveBeenCalled();
const initialRedisOrder = redisPackage.convertToRedisOrder(
initialOrderToPlace,
testConstants.defaultPerpetualMarket,
);
expectWebsocketMessagesSent(
producerSendSpy,
expectedReplacedOrder,
expectedReplacementOrder,
dbOrder,
testConstants.defaultPerpetualMarket,
APIOrderStatusEnum.BEST_EFFORT_OPENED,
expectSubaccountMessage,
initialRedisOrder,
0,
);
expectStats(true);
});
@@ -325,7 +328,7 @@ describe('order-replace-handler', () => {
dbDefaultOrder,
redisTestConstants.defaultOrderUuid,
redisTestConstants.defaultReplacementOrderUuid,
replacedOrder,
replacementRedisOrder,
true,
true,
false,
@@ -338,7 +341,7 @@ describe('order-replace-handler', () => {
dbOrderGoodTilBlockTime,
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.defaultReplacementOrderUuidGTBT,
replacedOrderGoodTilBlockTime,
replacementRedisOrderGoodTilBlockTime,
false,
true,
false,
@@ -351,7 +354,7 @@ describe('order-replace-handler', () => {
dbOrderGoodTilBlockTime,
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.defaultReplacementOrderUuidGTBT,
replacedOrderDifferentPrice,
replacementRedisOrderDifferentPrice,
false,
true,
true,
@@ -364,7 +367,7 @@ describe('order-replace-handler', () => {
dbOrder: OrderFromDatabase,
expectedOldOrderUuid: string,
expectedNewOrderUuid: string,
expectedReplacedOrder: RedisOrder,
expectedReplacementOrder: RedisOrder,
expectSubaccountMessage: boolean,
expectOrderBookUpdate: boolean,
expectOrderBookMessage: boolean,
@@ -405,7 +408,6 @@ describe('order-replace-handler', () => {
APIOrderStatusEnum.BEST_EFFORT_OPENED,
expectSubaccountMessage,
);
// clear mocks
jest.clearAllMocks();

// Update the order to set it to be resting on the book
@@ -433,8 +435,9 @@ describe('order-replace-handler', () => {
expectedOldOrderUuid,
expectedNewOrderUuid,
redisTestConstants.defaultSubaccountUuid,
expectedReplacedOrder,
expectedReplacementOrder,
);
expect(OrderbookLevelsCache.updatePriceLevel).toHaveBeenCalled();
const orderbook: OrderbookLevels = await OrderbookLevelsCache.getOrderBookLevels(
testConstants.defaultPerpetualMarket.ticker,
client,
@@ -448,6 +451,10 @@ describe('order-replace-handler', () => {
}

expect(logger.error).not.toHaveBeenCalled();
const initialRedisOrder = redisPackage.convertToRedisOrder(
initialOrderToPlace,
testConstants.defaultPerpetualMarket,
);
const orderbookContents: OrderbookMessageContents = {
[OrderbookSide.BIDS]: [[
redisTestConstants.defaultPrice,
@@ -456,11 +463,13 @@ describe('order-replace-handler', () => {
};
expectWebsocketMessagesSent(
producerSendSpy,
expectedReplacedOrder,
expectedReplacementOrder,
dbOrder,
testConstants.defaultPerpetualMarket,
APIOrderStatusEnum.BEST_EFFORT_OPENED,
expectSubaccountMessage,
initialRedisOrder,
oldOrderTotalFilled,
expectOrderBookMessage
? OrderbookMessage.fromPartial({
contents: JSON.stringify(orderbookContents),
@@ -480,7 +489,7 @@ describe('order-replace-handler', () => {
dbDefaultOrder,
redisTestConstants.defaultOrderUuid,
redisTestConstants.defaultReplacementOrderUuid,
replacedOrder,
replacementRedisOrder,
true,
],
[
@@ -491,7 +500,7 @@ describe('order-replace-handler', () => {
dbOrderGoodTilBlockTime,
redisTestConstants.defaultOrderUuidGoodTilBlockTime,
redisTestConstants.defaultReplacementOrderUuidGTBT,
replacedOrderGoodTilBlockTime,
replacementRedisOrderGoodTilBlockTime,
false,
],
])('handles order replacement (with %s), resting on book, 0 remaining quantums',
@@ -503,7 +512,7 @@ describe('order-replace-handler', () => {
dbOrder: OrderFromDatabase,
expectedOldOrderUuid: string,
expectedNewOrderUuid: string,
expectedReplacedOrder: RedisOrder,
expectedReplacementOrder: RedisOrder,
expectSubaccountMessage: boolean,
) => {
synchronizeWrapBackgroundTask(wrapBackgroundTask);
@@ -526,7 +535,6 @@ describe('order-replace-handler', () => {
expectSubaccountMessage,
);
expectStats();
// clear mocks
jest.clearAllMocks();

// Update the order to set it to be resting on the book
@@ -542,23 +550,45 @@ describe('order-replace-handler', () => {
expectedOldOrderUuid,
expectedNewOrderUuid,
redisTestConstants.defaultSubaccountUuid,
expectedReplacedOrder,
expectedReplacementOrder,
);
// expect(OrderbookLevelsCache.updatePriceLevel).not.toHaveBeenCalled();
expect(OrderbookLevelsCache.updatePriceLevel).toHaveBeenCalled();

expect(logger.error).not.toHaveBeenCalled();
const initialRedisOrder = redisPackage.convertToRedisOrder(
initialOrderToPlace,
testConstants.defaultPerpetualMarket,
);
expectWebsocketMessagesSent(
producerSendSpy,
expectedReplacedOrder,
expectedReplacementOrder,
dbOrder,
testConstants.defaultPerpetualMarket,
APIOrderStatusEnum.BEST_EFFORT_OPENED,
expectSubaccountMessage,
initialRedisOrder,
Number(initialOrderToPlace.quantums),
);
expectStats(true);
},
);

it('replaces order successfully where old order does not exist', async () => {
synchronizeWrapBackgroundTask(wrapBackgroundTask);
const producerSendSpy: jest.SpyInstance = jest.spyOn(producer, 'send').mockReturnThis();
await onMessage(replacementMessage);

expect(redisPackage.removeOrder).toHaveBeenCalled();
expect(redisPackage.placeOrder).toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith(expect.objectContaining({
at: 'OrderReplaceHandler#handle',
message: 'Old order not found in cache',
oldOrderId: redisTestConstants.defaultOrderId,
}));
expect(OrderbookLevelsCache.updatePriceLevel).not.toHaveBeenCalled();
expectWebsocketMessagesNotSent(producerSendSpy);
});

it.each([
[
'missing order',
@@ -675,6 +705,7 @@ async function checkOrderReplace(
placedSubaccountId: string,
expectedOrder: RedisOrder,
): Promise<void> {
expect(redisPackage.removeOrder).toHaveBeenCalled();
const oldRedisOrder: RedisOrder | null = await OrdersCache.getOrder(oldOrderId, client);
expect(oldRedisOrder).toBeNull();

@@ -684,6 +715,7 @@ async function checkOrderReplace(
client,
);

expect(redisPackage.placeOrder).toHaveBeenCalled();
expect(newRedisOrder).toEqual(expectedOrder);
expect(orderIdsForSubaccount).toEqual([placedOrderId]);
}
@@ -708,88 +740,137 @@ function expectStats(orderWasReplaced: boolean = false): void {

function expectWebsocketMessagesSent(
producerSendSpy: jest.SpyInstance,
redisOrder: RedisOrder,
replacementRedisOrder: RedisOrder,
dbOrder: OrderFromDatabase,
perpetualMarket: PerpetualMarketFromDatabase,
placementStatus: APIOrderStatus,
expectSubaccountMessage: boolean,
expectPlaceSubaccountMessage: boolean,
oldRedisOrder?: RedisOrder,
oldOrderTotalFilled?: number,
expectedOrderbookMessage?: OrderbookMessage,
expectedOffchainUpdate?: OffchainUpdateRecord,
): void {
jest.runOnlyPendingTimers();
// expect one subaccount update message being sent
// expect subaccount message for removing order to be sent
let numMessages: number = 0;
if (expectSubaccountMessage) {
if (oldRedisOrder !== undefined || expectPlaceSubaccountMessage) {
numMessages += 1;
}
if (expectedOrderbookMessage !== undefined) {
numMessages += 1;
}
if (expectedOffchainUpdate !== undefined) {
numMessages += 1;
}

expect(producerSendSpy).toHaveBeenCalledTimes(numMessages);

const expectedSubaccountMessages: SubaccountMessage[] = [];
let callIndex: number = 0;

if (expectedOffchainUpdate) {
expectOffchainUpdateMessage(
producerSendSpy.mock.calls[callIndex][0],
expectedOffchainUpdate.key,
expectedOffchainUpdate.offchainUpdate,
if (oldRedisOrder !== undefined) {
const initialOrderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
oldRedisOrder.order!.timeInForce,
);
callIndex += 1;
const isStateful: boolean = isStatefulOrder(oldRedisOrder.order!.orderId!.orderFlags);

const subaccountRemoveOrderContents: SubaccountMessageContents = {
orders: [{
id: OrderTable.orderIdToUuid(oldRedisOrder.order!.orderId!),
subaccountId: SubaccountTable.subaccountIdToUuid(
oldRedisOrder.order!.orderId!.subaccountId!,
),
clientId: oldRedisOrder.order!.orderId!.clientId.toString(),
clobPairId: testConstants.defaultOrderGoodTilBlockTime.clobPairId,
side: protocolTranslations.protocolOrderSideToOrderSide(oldRedisOrder.order!.side),
size: oldRedisOrder.size,
totalOptimisticFilled: protocolTranslations.quantumsToHumanFixedString(
oldOrderTotalFilled!.toString(),
perpetualMarket.atomicResolution,
),
price: oldRedisOrder.price,
type: protocolTranslations.protocolConditionTypeToOrderType(
oldRedisOrder.order!.conditionType,
),
status: OrderStatus.CANCELED,
timeInForce: apiTranslations.orderTIFToAPITIF(initialOrderTIF),
postOnly: apiTranslations.isOrderTIFPostOnly(initialOrderTIF),
reduceOnly: oldRedisOrder.order!.reduceOnly,
orderFlags: oldRedisOrder.order!.orderId!.orderFlags.toString(),
...(isStateful && {
goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(oldRedisOrder.order!),
}),
...(!isStateful && {
goodTilBlock: protocolTranslations.getGoodTilBlock(oldRedisOrder.order!)!.toString(),
}),
ticker: oldRedisOrder.ticker,
removalReason: OrderRemovalReason[OrderRemovalReason.ORDER_REMOVAL_REASON_USER_CANCELED],
createdAtHeight: dbOrder.createdAtHeight,
updatedAt: dbOrder.updatedAt,
updatedAtHeight: dbOrder.updatedAtHeight,
clientMetadata: oldRedisOrder!.order!.clientMetadata.toString(),
triggerPrice: getTriggerPrice(oldRedisOrder.order!, perpetualMarket),
}],
blockHeight: blockHeightRefresher.getLatestBlockHeight(),
};

const orderRemoveSubaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(subaccountRemoveOrderContents),
subaccountId: redisTestConstants.defaultSubaccountId,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});
expectedSubaccountMessages.push(orderRemoveSubaccountMessage);
}

if (expectSubaccountMessage) {
if (expectPlaceSubaccountMessage) {
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
replacementRedisOrder.order!.timeInForce,
);
const isStateful: boolean = isStatefulOrder(redisOrder.order!.orderId!.orderFlags);
const isStateful: boolean = isStatefulOrder(replacementRedisOrder.order!.orderId!.orderFlags);
const contents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(
redisOrder.order!.orderId!,
replacementRedisOrder.order!.orderId!,
),
subaccountId: SubaccountTable.subaccountIdToUuid(
redisOrder.order!.orderId!.subaccountId!,
replacementRedisOrder.order!.orderId!.subaccountId!,
),
clientId: redisOrder.order!.orderId!.clientId.toString(),
clientId: replacementRedisOrder.order!.orderId!.clientId.toString(),
clobPairId: perpetualMarket.clobPairId,
side: protocolTranslations.protocolOrderSideToOrderSide(redisOrder.order!.side),
size: redisOrder.size,
price: redisOrder.price,
side: protocolTranslations.protocolOrderSideToOrderSide(
replacementRedisOrder.order!.side,
),
size: replacementRedisOrder.size,
price: replacementRedisOrder.price,
status: placementStatus,
type: protocolTranslations.protocolConditionTypeToOrderType(
redisOrder.order!.conditionType,
replacementRedisOrder.order!.conditionType,
),
timeInForce: apiTranslations.orderTIFToAPITIF(orderTIF),
postOnly: apiTranslations.isOrderTIFPostOnly(orderTIF),
reduceOnly: redisOrder.order!.reduceOnly,
orderFlags: redisOrder.order!.orderId!.orderFlags.toString(),
goodTilBlock: protocolTranslations.getGoodTilBlock(redisOrder.order!)
reduceOnly: replacementRedisOrder.order!.reduceOnly,
orderFlags: replacementRedisOrder.order!.orderId!.orderFlags.toString(),
goodTilBlock: protocolTranslations.getGoodTilBlock(replacementRedisOrder.order!)
?.toString(),
goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(redisOrder.order!),
ticker: redisOrder.ticker,
goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(replacementRedisOrder.order!),
ticker: replacementRedisOrder.ticker,
...(isStateful && { createdAtHeight: dbOrder.createdAtHeight }),
...(isStateful && { updatedAt: dbOrder.updatedAt }),
...(isStateful && { updatedAtHeight: dbOrder.updatedAtHeight }),
clientMetadata: redisOrder.order!.clientMetadata.toString(),
triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket),
clientMetadata: replacementRedisOrder.order!.clientMetadata.toString(),
triggerPrice: getTriggerPrice(replacementRedisOrder.order!, perpetualMarket),
},
],
blockHeight: blockHeightRefresher.getLatestBlockHeight(),
};
const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
const orderPlaceSubaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(contents),
subaccountId: SubaccountId.fromPartial(redisOrder.order!.orderId!.subaccountId!),
subaccountId: SubaccountId.fromPartial(replacementRedisOrder.order!.orderId!.subaccountId!),
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});
expectedSubaccountMessages.push(orderPlaceSubaccountMessage);
}

if (expectedSubaccountMessages.length > 0) {
expectWebsocketSubaccountMessage(
producerSendSpy.mock.calls[callIndex][0],
subaccountMessage,
expectedSubaccountMessages,
defaultKafkaHeaders,
);
callIndex += 1;
23 changes: 13 additions & 10 deletions indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts
Original file line number Diff line number Diff line change
@@ -4,19 +4,22 @@ import { IHeaders, ProducerRecord } from 'kafkajs';

export function expectWebsocketSubaccountMessage(
subaccountProducerRecord: ProducerRecord,
expectedSubaccountMessage: SubaccountMessage,
expectedSubaccountMessages: Array<SubaccountMessage>,
expectedHeaders: IHeaders,
): void {
expect(subaccountProducerRecord.topic).toEqual(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
const subaccountMessageValueBinary: Uint8Array = new Uint8Array(
subaccountProducerRecord.messages[0].value as Buffer,
);
const headers: IHeaders | undefined = subaccountProducerRecord.messages[0].headers;
const subaccountMessage: SubaccountMessage = SubaccountMessage.decode(
subaccountMessageValueBinary,
);
expect(headers).toEqual(expectedHeaders);
expect(subaccountMessage).toEqual(expectedSubaccountMessage);
for (let i = 0; i < subaccountProducerRecord.messages.length; i++) {
const subaccountProducerMessage = subaccountProducerRecord.messages[i];
const subaccountMessageValueBinary: Uint8Array = new Uint8Array(
subaccountProducerMessage.value as Buffer,
);
const headers: IHeaders | undefined = subaccountProducerMessage.headers;
const subaccountMessage: SubaccountMessage = SubaccountMessage.decode(
subaccountMessageValueBinary,
);
expect(headers).toEqual(expectedHeaders);
expect(subaccountMessage).toEqual(expectedSubaccountMessages[i]);
}
}

export function expectWebsocketOrderbookMessage(
205 changes: 201 additions & 4 deletions indexer/services/vulcan/src/handlers/order-replace-handler.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import { logger, runFuncWithTimingStat, stats } from '@dydxprotocol-indexer/base';
import { createSubaccountWebsocketMessage, KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
ParseMessageError, logger, runFuncWithTimingStat, stats,
} from '@dydxprotocol-indexer/base';
import {
createSubaccountWebsocketMessage,
getTriggerPrice,
KafkaTopics,
SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
} from '@dydxprotocol-indexer/kafka';
import {
IsoString,
OrderFromDatabase,
OrderStatus,
OrderTable,
PerpetualMarketFromDatabase,
SubaccountMessageContents,
SubaccountTable,
TimeInForce,
apiTranslations,
blockHeightRefresher,
perpetualMarketRefresher,
protocolTranslations,
@@ -30,9 +43,11 @@ import {
IndexerOrderId,
OffChainUpdateV1,
OrderPlaceV1_OrderPlacementStatus,
OrderRemovalReason,
OrderReplaceV1,
OrderUpdateV1,
RedisOrder,
SubaccountMessage,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import { IHeaders, Message } from 'kafkajs';
@@ -41,6 +56,7 @@ import config from '../config';
import { redisClient } from '../helpers/redis/redis-controller';
import { sendMessageWrapper } from '../lib/send-message-helper';
import { Handler } from './handler';
import { getStateRemainingQuantums } from './helpers';

/**
* Handler for OrderReplace messages. This is currently only expected for stateful vault orders.
@@ -69,7 +85,25 @@ export class OrderReplaceHandler extends Handler {
const oldOrderId: IndexerOrderId = orderReplace.oldOrderId!;

/* Remove old order */
const removeOrderResult: RemoveOrderResult = await this.removeOldOrder(oldOrderId);
let removeOrderResult: RemoveOrderResult = { removed: false };
try {
removeOrderResult = await this.removeOldOrder(oldOrderId, headers);
} catch (error) {
if (error instanceof ParseMessageError) {
return;
}
}

/* We don't want to fail if old order is not found (new order should still be placed),
so log and track metric */
if (!removeOrderResult.removed) {
logger.info({
at: 'OrderReplaceHandler#handle',
message: 'Old order not found in cache',
oldOrderId,
});
stats.increment(`${config.SERVICE_NAME}.replace_order_handler.old_order_not_found_in_cache`, 1);
}

/* Place new order */
const order: IndexerOrder = orderReplace.order!;
@@ -88,6 +122,10 @@ export class OrderReplaceHandler extends Handler {
}
const redisOrder: RedisOrder = convertToRedisOrder(order, perpetualMarket);
const placeOrderResult: PlaceOrderResult = await this.placeNewOrder(redisOrder);
await this.removeOrderFromCanceledOrdersCache(
OrderTable.orderIdToUuid(redisOrder.order?.orderId!),
);

if (placeOrderResult.replaced) {
// This is not expected because the replaced orders either have different order IDs or
// should have been removed before being placed again
@@ -161,6 +199,11 @@ export class OrderReplaceHandler extends Handler {
};
sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}

await this.addOrderToCanceledOrdersCache(
oldOrderId,
Date.now(),
);
}

protected validateOrderReplace(orderReplace: OrderReplaceV1): void {
@@ -186,7 +229,38 @@ export class OrderReplaceHandler extends Handler {
}
}

protected async removeOldOrder(oldOrderId: IndexerOrderId): Promise<RemoveOrderResult> {
protected async removeOldOrder(
oldOrderId: IndexerOrderId,
headers: IHeaders,
): Promise<RemoveOrderResult> {
const order: OrderFromDatabase | undefined = await runFuncWithTimingStat(
OrderTable.findById(
OrderTable.orderIdToUuid(oldOrderId),
),
this.generateTimingStatsOptions('find_order_for_stateful_cancelation'),
);
if (order === undefined) {
logger.error({
at: 'OrderReplaceHandler#removeOldOrder',
message: 'Could not find old order ID to remove in DB',
orderId: oldOrderId,
});
throw new ParseMessageError(`Could not find old order ID to remove in DB: ${oldOrderId}`);
}

const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher
.getPerpetualMarketFromClobPairId(
order.clobPairId,
);
if (perpetualMarket === undefined) {
logger.error({
at: 'OrderReplaceHandler#removeOldOrder',
message: `Unable to find the perpetual market with clobPairId: ${order.clobPairId}`,
order,
});
throw new ParseMessageError(`Unable to find the perpetual market with clobPairId: ${order.clobPairId}`);
}

const removeOrderResult: RemoveOrderResult = await runFuncWithTimingStat(
removeOrder({
removedOrderId: oldOrderId,
@@ -200,9 +274,114 @@ export class OrderReplaceHandler extends Handler {
oldOrderId,
removeOrderResult,
});

const stateRemainingQuantums: Big = await getStateRemainingQuantums(
removeOrderResult.removedOrder!,
);

// If the remaining amount of the order in state is <= 0, the order is filled and
// does not need to have it's status updated
let canceledOrder: OrderFromDatabase | undefined;
if (stateRemainingQuantums.gt(0)) {
canceledOrder = await runFuncWithTimingStat(
this.cancelOrderInPostgres(oldOrderId),
this.generateTimingStatsOptions('cancel_order_in_postgres'),
);
} else {
canceledOrder = await runFuncWithTimingStat(
OrderTable.findById(OrderTable.orderIdToUuid(oldOrderId)),
this.generateTimingStatsOptions('find_order'),
);
}

const subaccountMessage: Message = {
value: this.createSubaccountWebsocketMessageFromRemoveOrderResult(
removeOrderResult,
canceledOrder,
oldOrderId,
perpetualMarket,
blockHeightRefresher.getLatestBlockHeight(),
),
headers,
};

sendMessageWrapper(subaccountMessage, KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);

return removeOrderResult;
}

// eslint-disable-next-line @typescript-eslint/require-await
protected async cancelOrderInPostgres(
oldOrderId: IndexerOrderId,
): Promise<OrderFromDatabase | undefined> {
return OrderTable.update({
id: OrderTable.orderIdToUuid(oldOrderId),
status: OrderStatus.CANCELED,
});
}

protected createSubaccountWebsocketMessageFromRemoveOrderResult(
removeOrderResult: RemoveOrderResult,
canceledOrder: OrderFromDatabase | undefined,
oldOrderId: IndexerOrderId,
perpetualMarket: PerpetualMarketFromDatabase,
blockHeight: string | undefined,
): Buffer {
const redisOrder: RedisOrder = removeOrderResult.removedOrder!;
const orderTIF: TimeInForce = protocolTranslations.protocolOrderTIFToTIF(
redisOrder.order!.timeInForce,
);
const createdAtHeight: string | undefined = canceledOrder?.createdAtHeight;
const updatedAt: IsoString | undefined = canceledOrder?.updatedAt;
const updatedAtHeight: string | undefined = canceledOrder?.updatedAtHeight;
const contents: SubaccountMessageContents = {
orders: [
{
id: OrderTable.orderIdToUuid(redisOrder.order!.orderId!),
subaccountId: SubaccountTable.subaccountIdToUuid(
oldOrderId.subaccountId!,
),
clientId: oldOrderId.clientId.toString(),
clobPairId: perpetualMarket.clobPairId,
side: protocolTranslations.protocolOrderSideToOrderSide(redisOrder.order!.side),
size: redisOrder.size,
totalOptimisticFilled: protocolTranslations.quantumsToHumanFixedString(
removeOrderResult.totalFilledQuantums!.toString(),
perpetualMarket.atomicResolution,
),
price: redisOrder.price,
type: protocolTranslations.protocolConditionTypeToOrderType(
redisOrder.order!.conditionType,
),
status: OrderStatus.CANCELED,
timeInForce: apiTranslations.orderTIFToAPITIF(orderTIF),
postOnly: apiTranslations.isOrderTIFPostOnly(orderTIF),
reduceOnly: redisOrder.order!.reduceOnly,
orderFlags: redisOrder.order!.orderId!.orderFlags.toString(),
goodTilBlock: protocolTranslations.getGoodTilBlock(redisOrder.order!)
?.toString(),
goodTilBlockTime: protocolTranslations.getGoodTilBlockTime(redisOrder.order!),
ticker: redisOrder.ticker,
removalReason: OrderRemovalReason[OrderRemovalReason.ORDER_REMOVAL_REASON_USER_CANCELED],
...(createdAtHeight && { createdAtHeight }),
...(updatedAt && { updatedAt }),
...(updatedAtHeight && { updatedAtHeight }),
clientMetadata: redisOrder.order!.clientMetadata.toString(),
triggerPrice: getTriggerPrice(redisOrder.order!, perpetualMarket),
},
],
...(blockHeight && { blockHeight }),
};

const subaccountMessage: SubaccountMessage = SubaccountMessage.fromPartial({
contents: JSON.stringify(contents),
subaccountId: oldOrderId.subaccountId!,
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

return Buffer.from(Uint8Array.from(SubaccountMessage.encode(subaccountMessage).finish()));
}

protected async placeNewOrder(redisOrder: RedisOrder): Promise<PlaceOrderResult> {
const placeOrderResult: PlaceOrderResult = await runFuncWithTimingStat(
placeOrder({
@@ -390,4 +569,22 @@ export class OrderReplaceHandler extends Handler {
return sizeDelta.toFixed();
}

/**
* Adds the removed order to the cancelled orders cache in Redis.
*
* @param orderId
* @param timestampMs
* @protected
*/
protected async addOrderToCanceledOrdersCache(
oldOrderId: IndexerOrderId,
timestampMs: number,
): Promise<void> {
const orderId: string = OrderTable.orderIdToUuid(oldOrderId);

await runFuncWithTimingStat(
CanceledOrdersCache.addCanceledOrderId(orderId, timestampMs, redisClient),
this.generateTimingStatsOptions('add_order_to_canceled_order_cache'),
);
}
}