From f0e22ccb75f5ae20066193d9baad2e1956a7ab06 Mon Sep 17 00:00:00 2001
From: Lukasz Cwik <lcwik@apache.org>
Date: Wed, 8 Nov 2023 10:39:45 -0800
Subject: [PATCH] [IND-477, IND-478, IND-479, IND-480] Update all stateful
 order handlers to use a SQL function to perform reads & updates.

---
 .../postgres/src/lib/order-translations.ts    |  34 +++-
 ...onditional-order-placement-handler.test.ts |  27 ++-
 ...onditional-order-triggered-handler.test.ts |  27 ++-
 .../stateful-order-placement-handler.test.ts  |  25 ++-
 .../stateful-order-removal-handler.test.ts    |  25 ++-
 .../ender/__tests__/scripts/scripts.test.ts   |  12 +-
 indexer/services/ender/src/config.ts          |   3 +
 .../abstract-stateful-order-handler.ts        |  55 +++++-
 .../conditional-order-placement-handler.ts    |  29 +++-
 .../conditional-order-triggered-handler.ts    |  26 ++-
 .../stateful-order-placement-handler.ts       |  26 +++
 .../stateful-order-removal-handler.ts         |  19 +++
 .../helpers/postgres/postgres-functions.ts    |   2 +
 .../scripts/dydx_from_protocol_order_side.sql |   8 +-
 ..._protocol_condition_type_to_order_type.sql |  25 +++
 .../scripts/dydx_stateful_order_handler.sql   | 159 ++++++++++++++++++
 16 files changed, 469 insertions(+), 33 deletions(-)
 create mode 100644 indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql
 create mode 100644 indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql

