Skip to content

Commit

Permalink
[IND-467] Push ender market create logic to use a single sql function
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwik committed Nov 2, 2023
1 parent f892a43 commit b0e31dd
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 62 deletions.
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { CandleMessage_Resolution, ClobPairStatus } from '@dydxprotocol-indexer/
import config from './config';
import AssetPositionModel from './models/asset-position-model';
import FillModel from './models/fill-model';
import MarketModel from './models/market-model';
import OrderModel from './models/order-model';
import PerpetualMarketModel from './models/perpetual-market-model';
import PerpetualPositionModel from './models/perpetual-position-model';
Expand Down Expand Up @@ -83,6 +84,7 @@ export const TIME_IN_FORCE_TO_API_TIME_IN_FORCE: Record<TimeInForce, APITimeInFo
export const SQL_TO_JSON_DEFINED_MODELS = [
AssetPositionModel,
FillModel,
MarketModel,
OrderModel,
PerpetualMarketModel,
PerpetualPositionModel,
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export { default as Transaction } from './helpers/transaction';
export { postgresConfigSchema } from './config';
export { default as AssetPositionModel } from './models/asset-position-model';
export { default as FillModel } from './models/fill-model';
export { default as MarketModel } from './models/market-model';
export { default as OrderModel } from './models/order-model';
export { default as PerpetualMarketModel } from './models/perpetual-market-model';
export { default as PerpetualPositionModel } from './models/perpetual-position-model';
Expand Down
7 changes: 7 additions & 0 deletions indexer/packages/postgres/src/loops/market-refresher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ export async function updateMarkets(options?: Options): Promise<void> {
stats.timing(`${config.SERVICE_NAME}.loops.update_markets`, Date.now() - startTime);
}

/**
* Updates the markets map with the specified market.
*/
export function updateMarket(market: MarketFromDatabase): void {
idToMarket[market.id] = market;
}

/**
* Gets the market for a given id.
*/
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/market-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ export default class MarketModel extends Model {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
id: 'integer',
pair: 'string',
exponent: 'integer',
minPriceChangePpm: 'integer',
oraclePrice: 'string',
};
}

id!: number;

pair!: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from '../../helpers/indexer-proto-helpers';
import Long from 'long';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('marketCreateHandler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -86,67 +87,97 @@ describe('marketCreateHandler', () => {
});
});

it('creates new market', async () => {
const transactionIndex: number = 0;
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'creates new market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;

const marketCreate: MarketEventV1 = {
marketId: 3,
marketCreate: {
base: {
pair: 'DYDX-USD',
minPriceChangePpm: 500,
const marketCreate: MarketEventV1 = {
marketId: 3,
marketCreate: {
base: {
pair: 'DYDX-USD',
minPriceChangePpm: 500,
},
exponent: -5,
},
exponent: -5,
},
};

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
};

await onMessage(kafkaMessage);
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [marketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});

const market: MarketFromDatabase = await MarketTable.findById(
marketCreate.marketId,
) as MarketFromDatabase;
await onMessage(kafkaMessage);

expectMarketMatchesEvent(marketCreate as MarketCreateEventMessage, market);
});
const market: MarketFromDatabase = await MarketTable.findById(
marketCreate.marketId,
) as MarketFromDatabase;

expectMarketMatchesEvent(marketCreate as MarketCreateEventMessage, market);
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'errors when attempting to create an existing market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;

it('errors when attempting to create an existing market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [defaultMarketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('Market in MarketCreate already exists'),
);

const kafkaMessage: KafkaMessage = createKafkaMessageFromMarketEvent({
marketEvents: [defaultMarketCreate],
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
// Check that market in database is the old market.
const market: MarketFromDatabase = await MarketTable.findById(
defaultMarketCreate.marketId,
) as MarketFromDatabase;
expect(market.minPriceChangePpm).toEqual(50);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketCreateHandler#logAndThrowParseMessageError',
message: 'Market in MarketCreate already exists',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
await expect(onMessage(kafkaMessage)).rejects.toThrowError(
new ParseMessageError('Market in MarketCreate already exists'),
);

// Check that market in database is the old market.
const market: MarketFromDatabase = await MarketTable.findById(
defaultMarketCreate.marketId,
) as MarketFromDatabase;
expect(market.minPriceChangePpm).toEqual(50);

expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({
at: 'MarketCreateHandler#logAndThrowParseMessageError',
message: 'Market in MarketCreate already exists',
}));
expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({
at: 'onMessage#onMessage',
message: 'Error: Unable to parse message, this must be due to a bug in V4 node',
}));
expect(producerSendMock.mock.calls.length).toEqual(0);
});
});

