From 31cc1af1fa38240c3a5b7b6e9e49b40a571cff48 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik <126621805+lcwik@users.noreply.github.com> Date: Wed, 8 Nov 2023 11:30:40 -0800 Subject: [PATCH] [IND-477, IND-478, IND-479, IND-480] Update all stateful order handlers to use a SQL function to perform reads & updates. (#774) --- .../postgres/src/lib/order-translations.ts | 34 +++- ...onditional-order-placement-handler.test.ts | 27 ++- ...onditional-order-triggered-handler.test.ts | 27 ++- .../stateful-order-placement-handler.test.ts | 25 ++- .../stateful-order-removal-handler.test.ts | 25 ++- .../ender/__tests__/scripts/scripts.test.ts | 12 +- indexer/services/ender/src/config.ts | 3 + .../abstract-stateful-order-handler.ts | 55 +++++- .../conditional-order-placement-handler.ts | 29 +++- .../conditional-order-triggered-handler.ts | 26 ++- .../stateful-order-placement-handler.ts | 26 +++ .../stateful-order-removal-handler.ts | 19 +++ .../helpers/postgres/postgres-functions.ts | 2 + .../scripts/dydx_from_protocol_order_side.sql | 8 +- ..._protocol_condition_type_to_order_type.sql | 25 +++ .../scripts/dydx_stateful_order_handler.sql | 159 ++++++++++++++++++ 16 files changed, 469 insertions(+), 33 deletions(-) create mode 100644 indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql create mode 100644 indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql diff --git a/indexer/packages/postgres/src/lib/order-translations.ts b/indexer/packages/postgres/src/lib/order-translations.ts index d1bbe6f34e..dd8ae4677c 100644 --- a/indexer/packages/postgres/src/lib/order-translations.ts +++ b/indexer/packages/postgres/src/lib/order-translations.ts @@ -22,13 +22,11 @@ import { * * @param order */ -export async function convertToIndexerOrder( +export function convertToIndexerOrderWithSubaccount( order: OrderFromDatabase, perpetualMarket: PerpetualMarketFromDatabase, -): Promise { - const subaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( - order.subaccountId, - ); + subaccount: SubaccountFromDatabase, +): IndexerOrder { if (!OrderTable.isLongTermOrConditionalOrder(order.orderFlags)) { logger.error({ at: 'protocol-translations#convertToIndexerOrder', @@ -77,3 +75,29 @@ export async function convertToIndexerOrder( return indexerOrder; } + +/** + * Converts an order from the database to an IndexerOrder proto. + * This is used to resend open stateful orders to Vulcan during Indexer fast sync + * to uncross the orderbook. + * + * @param order + */ +export async function convertToIndexerOrder( + order: OrderFromDatabase, + perpetualMarket: PerpetualMarketFromDatabase, +): Promise { + const subaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( + order.subaccountId, + ); + + if (!subaccount === undefined) { + logger.error({ + at: 'protocol-translations#convertToIndexerOrder', + message: 'Subaccount for order not found', + order, + }); + throw new Error(`Subaccount for order not found: ${order.subaccountId}`); + } + return convertToIndexerOrderWithSubaccount(order, perpetualMarket, subaccount!); +} diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts index 3c74bafe73..c995bee8a0 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts @@ -45,6 +45,7 @@ import Long from 'long'; import { producer } from '@dydxprotocol-indexer/kafka'; import { ConditionalOrderPlacementHandler } from '../../../src/handlers/stateful-order/conditional-order-placement-handler'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('conditionalOrderPlacementHandler', () => { beforeAll(async () => { @@ -125,7 +126,14 @@ describe('conditionalOrderPlacementHandler', () => { }); }); - it('successfully places order', async () => { + it.each([ + ['via knex', false], + ['via SQL function', true], + ])('successfully places order (%s)', async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( defaultStatefulOrderEvent, ); @@ -154,7 +162,9 @@ describe('conditionalOrderPlacementHandler', () => { updatedAt: defaultDateTime.toISO(), updatedAtHeight: defaultHeight.toString(), }); - expectTimingStats(); + if (!useSqlFunction) { + expectTimingStats(); + } expectOrderSubaccountKafkaMessage( producerSendMock, defaultOrder.orderId!.subaccountId!, @@ -162,7 +172,14 @@ describe('conditionalOrderPlacementHandler', () => { ); }); - it('successfully upserts order', async () => { + it.each([ + ['via knex', false], + ['via SQL function', true], + ])('successfully upserts order (%s)', async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; const subaccountId: string = SubaccountTable.subaccountIdToUuid( defaultOrder.orderId!.subaccountId!, ); @@ -215,7 +232,9 @@ describe('conditionalOrderPlacementHandler', () => { updatedAt: defaultDateTime.toISO(), updatedAtHeight: defaultHeight.toString(), }); - expectTimingStats(); + if (!useSqlFunction) { + expectTimingStats(); + } expectOrderSubaccountKafkaMessage( producerSendMock, defaultOrder.orderId!.subaccountId!, diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts index 9c5701b636..395301a856 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts @@ -39,8 +39,9 @@ import { ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser'; import { ConditionalOrderTriggeredHandler } from '../../../src/handlers/stateful-order/conditional-order-triggered-handler'; import { defaultPerpetualMarket } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; -describe('statefulOrderRemovalHandler', () => { +describe('conditionalOrderTriggeredHandler', () => { beforeAll(async () => { await dbHelpers.migrate(); await createPostgresFunctions(); @@ -110,7 +111,14 @@ describe('statefulOrderRemovalHandler', () => { }); }); - it('successfully triggers order and sends to vulcan', async () => { + it.each([ + ['via knex', false], + ['via SQL function', true], + ])('successfully triggers order and sends to vulcan (%s)', async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; await OrderTable.create({ ...testConstants.defaultOrderGoodTilBlockTime, orderFlags: conditionalOrderId.orderFlags.toString(), @@ -147,16 +155,25 @@ describe('statefulOrderRemovalHandler', () => { orderId: conditionalOrderId, offchainUpdate: expectedOffchainUpdate, }); - expectTimingStats(); + if (!useSqlFunction) { + expectTimingStats(); + } }); - it('throws error when attempting to trigger an order that does not exist', async () => { + it.each([ + ['via knex', false], + ['via SQL function', true], + ])('throws error when attempting to trigger an order that does not exist (%s)', async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( defaultStatefulOrderEvent, ); await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new Error(`Unable to update order status with orderId: ${orderId}`), + `Unable to update order status with orderId: ${orderId}`, ); }); }); diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts index cea37d7762..62cdce5817 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts @@ -45,6 +45,7 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants'; import { producer } from '@dydxprotocol-indexer/kafka'; import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('statefulOrderPlacementHandler', () => { beforeAll(async () => { @@ -138,12 +139,16 @@ describe('statefulOrderPlacementHandler', () => { it.each([ // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent - ['stateful order placement', defaultStatefulOrderEvent], - ['stateful long term order placement', defaultStatefulOrderLongTermEvent], + ['stateful order placement (via knex)', defaultStatefulOrderEvent, false], + ['stateful order placement (via SQL function)', defaultStatefulOrderEvent, true], + ['stateful long term order placement (via knex)', defaultStatefulOrderLongTermEvent, false], + ['stateful long term order placement (via SQL function)', defaultStatefulOrderLongTermEvent, true], ])('successfully places order with %s', async ( _name: string, statefulOrderEvent: StatefulOrderEventV1, + useSqlFunction: boolean, ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( statefulOrderEvent, ); @@ -172,7 +177,9 @@ describe('statefulOrderPlacementHandler', () => { updatedAt: defaultDateTime.toISO(), updatedAtHeight: defaultHeight.toString(), }); - expectTimingStats(); + if (!useSqlFunction) { + expectTimingStats(); + } const expectedOffchainUpdate: OffChainUpdateV1 = { orderPlace: { @@ -189,12 +196,16 @@ describe('statefulOrderPlacementHandler', () => { it.each([ // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent - ['stateful order placement', defaultStatefulOrderEvent], - ['stateful long term order placement', defaultStatefulOrderLongTermEvent], + ['stateful order placement (via knex)', defaultStatefulOrderEvent, false], + ['stateful order placement (via SQL function)', defaultStatefulOrderEvent, true], + ['stateful long term order placement (via knex)', defaultStatefulOrderLongTermEvent, false], + ['stateful long term order placement (via SQL function)', defaultStatefulOrderLongTermEvent, true], ])('successfully upserts order with %s', async ( _name: string, statefulOrderEvent: StatefulOrderEventV1, + useSqlFunction: boolean, ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; const subaccountId: string = SubaccountTable.subaccountIdToUuid( defaultOrder.orderId!.subaccountId!, ); @@ -247,7 +258,9 @@ describe('statefulOrderPlacementHandler', () => { updatedAt: defaultDateTime.toISO(), updatedAtHeight: defaultHeight.toString(), }); - expectTimingStats(); + if (!useSqlFunction) { + expectTimingStats(); + } // TODO[IND-20]: Add tests for vulcan messages }); }); diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts index da1c3a4782..c47f935af1 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts @@ -35,6 +35,7 @@ import { stats, STATS_FUNCTION_NAME } from '@dydxprotocol-indexer/base'; import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants'; import { producer } from '@dydxprotocol-indexer/kafka'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('statefulOrderRemovalHandler', () => { beforeAll(async () => { @@ -104,7 +105,14 @@ describe('statefulOrderRemovalHandler', () => { }); }); - it('successfully cancels and removes order', async () => { + it.each([ + ['via knex', false], + ['via SQL function', true], + ])('successfully cancels and removes order (%s)', async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; await OrderTable.create({ ...testConstants.defaultOrder, clientId: '0', @@ -121,7 +129,9 @@ describe('statefulOrderRemovalHandler', () => { updatedAt: defaultDateTime.toISO(), updatedAtHeight: defaultHeight.toString(), })); - expectTimingStats(); + if (!useSqlFunction) { + expectTimingStats(); + } const expectedOffchainUpdate: OffChainUpdateV1 = { orderRemove: { @@ -137,13 +147,20 @@ describe('statefulOrderRemovalHandler', () => { }); }); - it('throws error when attempting to cancel an order that does not exist', async () => { + it.each([ + ['via knex', false], + ['via SQL function', true], + ])('throws error when attempting to cancel an order that does not exist (%s)', async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction; const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( defaultStatefulOrderEvent, ); await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new Error(`Unable to update order status with orderId: ${orderId}`), + `Unable to update order status with orderId: ${orderId}`, ); }); }); diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts index 359273343c..7be6875a34 100644 --- a/indexer/services/ender/__tests__/scripts/scripts.test.ts +++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts @@ -7,7 +7,7 @@ import { IndexerTendermintEvent_BlockEvent, AssetCreateEventV1, SubaccountUpdateEventV1, - MarketEventV1, + MarketEventV1, IndexerOrder_ConditionType, } from '@dydxprotocol-indexer/v4-protos'; import { BUFFER_ENCODING_UTF_8, @@ -175,6 +175,16 @@ describe('SQL Function Tests', () => { expect(result).toEqual(protocolTranslations.protocolOrderTIFToTIF(value)); }); + it.each([ + ['LIMIT', IndexerOrder_ConditionType.UNRECOGNIZED], + ['LIMIT', IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED], + ['TAKE_PROFIT', IndexerOrder_ConditionType.CONDITION_TYPE_TAKE_PROFIT], + ['STOP_LIMIT', IndexerOrder_ConditionType.CONDITION_TYPE_STOP_LOSS], + ])('dydx_protocol_condition_type_to_order_type (%s)', async (_name: string, value: IndexerOrder_ConditionType) => { + const result = await getSingleRawQueryResultRow(`SELECT dydx_protocol_condition_type_to_order_type('${value}') AS result`); + expect(result).toEqual(protocolTranslations.protocolConditionTypeToOrderType(value)); + }); + it.each([ '0', '1', '-1', '10000000000000000000000000000', '-20000000000000000000000000000', ])('dydx_from_serializable_int (%s)', async (value: string) => { diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index 33ea3ddf2f..e129dace57 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -50,6 +50,9 @@ export const configSchema = { USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts b/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts index b535042a45..2f1ea7dcfd 100644 --- a/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts +++ b/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts @@ -1,11 +1,29 @@ import { logger } from '@dydxprotocol-indexer/base'; import { - OrderFromDatabase, OrderStatus, OrderTable, OrderUpdateObject, OrderCreateObject, SubaccountTable, - OrderSide, OrderType, protocolTranslations, + OrderFromDatabase, + OrderStatus, + OrderTable, + OrderUpdateObject, + OrderCreateObject, + SubaccountTable, + OrderSide, + OrderType, + protocolTranslations, PerpetualMarketFromDatabase, + storeHelpers, + OrderModel, + PerpetualMarketModel, + SubaccountFromDatabase, } from '@dydxprotocol-indexer/postgres'; -import { IndexerOrderId, IndexerOrder, IndexerOrder_Side } from '@dydxprotocol-indexer/v4-protos'; +import SubaccountModel from '@dydxprotocol-indexer/postgres/build/src/models/subaccount-model'; +import { + IndexerOrderId, + IndexerOrder, + IndexerOrder_Side, + StatefulOrderEventV1, +} from '@dydxprotocol-indexer/v4-protos'; import { DateTime } from 'luxon'; +import * as pg from 'pg'; import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../constants'; import { getPrice, getSize } from '../lib/helper'; @@ -21,6 +39,37 @@ export abstract class AbstractStatefulOrderHandler extends Handler { ]; } + protected async handleEventViaSqlFunction(): + Promise<[OrderFromDatabase, + PerpetualMarketFromDatabase, + SubaccountFromDatabase | undefined]> { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_stateful_order_handler( + ${this.block.height}, + '${this.block.time?.toISOString()}', + '${JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'AbstractStatefulOrderHandler#handleEventViaSqlFunction', + message: 'Failed to handle StatefulOrderEventV1', + error, + }); + throw error; + }); + + return [ + OrderModel.fromJson(result.rows[0].result.order) as OrderFromDatabase, + PerpetualMarketModel.fromJson( + result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase, + result.rows[0].result.subaccount + ? SubaccountModel.fromJson(result.rows[0].result.subaccount) as SubaccountFromDatabase + : undefined, + ]; + } + protected async updateOrderStatus( orderIdProto: IndexerOrderId, status: OrderStatus, diff --git a/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts b/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts index fe4d6410c8..ba5acdbde6 100644 --- a/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts +++ b/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts @@ -5,7 +5,7 @@ import { OrderTable, PerpetualMarketFromDatabase, perpetualMarketRefresher, - protocolTranslations, + protocolTranslations, SubaccountFromDatabase, SubaccountMessageContents, } from '@dydxprotocol-indexer/postgres'; import { @@ -14,6 +14,7 @@ import { StatefulOrderEventV1, } from '@dydxprotocol-indexer/v4-protos'; +import config from '../../config'; import { generateOrderSubaccountMessage } from '../../helpers/kafka-helper'; import { getTriggerPrice } from '../../lib/helper'; import { ConsolidatedKafkaEvent } from '../../lib/types'; @@ -32,6 +33,24 @@ export class ConditionalOrderPlacementHandler extends // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const result: + [OrderFromDatabase, + PerpetualMarketFromDatabase, + SubaccountFromDatabase | undefined] = await this.handleEventViaSqlFunction(); + + const subaccountId: + IndexerSubaccountId = this.event.conditionalOrderPlacement!.order!.orderId!.subaccountId!; + return this.createKafkaEvents(subaccountId, result[0], result[1]); + } + + private async handleViaKnex(): Promise { const order: IndexerOrder = this.event.conditionalOrderPlacement!.order!; const subaccountId: IndexerSubaccountId = order.orderId!.subaccountId!; const clobPairId: string = order.orderId!.clobPairId.toString(); @@ -58,6 +77,14 @@ export class ConditionalOrderPlacementHandler extends this.generateTimingStatsOptions('upsert_order'), ); + return this.createKafkaEvents(subaccountId, conditionalOrder, perpetualMarket); + } + + private createKafkaEvents( + subaccountId: IndexerSubaccountId, + conditionalOrder: OrderFromDatabase, + perpetualMarket: PerpetualMarketFromDatabase): ConsolidatedKafkaEvent[] { + // Since the order isn't placed on the book, no message is sent to vulcan // ender needs to send the websocket message indicating the conditional order was placed const message: SubaccountMessageContents = { diff --git a/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts b/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts index b193565d69..5bc85e8885 100644 --- a/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts +++ b/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts @@ -6,6 +6,7 @@ import { PerpetualMarketFromDatabase, orderTranslations, perpetualMarketRefresher, + SubaccountFromDatabase, } from '@dydxprotocol-indexer/postgres'; import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser'; import { @@ -16,6 +17,7 @@ import { StatefulOrderEventV1, } from '@dydxprotocol-indexer/v4-protos'; +import config from '../../config'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler'; @@ -32,6 +34,24 @@ export class ConditionalOrderTriggeredHandler extends // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const result: + [OrderFromDatabase, + PerpetualMarketFromDatabase, + SubaccountFromDatabase | undefined] = await this.handleEventViaSqlFunction(); + + const order: IndexerOrder = orderTranslations.convertToIndexerOrderWithSubaccount( + result[0], result[1], result[2]!); + return this.createKafkaEvents(order); + } + + private async handleViaKnex(): Promise { const orderIdProto: IndexerOrderId = this.event.conditionalOrderTriggered!.triggeredOrderId!; const orderFromDatabase: OrderFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.updateOrderStatus(orderIdProto, OrderStatus.OPEN), @@ -56,6 +76,10 @@ export class ConditionalOrderTriggeredHandler extends orderFromDatabase, perpetualMarket, ); + return this.createKafkaEvents(order); + } + + private createKafkaEvents(order: IndexerOrder): ConsolidatedKafkaEvent[] { const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({ orderPlace: { order, @@ -65,7 +89,7 @@ export class ConditionalOrderTriggeredHandler extends return [ this.generateConsolidatedVulcanKafkaEvent( - getOrderIdHash(orderIdProto), + getOrderIdHash(order.orderId!), offChainUpdate, ), ]; diff --git a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts index f2066ead71..e941dee2f9 100644 --- a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts +++ b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts @@ -14,6 +14,7 @@ import { StatefulOrderEventV1, } from '@dydxprotocol-indexer/v4-protos'; +import config from '../../config'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler'; @@ -34,7 +35,28 @@ export class StatefulOrderPlacementHandler extends return this.getParallelizationIdsFromOrderId(orderId); } + // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + await this.handleEventViaSqlFunction(); + + let order: IndexerOrder; + // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent + if (this.event.orderPlace !== undefined) { + order = this.event.orderPlace!.order!; + } else { + order = this.event.longTermOrderPlacement!.order!; + } + return this.createKafkaEvents(order); + } + + private async handleViaKnex(): Promise { let order: IndexerOrder; // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent if (this.event.orderPlace !== undefined) { @@ -60,6 +82,10 @@ export class StatefulOrderPlacementHandler extends this.generateTimingStatsOptions('upsert_order'), ); + return this.createKafkaEvents(order); + } + + private createKafkaEvents(order: IndexerOrder): ConsolidatedKafkaEvent[] { const kafakEvents: ConsolidatedKafkaEvent[] = []; const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({ diff --git a/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts b/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts index df669f5a34..a835e74cba 100644 --- a/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts +++ b/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts @@ -10,6 +10,7 @@ import { StatefulOrderEventV1, } from '@dydxprotocol-indexer/v4-protos'; +import config from '../../config'; import { ConsolidatedKafkaEvent } from '../../lib/types'; import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler'; @@ -23,13 +24,31 @@ export class StatefulOrderRemovalHandler extends return this.getParallelizationIdsFromOrderId(orderId); } + // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const orderIdProto: IndexerOrderId = this.event.orderRemoval!.removedOrderId!; + await this.handleEventViaSqlFunction(); + return this.createKafkaEvents(orderIdProto); + } + + private async handleViaKnex(): Promise { const orderIdProto: IndexerOrderId = this.event.orderRemoval!.removedOrderId!; await this.runFuncWithTimingStatAndErrorLogging( this.updateOrderStatus(orderIdProto, OrderStatus.CANCELED), this.generateTimingStatsOptions('cancel_order'), ); + return this.createKafkaEvents(orderIdProto); + } + + private createKafkaEvents(orderIdProto: IndexerOrderId): ConsolidatedKafkaEvent[] { const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({ orderRemove: { removedOrderId: orderIdProto, diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 6193321c19..a32840e76b 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -50,6 +50,8 @@ const scripts: string[] = [ 'dydx_order_fill_handler_per_order.sql', 'dydx_perpetual_market_handler.sql', 'dydx_perpetual_position_and_order_side_matching.sql', + 'dydx_protocol_condition_type_to_order_type.sql', + 'dydx_stateful_order_handler.sql', 'dydx_subaccount_update_handler.sql', 'dydx_transfer_handler.sql', 'dydx_trim_scale.sql', diff --git a/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql b/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql index 300f3f0a92..f9d7571761 100644 --- a/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql +++ b/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql @@ -5,8 +5,10 @@ CREATE OR REPLACE FUNCTION dydx_from_protocol_order_side(order_side jsonb) RETURNS text AS $$ BEGIN CASE order_side - WHEN '1'::jsonb THEN RETURN 'BUY'; - ELSE RETURN 'SELL'; - END CASE; + WHEN '1'::jsonb THEN + RETURN 'BUY'; + ELSE + RETURN 'SELL'; + END CASE; END; $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; diff --git a/indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql b/indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql new file mode 100644 index 0000000000..8ca8a7bf99 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql @@ -0,0 +1,25 @@ +/** + Converts the 'ConditionType' enum from the IndexerOrder protobuf (https://github.com/dydxprotocol/v4-proto/blob/4b721881fdfe99485336e221def03dc5b86eb0a1/dydxprotocol/indexer/protocol/v1/clob.proto#L131) + to the 'OrderType' enum in postgres. + */ +CREATE OR REPLACE FUNCTION dydx_protocol_condition_type_to_order_type(condition_type jsonb) RETURNS text AS $$ +DECLARE + UNRECOGNIZED constant jsonb = '-1'::jsonb; + CONDITION_TYPE_UNSPECIFIED constant jsonb = '0'::jsonb; + CONDITION_TYPE_STOP_LOSS constant jsonb = '1'::jsonb; + CONDITION_TYPE_TAKE_PROFIT constant jsonb = '2'::jsonb; +BEGIN + CASE condition_type + WHEN UNRECOGNIZED THEN + RETURN 'LIMIT'; + WHEN CONDITION_TYPE_UNSPECIFIED THEN + RETURN 'LIMIT'; + WHEN CONDITION_TYPE_STOP_LOSS THEN + RETURN 'STOP_LIMIT'; + WHEN CONDITION_TYPE_TAKE_PROFIT THEN + RETURN 'TAKE_PROFIT'; + ELSE + RAISE EXCEPTION 'Unexpected ConditionType: %', condition_type; + END CASE; +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; diff --git a/indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql b/indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql new file mode 100644 index 0000000000..275a2a6185 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql @@ -0,0 +1,159 @@ +/** + Parameters: + - block_height: the height of the block being processing. + - block_time: the time of the block being processed. + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - order: The upserted order in order-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/order-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_stateful_order_handler( + block_height int, block_time timestamp, event_data jsonb) RETURNS jsonb AS $$ +DECLARE + QUOTE_CURRENCY_ATOMIC_RESOLUTION constant numeric = -6; + + order_ jsonb; + order_id jsonb; + clob_pair_id bigint; + subaccount_id uuid; + perpetual_market_record perpetual_markets%ROWTYPE; + order_record orders%ROWTYPE; + subaccount_record subaccounts%ROWTYPE; +BEGIN + /** TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent. */ + IF event_data->'orderPlace' IS NOT NULL OR event_data->'longTermOrderPlacement' IS NOT NULL OR event_data->'conditionalOrderPlacement' IS NOT NULL THEN + order_ = COALESCE(event_data->'orderPlace'->'order', event_data->'longTermOrderPlacement'->'order', event_data->'conditionalOrderPlacement'->'order'); + clob_pair_id = (order_->'orderId'->'clobPairId')::bigint; + + BEGIN + SELECT * INTO STRICT perpetual_market_record FROM perpetual_markets WHERE "clobPairId" = clob_pair_id; + EXCEPTION + WHEN NO_DATA_FOUND THEN + RAISE EXCEPTION 'Unable to find perpetual market with clobPairId: %', clob_pair_id; + WHEN TOO_MANY_ROWS THEN + /** This should never happen and if it ever were to would indicate that the table has malformed data. */ + RAISE EXCEPTION 'Found multiple perpetual markets with clobPairId: %', clob_pair_id; + END; + + /** + Calculate sizes, prices, and fill amounts. + + TODO(IND-238): Extract out calculation of quantums and subticks to their own SQL functions. + */ + order_record."id" = dydx_uuid_from_order_id(order_->'orderId'); + order_record."subaccountId" = dydx_uuid_from_subaccount_id(order_->'orderId'->'subaccountId'); + order_record."clientId" = jsonb_extract_path_text(order_, 'orderId', 'clientId')::bigint; + order_record."clobPairId" = clob_pair_id; + order_record."side" = dydx_from_protocol_order_side(order_->'side'); + order_record."size" = dydx_trim_scale(dydx_from_jsonlib_long(order_->'quantums') * + power(10, perpetual_market_record."atomicResolution")::numeric); + order_record."totalFilled" = 0; + order_record."price" = dydx_trim_scale(dydx_from_jsonlib_long(order_->'subticks') * + power(10, perpetual_market_record."quantumConversionExponent" + + QUOTE_CURRENCY_ATOMIC_RESOLUTION - + perpetual_market_record."atomicResolution")::numeric); + order_record."timeInForce" = dydx_from_protocol_time_in_force(order_->'timeInForce'); + order_record."reduceOnly" = (order_->>'reduceOnly')::boolean; + order_record."orderFlags" = (order_->'orderId'->'orderFlags')::bigint; + order_record."goodTilBlockTime" = to_timestamp((order_->'goodTilBlockTime')::double precision); + order_record."clientMetadata" = (order_->'clientMetadata')::bigint; + order_record."createdAtHeight" = block_height; + order_record."updatedAt" = block_time; + order_record."updatedAtHeight" = block_height; + + CASE + WHEN event_data->'conditionalOrderPlacement' IS NOT NULL THEN + order_record."type" = dydx_protocol_condition_type_to_order_type(order_->'conditionType'); + order_record."status" = 'UNTRIGGERED'; + order_record."triggerPrice" = dydx_trim_scale(dydx_from_jsonlib_long(order_->'conditionalOrderTriggerSubticks') * + power(10, perpetual_market_record."quantumConversionExponent" + + QUOTE_CURRENCY_ATOMIC_RESOLUTION - + perpetual_market_record."atomicResolution")::numeric); + ELSE + order_record."type" = 'LIMIT'; + order_record."status" = 'OPEN'; + END CASE; + + INSERT INTO orders VALUES (order_record.*) ON CONFLICT ("id") DO + UPDATE SET + "subaccountId" = order_record."subaccountId", + "clientId" = order_record."clientId", + "clobPairId" = order_record."clobPairId", + "side" = order_record."side", + "size" = order_record."size", + "totalFilled" = order_record."totalFilled", + "price" = order_record."price", + "timeInForce" = order_record."timeInForce", + "reduceOnly" = order_record."reduceOnly", + "orderFlags" = order_record."orderFlags", + "goodTilBlockTime" = order_record."goodTilBlockTime", + "clientMetadata" = order_record."clientMetadata", + "createdAtHeight" = order_record."createdAtHeight", + "updatedAt" = order_record."updatedAt", + "updatedAtHeight" = order_record."updatedAtHeight", + "type" = order_record."type", + "status" = order_record."status", + "triggerPrice" = order_record."triggerPrice" + RETURNING * INTO order_record; + + RETURN jsonb_build_object( + 'order', + dydx_to_jsonb(order_record), + 'perpetual_market', + dydx_to_jsonb(perpetual_market_record) + ); + ELSIF event_data->'conditionalOrderTriggered' IS NOT NULL OR event_data->'orderRemoval' IS NOT NULL THEN + CASE + WHEN event_data->'conditionalOrderTriggered' IS NOT NULL THEN + order_id = event_data->'conditionalOrderTriggered'->'triggeredOrderId'; + order_record."status" = 'OPEN'; + ELSE + order_id = event_data->'orderRemoval'->'removedOrderId'; + order_record."status" = 'CANCELED'; + END CASE; + + clob_pair_id = (order_id->'clobPairId')::bigint; + BEGIN + SELECT * INTO STRICT perpetual_market_record FROM perpetual_markets WHERE "clobPairId" = clob_pair_id; + EXCEPTION + WHEN NO_DATA_FOUND THEN + RAISE EXCEPTION 'Unable to find perpetual market with clobPairId: %', clob_pair_id; + WHEN TOO_MANY_ROWS THEN + /** This should never happen and if it ever were to would indicate that the table has malformed data. */ + RAISE EXCEPTION 'Found multiple perpetual markets with clobPairId: %', clob_pair_id; + END; + + subaccount_id = dydx_uuid_from_subaccount_id(order_id->'subaccountId'); + SELECT * INTO subaccount_record FROM subaccounts WHERE "id" = subaccount_id; + IF NOT FOUND THEN + RAISE EXCEPTION 'Subaccount for order not found: %', order_; + END IF; + + order_record."id" = dydx_uuid_from_order_id(order_id); + order_record."updatedAt" = block_time; + order_record."updatedAtHeight" = block_height; + UPDATE orders + SET + "status" = order_record."status", + "updatedAt" = order_record."updatedAt", + "updatedAtHeight" = order_record."updatedAtHeight" + WHERE "id" = order_record."id" + RETURNING * INTO order_record; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Unable to update order status with orderId: %', dydx_uuid_from_order_id(order_id); + END IF; + + RETURN jsonb_build_object( + 'order', + dydx_to_jsonb(order_record), + 'perpetual_market', + dydx_to_jsonb(perpetual_market_record), + 'subaccount', + dydx_to_jsonb(subaccount_record) + ); + ELSE + RAISE EXCEPTION 'Unkonwn sub-event type %', event_data; + END IF; +END; +$$ LANGUAGE plpgsql;