diff --git a/indexer/packages/postgres/src/db/migrations/migration_files/20231101173444_update_fills_type_with_deleverage.ts b/indexer/packages/postgres/src/db/migrations/migration_files/20231101173444_update_fills_type_with_deleverage.ts new file mode 100644 index 0000000000..2b2462a7c0 --- /dev/null +++ b/indexer/packages/postgres/src/db/migrations/migration_files/20231101173444_update_fills_type_with_deleverage.ts @@ -0,0 +1,19 @@ +import * as Knex from 'knex'; + +import { formatAlterTableEnumSql } from '../helpers'; + +export async function up(knex: Knex): Promise { + return knex.raw(formatAlterTableEnumSql( + 'fills', + 'type', + ['MARKET', 'LIMIT', 'LIQUIDATED', 'LIQUIDATION', 'DELEVERAGED', 'OFFSETTING'], + )); +} + +export async function down(knex: Knex): Promise { + return knex.raw(formatAlterTableEnumSql( + 'fills', + 'type', + ['MARKET', 'LIMIT', 'LIQUIDATED', 'LIQUIDATION'], + )); +} diff --git a/indexer/packages/postgres/src/types/fill-types.ts b/indexer/packages/postgres/src/types/fill-types.ts index b8006d1cab..a46aa2781d 100644 --- a/indexer/packages/postgres/src/types/fill-types.ts +++ b/indexer/packages/postgres/src/types/fill-types.ts @@ -23,6 +23,12 @@ export enum FillType { LIQUIDATED = 'LIQUIDATED', // LIQUIDATION is for the maker side of the fill, never used for orders LIQUIDATION = 'LIQUIDATION', + // DELEVERAGED is for the subaccount that was deleveraged in a deleveraging event. + // The fill type will be set to taker. + DELEVERAGED = 'DELEVERAGED', + // OFFSETTING is for the offsetting subaccount in a deleveraging event. + // The fill type will be set to maker. + OFFSETTING = 'OFFSETTING', } export interface FillCreateObject { diff --git a/indexer/packages/postgres/src/types/websocket-message-types.ts b/indexer/packages/postgres/src/types/websocket-message-types.ts index 51376e1f06..56b9c3f9b0 100644 --- a/indexer/packages/postgres/src/types/websocket-message-types.ts +++ b/indexer/packages/postgres/src/types/websocket-message-types.ts @@ -176,6 +176,7 @@ export interface TradeContent { side: string, createdAt: IsoString, liquidation: boolean, + deleveraging: boolean, } /* ------- MarketMessageContents ------- */ diff --git a/indexer/services/comlink/public/api-documentation.md b/indexer/services/comlink/public/api-documentation.md index cc2bbc80ed..82bea70654 100644 --- a/indexer/services/comlink/public/api-documentation.md +++ b/indexer/services/comlink/public/api-documentation.md @@ -2287,6 +2287,8 @@ This operation does not require authentication |*anonymous*|LIMIT| |*anonymous*|LIQUIDATED| |*anonymous*|LIQUIDATION| +|*anonymous*|DELEVERAGED| +|*anonymous*|OFFSETTING| ## MarketType diff --git a/indexer/services/comlink/public/swagger.json b/indexer/services/comlink/public/swagger.json index 080629ef8e..324e83501e 100644 --- a/indexer/services/comlink/public/swagger.json +++ b/indexer/services/comlink/public/swagger.json @@ -304,7 +304,9 @@ "MARKET", "LIMIT", "LIQUIDATED", - "LIQUIDATION" + "LIQUIDATION", + "DELEVERAGED", + "OFFSETTING" ], "type": "string" }, diff --git a/indexer/services/comlink/public/websocket-documentation.md b/indexer/services/comlink/public/websocket-documentation.md index fdeee2c9b9..e9c35463a6 100644 --- a/indexer/services/comlink/public/websocket-documentation.md +++ b/indexer/services/comlink/public/websocket-documentation.md @@ -260,6 +260,12 @@ export enum FillType { LIQUIDATED = 'LIQUIDATED', // LIQUIDATION is for the maker side of the fill, never used for orders LIQUIDATION = 'LIQUIDATION', + // DELEVERAGED is for the subaccount that was deleveraged in a deleveraging event. + // The fill type will be set to taker. + DELEVERAGED = 'DELEVERAGED', + // OFFSETTING is for the offsetting subaccount in a deleveraging event. + // The fill type will be set to maker. + OFFSETTING = 'OFFSETTING', } export interface TransferSubaccountMessageContents { diff --git a/indexer/services/ender/__tests__/handlers/order-fills/deleveraging-handler.test.ts b/indexer/services/ender/__tests__/handlers/order-fills/deleveraging-handler.test.ts new file mode 100644 index 0000000000..fbe6950292 --- /dev/null +++ b/indexer/services/ender/__tests__/handlers/order-fills/deleveraging-handler.test.ts @@ -0,0 +1,346 @@ +import { logger } from '@dydxprotocol-indexer/base'; +import { redis } from '@dydxprotocol-indexer/redis'; +import { + assetRefresher, + dbHelpers, + FillTable, + FillType, + Liquidity, + OrderSide, + PerpetualMarketFromDatabase, + perpetualMarketRefresher, + PerpetualPositionCreateObject, + PerpetualPositionStatus, + PerpetualPositionTable, + PositionSide, + SubaccountCreateObject, + SubaccountTable, + TendermintEventTable, + testConstants, + testMocks, +} from '@dydxprotocol-indexer/postgres'; +import { updateBlockCache } from '../../../src/caches/block-cache'; +import { defaultDeleveragingEvent, defaultPreviousHeight } from '../../helpers/constants'; +import { clearCandlesMap } from '../../../src/caches/candle-cache'; +import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import { redisClient } from '../../../src/helpers/redis/redis-controller'; +import { + DeleveragingEventV1, + IndexerSubaccountId, + IndexerTendermintBlock, + IndexerTendermintEvent, + Timestamp, +} from '@dydxprotocol-indexer/v4-protos'; +import { + createIndexerTendermintBlock, + createIndexerTendermintEvent, + createKafkaMessageFromDeleveragingEvent, + expectDefaultTradeKafkaMessageFromTakerFillId, + expectFillInDatabase, + expectFillSubaccountKafkaMessageFromLiquidationEvent, expectPerpetualPosition, +} from '../../helpers/indexer-proto-helpers'; +import { DydxIndexerSubtypes } from '../../../src/lib/types'; +import { + MILLIS_IN_NANOS, + SECONDS_IN_MILLIS, + SUBACCOUNT_ORDER_FILL_EVENT_TYPE, +} from '../../../src/constants'; +import { DateTime } from 'luxon'; +import Long from 'long'; +import { DeleveragingHandler } from '../../../src/handlers/order-fills/deleveraging-handler'; +import { KafkaMessage } from 'kafkajs'; +import { onMessage } from '../../../src/lib/on-message'; +import { producer } from '@dydxprotocol-indexer/kafka'; +import { createdDateTime, createdHeight } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants'; +import Big from 'big.js'; +import { getWeightedAverage } from '../../../src/lib/helper'; + +describe('DeleveragingHandler', () => { + const offsettingSubaccount: SubaccountCreateObject = { + address: defaultDeleveragingEvent.offsetting!.owner, + subaccountNumber: defaultDeleveragingEvent.offsetting!.number, + updatedAt: createdDateTime.toISO(), + updatedAtHeight: createdHeight, + }; + + const deleveragedSubaccount: SubaccountCreateObject = { + address: defaultDeleveragingEvent.liquidated!.owner, + subaccountNumber: defaultDeleveragingEvent.liquidated!.number, + updatedAt: createdDateTime.toISO(), + updatedAtHeight: createdHeight, + }; + + beforeAll(async () => { + await dbHelpers.migrate(); + await createPostgresFunctions(); + }); + + beforeEach(async () => { + await testMocks.seedData(); + await perpetualMarketRefresher.updatePerpetualMarkets(); + await assetRefresher.updateAssets(); + updateBlockCache(defaultPreviousHeight); + }); + + afterEach(async () => { + await dbHelpers.clearData(); + jest.clearAllMocks(); + clearCandlesMap(); + await redis.deleteAllAsync(redisClient); + }); + + afterAll(async () => { + await dbHelpers.teardown(); + jest.resetAllMocks(); + }); + + const defaultHeight: string = '3'; + const defaultDateTime: DateTime = DateTime.utc(2022, 6, 1, 12, 1, 1, 2); + const defaultTime: Timestamp = { + seconds: Long.fromValue(Math.floor(defaultDateTime.toSeconds()), true), + nanos: (defaultDateTime.toMillis() % SECONDS_IN_MILLIS) * MILLIS_IN_NANOS, + }; + const defaultTxHash: string = '0x32343534306431622d306461302d343831322d613730372d3965613162336162'; + const transactionIndex: number = 0; + const eventIndex: number = 0; + + const offsettingPerpetualPosition: PerpetualPositionCreateObject = { + subaccountId: SubaccountTable.subaccountIdToUuid(defaultDeleveragingEvent.offsetting!), + perpetualId: testConstants.defaultPerpetualMarket2.id, + side: PositionSide.LONG, + status: PerpetualPositionStatus.OPEN, + size: '10', + maxSize: '25', + sumOpen: '10', + entryPrice: '15000', + createdAt: DateTime.utc().toISO(), + createdAtHeight: '1', + openEventId: testConstants.defaultTendermintEventId, + lastEventId: testConstants.defaultTendermintEventId, + settledFunding: '200000', + }; + const deleveragedPerpetualPosition: PerpetualPositionCreateObject = { + ...offsettingPerpetualPosition, + subaccountId: SubaccountTable.subaccountIdToUuid(defaultDeleveragingEvent.liquidated!), + }; + + it('getParallelizationIds', () => { + const offsettingSubaccountId: IndexerSubaccountId = defaultDeleveragingEvent.offsetting!; + const deleveragedSubaccountId: IndexerSubaccountId = defaultDeleveragingEvent.liquidated!; + + const indexerTendermintEvent: IndexerTendermintEvent = createIndexerTendermintEvent( + DydxIndexerSubtypes.DELEVERAGING, + DeleveragingEventV1.encode(defaultDeleveragingEvent).finish(), + transactionIndex, + eventIndex, + ); + const block: IndexerTendermintBlock = createIndexerTendermintBlock( + 0, + defaultTime, + [indexerTendermintEvent], + [defaultTxHash], + ); + + const handler: DeleveragingHandler = new DeleveragingHandler( + block, + indexerTendermintEvent, + 0, + defaultDeleveragingEvent, + ); + + const offsettingSubaccountUuid: string = SubaccountTable.subaccountIdToUuid( + offsettingSubaccountId, + ); + const deleveragedSubaccountUuid: string = SubaccountTable.subaccountIdToUuid( + deleveragedSubaccountId, + ); + + const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher + .getPerpetualMarketFromId( + defaultDeleveragingEvent.perpetualId.toString(), + ); + expect(perpetualMarket).toBeDefined(); + + expect(handler.getParallelizationIds()).toEqual([ + `${handler.eventType}_${offsettingSubaccountUuid}_${perpetualMarket!.clobPairId}`, + `${handler.eventType}_${deleveragedSubaccountUuid}_${perpetualMarket!.clobPairId}`, + // To ensure that SubaccountUpdateEvents, OrderFillEvents, and DeleveragingEvents for + // the same subaccount are not processed in parallel + `${SUBACCOUNT_ORDER_FILL_EVENT_TYPE}_${offsettingSubaccountUuid}`, + `${SUBACCOUNT_ORDER_FILL_EVENT_TYPE}_${deleveragedSubaccountUuid}`, + ]); + }); + + it('DeleveragingEvent fails validation', async () => { + const deleveragingEvent: DeleveragingEventV1 = DeleveragingEventV1 + .fromPartial({ // no liquidated subaccount + ...defaultDeleveragingEvent, + liquidated: undefined, + }); + const kafkaMessage: KafkaMessage = createKafkaMessageFromDeleveragingEvent({ + deleveragingEvent, + transactionIndex, + eventIndex, + height: parseInt(defaultHeight, 10), + time: defaultTime, + txHash: defaultTxHash, + }); + const loggerCrit = jest.spyOn(logger, 'crit'); + await expect(onMessage(kafkaMessage)).rejects.toThrowError(); + + expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ + at: 'onMessage#onMessage', + message: 'Error: Unable to parse message, this must be due to a bug in V4 node', + })); + }); + + it('creates fills and updates perpetual positions', async () => { + const kafkaMessage: KafkaMessage = createKafkaMessageFromDeleveragingEvent({ + deleveragingEvent: defaultDeleveragingEvent, + transactionIndex, + eventIndex, + height: parseInt(defaultHeight, 10), + time: defaultTime, + txHash: defaultTxHash, + }); + + // create initial Subaccounts + await Promise.all([ + SubaccountTable.create(offsettingSubaccount), + SubaccountTable.create(deleveragedSubaccount), + ]); + // create initial PerpetualPositions + await Promise.all([ + PerpetualPositionTable.create(offsettingPerpetualPosition), + PerpetualPositionTable.create(deleveragedPerpetualPosition), + ]); + + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); + + const eventId: Buffer = TendermintEventTable.createEventId( + defaultHeight, + transactionIndex, + eventIndex, + ); + + // This size should be in fixed-point notation rather than exponential notation. + const quoteAmount: string = '0.00000000001'; // quote amount is price * fillAmount = 1e3 * 1e-14 = 1e-11 + const totalFilled: string = '0.00000000000001'; // fillAmount in human = 10^4*10^-18=10^-14 + const price: string = '1000'; // 10^9*10^-6=10^3 + const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher + .getPerpetualMarketFromId( + defaultDeleveragingEvent.perpetualId.toString(), + ); + + await Promise.all([ + expectFillInDatabase({ + subaccountId: SubaccountTable.subaccountIdToUuid(defaultDeleveragingEvent.offsetting!), + clientId: '0', + liquidity: Liquidity.MAKER, + size: totalFilled, + price, + quoteAmount, + eventId, + transactionHash: defaultTxHash, + createdAt: defaultDateTime.toISO(), + createdAtHeight: defaultHeight, + type: FillType.OFFSETTING, + clobPairId: perpetualMarket!.clobPairId, + side: OrderSide.BUY, + orderFlags: '0', + clientMetadata: null, + hasOrderId: false, + fee: '0', + }), + expectFillInDatabase({ + subaccountId: SubaccountTable.subaccountIdToUuid(defaultDeleveragingEvent.liquidated!), + clientId: '0', + liquidity: Liquidity.TAKER, + size: totalFilled, + price, + quoteAmount, + eventId, + transactionHash: defaultTxHash, + createdAt: defaultDateTime.toISO(), + createdAtHeight: defaultHeight, + type: FillType.DELEVERAGED, + clobPairId: perpetualMarket!.clobPairId, + side: OrderSide.SELL, + orderFlags: '0', + clientMetadata: null, + hasOrderId: false, + fee: '0', + }), + expectPerpetualPosition( + PerpetualPositionTable.uuid( + offsettingPerpetualPosition.subaccountId, + offsettingPerpetualPosition.openEventId, + ), + { + sumOpen: Big(offsettingPerpetualPosition.size).plus(totalFilled).toFixed(), + entryPrice: getWeightedAverage( + offsettingPerpetualPosition.entryPrice!, + offsettingPerpetualPosition.size, + price, + totalFilled, + ), + }, + ), + expectPerpetualPosition( + PerpetualPositionTable.uuid( + deleveragedPerpetualPosition.subaccountId, + deleveragedPerpetualPosition.openEventId, + ), + { + sumClose: Big(totalFilled).toFixed(), + exitPrice: price, + }, + ), + ]); + + await Promise.all([ + expectFillsAndPositionsSubaccountKafkaMessages( + producerSendMock, + eventId, + true, + ), + expectFillsAndPositionsSubaccountKafkaMessages( + producerSendMock, + eventId, + false, + ), + expectDefaultTradeKafkaMessageFromTakerFillId( + producerSendMock, + eventId, + ), + ]); + }); + + async function expectFillsAndPositionsSubaccountKafkaMessages( + producerSendMock: jest.SpyInstance, + eventId: Buffer, + deleveraged: boolean, + ) { + const subaccountId: IndexerSubaccountId = deleveraged + ? defaultDeleveragingEvent.liquidated! : defaultDeleveragingEvent.offsetting!; + const liquidity: Liquidity = deleveraged ? Liquidity.TAKER : Liquidity.MAKER; + const positionId: string = ( + await PerpetualPositionTable.findOpenPositionForSubaccountPerpetual( + SubaccountTable.subaccountIdToUuid(subaccountId), + testConstants.defaultPerpetualMarket2.id, + ) + )!.id; + + await expectFillSubaccountKafkaMessageFromLiquidationEvent( + producerSendMock, + subaccountId, + FillTable.uuid(eventId, liquidity), + positionId, + defaultHeight, + transactionIndex, + eventIndex, + testConstants.defaultPerpetualMarket2.ticker, + ); + } +}); diff --git a/indexer/services/ender/__tests__/helpers/constants.ts b/indexer/services/ender/__tests__/helpers/constants.ts index 751b52390d..1e4f8f5a2d 100644 --- a/indexer/services/ender/__tests__/helpers/constants.ts +++ b/indexer/services/ender/__tests__/helpers/constants.ts @@ -325,6 +325,7 @@ export const defaultTradeContent: TradeContent = { side: 'BUY', createdAt: 'createdAt', liquidation: true, + deleveraging: false, }; export const defaultTradeMessage: SingleTradeMessage = contentToSingleTradeMessage( defaultTradeContent, diff --git a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts index 576ee0e89e..4f5eadcaf7 100644 --- a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts +++ b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts @@ -48,6 +48,7 @@ import { OffChainUpdateV1, IndexerOrderId, PerpetualMarketCreateEventV1, + DeleveragingEventV1, } from '@dydxprotocol-indexer/v4-protos'; import { Message, ProducerRecord } from 'kafkajs'; import _ from 'lodash'; @@ -57,6 +58,7 @@ import { generateFillSubaccountMessage, generatePerpetualMarketMessage, generatePerpetualPositionsContents, + isDeleveraging, isLiquidation, } from '../../src/helpers/kafka-helper'; import { protoTimestampToDate } from '../../src/lib/helper'; @@ -471,6 +473,41 @@ export function createKafkaMessageFromOrderFillEvent({ return createKafkaMessage(Buffer.from(binaryBlock)); } +export function createKafkaMessageFromDeleveragingEvent({ + deleveragingEvent, + transactionIndex, + eventIndex, + height, + time, + txHash, +}: { + deleveragingEvent: DeleveragingEventV1, + transactionIndex: number, + eventIndex: number, + height: number, + time: Timestamp, + txHash: string, +}) { + const events: IndexerTendermintEvent[] = [ + createIndexerTendermintEvent( + DydxIndexerSubtypes.DELEVERAGING, + Uint8Array.from(DeleveragingEventV1.encode(deleveragingEvent).finish()), + transactionIndex, + eventIndex, + ), + ]; + + const block: IndexerTendermintBlock = createIndexerTendermintBlock( + height, + time, + events, + [txHash], + ); + + const binaryBlock: Uint8Array = Uint8Array.from(IndexerTendermintBlock.encode(block).finish()); + return createKafkaMessage(Buffer.from(binaryBlock)); +} + export function liquidationOrderToOrderSide( liquidationOrder: LiquidationOrderV1, ): OrderSide { @@ -771,6 +808,7 @@ export async function expectDefaultTradeKafkaMessageFromTakerFillId( side: takerFill!.side.toString(), createdAt: takerFill!.createdAt, liquidation: isLiquidation(takerFill!), + deleveraging: isDeleveraging(takerFill!), }, ], }; diff --git a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts index add0830794..f125aed5f4 100644 --- a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts +++ b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts @@ -414,6 +414,7 @@ describe('kafka-publisher', () => { side: 'side', createdAt: 'today', liquidation: false, + deleveraging: false, }; const singleTrade1: SingleTradeMessage = contentToSingleTradeMessage( tradeContent1, @@ -427,6 +428,7 @@ describe('kafka-publisher', () => { side: 'side', createdAt: 'today', liquidation: false, + deleveraging: false, }; const singleTrade2: SingleTradeMessage = contentToSingleTradeMessage( tradeContent2, @@ -441,6 +443,7 @@ describe('kafka-publisher', () => { side: 'side', createdAt: 'today', liquidation: false, + deleveraging: false, }; const singleTrade3: SingleTradeMessage = contentToSingleTradeMessage( tradeContent3, diff --git a/indexer/services/ender/__tests__/lib/on-message.test.ts b/indexer/services/ender/__tests__/lib/on-message.test.ts index 7b29eba4a4..8e6dd5acc9 100644 --- a/indexer/services/ender/__tests__/lib/on-message.test.ts +++ b/indexer/services/ender/__tests__/lib/on-message.test.ts @@ -51,12 +51,12 @@ import { updateBlockCache } from '../../src/caches/block-cache'; import { MarketModifyHandler } from '../../src/handlers/markets/market-modify-handler'; import Long from 'long'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; -import { DeleveragingHandler } from '../../src/handlers/deleveraging-handler'; +import { DeleveragingHandler } from '../../src/handlers/order-fills/deleveraging-handler'; jest.mock('../../src/handlers/subaccount-update-handler'); jest.mock('../../src/handlers/transfer-handler'); jest.mock('../../src/handlers/funding-handler'); -jest.mock('../../src/handlers/deleveraging-handler'); +jest.mock('../../src/handlers/order-fills/deleveraging-handler'); jest.mock('../../src/handlers/markets/market-modify-handler'); describe('on-message', () => { diff --git a/indexer/services/ender/src/constants.ts b/indexer/services/ender/src/constants.ts index 1d090feeb9..bbfbb7e3b7 100644 --- a/indexer/services/ender/src/constants.ts +++ b/indexer/services/ender/src/constants.ts @@ -13,7 +13,3 @@ export const SUBACCOUNT_ORDER_FILL_EVENT_TYPE: string = 'subaccount_order_fill'; // StatefulOrder and OrderFill events for the same order are processed chronologically. export const STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE: string = 'stateful_order_order_fill'; - -// Deleveraging, SubaccountUpdate, and OrderFill events for the same subaccount -// are processed chronologically. -export const DELEVERAGING_EVENT_TYPE: string = 'deleveraging'; diff --git a/indexer/services/ender/src/handlers/deleveraging-handler.ts b/indexer/services/ender/src/handlers/deleveraging-handler.ts deleted file mode 100644 index e5a2755f5c..0000000000 --- a/indexer/services/ender/src/handlers/deleveraging-handler.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { DeleveragingEventV1 } from '@dydxprotocol-indexer/v4-protos'; - -import { ConsolidatedKafkaEvent } from '../lib/types'; -import { Handler } from './handler'; - -export class DeleveragingHandler extends Handler { - eventType: string = 'DeleveragingEvent'; - - public getParallelizationIds(): string[] { - return []; - } - - // eslint-disable-next-line @typescript-eslint/require-await - public async internalHandle(): Promise { - // Implement this - return []; - } -} diff --git a/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts b/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts index 52814bc5bf..0c0c864b67 100644 --- a/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts +++ b/indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts @@ -45,6 +45,7 @@ import { generateFillSubaccountMessage, generateOrderSubaccountMessage, generatePerpetualPositionsContents, + isDeleveraging, isLiquidation, } from '../../helpers/kafka-helper'; import { @@ -428,6 +429,7 @@ export abstract class AbstractOrderFillHandler extends Handler { side: fill.side.toString(), createdAt: fill.createdAt, liquidation: isLiquidation(fill), + deleveraging: isDeleveraging(fill), }, ], }; diff --git a/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts b/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts new file mode 100644 index 0000000000..49408ad6e3 --- /dev/null +++ b/indexer/services/ender/src/handlers/order-fills/deleveraging-handler.ts @@ -0,0 +1,107 @@ +import { logger } from '@dydxprotocol-indexer/base'; +import { + FillFromDatabase, + FillModel, + PerpetualMarketFromDatabase, + PerpetualMarketModel, + perpetualMarketRefresher, + PerpetualPositionFromDatabase, + PerpetualPositionModel, + storeHelpers, + SubaccountTable, +} from '@dydxprotocol-indexer/postgres'; +import { DeleveragingEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; + +import { SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants'; +import { indexerTendermintEventToTransactionIndex } from '../../lib/helper'; +import { ConsolidatedKafkaEvent } from '../../lib/types'; +import { AbstractOrderFillHandler } from './abstract-order-fill-handler'; + +export class DeleveragingHandler extends AbstractOrderFillHandler { + eventType: string = 'DeleveragingEvent'; + + public getParallelizationIds(): string[] { + const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher + .getPerpetualMarketFromId(this.event.perpetualId.toString()); + if (perpetualMarket === undefined) { + logger.error({ + at: 'DeleveragingHandler#internalHandle', + message: 'Unable to find perpetual market', + perpetualId: this.event.perpetualId, + event: this.event, + }); + throw new Error(`Unable to find perpetual market with perpetualId: ${this.event.perpetualId}`); + } + const offsettingSubaccountUuid: string = SubaccountTable + .uuid(this.event.offsetting!.owner, this.event.offsetting!.number); + const deleveragedSubaccountUuid: string = SubaccountTable + .uuid(this.event.liquidated!.owner, this.event.liquidated!.number); + return [ + `${this.eventType}_${offsettingSubaccountUuid}_${perpetualMarket.clobPairId}`, + `${this.eventType}_${deleveragedSubaccountUuid}_${perpetualMarket.clobPairId}`, + // To ensure that SubaccountUpdateEvents, OrderFillEvents, and DeleveragingEvents for the same + // subaccount are not processed in parallel + `${SUBACCOUNT_ORDER_FILL_EVENT_TYPE}_${offsettingSubaccountUuid}`, + `${SUBACCOUNT_ORDER_FILL_EVENT_TYPE}_${deleveragedSubaccountUuid}`, + ]; + } + + // eslint-disable-next-line @typescript-eslint/require-await + public async internalHandle(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const transactionIndex: number = indexerTendermintEventToTransactionIndex( + this.indexerTendermintEvent, + ); + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_deleveraging_handler( + ${this.block.height}, + '${this.block.time?.toISOString()}', + '${JSON.stringify(DeleveragingEventV1.decode(eventDataBinary))}', + ${this.indexerTendermintEvent.eventIndex}, + ${transactionIndex}, + '${this.block.txHashes[transactionIndex]}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'DeleveragingHandler#handleViaSqlFunction', + message: 'Failed to handle DeleveragingEventV1', + error, + }); + throw error; + }); + const liquidatedFill: FillFromDatabase = FillModel.fromJson( + result.rows[0].result.liquidated_fill) as FillFromDatabase; + const offsettingFill: FillFromDatabase = FillModel.fromJson( + result.rows[0].result.offsetting_fill) as FillFromDatabase; + const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson( + result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase; + const liquidatedPerpetualPosition: + PerpetualPositionFromDatabase = PerpetualPositionModel.fromJson( + result.rows[0].result.liquidated_perpetual_position) as PerpetualPositionFromDatabase; + const offsettingPerpetualPosition: + PerpetualPositionFromDatabase = PerpetualPositionModel.fromJson( + result.rows[0].result.offsetting_perpetual_position) as PerpetualPositionFromDatabase; + const kafkaEvents: ConsolidatedKafkaEvent[] = [ + this.generateConsolidatedKafkaEvent( + this.event.liquidated!, + undefined, + liquidatedPerpetualPosition, + liquidatedFill, + perpetualMarket, + ), + this.generateConsolidatedKafkaEvent( + this.event.offsetting!, + undefined, + offsettingPerpetualPosition, + offsettingFill, + perpetualMarket, + ), + this.generateTradeKafkaEventFromTakerOrderFill( + liquidatedFill, + ), + ]; + return kafkaEvents; + } +} diff --git a/indexer/services/ender/src/helpers/kafka-helper.ts b/indexer/services/ender/src/helpers/kafka-helper.ts index cfda119ef4..8e1f1fa579 100644 --- a/indexer/services/ender/src/helpers/kafka-helper.ts +++ b/indexer/services/ender/src/helpers/kafka-helper.ts @@ -278,6 +278,10 @@ export function isLiquidation(fill: FillFromDatabase): boolean { return fill.type === FillType.LIQUIDATION || fill.type === FillType.LIQUIDATED; } +export function isDeleveraging(fill: FillFromDatabase): boolean { + return fill.type === FillType.DELEVERAGED || fill.type === FillType.OFFSETTING; +} + export function generateFillSubaccountMessage( fill: FillFromDatabase, ticker: string, diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index a32840e76b..058238fc15 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -31,6 +31,7 @@ const scripts: string[] = [ 'create_extension_uuid_ossp.sql', 'dydx_asset_create_handler.sql', 'dydx_clob_pair_status_to_market_status.sql', + 'dydx_deleveraging_handler.sql', 'dydx_market_create_handler.sql', 'dydx_market_modify_handler.sql', 'dydx_market_price_update_handler.sql', @@ -57,6 +58,7 @@ const scripts: string[] = [ 'dydx_trim_scale.sql', 'dydx_update_clob_pair_handler.sql', 'dydx_update_perpetual_handler.sql', + 'dydx_update_perpetual_position_aggregate_fields.sql', 'dydx_uuid.sql', 'dydx_uuid_from_asset_position_parts.sql', 'dydx_uuid_from_fill_event_parts.sql', diff --git a/indexer/services/ender/src/scripts/dydx_deleveraging_handler.sql b/indexer/services/ender/src/scripts/dydx_deleveraging_handler.sql new file mode 100644 index 0000000000..421e1232b0 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_deleveraging_handler.sql @@ -0,0 +1,134 @@ +/** + Parameters: + - block_height: the height of the block being processing. + - block_time: the time of the block being processed. + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + - event_index: The 'event_index' of the IndexerTendermintEvent. + - transaction_index: The transaction_index of the IndexerTendermintEvent after the conversion that takes into + account the block_event (https://github.com/dydxprotocol/indexer/blob/cc70982/services/ender/src/lib/helper.ts#L33) + - transaction_hash: The transaction hash corresponding to this event from the IndexerTendermintBlock 'tx_hashes'. + Returns: JSON object containing fields: + - liquidated_fill: The created liquidated fill in fill-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/fill-model.ts). + - offsetting_fill: The created offsetting fill in fill-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/fill-model.ts). + - perpetual_market: The perpetual market for the deleveraging in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts). + - liquidated_perpetual_position: The updated liquidated perpetual position in perpetual-position-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-position-model.ts). + - offsetting_perpetual_position: The updated offsetting perpetual position in perpetual-position-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-position-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_deleveraging_handler( + block_height int, block_time timestamp, event_data jsonb, event_index int, transaction_index int, + transaction_hash text) RETURNS jsonb AS $$ +DECLARE + QUOTE_CURRENCY_ATOMIC_RESOLUTION constant numeric = -6; + FEE constant numeric = 0; + perpetual_id bigint; + clob_pair_id bigint; + liquidated_subaccount_uuid uuid; + offsetting_subaccount_uuid uuid; + perpetual_market_record perpetual_markets%ROWTYPE; + liquidated_fill_record fills%ROWTYPE; + offsetting_fill_record fills%ROWTYPE; + liquidated_perpetual_position_record perpetual_positions%ROWTYPE; + offsetting_perpetual_position_record perpetual_positions%ROWTYPE; + liquidated_side text; + offsetting_side text; + size numeric; + price numeric; + event_id bytea; +BEGIN + perpetual_id = (event_data->'perpetualId')::bigint; + BEGIN + SELECT * INTO STRICT perpetual_market_record FROM perpetual_markets WHERE "id" = perpetual_id; + EXCEPTION + WHEN NO_DATA_FOUND THEN + RAISE EXCEPTION 'Unable to find perpetual market with perpetualId %', perpetual_id; + WHEN TOO_MANY_ROWS THEN + /** This should never happen and if it ever were to would indicate that the table has malformed data. */ + RAISE EXCEPTION 'Found multiple perpetual markets with perpetualId %', perpetual_id; + END; + /** + Calculate sizes, prices, and fill amounts. + + TODO(IND-238): Extract out calculation of quantums and subticks to their own SQL functions. + */ + size = dydx_trim_scale(dydx_from_jsonlib_long(event_data->'fillAmount') * + power(10, perpetual_market_record."atomicResolution")::numeric); + price = dydx_trim_scale(dydx_from_jsonlib_long(event_data->'price') * + power(10, QUOTE_CURRENCY_ATOMIC_RESOLUTION)::numeric); + + liquidated_subaccount_uuid = dydx_uuid_from_subaccount_id(event_data->'liquidated'); + offsetting_subaccount_uuid = dydx_uuid_from_subaccount_id(event_data->'offsetting'); + offsetting_side = CASE WHEN (event_data->'isBuy')::bool THEN 'BUY' ELSE 'SELL' END; + liquidated_side = CASE WHEN offsetting_side = 'BUY' THEN 'SELL' ELSE 'BUY' END; + clob_pair_id = perpetual_market_record."clobPairId"; + + /* Insert the associated fill records for this deleveraging event. */ + event_id = dydx_event_id_from_parts( + block_height, transaction_index, event_index); + INSERT INTO fills + ("id", "subaccountId", "side", "liquidity", "type", "clobPairId", "size", "price", "quoteAmount", + "eventId", "transactionHash", "createdAt", "createdAtHeight", "fee") + VALUES (dydx_uuid_from_fill_event_parts(event_id, 'TAKER'), + liquidated_subaccount_uuid, + liquidated_side, + 'TAKER', + 'DELEVERAGED', + clob_pair_id, + size, + price, + dydx_trim_scale(size * price), + event_id, + transaction_hash, + block_time, + block_height, + FEE) + RETURNING * INTO liquidated_fill_record; + + INSERT INTO fills + ("id", "subaccountId", "side", "liquidity", "type", "clobPairId", "size", "price", "quoteAmount", + "eventId", "transactionHash", "createdAt", "createdAtHeight", "fee") + VALUES (dydx_uuid_from_fill_event_parts(event_id, 'MAKER'), + offsetting_subaccount_uuid, + offsetting_side, + 'MAKER', + 'OFFSETTING', + clob_pair_id, + size, + price, + dydx_trim_scale(size * price), + event_id, + transaction_hash, + block_time, + block_height, + FEE) + RETURNING * INTO offsetting_fill_record; + + /* Upsert the perpetual_position records for this deleveraging event. */ + liquidated_perpetual_position_record = dydx_update_perpetual_position_aggregate_fields( + liquidated_subaccount_uuid, + perpetual_id, + liquidated_side, + size, + price); + offsetting_perpetual_position_record = dydx_update_perpetual_position_aggregate_fields( + offsetting_subaccount_uuid, + perpetual_id, + offsetting_side, + size, + price); + + + RETURN jsonb_build_object( + 'liquidated_fill', + dydx_to_jsonb(liquidated_fill_record), + 'offsetting_fill', + dydx_to_jsonb(offsetting_fill_record), + 'perpetual_market', + dydx_to_jsonb(perpetual_market_record), + 'liquidated_perpetual_position', + dydx_to_jsonb(liquidated_perpetual_position_record), + 'offsetting_perpetual_position', + dydx_to_jsonb(offsetting_perpetual_position_record) + ); +END; +$$ LANGUAGE plpgsql; diff --git a/indexer/services/ender/src/scripts/dydx_order_fill_handler_per_order.sql b/indexer/services/ender/src/scripts/dydx_order_fill_handler_per_order.sql index ddb45e55d4..ccc4413aff 100644 --- a/indexer/services/ender/src/scripts/dydx_order_fill_handler_per_order.sql +++ b/indexer/services/ender/src/scripts/dydx_order_fill_handler_per_order.sql @@ -165,42 +165,12 @@ BEGIN RETURNING * INTO fill_record; /* Upsert the perpetual_position record for this order_fill event. */ - SELECT * INTO perpetual_position_record FROM perpetual_positions WHERE "subaccountId" = subaccount_uuid - AND "perpetualId" = perpetual_market_record."id" - ORDER BY "createdAtHeight" DESC; - IF NOT FOUND THEN - RAISE EXCEPTION 'Unable to find existing perpetual position, subaccountId: %, perpetualId: %', subaccount_uuid, perpetual_market_record."id"; - END IF; - DECLARE - sum_open numeric = perpetual_position_record."sumOpen"; - entry_price numeric = perpetual_position_record."entryPrice"; - sum_close numeric = perpetual_position_record."sumClose"; - exit_price numeric = perpetual_position_record."exitPrice"; - BEGIN - IF dydx_perpetual_position_and_order_side_matching( - perpetual_position_record."side", order_side) THEN - sum_open = dydx_trim_scale(perpetual_position_record."sumOpen" + fill_amount); - entry_price = dydx_get_weighted_average( - perpetual_position_record."entryPrice", perpetual_position_record."sumOpen", - maker_price, fill_amount); - perpetual_position_record."sumOpen" = sum_open; - perpetual_position_record."entryPrice" = entry_price; - ELSE - sum_close = dydx_trim_scale(perpetual_position_record."sumClose" + fill_amount); - exit_price = dydx_get_weighted_average( - perpetual_position_record."exitPrice", perpetual_position_record."sumClose", - maker_price, fill_amount); - perpetual_position_record."sumClose" = sum_close; - perpetual_position_record."exitPrice" = exit_price; - END IF; - UPDATE perpetual_positions - SET - "sumOpen" = sum_open, - "entryPrice" = entry_price, - "sumClose" = sum_close, - "exitPrice" = exit_price - WHERE "id" = perpetual_position_record.id; - END; + perpetual_position_record = dydx_update_perpetual_position_aggregate_fields( + subaccount_uuid, + perpetual_market_record."id", + order_side, + fill_amount, + maker_price); RETURN jsonb_build_object( 'order', diff --git a/indexer/services/ender/src/scripts/dydx_update_perpetual_position_aggregate_fields.sql b/indexer/services/ender/src/scripts/dydx_update_perpetual_position_aggregate_fields.sql new file mode 100644 index 0000000000..3265c73d89 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_update_perpetual_position_aggregate_fields.sql @@ -0,0 +1,71 @@ +/** + Parameters: + - subaccount_uuid: The subaccount uuid of the updated perpetual position. + - perpetual_id: The perpetual id of the updated perpetual position. + - side: The side of the fill. + - size: The size of the fill. + - price: The price of the fill. + Returns: the updated perpetual position. +*/ +CREATE OR REPLACE FUNCTION dydx_update_perpetual_position_aggregate_fields( + subaccount_uuid uuid, + perpetual_id bigint, + side text, + size numeric, + price numeric +) RETURNS perpetual_positions AS $$ +DECLARE + perpetual_position_record RECORD; + sum_open numeric; + entry_price numeric; + sum_close numeric; + exit_price numeric; +BEGIN + -- Retrieve the latest perpetual position record. + -- TODO(IND-485): Order by openEventId instead of createdAtHeight. + SELECT * INTO perpetual_position_record + FROM perpetual_positions + WHERE "subaccountId" = subaccount_uuid + AND "perpetualId" = perpetual_id + ORDER BY "createdAtHeight" DESC + LIMIT 1; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Unable to find existing perpetual position, subaccountId: %, perpetualId: %', subaccount_uuid, perpetual_id; + END IF; + + sum_open = perpetual_position_record."sumOpen"; + entry_price = perpetual_position_record."entryPrice"; + sum_close = perpetual_position_record."sumClose"; + exit_price = perpetual_position_record."exitPrice"; + + -- Update the perpetual position record based on the side + IF dydx_perpetual_position_and_order_side_matching(perpetual_position_record."side", side) THEN + sum_open := dydx_trim_scale(perpetual_position_record."sumOpen" + size); + entry_price := dydx_get_weighted_average( + perpetual_position_record."entryPrice", perpetual_position_record."sumOpen", price, size + ); + perpetual_position_record."sumOpen" = sum_open; + perpetual_position_record."entryPrice" = entry_price; + ELSE + sum_close := dydx_trim_scale(perpetual_position_record."sumClose" + size); + exit_price := dydx_get_weighted_average( + perpetual_position_record."exitPrice", perpetual_position_record."sumClose", price, size + ); + perpetual_position_record."sumClose" = sum_close; + perpetual_position_record."exitPrice" = exit_price; + END IF; + + -- Perform the actual update in the database + UPDATE perpetual_positions + SET + "sumOpen" = sum_open, + "entryPrice" = entry_price, + "sumClose" = sum_close, + "exitPrice" = exit_price + WHERE "id" = perpetual_position_record."id"; + + -- Return the updated perpetual position record + RETURN perpetual_position_record; +END; +$$ LANGUAGE plpgsql; diff --git a/indexer/services/ender/src/validators/deleveraging-validator.ts b/indexer/services/ender/src/validators/deleveraging-validator.ts index 4dde432b07..c722b68159 100644 --- a/indexer/services/ender/src/validators/deleveraging-validator.ts +++ b/indexer/services/ender/src/validators/deleveraging-validator.ts @@ -1,7 +1,7 @@ import { IndexerTendermintEvent, DeleveragingEventV1 } from '@dydxprotocol-indexer/v4-protos'; -import { DeleveragingHandler } from '../handlers/deleveraging-handler'; import { Handler } from '../handlers/handler'; +import { DeleveragingHandler } from '../handlers/order-fills/deleveraging-handler'; import { Validator } from './validator'; export class DeleveragingValidator extends Validator {