diff --git a/indexer/packages/postgres/src/constants.ts b/indexer/packages/postgres/src/constants.ts index 953ea0cfc7..e716af81bb 100644 --- a/indexer/packages/postgres/src/constants.ts +++ b/indexer/packages/postgres/src/constants.ts @@ -3,6 +3,7 @@ import { CandleMessage_Resolution, ClobPairStatus } from '@dydxprotocol-indexer/ import config from './config'; import AssetPositionModel from './models/asset-position-model'; import FillModel from './models/fill-model'; +import MarketModel from './models/market-model'; import OrderModel from './models/order-model'; import PerpetualMarketModel from './models/perpetual-market-model'; import PerpetualPositionModel from './models/perpetual-position-model'; @@ -83,6 +84,7 @@ export const TIME_IN_FORCE_TO_API_TIME_IN_FORCE: Record { stats.timing(`${config.SERVICE_NAME}.loops.update_markets`, Date.now() - startTime); } +/** + * Updates the markets map with the specified market. + */ +export function updateMarket(market: MarketFromDatabase): void { + idToMarket[market.id] = market; +} + /** * Gets the market for a given id. */ diff --git a/indexer/packages/postgres/src/models/market-model.ts b/indexer/packages/postgres/src/models/market-model.ts index d60c196f2b..b4f5eeac57 100644 --- a/indexer/packages/postgres/src/models/market-model.ts +++ b/indexer/packages/postgres/src/models/market-model.ts @@ -51,6 +51,22 @@ export default class MarketModel extends Model { }; } + /** + * A mapping from column name to JSON conversion expected. + * See getSqlConversionForDydxModelTypes for valid conversions. + * + * TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match. + */ + static get sqlToJsonConversions() { + return { + id: 'integer', + pair: 'string', + exponent: 'integer', + minPriceChangePpm: 'integer', + oraclePrice: 'string', + }; + } + id!: number; pair!: string; diff --git a/indexer/services/ender/__tests__/handlers/markets/market-create-handler.test.ts b/indexer/services/ender/__tests__/handlers/markets/market-create-handler.test.ts index 839626cdd5..3495d574a5 100644 --- a/indexer/services/ender/__tests__/handlers/markets/market-create-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/markets/market-create-handler.test.ts @@ -23,6 +23,7 @@ import { } from '../../helpers/indexer-proto-helpers'; import Long from 'long'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('marketCreateHandler', () => { beforeAll(async () => { @@ -86,67 +87,97 @@ describe('marketCreateHandler', () => { }); }); - it('creates new market', async () => { - const transactionIndex: number = 0; + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - const marketCreate: MarketEventV1 = { - marketId: 3, - marketCreate: { - base: { - pair: 'DYDX-USD', - minPriceChangePpm: 500, + const marketCreate: MarketEventV1 = { + marketId: 3, + marketCreate: { + base: { + pair: 'DYDX-USD', + minPriceChangePpm: 500, + }, + exponent: -5, }, - exponent: -5, - }, - }; - - const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ - marketEvents: [marketCreate], - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); + }; - await onMessage(kafkaMessage); + const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ + marketEvents: [marketCreate], + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - const market: MarketFromDatabase = await MarketTable.findById( - marketCreate.marketId, - ) as MarketFromDatabase; + await onMessage(kafkaMessage); - expectMarketMatchesEvent(marketCreate as MarketCreateEventMessage, market); - }); + const market: MarketFromDatabase = await MarketTable.findById( + marketCreate.marketId, + ) as MarketFromDatabase; + + expectMarketMatchesEvent(marketCreate as MarketCreateEventMessage, market); + }); + + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'errors when attempting to create an existing market (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - it('errors when attempting to create an existing market', async () => { - const transactionIndex: number = 0; + const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ + marketEvents: [defaultMarketCreate], + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); + await expect(onMessage(kafkaMessage)).rejects.toThrowError( + new ParseMessageError('Market in MarketCreate already exists'), + ); - const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({ - marketEvents: [defaultMarketCreate], - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + // Check that market in database is the old market. + const market: MarketFromDatabase = await MarketTable.findById( + defaultMarketCreate.marketId, + ) as MarketFromDatabase; + expect(market.minPriceChangePpm).toEqual(50); + + expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ + at: 'MarketCreateHandler#logAndThrowParseMessageError', + message: 'Market in MarketCreate already exists', + })); + expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ + at: 'onMessage#onMessage', + message: 'Error: Unable to parse message, this must be due to a bug in V4 node', + })); + expect(producerSendMock.mock.calls.length).toEqual(0); }); - await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new ParseMessageError('Market in MarketCreate already exists'), - ); - - // Check that market in database is the old market. - const market: MarketFromDatabase = await MarketTable.findById( - defaultMarketCreate.marketId, - ) as MarketFromDatabase; - expect(market.minPriceChangePpm).toEqual(50); - - expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ - at: 'MarketCreateHandler#logAndThrowParseMessageError', - message: 'Market in MarketCreate already exists', - })); - expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ - at: 'onMessage#onMessage', - message: 'Error: Unable to parse message, this must be due to a bug in V4 node', - })); - expect(producerSendMock.mock.calls.length).toEqual(0); - }); }); function expectMarketMatchesEvent( diff --git a/indexer/services/ender/__tests__/lib/sync-handlers.test.ts b/indexer/services/ender/__tests__/lib/sync-handlers.test.ts index 4967a7b28d..f68f6a0f97 100644 --- a/indexer/services/ender/__tests__/lib/sync-handlers.test.ts +++ b/indexer/services/ender/__tests__/lib/sync-handlers.test.ts @@ -30,6 +30,7 @@ import { Transaction, } from '@dydxprotocol-indexer/postgres'; import { KafkaPublisher } from '../../src/lib/kafka-publisher'; +import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; const defaultMarketEventBinary: Uint8Array = Uint8Array.from(MarketEventV1.encode( defaultMarketCreate, @@ -63,6 +64,10 @@ describe('syncHandler', () => { ); describe('addHandler/process', () => { + beforeAll(async () => { + await createPostgresFunctions(); + }); + beforeEach(async () => { await BlockTable.create({ blockHeight: '1', @@ -86,6 +91,10 @@ describe('syncHandler', () => { await dbHelpers.clearData(); }); + afterAll(async () => { + await dbHelpers.teardown(); + }); + it('successfully adds handler', async () => { const synchHandlers: SyncHandlers = new SyncHandlers(); const txId: number = await Transaction.start(); diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts index 17fbe45943..3d33e0c6a7 100644 --- a/indexer/services/ender/__tests__/scripts/scripts.test.ts +++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts @@ -478,7 +478,7 @@ describe('SQL Function Tests', () => { }); async function getSingleRawQueryResultRow(query: string): Promise { - const queryResult = await storeHelpers.rawQuery(query, {}).catch((error) => { + const queryResult = await storeHelpers.rawQuery(query, {}).catch((error: Error) => { throw error; }); return queryResult.rows[0].result; diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index 850515af37..bdf40b85e4 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -29,6 +29,9 @@ export const configSchema = { USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_MARKET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), diff --git a/indexer/services/ender/src/handlers/markets/market-create-handler.ts b/indexer/services/ender/src/handlers/markets/market-create-handler.ts index 4b33da3223..8c56eb1880 100644 --- a/indexer/services/ender/src/handlers/markets/market-create-handler.ts +++ b/indexer/services/ender/src/handlers/markets/market-create-handler.ts @@ -1,7 +1,15 @@ import { logger } from '@dydxprotocol-indexer/base'; -import { MarketFromDatabase, MarketTable, marketRefresher } from '@dydxprotocol-indexer/postgres'; +import { + MarketFromDatabase, + MarketModel, + MarketTable, + marketRefresher, + storeHelpers, +} from '@dydxprotocol-indexer/postgres'; import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; +import config from '../../config'; import { ConsolidatedKafkaEvent, MarketCreateEventMessage } from '../../lib/types'; import { Handler } from '../handler'; @@ -13,13 +21,20 @@ export class MarketCreateHandler extends Handler { return [`${this.eventType}_${this.event.marketId}`]; } - // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { logger.info({ at: 'MarketCreateHandler#handle', message: 'Received MarketEvent with MarketCreate.', event: this.event, }); + if (config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnexQueries(); + } + + // eslint-disable-next-line @typescript-eslint/require-await + public async handleViaKnexQueries(): Promise { // MarketHandler already makes sure the event has 'marketCreate' as the oneofKind. const marketCreate: MarketCreateEventMessage = this.event as MarketCreateEventMessage; @@ -39,6 +54,37 @@ export class MarketCreateHandler extends Handler { return []; } + private async handleViaSqlFunction(): Promise { + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_market_create_handler( + '${JSON.stringify(MarketEventV1.decode(eventDataBinary))}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'MarketCreateHandler#handleViaSqlFunction', + message: 'Failed to handle MarketEventV1', + error, + }); + + if (error.message.includes('Market in MarketCreate already exists')) { + const marketCreate: MarketCreateEventMessage = this.event as MarketCreateEventMessage; + this.logAndThrowParseMessageError( + 'Market in MarketCreate already exists', + { marketCreate }, + ); + } + + throw error; + }); + + const market: MarketFromDatabase = MarketModel.fromJson( + result.rows[0].result.market) as MarketFromDatabase; + marketRefresher.updateMarket(market); + return []; + } + private async createMarket(marketCreate: MarketCreateEventMessage): Promise { await MarketTable.create({ id: marketCreate.marketId, diff --git a/indexer/services/ender/src/handlers/order-fills/liquidation-handler.ts b/indexer/services/ender/src/handlers/order-fills/liquidation-handler.ts index 5c6c4a2354..cf4ea31a8b 100644 --- a/indexer/services/ender/src/handlers/order-fills/liquidation-handler.ts +++ b/indexer/services/ender/src/handlers/order-fills/liquidation-handler.ts @@ -113,9 +113,9 @@ export class LiquidationHandler extends AbstractOrderFillHandler { + ).catch((error: Error) => { logger.error({ - at: 'orderHandler#handleViaSqlFunction', + at: 'liquidationHandler#handleViaSqlFunction', message: 'Failed to handle OrderFillEventV1', error, }); diff --git a/indexer/services/ender/src/handlers/order-fills/order-handler.ts b/indexer/services/ender/src/handlers/order-fills/order-handler.ts index edd649ee90..7db67fbc22 100644 --- a/indexer/services/ender/src/handlers/order-fills/order-handler.ts +++ b/indexer/services/ender/src/handlers/order-fills/order-handler.ts @@ -93,7 +93,7 @@ export class OrderHandler extends AbstractOrderFillHandler { + ).catch((error: Error) => { logger.error({ at: 'orderHandler#handleViaSqlFunction', message: 'Failed to handle OrderFillEventV1', diff --git a/indexer/services/ender/src/handlers/subaccount-update-handler.ts b/indexer/services/ender/src/handlers/subaccount-update-handler.ts index 7cb13406b1..279c3546cf 100644 --- a/indexer/services/ender/src/handlers/subaccount-update-handler.ts +++ b/indexer/services/ender/src/handlers/subaccount-update-handler.ts @@ -75,7 +75,7 @@ export class SubaccountUpdateHandler extends Handler { ${this.indexerTendermintEvent.eventIndex}, ${transactionIndex}) AS result;`, { txId: this.txId }, - ).catch((error) => { + ).catch((error: Error) => { logger.error({ at: 'subaccountUpdateHandler#handleViaSqlFunction', message: 'Failed to handle SubaccountUpdateEventV1', diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index b8e4b43b54..79d61ab776 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -29,6 +29,7 @@ function newScript(name: string, scriptPath: string): PostgresFunction { const scripts: string[] = [ 'create_extension_pg_stat_statements.sql', 'create_extension_uuid_ossp.sql', + 'dydx_market_create_handler.sql', 'dydx_event_id_from_parts.sql', 'dydx_event_to_transaction_index.sql', 'dydx_from_jsonlib_long.sql', @@ -63,7 +64,7 @@ export async function createPostgresFunctions(): Promise { await Promise.all([ dbHelpers.createModelToJsonFunctions(), ...scripts.map((script: string) => storeHelpers.rawQuery(newScript(script, `../../scripts/${script}`).script, {}) - .catch((error) => { + .catch((error: Error) => { logger.error({ at: 'dbHelpers#createModelToJsonFunctions', message: `Failed to create or replace function contained in ${script}`, diff --git a/indexer/services/ender/src/lib/on-message.ts b/indexer/services/ender/src/lib/on-message.ts index bb49082581..5b9722b0f1 100644 --- a/indexer/services/ender/src/lib/on-message.ts +++ b/indexer/services/ender/src/lib/on-message.ts @@ -277,7 +277,7 @@ async function createInitialRowsViaSqlFunction( await storeHelpers.rawQuery( queryString, { txId }, - ).catch((error) => { + ).catch((error: Error) => { logger.error({ at: 'on-message#createInitialRowsViaSqlFunction', message: 'Failed to create initial rows', diff --git a/indexer/services/ender/src/scripts/dydx_market_create_handler.sql b/indexer/services/ender/src/scripts/dydx_market_create_handler.sql new file mode 100644 index 0000000000..db130c4ec5 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_market_create_handler.sql @@ -0,0 +1,32 @@ +/** + Parameters: + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + Returns: JSON object containing fields: + - market: The created market in market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/market-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_market_create_handler(event_data jsonb) RETURNS jsonb AS $$ +DECLARE + market_record_id integer; + market_record markets%ROWTYPE; +BEGIN + market_record_id = (event_data->'marketId')::integer; + SELECT * INTO market_record FROM markets WHERE "id" = market_record_id; + + IF FOUND THEN + RAISE EXCEPTION 'Market in MarketCreate already exists. Record: %', market_record; + END IF; + + market_record."id" = market_record_id; + market_record."pair" = event_data->'marketCreate'->'base'->>'pair'; + market_record."exponent" = (event_data->'marketCreate'->'exponent')::integer; + market_record."minPriceChangePpm" = (event_data->'marketCreate'->'base'->'minPriceChangePpm')::integer; + + INSERT INTO markets VALUES (market_record.*); + + RETURN jsonb_build_object( + 'market', + dydx_to_jsonb(market_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file