Skip to content

Commit

Permalink
[IND-476] Update ender update perpetual handler to execute updates vi…
Browse files Browse the repository at this point in the history
…a a SQL function. (#755)

* [IND-476] Update ender update perpetual handler to execute updates via a SQL function.
  • Loading branch information
lcwik authored Nov 6, 2023
1 parent 65f7f8d commit 78614e2
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
dbHelpers,
liquidityTierRefresher,
perpetualMarketRefresher,
protocolTranslations,
testMocks,
} from '@dydxprotocol-indexer/postgres';
import { updateBlockCache } from '../../src/caches/block-cache';
Expand Down Expand Up @@ -125,10 +126,15 @@ describe('update-clob-pair-handler', () => {
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
perpetualMarketId,
);

expect(perpetualMarket).toEqual(expect.objectContaining({
clobPairId: defaultUpdateClobPairEvent.clobPairId.toString(),
status: protocolTranslations.clobStatusToMarketStatus(defaultUpdateClobPairEvent.status),
quantumConversionExponent: defaultUpdateClobPairEvent.quantumConversionExponent,
subticksPerTick: defaultUpdateClobPairEvent.subticksPerTick,
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(),
}));
expect(perpetualMarket).toEqual(
perpetualMarketRefresher.getPerpetualMarketFromId(perpetualMarketId));

if (!useSqlFunction) {
expectTimingStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import { onMessage } from '../../src/lib/on-message';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import config from '../../src/config';

describe('update-perpetual-handler', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -90,32 +91,52 @@ describe('update-perpetual-handler', () => {
});
});

it('updates an existing perpetual market', async () => {
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdatePerpetualEvent({
updatePerpetualEvent: defaultUpdatePerpetualEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);
it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])(
'updates an existing perpetual market (%s)',
async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_UPDATE_PERPETUAL_HANDLER_SQL_FUNCTION = useSqlFunction;
const transactionIndex: number = 0;
const kafkaMessage: KafkaMessage = createKafkaMessageFromUpdatePerpetualEvent({
updatePerpetualEvent: defaultUpdatePerpetualEvent,
transactionIndex,
height: defaultHeight,
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
defaultUpdatePerpetualEvent.id.toString(),
);
expect(perpetualMarket).toEqual(expect.objectContaining({
id: defaultUpdatePerpetualEvent.id.toString(),
ticker: defaultUpdatePerpetualEvent.ticker,
marketId: defaultUpdatePerpetualEvent.marketId,
atomicResolution: defaultUpdatePerpetualEvent.atomicResolution,
liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier,
}));
expectTimingStats();
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
const perpetualMarket:
PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.findById(
defaultUpdatePerpetualEvent.id.toString(),
);
expect(perpetualMarket).toEqual(expect.objectContaining({
id: defaultUpdatePerpetualEvent.id.toString(),
ticker: defaultUpdatePerpetualEvent.ticker,
marketId: defaultUpdatePerpetualEvent.marketId,
atomicResolution: defaultUpdatePerpetualEvent.atomicResolution,
liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier,
}));
expect(perpetualMarket).toEqual(
perpetualMarketRefresher.getPerpetualMarketFromId(
defaultUpdatePerpetualEvent.id.toString()));
if (!useSqlFunction) {
expectTimingStats();
}
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

function expectTimingStats() {
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ export const configSchema = {
USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_UPDATE_PERPETUAL_HANDLER_SQL_FUNCTION: parseBoolean({
default: true,
}),
USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS: parseBoolean({
default: true,
}),
Expand Down
45 changes: 44 additions & 1 deletion indexer/services/ender/src/handlers/update-perpetual-handler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import assert from 'assert';

import { logger } from '@dydxprotocol-indexer/base';
import {
PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher,
PerpetualMarketFromDatabase,
PerpetualMarketTable,
perpetualMarketRefresher,
storeHelpers,
PerpetualMarketModel,
} from '@dydxprotocol-indexer/postgres';
import { UpdatePerpetualEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import config from '../config';
import { generatePerpetualMarketMessage } from '../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';
Expand All @@ -18,6 +25,42 @@ export class UpdatePerpetualHandler extends Handler<UpdatePerpetualEventV1> {

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(): Promise<ConsolidatedKafkaEvent[]> {
if (config.USE_UPDATE_PERPETUAL_HANDLER_SQL_FUNCTION) {
return this.handleViaSqlFunction();
}
return this.handleViaKnex();
}

private async handleViaSqlFunction(): Promise<ConsolidatedKafkaEvent[]> {
const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes;
const result: pg.QueryResult = await storeHelpers.rawQuery(
`SELECT dydx_update_perpetual_handler(
'${JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary))}'
) AS result;`,
{ txId: this.txId },
).catch((error: Error) => {
logger.error({
at: 'UpdatePerpetualHandler#handleViaSqlFunction',
message: 'Failed to handle UpdatePerpetualEventV1',
error,
});

throw error;
});

const perpetualMarket: PerpetualMarketFromDatabase = PerpetualMarketModel.fromJson(
result.rows[0].result.perpetual_market) as PerpetualMarketFromDatabase;

await perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket);

return [
this.generateConsolidatedMarketKafkaEvent(
JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])),
),
];
}

private async handleViaKnex(): Promise<ConsolidatedKafkaEvent[]> {
const perpetualMarket:
PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging(
this.updatePerpetual(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const scripts: string[] = [
'dydx_subaccount_update_handler.sql',
'dydx_trim_scale.sql',
'dydx_update_clob_pair_handler.sql',
'dydx_update_perpetual_handler.sql',
'dydx_uuid.sql',
'dydx_uuid_from_asset_position_parts.sql',
'dydx_uuid_from_fill_event_parts.sql',
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
CREATE OR REPLACE FUNCTION dydx_update_clob_pair_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
row_count integer;
clob_pair_id bigint;
perpetual_market_record perpetual_markets%ROWTYPE;
BEGIN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
Parameters:
- event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25)
converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify.
Returns: JSON object containing fields:
- perpetual_market: The updated perpetual market in perpetual-market-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/perpetual-market-model.ts).
*/
CREATE OR REPLACE FUNCTION dydx_update_perpetual_handler(event_data jsonb) RETURNS jsonb AS $$
DECLARE
perpetual_market_id bigint;
perpetual_market_record perpetual_markets%ROWTYPE;
BEGIN
perpetual_market_id = (event_data->'id')::bigint;
perpetual_market_record."ticker" = event_data->>'ticker';
perpetual_market_record."marketId" = (event_data->'marketId')::integer;
perpetual_market_record."atomicResolution" = (event_data->'atomicResolution')::integer;
perpetual_market_record."liquidityTierId" = (event_data->'liquidityTier')::integer;

UPDATE perpetual_markets
SET
"ticker" = perpetual_market_record."ticker",
"marketId" = perpetual_market_record."marketId",
"atomicResolution" = perpetual_market_record."atomicResolution",
"liquidityTierId" = perpetual_market_record."liquidityTierId"
WHERE "id" = perpetual_market_id
RETURNING * INTO perpetual_market_record;

IF NOT FOUND THEN
RAISE EXCEPTION 'Could not find perpetual market with corresponding id %', perpetual_market_id;
END IF;

RETURN jsonb_build_object(
'perpetual_market',
dydx_to_jsonb(perpetual_market_record)
);
END;
$$ LANGUAGE plpgsql;

0 comments on commit 78614e2

Please sign in to comment.