Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-387] add script to parse block and print json #692

Merged
merged 10 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions indexer/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions indexer/services/scripts/.env
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
SERVICE_NAME=scripts

KAFKA_ENABLE_UNIQUE_CONSUMER_GROUP_IDS=true
7 changes: 6 additions & 1 deletion indexer/services/scripts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"build:prod": "pnpm run build",
"build:watch": "pnpm run build -- --watch",
"validate-pnl": "ts-node src/validate-pnl.ts",
"print-block": "ts-node src/print-block.ts",
"coverage": "pnpm test -- --coverage",
"lint": "eslint --ext .ts,.js .",
"lint:fix": "eslint --ext .ts,.js . --fix",
Expand All @@ -19,11 +20,15 @@
"dependencies": {
"@dydxprotocol-indexer/base": "workspace:^0.0.1",
"@dydxprotocol-indexer/postgres": "workspace:^0.0.1",
"@dydxprotocol-indexer/kafka": "workspace:^0.0.1",
"@dydxprotocol-indexer/v4-proto-parser": "workspace:^0.0.1",
"@dydxprotocol-indexer/v4-protos": "workspace:^0.0.1",
"dotenv-flow": "^3.2.0",
"long": "^5.2.1",
"big.js": "^6.0.2",
"lodash": "^4.17.21",
"yargs": "^13.3.0"
"yargs": "^13.3.0",
"kafkajs": "^2.1.0"
},
"devDependencies": {
"@dydxprotocol-indexer/dev": "workspace:^0.0.1",
Expand Down
11 changes: 11 additions & 0 deletions indexer/services/scripts/src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { baseConfigSchema, parseSchema } from '@dydxprotocol-indexer/base';
import { kafkaConfigSchema } from '@dydxprotocol-indexer/kafka';
import { postgresConfigSchema } from '@dydxprotocol-indexer/postgres';

export const configSchema = {
...baseConfigSchema,
...postgresConfigSchema,
...kafkaConfigSchema,
};

export default parseSchema(configSchema);
110 changes: 110 additions & 0 deletions indexer/services/scripts/src/helpers/block-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { logger } from '@dydxprotocol-indexer/base';
import {
AssetCreateEventV1,
FundingEventV1,
IndexerTendermintEvent,
LiquidityTierUpsertEventV1,
MarketEventV1,
OrderFillEventV1,
PerpetualMarketCreateEventV1,
StatefulOrderEventV1,
SubaccountUpdateEventV1,
TransferEventV1,
UpdateClobPairEventV1,
UpdatePerpetualEventV1,
} from '@dydxprotocol-indexer/v4-protos';

import { AnnotatedIndexerTendermintEvent, DydxIndexerSubtypes } from './types';

export function annotateIndexerTendermintEvent(
event: IndexerTendermintEvent,
): AnnotatedIndexerTendermintEvent | undefined {
const eventDataBinary: Uint8Array = event.dataBytes;
switch (event.subtype) {
case (DydxIndexerSubtypes.ORDER_FILL.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(OrderFillEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(SubaccountUpdateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.TRANSFER.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(TransferEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.MARKET.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(MarketEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.STATEFUL_ORDER.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.FUNDING.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(FundingEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.ASSET.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(AssetCreateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.PERPETUAL_MARKET.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.LIQUIDITY_TIER.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary)),
};
}
default: {
const message: string = `Unable to parse event subtype: ${event.subtype}`;
logger.error({
at: 'block-helpers#annotateIndexerTendermintEvent',
message,
});
return undefined;
}
}
}
Comment on lines +19 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotateIndexerTendermintEvent function is decoding the dataBytes of the event based on its subtype and converting it to a JSON string. This is a good approach for handling different types of events. However, there is a potential performance issue here. Each time a new event comes in, a new Uint8Array is created and assigned to dataBytes, even though the original dataBytes is not used anymore. This could lead to unnecessary memory allocation and garbage collection. Consider setting dataBytes to null instead to free up memory.

-        dataBytes: new Uint8Array(),
+        dataBytes: null,
Committable suggestion (Beta)
Suggested change
export function annotateIndexerTendermintEvent(
event: IndexerTendermintEvent,
): AnnotatedIndexerTendermintEvent | undefined {
const eventDataBinary: Uint8Array = event.dataBytes;
switch (event.subtype) {
case (DydxIndexerSubtypes.ORDER_FILL.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(OrderFillEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(SubaccountUpdateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.TRANSFER.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(TransferEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.MARKET.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(MarketEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.STATEFUL_ORDER.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.FUNDING.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(FundingEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.ASSET.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(AssetCreateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.PERPETUAL_MARKET.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.LIQUIDITY_TIER.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): {
return {
...event,
dataBytes: new Uint8Array(),
data: JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary)),
};
}
default: {
const message: string = `Unable to parse event subtype: ${event.subtype}`;
logger.error({
at: 'block-helpers#annotateIndexerTendermintEvent',
message,
});
return undefined;
}
}
}
export function annotateIndexerTendermintEvent(
event: IndexerTendermintEvent,
): AnnotatedIndexerTendermintEvent | undefined {
const eventDataBinary: Uint8Array = event.dataBytes;
switch (event.subtype) {
case (DydxIndexerSubtypes.ORDER_FILL.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(OrderFillEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(SubaccountUpdateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.TRANSFER.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(TransferEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.MARKET.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(MarketEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.STATEFUL_ORDER.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(StatefulOrderEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.FUNDING.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(FundingEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.ASSET.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(AssetCreateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.PERPETUAL_MARKET.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(PerpetualMarketCreateEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.LIQUIDITY_TIER.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(LiquidityTierUpsertEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(UpdatePerpetualEventV1.decode(eventDataBinary)),
};
}
case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): {
return {
...event,
dataBytes: null,
data: JSON.stringify(UpdateClobPairEventV1.decode(eventDataBinary)),
};
}
default: {
const message: string = `Unable to parse event subtype: ${event.subtype}`;
logger.error({
at: 'block-helpers#annotateIndexerTendermintEvent',
message,
});
return undefined;
}
}
}

23 changes: 23 additions & 0 deletions indexer/services/scripts/src/helpers/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { IndexerTendermintBlock, IndexerTendermintEvent } from '@dydxprotocol-indexer/v4-protos';

export interface AnnotatedIndexerTendermintEvent extends IndexerTendermintEvent {
data: string;
}

export interface AnnotatedIndexerTendermintBlock extends IndexerTendermintBlock {
annotatedEvents: AnnotatedIndexerTendermintEvent[];
}

export enum DydxIndexerSubtypes {
ORDER_FILL = 'order_fill',
SUBACCOUNT_UPDATE = 'subaccount_update',
TRANSFER = 'transfer',
MARKET = 'market',
STATEFUL_ORDER = 'stateful_order',
FUNDING = 'funding_values',
ASSET = 'asset',
PERPETUAL_MARKET = 'perpetual_market',
LIQUIDITY_TIER = 'liquidity_tier',
UPDATE_PERPETUAL = 'update_perpetual',
UPDATE_CLOB_PAIR = 'update_clob_pair',
}
169 changes: 169 additions & 0 deletions indexer/services/scripts/src/print-block.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { logger, wrapBackgroundTask } from '@dydxprotocol-indexer/base';
import {
addOnMessageFunction,
consumer,
producer,
startConsumer,
stopConsumer,
TO_ENDER_TOPIC,
} from '@dydxprotocol-indexer/kafka';
import { IndexerTendermintBlock, IndexerTendermintEvent } from '@dydxprotocol-indexer/v4-protos';
import { KafkaMessage } from 'kafkajs';
import _ from 'lodash';
import yargs from 'yargs';

import config from './config';
import { annotateIndexerTendermintEvent } from './helpers/block-helpers';
import { AnnotatedIndexerTendermintBlock, AnnotatedIndexerTendermintEvent } from './helpers/types';

/**
* Creates an IndexerTendermintBlock from a KafkaMessage.
* Throws an error if there's an issue.
*/
function getIndexerTendermintBlock(
message: KafkaMessage,
): IndexerTendermintBlock | undefined {
if (!message || !message.value || !message.timestamp) {
throw Error('Empty message');
}
const messageValueBinary: Uint8Array = new Uint8Array(message.value);

const block: IndexerTendermintBlock = IndexerTendermintBlock.decode(
messageValueBinary,
);

return block;
}
Comment on lines +26 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function getIndexerTendermintBlock throws an error if the message, its value, or its timestamp is not present. However, it does not handle the case where the IndexerTendermintBlock.decode function fails to decode the message value. This could lead to unhandled exceptions if the message value is not a valid IndexerTendermintBlock. Consider adding error handling for this case.

  const block: IndexerTendermintBlock = IndexerTendermintBlock.decode(
    messageValueBinary,
  );
Committable suggestion (Beta)
Suggested change
if (!message || !message.value || !message.timestamp) {
throw Error('Empty message');
}
const messageValueBinary: Uint8Array = new Uint8Array(message.value);
const block: IndexerTendermintBlock = IndexerTendermintBlock.decode(
messageValueBinary,
);
return block;
}
try {
const block: IndexerTendermintBlock = IndexerTendermintBlock.decode(
messageValueBinary,
);
return block;
} catch (error) {
logger.error({
at: 'getIndexerTendermintBlock',
message: 'Failed to decode message value',
error,
});
return undefined;
}


export function seek(offset: bigint): void {
logger.info({
at: 'consumer#seek',
message: 'Seeking...',
offset: offset.toString(),
});

consumer.seek({
topic: TO_ENDER_TOPIC,
partition: 0,
offset: offset.toString(),
});

logger.info({
at: 'consumer#seek',
message: 'Seeked.',
offset: offset.toString(),
});
}

export async function connect(height: number): Promise<void> {
await Promise.all([
consumer.connect(),
producer.connect(),
]);

await consumer.subscribe({
topic: TO_ENDER_TOPIC,
fromBeginning: true,
});

addOnMessageFunction((_topic: string, message: KafkaMessage): Promise<void> => {
return printMessageAtHeight(message, height);
});

logger.info({
at: 'consumers#connect',
message: 'Connected to Kafka',
});
}

export async function printMessageAtHeight(
currentMessage: KafkaMessage,
targetHeight: number,
): Promise<void> {
const indexerTendermintBlock: IndexerTendermintBlock | undefined = getIndexerTendermintBlock(
currentMessage,
);
if (indexerTendermintBlock === undefined) {
return;
}

const currentBlockHeight: number = parseInt(indexerTendermintBlock.height.toString(), 10);
if (currentBlockHeight < targetHeight) {
const offsetToSeek: number = targetHeight - currentBlockHeight + Number(currentMessage.offset);
await seek(BigInt(offsetToSeek));
} else if (currentBlockHeight === targetHeight) {
const annotatedEvents: AnnotatedIndexerTendermintEvent[] = [];
_.forEach(indexerTendermintBlock.events, (event: IndexerTendermintEvent) => {
const annotatedEvent:
AnnotatedIndexerTendermintEvent | undefined = annotateIndexerTendermintEvent(
event,
);
if (annotatedEvent === undefined) {
logger.error({
at: 'printMessageAtHeight',
message: 'Failed to parse event',
event,
});
throw Error('Failed to parse event');
}
annotatedEvents.push(annotatedEvent);
});
Comment on lines +96 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function printMessageAtHeight throws an error if it fails to annotate an event. However, it does not handle the case where the annotateIndexerTendermintEvent function returns undefined. This could lead to unhandled exceptions if the function fails to annotate an event. Consider adding error handling for this case.

      if (annotatedEvent === undefined) {
        logger.error({
          at: 'printMessageAtHeight',
          message: 'Failed to parse event',
          event,
        });
        throw Error('Failed to parse event');
      }
Committable suggestion (Beta)
Suggested change
_.forEach(indexerTendermintBlock.events, (event: IndexerTendermintEvent) => {
const annotatedEvent:
AnnotatedIndexerTendermintEvent | undefined = annotateIndexerTendermintEvent(
event,
);
if (annotatedEvent === undefined) {
logger.error({
at: 'printMessageAtHeight',
message: 'Failed to parse event',
event,
});
throw Error('Failed to parse event');
}
annotatedEvents.push(annotatedEvent);
});
if (annotatedEvent === undefined) {
logger.error({
at: 'printMessageAtHeight',
message: 'Failed to annotate event',
event,
});
return;
}

const annotatedBlock: AnnotatedIndexerTendermintBlock = {
...indexerTendermintBlock,
events: [],
annotatedEvents,
};
logger.info({
at: 'printMessageAtHeight',
message: 'Printing block',
block: annotatedBlock,
});
await stopConsumer();
}
Comment on lines +83 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function printMessageAtHeight still does not handle the case where the current block height is greater than the target height. This could lead to unexpected behavior if the target height is less than the current block height. Consider adding a condition to handle this case.

  if (currentBlockHeight < targetHeight) {
    const offsetToSeek: number = targetHeight - currentBlockHeight + Number(currentMessage.offset);
    await seek(BigInt(offsetToSeek));
  } else if (currentBlockHeight === targetHeight) {
Committable suggestion (Beta)
Suggested change
const indexerTendermintBlock: IndexerTendermintBlock | undefined = getIndexerTendermintBlock(
currentMessage,
);
if (indexerTendermintBlock === undefined) {
return;
}
const currentBlockHeight: number = parseInt(indexerTendermintBlock.height.toString(), 10);
if (currentBlockHeight < targetHeight) {
const offsetToSeek: number = targetHeight - currentBlockHeight + Number(currentMessage.offset);
await seek(BigInt(offsetToSeek));
} else if (currentBlockHeight === targetHeight) {
const annotatedEvents: AnnotatedIndexerTendermintEvent[] = [];
_.forEach(indexerTendermintBlock.events, (event: IndexerTendermintEvent) => {
const annotatedEvent:
AnnotatedIndexerTendermintEvent | undefined = annotateIndexerTendermintEvent(
event,
);
if (annotatedEvent === undefined) {
logger.error({
at: 'printMessageAtHeight',
message: 'Failed to parse event',
event,
});
throw Error('Failed to parse event');
}
annotatedEvents.push(annotatedEvent);
});
const annotatedBlock: AnnotatedIndexerTendermintBlock = {
...indexerTendermintBlock,
events: [],
annotatedEvents,
};
logger.info({
at: 'printMessageAtHeight',
message: 'Printing block',
block: annotatedBlock,
});
await stopConsumer();
}
if (currentBlockHeight < targetHeight) {
const offsetToSeek: number = targetHeight - currentBlockHeight + Number(currentMessage.offset);
await seek(BigInt(offsetToSeek));
} else if (currentBlockHeight > targetHeight) {
logger.info({
at: 'printMessageAtHeight',
message: 'Current block height is greater than target height',
currentBlockHeight,
targetHeight,
});
await stopConsumer();
} else if (currentBlockHeight === targetHeight) {

}

async function startKafka(height: number): Promise<void> {
logger.info({
at: 'index#start',
message: `Starting in env ${config.NODE_ENV}`,
});

await connect(height);
await startConsumer();

logger.info({
at: 'index#start',
message: 'Successfully started',
});
}

process.on('SIGTERM', async () => {
logger.info({
at: 'index#SIGTERM',
message: 'Received SIGTERM, shutting down',
});
await stopConsumer();
});

async function start(height: number): Promise<void> {
logger.info({
at: 'index#start',
message: `Connecting to kafka brokers: ${config.KAFKA_BROKER_URLS}`,
});
await startKafka(height);
logger.info({
at: 'index#start',
message: `Successfully connected to kafka brokers: ${config.KAFKA_BROKER_URLS}`,
});
}
Comment on lines +148 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start function logs the Kafka broker URLs before and after starting Kafka. This could potentially expose sensitive information in the logs. Consider removing or obfuscating these logs.

  logger.info({
    at: 'index#start',
    message: `Connecting to kafka brokers: ${config.KAFKA_BROKER_URLS}`,
  });
  await startKafka(height);
  logger.info({
    at: 'index#start',
    message: `Successfully connected to kafka brokers: ${config.KAFKA_BROKER_URLS}`,
  });
Committable suggestion (Beta)
Suggested change
async function start(height: number): Promise<void> {
logger.info({
at: 'index#start',
message: `Connecting to kafka brokers: ${config.KAFKA_BROKER_URLS}`,
});
await startKafka(height);
logger.info({
at: 'index#start',
message: `Successfully connected to kafka brokers: ${config.KAFKA_BROKER_URLS}`,
});
}
logger.info({
at: 'index#start',
message: `Connecting to kafka brokers`,
});
await startKafka(height);
logger.info({
at: 'index#start',
message: `Successfully connected to kafka brokers`,
});


const args = yargs.options({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only if it's easy to do, can we also add support to just parse a block at a particular offset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed, since all Ender logs printing out offset should also print out block height

height: {
type: 'number',
alias: 'h',
description: 'Height to print block at',
required: true,
},
}).argv;

wrapBackgroundTask(start(args.height), false, 'main');