function expectMarketMatchesEvent(
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/ender/__tests__/scripts/scripts.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ describe('SQL Function Tests', () => {
});

async function getSingleRawQueryResultRow(query: string): Promise<object> {
const queryResult = await storeHelpers.rawQuery(query, {}).catch((error) => {
const queryResult = await storeHelpers.rawQuery(query, {}).catch((error: Error) => {
throw error;
});
return queryResult.rows[0].result;
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = {
USE_LIQUIDATION_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_MARKET_CREATE_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({
default: true,
}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { logger } from '@dydxprotocol-indexer/base';
import { MarketFromDatabase, MarketTable, marketRefresher } from '@dydxprotocol-indexer/postgres';
import {
MarketFromDatabase,
MarketModel,
MarketTable,
marketRefresher,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import { MarketEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../../config';
import { ConsolidatedKafkaEvent, MarketCreateEventMessage } from '../../lib/types';
import { Handler } from '../handler';

Expand All @@ -13,13 +21,20 @@ export class MarketCreateHandler extends Handler<MarketEventV1> {
return [`${this.eventType}_${this.event.marketId}`];
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
logger.info({
at: 'MarketCreateHandler#handle',
message: 'Received MarketEvent with MarketCreate.',
event: this.event,
});
if (config.USE_MARKET_CREATE_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnexQueries();
}

// eslint-disable-next-line @typescript-eslint/require-await
public async handleViaKnexQueries(): Promise<ConsolidatedKafkaEvent[]> {
// MarketHandler already makes sure the event has 'marketCreate' as the oneofKind.
const marketCreate: MarketCreateEventMessage = this.event as MarketCreateEventMessage;

Expand All @@ -39,6 +54,37 @@ export class MarketCreateHandler extends Handler<MarketEventV1> {
return [];
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_market_create_handler(
'${JSON.stringify(MarketEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'MarketCreateHandler#handleViaSqlFunction',
message: 'Failed to handle MarketEventV1',
error,
});

if (error.message.includes('Market in MarketCreate already exists')) {
const marketCreate: MarketCreateEventMessage = this.event as MarketCreateEventMessage;
this.logAndThrowParseMessageError(
'Market in MarketCreate already exists',
{ marketCreate },
);
}

throw error;
});

const market: MarketFromDatabase = MarketModel.fromJson(
result.rows[0].result.market) as MarketFromDatabase;
marketRefresher.updateMarket(market);
return [];
}

private async createMarket(marketCreate: MarketCreateEventMessage): Promise<void> {
await MarketTable.create({
id: marketCreate.marketId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ export class LiquidationHandler extends AbstractOrderFillHandler<OrderFillWithLi
'${USDC_ASSET_ID}'
) AS result;`,
{ txId: this.txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'orderHandler#handleViaSqlFunction',
at: 'liquidationHandler#handleViaSqlFunction',
message: 'Failed to handle OrderFillEventV1',
error,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class OrderHandler extends AbstractOrderFillHandler<OrderFillWithLiquidit
'${isOrderCanceled}'
) AS result;`,
{ txId: this.txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'orderHandler#handleViaSqlFunction',
message: 'Failed to handle OrderFillEventV1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class SubaccountUpdateHandler extends Handler<SubaccountUpdate> {
${this.indexerTendermintEvent.eventIndex},
${transactionIndex}) AS result;`,
{ txId: this.txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'subaccountUpdateHandler#handleViaSqlFunction',
message: 'Failed to handle SubaccountUpdateEventV1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function newScript(name: string, scriptPath: string): PostgresFunction {
const scripts: string[] = [
'create_extension_pg_stat_statements.sql',
'create_extension_uuid_ossp.sql',
'dydx_market_create_handler.sql',
'dydx_event_id_from_parts.sql',
'dydx_event_to_transaction_index.sql',
'dydx_from_jsonlib_long.sql',
Expand Down Expand Up @@ -63,7 +64,7 @@ export async function createPostgresFunctions(): Promise<void> {
await Promise.all([
dbHelpers.createModelToJsonFunctions(),
...scripts.map((script: string) => storeHelpers.rawQuery(newScript(script, `../../scripts/${script}`).script, {})
.catch((error) => {
.catch((error: Error) => {
logger.error({
at: 'dbHelpers#createModelToJsonFunctions',
message: `Failed to create or replace function contained in ${script}`,
Expand Down
2 changes: 1 addition & 1 deletion indexer/services/ender/src/lib/on-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ async function createInitialRowsViaSqlFunction(
await storeHelpers.rawQuery(
queryString,
{ txId },
).catch((error) => {
).catch((error: Error) => {
logger.error({
at: 'on-message#createInitialRowsViaSqlFunction',
message: 'Failed to create initial rows',
Expand Down
25 changes: 25 additions & 0 deletions indexer/services/ender/src/scripts/dydx_market_create_handler.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE OR REPLACE FUNCTION dydx_market_create_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
market_record_id integer;
market_record markets%ROWTYPE;
BEGIN
market_record_id = (event_data->'marketId')::integer;
SELECT * INTO market_record FROM markets WHERE "id" = market_record_id;

IF FOUND THEN
RAISE EXCEPTION 'Market in MarketCreate already exists. Record: %', market_record;
END IF;

market_record."id" = market_record_id;
market_record."pair" = event_data->'marketCreate'->'base'->>'pair';
market_record."exponent" = (event_data->'marketCreate'->'exponent')::integer;
market_record."minPriceChangePpm" = (event_data->'marketCreate'->'base'->'minPriceChangePpm')::integer;

INSERT INTO markets VALUES (market_record.*);

RETURN jsonb_build_object(
'market',
dydx_to_jsonb(market_record)
);
END;
$$ LANGUAGE plpgsql;

0 comments on commit b0e31dd

Please sign in to comment.