diff --git a/indexer/packages/postgres/src/lib/order-translations.ts b/indexer/packages/postgres/src/lib/order-translations.ts
index d1bbe6f34e..dd8ae4677c 100644
--- a/indexer/packages/postgres/src/lib/order-translations.ts
+++ b/indexer/packages/postgres/src/lib/order-translations.ts
@@ -22,13 +22,11 @@ import {
  *
  * @param order
  */
-export async function convertToIndexerOrder(
+export function convertToIndexerOrderWithSubaccount(
   order: OrderFromDatabase,
   perpetualMarket: PerpetualMarketFromDatabase,
-): Promise<IndexerOrder> {
-  const subaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById(
-    order.subaccountId,
-  );
+  subaccount: SubaccountFromDatabase,
+): IndexerOrder {
   if (!OrderTable.isLongTermOrConditionalOrder(order.orderFlags)) {
     logger.error({
       at: 'protocol-translations#convertToIndexerOrder',
@@ -77,3 +75,29 @@ export async function convertToIndexerOrder(
 
   return indexerOrder;
 }
+
+/**
+ * Converts an order from the database to an IndexerOrder proto.
+ * This is used to resend open stateful orders to Vulcan during Indexer fast sync
+ * to uncross the orderbook.
+ *
+ * @param order
+ */
+export async function convertToIndexerOrder(
+  order: OrderFromDatabase,
+  perpetualMarket: PerpetualMarketFromDatabase,
+): Promise<IndexerOrder> {
+  const subaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById(
+    order.subaccountId,
+  );
+
+  if (!subaccount === undefined) {
+    logger.error({
+      at: 'protocol-translations#convertToIndexerOrder',
+      message: 'Subaccount for order not found',
+      order,
+    });
+    throw new Error(`Subaccount for order not found: ${order.subaccountId}`);
+  }
+  return convertToIndexerOrderWithSubaccount(order, perpetualMarket, subaccount!);
+}
diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts
index 3c74bafe73..c995bee8a0 100644
--- a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts
+++ b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts
@@ -45,6 +45,7 @@ import Long from 'long';
 import { producer } from '@dydxprotocol-indexer/kafka';
 import { ConditionalOrderPlacementHandler } from '../../../src/handlers/stateful-order/conditional-order-placement-handler';
 import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
+import config from '../../../src/config';
 
 describe('conditionalOrderPlacementHandler', () => {
   beforeAll(async () => {
@@ -125,7 +126,14 @@ describe('conditionalOrderPlacementHandler', () => {
     });
   });
 
-  it('successfully places order', async () => {
+  it.each([
+    ['via knex', false],
+    ['via SQL function', true],
+  ])('successfully places order (%s)', async (
+    _name: string,
+    useSqlFunction: boolean,
+  ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
       defaultStatefulOrderEvent,
     );
@@ -154,7 +162,9 @@ describe('conditionalOrderPlacementHandler', () => {
       updatedAt: defaultDateTime.toISO(),
       updatedAtHeight: defaultHeight.toString(),
     });
-    expectTimingStats();
+    if (!useSqlFunction) {
+      expectTimingStats();
+    }
     expectOrderSubaccountKafkaMessage(
       producerSendMock,
       defaultOrder.orderId!.subaccountId!,
@@ -162,7 +172,14 @@ describe('conditionalOrderPlacementHandler', () => {
     );
   });
 
-  it('successfully upserts order', async () => {
+  it.each([
+    ['via knex', false],
+    ['via SQL function', true],
+  ])('successfully upserts order (%s)', async (
+    _name: string,
+    useSqlFunction: boolean,
+  ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     const subaccountId: string = SubaccountTable.subaccountIdToUuid(
       defaultOrder.orderId!.subaccountId!,
     );
@@ -215,7 +232,9 @@ describe('conditionalOrderPlacementHandler', () => {
       updatedAt: defaultDateTime.toISO(),
       updatedAtHeight: defaultHeight.toString(),
     });
-    expectTimingStats();
+    if (!useSqlFunction) {
+      expectTimingStats();
+    }
     expectOrderSubaccountKafkaMessage(
       producerSendMock,
       defaultOrder.orderId!.subaccountId!,
diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts
index 9c5701b636..395301a856 100644
--- a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts
+++ b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts
@@ -39,8 +39,9 @@ import { ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser';
 import { ConditionalOrderTriggeredHandler } from '../../../src/handlers/stateful-order/conditional-order-triggered-handler';
 import { defaultPerpetualMarket } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';
 import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
+import config from '../../../src/config';
 
-describe('statefulOrderRemovalHandler', () => {
+describe('conditionalOrderTriggeredHandler', () => {
   beforeAll(async () => {
     await dbHelpers.migrate();
     await createPostgresFunctions();
@@ -110,7 +111,14 @@ describe('statefulOrderRemovalHandler', () => {
     });
   });
 
-  it('successfully triggers order and sends to vulcan', async () => {
+  it.each([
+    ['via knex', false],
+    ['via SQL function', true],
+  ])('successfully triggers order and sends to vulcan (%s)', async (
+    _name: string,
+    useSqlFunction: boolean,
+  ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     await OrderTable.create({
       ...testConstants.defaultOrderGoodTilBlockTime,
       orderFlags: conditionalOrderId.orderFlags.toString(),
@@ -147,16 +155,25 @@ describe('statefulOrderRemovalHandler', () => {
       orderId: conditionalOrderId,
       offchainUpdate: expectedOffchainUpdate,
     });
-    expectTimingStats();
+    if (!useSqlFunction) {
+      expectTimingStats();
+    }
   });
 
-  it('throws error when attempting to trigger an order that does not exist', async () => {
+  it.each([
+    ['via knex', false],
+    ['via SQL function', true],
+  ])('throws error when attempting to trigger an order that does not exist (%s)', async (
+    _name: string,
+    useSqlFunction: boolean,
+  ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
       defaultStatefulOrderEvent,
     );
 
     await expect(onMessage(kafkaMessage)).rejects.toThrowError(
-      new Error(`Unable to update order status with orderId: ${orderId}`),
+      `Unable to update order status with orderId: ${orderId}`,
     );
   });
 });
diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts
index cea37d7762..62cdce5817 100644
--- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts
+++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts
@@ -45,6 +45,7 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
 import { producer } from '@dydxprotocol-indexer/kafka';
 import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
 import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
+import config from '../../../src/config';
 
 describe('statefulOrderPlacementHandler', () => {
   beforeAll(async () => {
@@ -138,12 +139,16 @@ describe('statefulOrderPlacementHandler', () => {
 
   it.each([
     // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
-    ['stateful order placement', defaultStatefulOrderEvent],
-    ['stateful long term order placement', defaultStatefulOrderLongTermEvent],
+    ['stateful order placement (via knex)', defaultStatefulOrderEvent, false],
+    ['stateful order placement (via SQL function)', defaultStatefulOrderEvent, true],
+    ['stateful long term order placement (via knex)', defaultStatefulOrderLongTermEvent, false],
+    ['stateful long term order placement (via SQL function)', defaultStatefulOrderLongTermEvent, true],
   ])('successfully places order with %s', async (
     _name: string,
     statefulOrderEvent: StatefulOrderEventV1,
+    useSqlFunction: boolean,
   ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
       statefulOrderEvent,
     );
@@ -172,7 +177,9 @@ describe('statefulOrderPlacementHandler', () => {
       updatedAt: defaultDateTime.toISO(),
       updatedAtHeight: defaultHeight.toString(),
     });
-    expectTimingStats();
+    if (!useSqlFunction) {
+      expectTimingStats();
+    }
 
     const expectedOffchainUpdate: OffChainUpdateV1 = {
       orderPlace: {
@@ -189,12 +196,16 @@ describe('statefulOrderPlacementHandler', () => {
 
   it.each([
     // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
-    ['stateful order placement', defaultStatefulOrderEvent],
-    ['stateful long term order placement', defaultStatefulOrderLongTermEvent],
+    ['stateful order placement (via knex)', defaultStatefulOrderEvent, false],
+    ['stateful order placement (via SQL function)', defaultStatefulOrderEvent, true],
+    ['stateful long term order placement (via knex)', defaultStatefulOrderLongTermEvent, false],
+    ['stateful long term order placement (via SQL function)', defaultStatefulOrderLongTermEvent, true],
   ])('successfully upserts order with %s', async (
     _name: string,
     statefulOrderEvent: StatefulOrderEventV1,
+    useSqlFunction: boolean,
   ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     const subaccountId: string = SubaccountTable.subaccountIdToUuid(
       defaultOrder.orderId!.subaccountId!,
     );
@@ -247,7 +258,9 @@ describe('statefulOrderPlacementHandler', () => {
       updatedAt: defaultDateTime.toISO(),
       updatedAtHeight: defaultHeight.toString(),
     });
-    expectTimingStats();
+    if (!useSqlFunction) {
+      expectTimingStats();
+    }
     // TODO[IND-20]: Add tests for vulcan messages
   });
 });
diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts
index da1c3a4782..c47f935af1 100644
--- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts
+++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts
@@ -35,6 +35,7 @@ import { stats, STATS_FUNCTION_NAME } from '@dydxprotocol-indexer/base';
 import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
 import { producer } from '@dydxprotocol-indexer/kafka';
 import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
+import config from '../../../src/config';
 
 describe('statefulOrderRemovalHandler', () => {
   beforeAll(async () => {
@@ -104,7 +105,14 @@ describe('statefulOrderRemovalHandler', () => {
     });
   });
 
-  it('successfully cancels and removes order', async () => {
+  it.each([
+    ['via knex', false],
+    ['via SQL function', true],
+  ])('successfully cancels and removes order (%s)', async (
+    _name: string,
+    useSqlFunction: boolean,
+  ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     await OrderTable.create({
       ...testConstants.defaultOrder,
       clientId: '0',
@@ -121,7 +129,9 @@ describe('statefulOrderRemovalHandler', () => {
       updatedAt: defaultDateTime.toISO(),
       updatedAtHeight: defaultHeight.toString(),
     }));
-    expectTimingStats();
+    if (!useSqlFunction) {
+      expectTimingStats();
+    }
 
     const expectedOffchainUpdate: OffChainUpdateV1 = {
       orderRemove: {
@@ -137,13 +147,20 @@ describe('statefulOrderRemovalHandler', () => {
     });
   });
 
-  it('throws error when attempting to cancel an order that does not exist', async () => {
+  it.each([
+    ['via knex', false],
+    ['via SQL function', true],
+  ])('throws error when attempting to cancel an order that does not exist (%s)', async (
+    _name: string,
+    useSqlFunction: boolean,
+  ) => {
+    config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION = useSqlFunction;
     const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
       defaultStatefulOrderEvent,
     );
 
     await expect(onMessage(kafkaMessage)).rejects.toThrowError(
-      new Error(`Unable to update order status with orderId: ${orderId}`),
+      `Unable to update order status with orderId: ${orderId}`,
     );
   });
 });
diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts
index 359273343c..7be6875a34 100644
--- a/indexer/services/ender/__tests__/scripts/scripts.test.ts
+++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts
@@ -7,7 +7,7 @@ import {
   IndexerTendermintEvent_BlockEvent,
   AssetCreateEventV1,
   SubaccountUpdateEventV1,
-  MarketEventV1,
+  MarketEventV1, IndexerOrder_ConditionType,
 } from '@dydxprotocol-indexer/v4-protos';
 import {
   BUFFER_ENCODING_UTF_8,
@@ -175,6 +175,16 @@ describe('SQL Function Tests', () => {
     expect(result).toEqual(protocolTranslations.protocolOrderTIFToTIF(value));
   });
 
+  it.each([
+    ['LIMIT', IndexerOrder_ConditionType.UNRECOGNIZED],
+    ['LIMIT', IndexerOrder_ConditionType.CONDITION_TYPE_UNSPECIFIED],
+    ['TAKE_PROFIT', IndexerOrder_ConditionType.CONDITION_TYPE_TAKE_PROFIT],
+    ['STOP_LIMIT', IndexerOrder_ConditionType.CONDITION_TYPE_STOP_LOSS],
+  ])('dydx_protocol_condition_type_to_order_type (%s)', async (_name: string, value: IndexerOrder_ConditionType) => {
+    const result = await getSingleRawQueryResultRow(`SELECT dydx_protocol_condition_type_to_order_type('${value}') AS result`);
+    expect(result).toEqual(protocolTranslations.protocolConditionTypeToOrderType(value));
+  });
+
   it.each([
     '0', '1', '-1', '10000000000000000000000000000', '-20000000000000000000000000000',
   ])('dydx_from_serializable_int (%s)', async (value: string) => {
diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts
index 33ea3ddf2f..e129dace57 100644
--- a/indexer/services/ender/src/config.ts
+++ b/indexer/services/ender/src/config.ts
@@ -50,6 +50,9 @@ export const configSchema = {
   USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({
     default: true,
   }),
+  USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({
+    default: true,
+  }),
   USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
     default: true,
   }),
diff --git a/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts b/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts
index b535042a45..2f1ea7dcfd 100644
--- a/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts
+++ b/indexer/services/ender/src/handlers/abstract-stateful-order-handler.ts
@@ -1,11 +1,29 @@
 import { logger } from '@dydxprotocol-indexer/base';
 import {
-  OrderFromDatabase, OrderStatus, OrderTable, OrderUpdateObject, OrderCreateObject, SubaccountTable,
-  OrderSide, OrderType, protocolTranslations,
+  OrderFromDatabase,
+  OrderStatus,
+  OrderTable,
+  OrderUpdateObject,
+  OrderCreateObject,
+  SubaccountTable,
+  OrderSide,
+  OrderType,
+  protocolTranslations,
   PerpetualMarketFromDatabase,
+  storeHelpers,
+  OrderModel,
+  PerpetualMarketModel,
+  SubaccountFromDatabase,
 } from '@dydxprotocol-indexer/postgres';
-import { IndexerOrderId, IndexerOrder, IndexerOrder_Side } from '@dydxprotocol-indexer/v4-protos';
+import SubaccountModel from '@dydxprotocol-indexer/postgres/build/src/models/subaccount-model';
+import {
+  IndexerOrderId,
+  IndexerOrder,
+  IndexerOrder_Side,
+  StatefulOrderEventV1,
+} from '@dydxprotocol-indexer/v4-protos';
 import { DateTime } from 'luxon';
+import * as pg from 'pg';
 
 import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../constants';
 import { getPrice, getSize } from '../lib/helper';
@@ -21,6 +39,37 @@ export abstract class AbstractStatefulOrderHandler<T> extends Handler<T> {
     ];
   }
 
+  protected async handleEventViaSqlFunction():
+  Promise<[OrderFromDatabase,
+    PerpetualMarketFromDatabase,
+    SubaccountFromDatabase | undefined]> {
+    const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
+    const result: pg.QueryResult = await storeHelpers.rawQuery(
+      `SELECT dydx_stateful_order_handler(
+        ${this.block.height},
+        '${this.block.time?.toISOString()}',
+        '${JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary))}'
+      ) AS result;`,
+      { txId: this.txId },
+    ).catch((error: Error) => {
+      logger.error({
+        at: 'AbstractStatefulOrderHandler#handleEventViaSqlFunction',
+        message: 'Failed to handle StatefulOrderEventV1',
+        error,
+      });
+      throw error;
+    });
+
+    return [
+      OrderModel.fromJson(result.rows[0].result.order) as OrderFromDatabase,
+      PerpetualMarketModel.fromJson(
+        result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase,
+      result.rows[0].result.subaccount
+        ? SubaccountModel.fromJson(result.rows[0].result.subaccount) as SubaccountFromDatabase
+        : undefined,
+    ];
+  }
+
   protected async updateOrderStatus(
     orderIdProto: IndexerOrderId,
     status: OrderStatus,
diff --git a/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts b/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts
index fe4d6410c8..ba5acdbde6 100644
--- a/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts
+++ b/indexer/services/ender/src/handlers/stateful-order/conditional-order-placement-handler.ts
@@ -5,7 +5,7 @@ import {
   OrderTable,
   PerpetualMarketFromDatabase,
   perpetualMarketRefresher,
-  protocolTranslations,
+  protocolTranslations, SubaccountFromDatabase,
   SubaccountMessageContents,
 } from '@dydxprotocol-indexer/postgres';
 import {
@@ -14,6 +14,7 @@ import {
   StatefulOrderEventV1,
 } from '@dydxprotocol-indexer/v4-protos';
 
+import config from '../../config';
 import { generateOrderSubaccountMessage } from '../../helpers/kafka-helper';
 import { getTriggerPrice } from '../../lib/helper';
 import { ConsolidatedKafkaEvent } from '../../lib/types';
@@ -32,6 +33,24 @@ export class ConditionalOrderPlacementHandler extends
 
   // eslint-disable-next-line @typescript-eslint/require-await
   public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
+    if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) {
+      return this.handleViaSqlFunction();
+    }
+    return this.handleViaKnex();
+  }
+
+  private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
+    const result:
+    [OrderFromDatabase,
+      PerpetualMarketFromDatabase,
+      SubaccountFromDatabase | undefined] = await this.handleEventViaSqlFunction();
+
+    const subaccountId:
+    IndexerSubaccountId = this.event.conditionalOrderPlacement!.order!.orderId!.subaccountId!;
+    return this.createKafkaEvents(subaccountId, result[0], result[1]);
+  }
+
+  private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
     const order: IndexerOrder = this.event.conditionalOrderPlacement!.order!;
     const subaccountId: IndexerSubaccountId = order.orderId!.subaccountId!;
     const clobPairId: string = order.orderId!.clobPairId.toString();
@@ -58,6 +77,14 @@ export class ConditionalOrderPlacementHandler extends
       this.generateTimingStatsOptions('upsert_order'),
     );
 
+    return this.createKafkaEvents(subaccountId, conditionalOrder, perpetualMarket);
+  }
+
+  private createKafkaEvents(
+    subaccountId: IndexerSubaccountId,
+    conditionalOrder: OrderFromDatabase,
+    perpetualMarket: PerpetualMarketFromDatabase): ConsolidatedKafkaEvent[] {
+
     // Since the order isn't placed on the book, no message is sent to vulcan
     // ender needs to send the websocket message indicating the conditional order was placed
     const message: SubaccountMessageContents = {
diff --git a/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts b/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts
index b193565d69..5bc85e8885 100644
--- a/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts
+++ b/indexer/services/ender/src/handlers/stateful-order/conditional-order-triggered-handler.ts
@@ -6,6 +6,7 @@ import {
   PerpetualMarketFromDatabase,
   orderTranslations,
   perpetualMarketRefresher,
+  SubaccountFromDatabase,
 } from '@dydxprotocol-indexer/postgres';
 import { getOrderIdHash } from '@dydxprotocol-indexer/v4-proto-parser';
 import {
@@ -16,6 +17,7 @@ import {
   StatefulOrderEventV1,
 } from '@dydxprotocol-indexer/v4-protos';
 
+import config from '../../config';
 import { ConsolidatedKafkaEvent } from '../../lib/types';
 import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';
 
@@ -32,6 +34,24 @@ export class ConditionalOrderTriggeredHandler extends
 
   // eslint-disable-next-line @typescript-eslint/require-await
   public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
+    if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) {
+      return this.handleViaSqlFunction();
+    }
+    return this.handleViaKnex();
+  }
+
+  private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
+    const result:
+    [OrderFromDatabase,
+      PerpetualMarketFromDatabase,
+      SubaccountFromDatabase | undefined] = await this.handleEventViaSqlFunction();
+
+    const order: IndexerOrder = orderTranslations.convertToIndexerOrderWithSubaccount(
+      result[0], result[1], result[2]!);
+    return this.createKafkaEvents(order);
+  }
+
+  private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
     const orderIdProto: IndexerOrderId = this.event.conditionalOrderTriggered!.triggeredOrderId!;
     const orderFromDatabase: OrderFromDatabase = await this.runFuncWithTimingStatAndErrorLogging(
       this.updateOrderStatus(orderIdProto, OrderStatus.OPEN),
@@ -56,6 +76,10 @@ export class ConditionalOrderTriggeredHandler extends
       orderFromDatabase,
       perpetualMarket,
     );
+    return this.createKafkaEvents(order);
+  }
+
+  private createKafkaEvents(order: IndexerOrder): ConsolidatedKafkaEvent[] {
     const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({
       orderPlace: {
         order,
@@ -65,7 +89,7 @@ export class ConditionalOrderTriggeredHandler extends
 
     return [
       this.generateConsolidatedVulcanKafkaEvent(
-        getOrderIdHash(orderIdProto),
+        getOrderIdHash(order.orderId!),
         offChainUpdate,
       ),
     ];
diff --git a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts
index f2066ead71..e941dee2f9 100644
--- a/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts
+++ b/indexer/services/ender/src/handlers/stateful-order/stateful-order-placement-handler.ts
@@ -14,6 +14,7 @@ import {
   StatefulOrderEventV1,
 } from '@dydxprotocol-indexer/v4-protos';
 
+import config from '../../config';
 import { ConsolidatedKafkaEvent } from '../../lib/types';
 import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';
 
@@ -34,7 +35,28 @@ export class StatefulOrderPlacementHandler extends
     return this.getParallelizationIdsFromOrderId(orderId);
   }
 
+  // eslint-disable-next-line @typescript-eslint/require-await
   public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
+    if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) {
+      return this.handleViaSqlFunction();
+    }
+    return this.handleViaKnex();
+  }
+
+  private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
+    await this.handleEventViaSqlFunction();
+
+    let order: IndexerOrder;
+    // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
+    if (this.event.orderPlace !== undefined) {
+      order = this.event.orderPlace!.order!;
+    } else {
+      order = this.event.longTermOrderPlacement!.order!;
+    }
+    return this.createKafkaEvents(order);
+  }
+
+  private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
     let order: IndexerOrder;
     // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
     if (this.event.orderPlace !== undefined) {
@@ -60,6 +82,10 @@ export class StatefulOrderPlacementHandler extends
       this.generateTimingStatsOptions('upsert_order'),
     );
 
+    return this.createKafkaEvents(order);
+  }
+
+  private createKafkaEvents(order: IndexerOrder): ConsolidatedKafkaEvent[] {
     const kafakEvents: ConsolidatedKafkaEvent[] = [];
 
     const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({
diff --git a/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts b/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts
index df669f5a34..a835e74cba 100644
--- a/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts
+++ b/indexer/services/ender/src/handlers/stateful-order/stateful-order-removal-handler.ts
@@ -10,6 +10,7 @@ import {
   StatefulOrderEventV1,
 } from '@dydxprotocol-indexer/v4-protos';
 
+import config from '../../config';
 import { ConsolidatedKafkaEvent } from '../../lib/types';
 import { AbstractStatefulOrderHandler } from '../abstract-stateful-order-handler';
 
@@ -23,13 +24,31 @@ export class StatefulOrderRemovalHandler extends
     return this.getParallelizationIdsFromOrderId(orderId);
   }
 
+  // eslint-disable-next-line @typescript-eslint/require-await
   public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
+    if (config.USE_STATEFUL_ORDER_HANDLER_SQL_FUNCTION) {
+      return this.handleViaSqlFunction();
+    }
+    return this.handleViaKnex();
+  }
+
+  private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
+    const orderIdProto: IndexerOrderId = this.event.orderRemoval!.removedOrderId!;
+    await this.handleEventViaSqlFunction();
+    return this.createKafkaEvents(orderIdProto);
+  }
+
+  private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
     const orderIdProto: IndexerOrderId = this.event.orderRemoval!.removedOrderId!;
     await this.runFuncWithTimingStatAndErrorLogging(
       this.updateOrderStatus(orderIdProto, OrderStatus.CANCELED),
       this.generateTimingStatsOptions('cancel_order'),
     );
 
+    return this.createKafkaEvents(orderIdProto);
+  }
+
+  private createKafkaEvents(orderIdProto: IndexerOrderId): ConsolidatedKafkaEvent[] {
     const offChainUpdate: OffChainUpdateV1 = OffChainUpdateV1.fromPartial({
       orderRemove: {
         removedOrderId: orderIdProto,
diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts
index 6193321c19..a32840e76b 100644
--- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts
+++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts
@@ -50,6 +50,8 @@ const scripts: string[] = [
   'dydx_order_fill_handler_per_order.sql',
   'dydx_perpetual_market_handler.sql',
   'dydx_perpetual_position_and_order_side_matching.sql',
+  'dydx_protocol_condition_type_to_order_type.sql',
+  'dydx_stateful_order_handler.sql',
   'dydx_subaccount_update_handler.sql',
   'dydx_transfer_handler.sql',
   'dydx_trim_scale.sql',
diff --git a/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql b/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql
index 300f3f0a92..f9d7571761 100644
--- a/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql
+++ b/indexer/services/ender/src/scripts/dydx_from_protocol_order_side.sql
@@ -5,8 +5,10 @@
 CREATE OR REPLACE FUNCTION dydx_from_protocol_order_side(order_side jsonb) RETURNS text AS $$
 BEGIN
     CASE order_side
-        WHEN '1'::jsonb THEN RETURN 'BUY';
-        ELSE RETURN 'SELL';
-        END CASE;
+    WHEN '1'::jsonb THEN
+        RETURN 'BUY';
+    ELSE
+        RETURN 'SELL';
+    END CASE;
 END;
 $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
diff --git a/indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql b/indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql
new file mode 100644
index 0000000000..8ca8a7bf99
--- /dev/null
+++ b/indexer/services/ender/src/scripts/dydx_protocol_condition_type_to_order_type.sql
@@ -0,0 +1,25 @@
+/**
+ Converts the 'ConditionType' enum from the IndexerOrder protobuf (https://github.com/dydxprotocol/v4-proto/blob/4b721881fdfe99485336e221def03dc5b86eb0a1/dydxprotocol/indexer/protocol/v1/clob.proto#L131)
+ to the 'OrderType' enum in postgres.
+ */
+CREATE OR REPLACE FUNCTION dydx_protocol_condition_type_to_order_type(condition_type jsonb) RETURNS text AS $$
+DECLARE
+    UNRECOGNIZED constant jsonb = '-1'::jsonb;
+    CONDITION_TYPE_UNSPECIFIED constant jsonb = '0'::jsonb;
+    CONDITION_TYPE_STOP_LOSS constant jsonb = '1'::jsonb;
+    CONDITION_TYPE_TAKE_PROFIT constant jsonb = '2'::jsonb;
+BEGIN
+    CASE condition_type
+    WHEN UNRECOGNIZED THEN
+            RETURN 'LIMIT';
+    WHEN CONDITION_TYPE_UNSPECIFIED THEN
+        RETURN 'LIMIT';
+    WHEN CONDITION_TYPE_STOP_LOSS THEN
+        RETURN 'STOP_LIMIT';
+    WHEN CONDITION_TYPE_TAKE_PROFIT THEN
+        RETURN 'TAKE_PROFIT';
+    ELSE
+        RAISE EXCEPTION 'Unexpected ConditionType: %', condition_type;
+    END CASE;
+END;
+$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
diff --git a/indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql b/indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql
new file mode 100644
index 0000000000..275a2a6185
--- /dev/null
+++ b/indexer/services/ender/src/scripts/dydx_stateful_order_handler.sql
@@ -0,0 +1,159 @@
+/**
+  Parameters:
+    - block_height: the height of the block being processing.
+    - block_time: the time of the block being processed.
+    - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
+        converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
+  Returns: JSON object containing fields:
+    - order: The upserted order in order-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/order-model.ts).
+*/
+CREATE OR REPLACE FUNCTION dydx_stateful_order_handler(
+    block_height int, block_time timestamp, event_data jsonb) RETURNS jsonb AS $$
+DECLARE
+    QUOTE_CURRENCY_ATOMIC_RESOLUTION constant numeric = -6;
+
+    order_ jsonb;
+    order_id jsonb;
+    clob_pair_id bigint;
+    subaccount_id uuid;
+    perpetual_market_record perpetual_markets%ROWTYPE;
+    order_record orders%ROWTYPE;
+    subaccount_record subaccounts%ROWTYPE;
+BEGIN
+    /** TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent. */
+    IF event_data->'orderPlace' IS NOT NULL OR event_data->'longTermOrderPlacement' IS NOT NULL OR event_data->'conditionalOrderPlacement' IS NOT NULL THEN
+        order_ = COALESCE(event_data->'orderPlace'->'order', event_data->'longTermOrderPlacement'->'order', event_data->'conditionalOrderPlacement'->'order');
+        clob_pair_id = (order_->'orderId'->'clobPairId')::bigint;
+
+        BEGIN
+            SELECT * INTO STRICT perpetual_market_record FROM perpetual_markets WHERE "clobPairId" = clob_pair_id;
+        EXCEPTION
+            WHEN NO_DATA_FOUND THEN
+                RAISE EXCEPTION 'Unable to find perpetual market with clobPairId: %', clob_pair_id;
+            WHEN TOO_MANY_ROWS THEN
+                /** This should never happen and if it ever were to would indicate that the table has malformed data. */
+                RAISE EXCEPTION 'Found multiple perpetual markets with clobPairId: %', clob_pair_id;
+        END;
+
+        /**
+          Calculate sizes, prices, and fill amounts.
+
+          TODO(IND-238): Extract out calculation of quantums and subticks to their own SQL functions.
+        */
+        order_record."id" = dydx_uuid_from_order_id(order_->'orderId');
+        order_record."subaccountId" = dydx_uuid_from_subaccount_id(order_->'orderId'->'subaccountId');
+        order_record."clientId" = jsonb_extract_path_text(order_, 'orderId', 'clientId')::bigint;
+        order_record."clobPairId" = clob_pair_id;
+        order_record."side" = dydx_from_protocol_order_side(order_->'side');
+        order_record."size" = dydx_trim_scale(dydx_from_jsonlib_long(order_->'quantums') *
+                                              power(10, perpetual_market_record."atomicResolution")::numeric);
+        order_record."totalFilled" = 0;
+        order_record."price" = dydx_trim_scale(dydx_from_jsonlib_long(order_->'subticks') *
+                                               power(10, perpetual_market_record."quantumConversionExponent" +
+                                                         QUOTE_CURRENCY_ATOMIC_RESOLUTION -
+                                                         perpetual_market_record."atomicResolution")::numeric);
+        order_record."timeInForce" = dydx_from_protocol_time_in_force(order_->'timeInForce');
+        order_record."reduceOnly" = (order_->>'reduceOnly')::boolean;
+        order_record."orderFlags" = (order_->'orderId'->'orderFlags')::bigint;
+        order_record."goodTilBlockTime" = to_timestamp((order_->'goodTilBlockTime')::double precision);
+        order_record."clientMetadata" = (order_->'clientMetadata')::bigint;
+        order_record."createdAtHeight" = block_height;
+        order_record."updatedAt" = block_time;
+        order_record."updatedAtHeight" = block_height;
+
+        CASE
+            WHEN event_data->'conditionalOrderPlacement' IS NOT NULL THEN
+                order_record."type" = dydx_protocol_condition_type_to_order_type(order_->'conditionType');
+                order_record."status" = 'UNTRIGGERED';
+                order_record."triggerPrice" = dydx_trim_scale(dydx_from_jsonlib_long(order_->'conditionalOrderTriggerSubticks') *
+                                                              power(10, perpetual_market_record."quantumConversionExponent" +
+                                                                        QUOTE_CURRENCY_ATOMIC_RESOLUTION -
+                                                                        perpetual_market_record."atomicResolution")::numeric);
+            ELSE
+                order_record."type" = 'LIMIT';
+                order_record."status" = 'OPEN';
+        END CASE;
+
+        INSERT INTO orders VALUES (order_record.*) ON CONFLICT ("id") DO
+            UPDATE SET
+                       "subaccountId" = order_record."subaccountId",
+                       "clientId" = order_record."clientId",
+                       "clobPairId" = order_record."clobPairId",
+                       "side" = order_record."side",
+                       "size" = order_record."size",
+                       "totalFilled" = order_record."totalFilled",
+                       "price" = order_record."price",
+                       "timeInForce" = order_record."timeInForce",
+                       "reduceOnly" = order_record."reduceOnly",
+                       "orderFlags" = order_record."orderFlags",
+                       "goodTilBlockTime" = order_record."goodTilBlockTime",
+                       "clientMetadata" = order_record."clientMetadata",
+                       "createdAtHeight" = order_record."createdAtHeight",
+                       "updatedAt" = order_record."updatedAt",
+                       "updatedAtHeight" = order_record."updatedAtHeight",
+                       "type" = order_record."type",
+                       "status" = order_record."status",
+                       "triggerPrice" = order_record."triggerPrice"
+        RETURNING * INTO order_record;
+
+        RETURN jsonb_build_object(
+                'order',
+                dydx_to_jsonb(order_record),
+                'perpetual_market',
+                dydx_to_jsonb(perpetual_market_record)
+            );
+    ELSIF event_data->'conditionalOrderTriggered' IS NOT NULL OR event_data->'orderRemoval' IS NOT NULL THEN
+        CASE
+            WHEN event_data->'conditionalOrderTriggered' IS NOT NULL THEN
+                order_id = event_data->'conditionalOrderTriggered'->'triggeredOrderId';
+                order_record."status" = 'OPEN';
+            ELSE
+                order_id = event_data->'orderRemoval'->'removedOrderId';
+                order_record."status" = 'CANCELED';
+        END CASE;
+
+        clob_pair_id = (order_id->'clobPairId')::bigint;
+        BEGIN
+            SELECT * INTO STRICT perpetual_market_record FROM perpetual_markets WHERE "clobPairId" = clob_pair_id;
+        EXCEPTION
+            WHEN NO_DATA_FOUND THEN
+                RAISE EXCEPTION 'Unable to find perpetual market with clobPairId: %', clob_pair_id;
+            WHEN TOO_MANY_ROWS THEN
+                /** This should never happen and if it ever were to would indicate that the table has malformed data. */
+                RAISE EXCEPTION 'Found multiple perpetual markets with clobPairId: %', clob_pair_id;
+        END;
+
+        subaccount_id = dydx_uuid_from_subaccount_id(order_id->'subaccountId');
+        SELECT * INTO subaccount_record FROM subaccounts WHERE "id" = subaccount_id;
+        IF NOT FOUND THEN
+            RAISE EXCEPTION 'Subaccount for order not found: %', order_;
+        END IF;
+
+        order_record."id" = dydx_uuid_from_order_id(order_id);
+        order_record."updatedAt" = block_time;
+        order_record."updatedAtHeight" = block_height;
+        UPDATE orders
+        SET
+            "status" = order_record."status",
+            "updatedAt" = order_record."updatedAt",
+            "updatedAtHeight" = order_record."updatedAtHeight"
+        WHERE "id" = order_record."id"
+        RETURNING * INTO order_record;
+
+        IF NOT FOUND THEN
+            RAISE EXCEPTION 'Unable to update order status with orderId: %', dydx_uuid_from_order_id(order_id);
+        END IF;
+
+        RETURN jsonb_build_object(
+                'order',
+                dydx_to_jsonb(order_record),
+                'perpetual_market',
+                dydx_to_jsonb(perpetual_market_record),
+                'subaccount',
+                dydx_to_jsonb(subaccount_record)
+            );
+    ELSE
+        RAISE EXCEPTION 'Unkonwn sub-event type %', event_data;
+    END IF;
+END;
+$$ LANGUAGE plpgsql;