From f2587ef635322964f5c7e4a9173e8b37a49e8a70 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik <126621805+lcwik@users.noreply.github.com> Date: Mon, 6 Nov 2023 08:17:28 -0800 Subject: [PATCH] [IND-476] Update ender update perpetual handler to execute updates via a SQL function. (#755) * [IND-476] Update ender update perpetual handler to execute updates via a SQL function. --- .../handlers/update-clob-pair-handler.test.ts | 10 ++- .../handlers/update-perpetual-handler.test.ts | 71 ++++++++++++------- indexer/services/ender/src/config.ts | 3 + .../src/handlers/update-perpetual-handler.ts | 45 +++++++++++- .../helpers/postgres/postgres-functions.ts | 1 + .../scripts/dydx_perpetual_market_handler.sql | 0 .../scripts/dydx_update_clob_pair_handler.sql | 1 - .../scripts/dydx_update_perpetual_handler.sql | 37 ++++++++++ 8 files changed, 139 insertions(+), 29 deletions(-) create mode 100644 indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql create mode 100644 indexer/services/ender/src/scripts/dydx_update_perpetual_handler.sql diff --git a/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts b/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts index 1d5e5b25e3..23b0fbd255 100644 --- a/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts @@ -5,6 +5,7 @@ import { dbHelpers, liquidityTierRefresher, perpetualMarketRefresher, + protocolTranslations, testMocks, } from '@dydxprotocol-indexer/postgres'; import { updateBlockCache } from '../../src/caches/block-cache'; @@ -125,10 +126,15 @@ describe('update-clob-pair-handler', () => { PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( perpetualMarketId, ); - + expect(perpetualMarket).toEqual(expect.objectContaining({ + clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(), + status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status), + quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent, + subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick, + stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), + })); expect(perpetualMarket).toEqual( perpetualMarketRefresher.getPerpetualMarketFromId(perpetualMarketId)); - if (!useSqlFunction) { expectTimingStats(); } diff --git a/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts b/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts index af6f99c904..dcc3103023 100644 --- a/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts @@ -32,6 +32,7 @@ import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; import { onMessage } from '../../src/lib/on-message'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import config from '../../src/config'; describe('update-perpetual-handler', () => { beforeAll(async () => { @@ -90,32 +91,52 @@ describe('update-perpetual-handler', () => { }); }); - it('updates an existing perpetual market', async () => { - const transactionIndex: number = 0; - const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdatePerpetualEvent({ - updatePerpetualEvent: defaultUpdatePerpetualEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'updates an existing perpetual market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_UPDATE_PERPETUAL_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdatePerpetualEvent({ + updatePerpetualEvent: defaultUpdatePerpetualEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - const perpetualMarket: - PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( - defaultUpdatePerpetualEvent.id.toString(), - ); - expect(perpetualMarket).toEqual(expect.objectContaining({ - id: defaultUpdatePerpetualEvent.id.toString(), - ticker: defaultUpdatePerpetualEvent.ticker, - marketId: defaultUpdatePerpetualEvent.marketId, - atomicResolution: defaultUpdatePerpetualEvent.atomicResolution, - liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier, - })); - expectTimingStats(); - expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); - }); + const perpetualMarket: + PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById( + defaultUpdatePerpetualEvent.id.toString(), + ); + expect(perpetualMarket).toEqual(expect.objectContaining({ + id: defaultUpdatePerpetualEvent.id.toString(), + ticker: defaultUpdatePerpetualEvent.ticker, + marketId: defaultUpdatePerpetualEvent.marketId, + atomicResolution: defaultUpdatePerpetualEvent.atomicResolution, + liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier, + })); + expect(perpetualMarket).toEqual( + perpetualMarketRefresher.getPerpetualMarketFromId( + defaultUpdatePerpetualEvent.id.toString())); + if (!useSqlFunction) { + expectTimingStats(); + } + expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); + }); }); function expectTimingStats() { diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index 485611ab0a..d18fb3ee96 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -47,6 +47,9 @@ export const configSchema = { USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_UPDATE_PERPETUAL_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/update-perpetual-handler.ts b/indexer/services/ender/src/handlers/update-perpetual-handler.ts index 2c921f7e02..d7787fff28 100644 --- a/indexer/services/ender/src/handlers/update-perpetual-handler.ts +++ b/indexer/services/ender/src/handlers/update-perpetual-handler.ts @@ -1,10 +1,17 @@ import assert from 'assert'; +import { logger } from '@dydxprotocol-indexer/base'; import { - PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher, + PerpetualMarketFromDatabase, + PerpetualMarketTable, + perpetualMarketRefresher, + storeHelpers, + PerpetualMarketModel, } from '@dydxprotocol-indexer/postgres'; import { UpdatePerpetualEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; +import config from '../config'; import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -18,6 +25,42 @@ export class UpdatePerpetualHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { + if (config.USE_UPDATE_PERPETUAL_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_update_perpetual_handler( + '${JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'UpdatePerpetualHandler#handleViaSqlFunction', + message: 'Failed to handle UpdatePerpetualEventV1', + error, + }); + + throw error; + }); + + const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson( + result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase; + + await perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])), + ), + ]; + } + + private async handleViaKnex(): Promise { const perpetualMarket: PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.updatePerpetual(), diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 3ca32a7e1a..5ebb47045e 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -50,6 +50,7 @@ const scripts: string[] = [ 'dydx_subaccount_update_handler.sql', 'dydx_trim_scale.sql', 'dydx_update_clob_pair_handler.sql', + 'dydx_update_perpetual_handler.sql', 'dydx_uuid.sql', 'dydx_uuid_from_asset_position_parts.sql', 'dydx_uuid_from_fill_event_parts.sql', diff --git a/indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql b/indexer/services/ender/src/scripts/dydx_perpetual_market_handler.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql b/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql index 9bbd5b670e..3ed74ac0cd 100644 --- a/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql +++ b/indexer/services/ender/src/scripts/dydx_update_clob_pair_handler.sql @@ -7,7 +7,6 @@ */ CREATE OR REPLACE FUNCTION dydx_update_clob_pair_handler(event_data jsonb) RETURNS jsonb AS $$ DECLARE - row_count integer; clob_pair_id bigint; perpetual_market_record perpetual_markets%ROWTYPE; BEGIN diff --git a/indexer/services/ender/src/scripts/dydx_update_perpetual_handler.sql b/indexer/services/ender/src/scripts/dydx_update_perpetual_handler.sql new file mode 100644 index 0000000000..c1df39c358 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_update_perpetual_handler.sql @@ -0,0 +1,37 @@ +/** + Parameters: + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_update_perpetual_handler(event_data jsonb) RETURNS jsonb AS $$ +DECLARE + perpetual_market_id bigint; + perpetual_market_record perpetual_markets%ROWTYPE; +BEGIN + perpetual_market_id = (event_data->'id')::bigint; + perpetual_market_record."ticker" = event_data->>'ticker'; + perpetual_market_record."marketId" = (event_data->'marketId')::integer; + perpetual_market_record."atomicResolution" = (event_data->'atomicResolution')::integer; + perpetual_market_record."liquidityTierId" = (event_data->'liquidityTier')::integer; + + UPDATE perpetual_markets + SET + "ticker" = perpetual_market_record."ticker", + "marketId" = perpetual_market_record."marketId", + "atomicResolution" = perpetual_market_record."atomicResolution", + "liquidityTierId" = perpetual_market_record."liquidityTierId" + WHERE "id" = perpetual_market_id + RETURNING * INTO perpetual_market_record; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Could not find perpetual market with corresponding id %', perpetual_market_id; + END IF; + + RETURN jsonb_build_object( + 'perpetual_market', + dydx_to_jsonb(perpetual_market_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file