From ee0b9935a11e4ba9a5eb40cc112a2fafa599fdd8 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 12:46:15 -0400 Subject: [PATCH 1/9] script --- indexer/pnpm-lock.yaml | 8 + indexer/services/scripts/.env | 2 + indexer/services/scripts/package.json | 6 +- indexer/services/scripts/src/config.ts | 11 ++ indexer/services/scripts/src/print-block.ts | 162 ++++++++++++++++++++ 5 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 indexer/services/scripts/src/config.ts create mode 100644 indexer/services/scripts/src/print-block.ts diff --git a/indexer/pnpm-lock.yaml b/indexer/pnpm-lock.yaml index 58bfa8ad93..1736e463a7 100644 --- a/indexer/pnpm-lock.yaml +++ b/indexer/pnpm-lock.yaml @@ -644,13 +644,17 @@ importers: specifiers: '@dydxprotocol-indexer/base': workspace:^0.0.1 '@dydxprotocol-indexer/dev': workspace:^0.0.1 + '@dydxprotocol-indexer/kafka': workspace:^0.0.1 '@dydxprotocol-indexer/postgres': workspace:^0.0.1 + '@dydxprotocol-indexer/v4-proto-parser': workspace:^0.0.1 + '@dydxprotocol-indexer/v4-protos': workspace:^0.0.1 '@types/jest': ^28.1.4 '@types/node': ^18.0.3 '@types/yargs': ^16.0.0 big.js: ^6.0.2 dotenv-flow: ^3.2.0 jest: ^28.1.2 + kafkajs: ^2.1.0 lodash: ^4.17.21 long: ^5.2.1 ts-node: ^10.8.2 @@ -659,9 +663,13 @@ importers: yargs: ^13.3.0 dependencies: '@dydxprotocol-indexer/base': link:../../packages/base + '@dydxprotocol-indexer/kafka': link:../../packages/kafka '@dydxprotocol-indexer/postgres': link:../../packages/postgres + '@dydxprotocol-indexer/v4-proto-parser': link:../../packages/v4-proto-parser + '@dydxprotocol-indexer/v4-protos': link:../../packages/v4-protos big.js: 6.2.1 dotenv-flow: 3.2.0 + kafkajs: 2.2.3 lodash: 4.17.21 long: 5.2.1 yargs: 13.3.2 diff --git a/indexer/services/scripts/.env b/indexer/services/scripts/.env index 4ac5190a99..c9e44cbf1d 100644 --- a/indexer/services/scripts/.env +++ b/indexer/services/scripts/.env @@ -1 +1,3 @@ SERVICE_NAME=scripts + +KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS=true diff --git a/indexer/services/scripts/package.json b/indexer/services/scripts/package.json index 5e13c2e0ae..76cf25856a 100644 --- a/indexer/services/scripts/package.json +++ b/indexer/services/scripts/package.json @@ -19,11 +19,15 @@ "dependencies": { "@dydxprotocol-indexer/base": "workspace:^0.0.1", "@dydxprotocol-indexer/postgres": "workspace:^0.0.1", + "@dydxprotocol-indexer/kafka": "workspace:^0.0.1", + "@dydxprotocol-indexer/v4-proto-parser": "workspace:^0.0.1", + "@dydxprotocol-indexer/v4-protos": "workspace:^0.0.1", "dotenv-flow": "^3.2.0", "long": "^5.2.1", "big.js": "^6.0.2", "lodash": "^4.17.21", - "yargs": "^13.3.0" + "yargs": "^13.3.0", + "kafkajs": "^2.1.0" }, "devDependencies": { "@dydxprotocol-indexer/dev": "workspace:^0.0.1", diff --git a/indexer/services/scripts/src/config.ts b/indexer/services/scripts/src/config.ts new file mode 100644 index 0000000000..8369f10af4 --- /dev/null +++ b/indexer/services/scripts/src/config.ts @@ -0,0 +1,11 @@ +import { baseConfigSchema, parseSchema } from '@dydxprotocol-indexer/base'; +import { kafkaConfigSchema } from '@dydxprotocol-indexer/kafka'; +import { postgresConfigSchema } from '@dydxprotocol-indexer/postgres'; + +export const configSchema = { + ...baseConfigSchema, + ...postgresConfigSchema, + ...kafkaConfigSchema, +}; + +export default parseSchema(configSchema); diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts new file mode 100644 index 0000000000..eb1c675363 --- /dev/null +++ b/indexer/services/scripts/src/print-block.ts @@ -0,0 +1,162 @@ +import { logger } from '@dydxprotocol-indexer/base'; +import { + addOnMessageFunction, + consumer, + producer, + startConsumer, + stopConsumer, + TO_ENDER_TOPIC, +} from '@dydxprotocol-indexer/kafka'; +import { IndexerTendermintBlock } from '@dydxprotocol-indexer/v4-protos'; +import { KafkaMessage } from 'kafkajs'; +import yargs from 'yargs'; + +import config from './config'; +import { runAsyncScript } from './helpers/util'; + +/** + * Creates an IndexerTendermintBlock from a KafkaMessage. + * Throws an error if there's an issue. + */ +function getIndexerTendermintBlock( + message: KafkaMessage, +): IndexerTendermintBlock | undefined { + if (!message || !message.value || !message.timestamp) { + throw Error('Empty message'); + } + const messageValueBinary: Uint8Array = new Uint8Array(message.value); + logger.info({ + at: 'onMessage#getIndexerTendermintBlock', + message: 'Received message', + offset: message.offset, + }); + + const block: IndexerTendermintBlock = IndexerTendermintBlock.decode( + messageValueBinary, + ); + logger.info({ + at: 'onMessage#getIndexerTendermintBlock', + message: 'Parsed message', + offset: message.offset, + height: block.height, + block, + }); + + return block; +} + +export function seek(offset: bigint): void { + logger.info({ + at: 'consumer#seek', + message: 'Seeking...', + offset: offset.toString(), + }); + + consumer.seek({ + topic: TO_ENDER_TOPIC, + partition: 0, + offset: offset.toString(), + }); + + logger.info({ + at: 'consumer#seek', + message: 'Seeked.', + offset: offset.toString(), + }); +} + +export async function connect(height: number): Promise { + await Promise.all([ + consumer.connect(), + producer.connect(), + ]); + + await consumer.subscribe({ + topic: TO_ENDER_TOPIC, + // Need to set fromBeginning to true, so when ender restarts, it will consume all messages + // rather than ignoring the messages in queue that were produced before ender was started. + fromBeginning: true, + }); + + addOnMessageFunction((_topic: string, message: KafkaMessage): Promise => { + return printMessageAtHeight(message, height); + }); + + logger.info({ + at: 'consumers#connect', + message: 'Connected to Kafka', + }); +} + +export async function printMessageAtHeight( + currentMessage: KafkaMessage, + targetHeight: number, +): Promise { + const indexerTendermintBlock: IndexerTendermintBlock | undefined = getIndexerTendermintBlock( + currentMessage, + ); + if (indexerTendermintBlock === undefined) { + return; + } + + const currentBlockHeight: number = parseInt(indexerTendermintBlock.height.toString(), 10); + console.log(`Current block height: ${currentBlockHeight}`); + if (currentBlockHeight < targetHeight) { + const offsetToSeek = targetHeight - currentBlockHeight; + console.log(`Seeking ${offsetToSeek} blocks ahead`); + const desiredMessage = await seek(BigInt(offsetToSeek)); + console.log(JSON.stringify(desiredMessage)); + } else if (currentBlockHeight === targetHeight) { + console.log(JSON.stringify(currentMessage)); + } else { + throw Error(`Current block height ${currentBlockHeight} is greater than target height ${targetHeight}`); + } +} + +async function startKafka(height: number): Promise { + logger.info({ + at: 'index#start', + message: `Starting in env ${config.NODE_ENV}`, + }); + + await connect(height); + await startConsumer(); + + logger.info({ + at: 'index#start', + message: 'Successfully started', + }); +} + +process.on('SIGTERM', async () => { + logger.info({ + at: 'index#SIGTERM', + message: 'Received SIGTERM, shutting down', + }); + await stopConsumer(); +}); + +async function start(height: number): Promise { + logger.info({ + at: 'index#start', + message: `Connecting to kafka brokers: ${config.KAFKA_BROKER_URLS}`, + }); + await startKafka(height); + logger.info({ + at: 'index#start', + message: `Successfully connected to kafka brokers: ${config.KAFKA_BROKER_URLS}`, + }); +} + +const args = yargs.options({ + height: { + type: 'number', + alias: 'h', + description: 'Height to print message for', + required: true, + }, +}).argv; + +runAsyncScript(async () => { + await start(args.height); +}); From 080659d75280baad7628344ff5241e968c542167 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:07:39 -0400 Subject: [PATCH 2/9] test --- indexer/services/scripts/src/print-block.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index eb1c675363..be74adb971 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -82,6 +82,10 @@ export async function connect(height: number): Promise { return printMessageAtHeight(message, height); }); + logger.info({ + at: 'consumer#connect', + message: 'Added onMessage function', + }); logger.info({ at: 'consumers#connect', message: 'Connected to Kafka', @@ -92,6 +96,10 @@ export async function printMessageAtHeight( currentMessage: KafkaMessage, targetHeight: number, ): Promise { + logger.info({ + at: 'consumer#printMessageAtHeight', + message: 'Received message', + }); const indexerTendermintBlock: IndexerTendermintBlock | undefined = getIndexerTendermintBlock( currentMessage, ); From 05b4c380049a8bb76a09ab85659bcb168dd3a9a2 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:09:28 -0400 Subject: [PATCH 3/9] test --- indexer/services/scripts/src/print-block.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index be74adb971..911506cfa4 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -1,4 +1,4 @@ -import { logger } from '@dydxprotocol-indexer/base'; +import { logger, wrapBackgroundTask } from '@dydxprotocol-indexer/base'; import { addOnMessageFunction, consumer, @@ -165,6 +165,4 @@ const args = yargs.options({ }, }).argv; -runAsyncScript(async () => { - await start(args.height); -}); +wrapBackgroundTask(start(args.height), true, 'main'); From 7fd798c30a931601a5a4c6192c26d0192bc33c0f Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:14:56 -0400 Subject: [PATCH 4/9] seek fix --- indexer/services/scripts/src/print-block.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index 911506cfa4..22e29ce88d 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -110,8 +110,8 @@ export async function printMessageAtHeight( const currentBlockHeight: number = parseInt(indexerTendermintBlock.height.toString(), 10); console.log(`Current block height: ${currentBlockHeight}`); if (currentBlockHeight < targetHeight) { - const offsetToSeek = targetHeight - currentBlockHeight; - console.log(`Seeking ${offsetToSeek} blocks ahead`); + const offsetToSeek = BigInt(targetHeight - currentBlockHeight) + currentMessage.offset; + console.log(`Seeking to offset: ${offsetToSeek}`); const desiredMessage = await seek(BigInt(offsetToSeek)); console.log(JSON.stringify(desiredMessage)); } else if (currentBlockHeight === targetHeight) { From 1ab360cd56ef428ca63bf301a6e37b516efa873f Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:38:59 -0400 Subject: [PATCH 5/9] latest --- .../scripts/src/helpers/block-helpers.ts | 113 ++++++++++++++++++ indexer/services/scripts/src/helpers/types.ts | 9 ++ indexer/services/scripts/src/print-block.ts | 70 ++++++----- 3 files changed, 162 insertions(+), 30 deletions(-) create mode 100644 indexer/services/scripts/src/helpers/block-helpers.ts create mode 100644 indexer/services/scripts/src/helpers/types.ts diff --git a/indexer/services/scripts/src/helpers/block-helpers.ts b/indexer/services/scripts/src/helpers/block-helpers.ts new file mode 100644 index 0000000000..1b1e0ea414 --- /dev/null +++ b/indexer/services/scripts/src/helpers/block-helpers.ts @@ -0,0 +1,113 @@ +import { logger } from '@dydxprotocol-indexer/base'; +import { + AssetCreateEventV1, + FundingEventV1, + IndexerTendermintEvent, + LiquidityTierUpsertEventV1, + MarketEventV1, + OrderFillEventV1, + PerpetualMarketCreateEventV1, + StatefulOrderEventV1, + SubaccountUpdateEventV1, + TransferEventV1, + UpdateClobPairEventV1, + UpdatePerpetualEventV1, +} from '@dydxprotocol-indexer/v4-protos'; + +import { AnnotatedIndexerTendermintEvent } from './types'; + +export enum DydxIndexerSubtypes { + ORDER_FILL = 'order_fill', + SUBACCOUNT_UPDATE = 'subaccount_update', + TRANSFER = 'transfer', + MARKET = 'market', + STATEFUL_ORDER = 'stateful_order', + FUNDING = 'funding_values', + ASSET = 'asset', + PERPETUAL_MARKET = 'perpetual_market', + LIQUIDITY_TIER = 'liquidity_tier', + UPDATE_PERPETUAL = 'update_perpetual', + UPDATE_CLOB_PAIR = 'update_clob_pair', +} + +export function annotateIndexerTendermintEvent( + event: IndexerTendermintEvent, +): AnnotatedIndexerTendermintEvent | undefined { + const eventDataBinary: Uint8Array = event.dataBytes; + switch (event.subtype) { + case (DydxIndexerSubtypes.ORDER_FILL.toString()): { + return { + ...event, + data: JSON.stringify(OrderFillEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()): { + return { + ...event, + data: JSON.stringify(SubaccountUpdateEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.TRANSFER.toString()): { + return { + ...event, + data: JSON.stringify(TransferEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.MARKET.toString()): { + return { + ...event, + data: JSON.stringify(MarketEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.STATEFUL_ORDER.toString()): { + return { + ...event, + data: JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.FUNDING.toString()): { + return { + ...event, + data: JSON.stringify(FundingEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.ASSET.toString()): { + return { + ...event, + data: JSON.stringify(AssetCreateEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.PERPETUAL_MARKET.toString()): { + return { + ...event, + data: JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.LIQUIDITY_TIER.toString()): { + return { + ...event, + data: JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): { + return { + ...event, + data: JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary)), + }; + } + case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): { + return { + ...event, + data: JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary)), + }; + } + default: { + const message: string = `Unable to parse event subtype: ${event.subtype}`; + logger.error({ + at: 'block-helpers#annotateIndexerTendermintEvent', + message, + }); + return undefined; + } + } +} diff --git a/indexer/services/scripts/src/helpers/types.ts b/indexer/services/scripts/src/helpers/types.ts new file mode 100644 index 0000000000..f728b82454 --- /dev/null +++ b/indexer/services/scripts/src/helpers/types.ts @@ -0,0 +1,9 @@ +import { IndexerTendermintBlock, IndexerTendermintEvent } from '@dydxprotocol-indexer/v4-protos'; + +export interface AnnotatedIndexerTendermintEvent extends IndexerTendermintEvent { + data: string; +} + +export interface AnnotatedIndexerTendermintBlock extends IndexerTendermintBlock { + annotatedEvents: AnnotatedIndexerTendermintEvent[]; +} diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index 22e29ce88d..0c550c11d3 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -7,12 +7,14 @@ import { stopConsumer, TO_ENDER_TOPIC, } from '@dydxprotocol-indexer/kafka'; -import { IndexerTendermintBlock } from '@dydxprotocol-indexer/v4-protos'; +import { IndexerTendermintBlock, IndexerTendermintEvent } from '@dydxprotocol-indexer/v4-protos'; import { KafkaMessage } from 'kafkajs'; +import _ from 'lodash'; import yargs from 'yargs'; import config from './config'; -import { runAsyncScript } from './helpers/util'; +import { annotateIndexerTendermintEvent } from './helpers/block-helpers'; +import { AnnotatedIndexerTendermintBlock, AnnotatedIndexerTendermintEvent } from './helpers/types'; /** * Creates an IndexerTendermintBlock from a KafkaMessage. @@ -25,22 +27,10 @@ function getIndexerTendermintBlock( throw Error('Empty message'); } const messageValueBinary: Uint8Array = new Uint8Array(message.value); - logger.info({ - at: 'onMessage#getIndexerTendermintBlock', - message: 'Received message', - offset: message.offset, - }); const block: IndexerTendermintBlock = IndexerTendermintBlock.decode( messageValueBinary, ); - logger.info({ - at: 'onMessage#getIndexerTendermintBlock', - message: 'Parsed message', - offset: message.offset, - height: block.height, - block, - }); return block; } @@ -82,10 +72,6 @@ export async function connect(height: number): Promise { return printMessageAtHeight(message, height); }); - logger.info({ - at: 'consumer#connect', - message: 'Added onMessage function', - }); logger.info({ at: 'consumers#connect', message: 'Connected to Kafka', @@ -96,10 +82,6 @@ export async function printMessageAtHeight( currentMessage: KafkaMessage, targetHeight: number, ): Promise { - logger.info({ - at: 'consumer#printMessageAtHeight', - message: 'Received message', - }); const indexerTendermintBlock: IndexerTendermintBlock | undefined = getIndexerTendermintBlock( currentMessage, ); @@ -108,16 +90,44 @@ export async function printMessageAtHeight( } const currentBlockHeight: number = parseInt(indexerTendermintBlock.height.toString(), 10); - console.log(`Current block height: ${currentBlockHeight}`); if (currentBlockHeight < targetHeight) { - const offsetToSeek = BigInt(targetHeight - currentBlockHeight) + currentMessage.offset; - console.log(`Seeking to offset: ${offsetToSeek}`); - const desiredMessage = await seek(BigInt(offsetToSeek)); - console.log(JSON.stringify(desiredMessage)); + const offsetToSeek: number = targetHeight - currentBlockHeight + Number(currentMessage.offset); + await seek(BigInt(offsetToSeek)); } else if (currentBlockHeight === targetHeight) { - console.log(JSON.stringify(currentMessage)); + const annotatedEvents: AnnotatedIndexerTendermintEvent[] = []; + _.forEach(indexerTendermintBlock.events, (event: IndexerTendermintEvent) => { + const annotatedEvent: + AnnotatedIndexerTendermintEvent | undefined = annotateIndexerTendermintEvent( + event, + ); + if (annotatedEvent === undefined) { + logger.error({ + at: 'printMessageAtHeight', + message: 'Failed to parse event', + event, + }); + throw Error('Failed to parse event'); + } + annotatedEvents.push(annotatedEvent); + }); + const annotatedBlock: AnnotatedIndexerTendermintBlock = { + ...indexerTendermintBlock, + events: [], + annotatedEvents, + }; + logger.info({ + at: 'printMessageAtHeight', + message: 'Printing block', + block: annotatedBlock, + }); } else { - throw Error(`Current block height ${currentBlockHeight} is greater than target height ${targetHeight}`); + logger.info({ + at: 'printMessageAtHeight', + message: 'Overshot target height', + currentBlockHeight, + targetHeight, + }); + await stopConsumer(); } } @@ -165,4 +175,4 @@ const args = yargs.options({ }, }).argv; -wrapBackgroundTask(start(args.height), true, 'main'); +wrapBackgroundTask(start(args.height), false, 'main'); From 31da9388daeeb49ed0bd30112cb5a276d5887cbd Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:41:46 -0400 Subject: [PATCH 6/9] fix --- indexer/services/scripts/src/helpers/block-helpers.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/indexer/services/scripts/src/helpers/block-helpers.ts b/indexer/services/scripts/src/helpers/block-helpers.ts index 1b1e0ea414..edc353a8de 100644 --- a/indexer/services/scripts/src/helpers/block-helpers.ts +++ b/indexer/services/scripts/src/helpers/block-helpers.ts @@ -38,66 +38,77 @@ export function annotateIndexerTendermintEvent( case (DydxIndexerSubtypes.ORDER_FILL.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(OrderFillEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(SubaccountUpdateEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.TRANSFER.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(TransferEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.MARKET.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(MarketEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.STATEFUL_ORDER.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.FUNDING.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(FundingEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.ASSET.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(AssetCreateEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.PERPETUAL_MARKET.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.LIQUIDITY_TIER.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary)), }; } case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): { return { ...event, + dataBytes: new Uint8Array(), data: JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary)), }; } From 1ba769d5c800b72950482404f6fc435262e02810 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:48:24 -0400 Subject: [PATCH 7/9] fix --- indexer/services/scripts/src/print-block.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index 0c550c11d3..a94900078c 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -120,13 +120,6 @@ export async function printMessageAtHeight( message: 'Printing block', block: annotatedBlock, }); - } else { - logger.info({ - at: 'printMessageAtHeight', - message: 'Overshot target height', - currentBlockHeight, - targetHeight, - }); await stopConsumer(); } } From 75e8f176a84a1ccc2efbf349d48811b9651054a2 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 24 Oct 2023 13:55:43 -0400 Subject: [PATCH 8/9] cleanup --- indexer/services/scripts/package.json | 1 + indexer/services/scripts/src/print-block.ts | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/indexer/services/scripts/package.json b/indexer/services/scripts/package.json index cb084b1fb3..8bd0c3f51c 100644 --- a/indexer/services/scripts/package.json +++ b/indexer/services/scripts/package.json @@ -9,6 +9,7 @@ "build:prod": "pnpm run build", "build:watch": "pnpm run build -- --watch", "validate-pnl": "ts-node src/validate-pnl.ts", + "print-block": "ts-node src/print-block.ts", "coverage": "pnpm test -- --coverage", "lint": "eslint --ext .ts,.js .", "lint:fix": "eslint --ext .ts,.js . --fix", diff --git a/indexer/services/scripts/src/print-block.ts b/indexer/services/scripts/src/print-block.ts index a94900078c..b4eee6d106 100644 --- a/indexer/services/scripts/src/print-block.ts +++ b/indexer/services/scripts/src/print-block.ts @@ -63,8 +63,6 @@ export async function connect(height: number): Promise { await consumer.subscribe({ topic: TO_ENDER_TOPIC, - // Need to set fromBeginning to true, so when ender restarts, it will consume all messages - // rather than ignoring the messages in queue that were produced before ender was started. fromBeginning: true, }); @@ -163,7 +161,7 @@ const args = yargs.options({ height: { type: 'number', alias: 'h', - description: 'Height to print message for', + description: 'Height to print block at', required: true, }, }).argv; From be8ed43e133b49181bafd93032faadd98930ce28 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Thu, 26 Oct 2023 11:51:53 -0400 Subject: [PATCH 9/9] address cmts --- .../scripts/src/helpers/block-helpers.ts | 16 +--------------- indexer/services/scripts/src/helpers/types.ts | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/indexer/services/scripts/src/helpers/block-helpers.ts b/indexer/services/scripts/src/helpers/block-helpers.ts index edc353a8de..97ecbe06d7 100644 --- a/indexer/services/scripts/src/helpers/block-helpers.ts +++ b/indexer/services/scripts/src/helpers/block-helpers.ts @@ -14,21 +14,7 @@ import { UpdatePerpetualEventV1, } from '@dydxprotocol-indexer/v4-protos'; -import { AnnotatedIndexerTendermintEvent } from './types'; - -export enum DydxIndexerSubtypes { - ORDER_FILL = 'order_fill', - SUBACCOUNT_UPDATE = 'subaccount_update', - TRANSFER = 'transfer', - MARKET = 'market', - STATEFUL_ORDER = 'stateful_order', - FUNDING = 'funding_values', - ASSET = 'asset', - PERPETUAL_MARKET = 'perpetual_market', - LIQUIDITY_TIER = 'liquidity_tier', - UPDATE_PERPETUAL = 'update_perpetual', - UPDATE_CLOB_PAIR = 'update_clob_pair', -} +import { AnnotatedIndexerTendermintEvent, DydxIndexerSubtypes } from './types'; export function annotateIndexerTendermintEvent( event: IndexerTendermintEvent, diff --git a/indexer/services/scripts/src/helpers/types.ts b/indexer/services/scripts/src/helpers/types.ts index f728b82454..3bd3ea8659 100644 --- a/indexer/services/scripts/src/helpers/types.ts +++ b/indexer/services/scripts/src/helpers/types.ts @@ -7,3 +7,17 @@ export interface AnnotatedIndexerTendermintEvent extends IndexerTendermintEvent export interface AnnotatedIndexerTendermintBlock extends IndexerTendermintBlock { annotatedEvents: AnnotatedIndexerTendermintEvent[]; } + +export enum DydxIndexerSubtypes { + ORDER_FILL = 'order_fill', + SUBACCOUNT_UPDATE = 'subaccount_update', + TRANSFER = 'transfer', + MARKET = 'market', + STATEFUL_ORDER = 'stateful_order', + FUNDING = 'funding_values', + ASSET = 'asset', + PERPETUAL_MARKET = 'perpetual_market', + LIQUIDITY_TIER = 'liquidity_tier', + UPDATE_PERPETUAL = 'update_perpetual', + UPDATE_CLOB_PAIR = 'update_clob_pair', +}