diff --git a/indexer/README.md b/indexer/README.md index 26fab47508..7598e2c9d9 100644 --- a/indexer/README.md +++ b/indexer/README.md @@ -230,4 +230,5 @@ Other example subscription events: { "type": "subscribe", "channel": "v4_markets" } { "type": "subscribe", "channel": "v4_orderbook", "id": "BTC-USD" } { "type": "subscribe", "channel": "v4_subaccounts", "id": "address/0" } +{ "type": "subscribe", "channel": "v4_block_height" } ``` diff --git a/indexer/docker-compose-local-deployment.yml b/indexer/docker-compose-local-deployment.yml index 832c27fa85..075ee7a4d5 100644 --- a/indexer/docker-compose-local-deployment.yml +++ b/indexer/docker-compose-local-deployment.yml @@ -13,7 +13,8 @@ services: to-websockets-subaccounts:1:1,\ to-websockets-trades:1:1,\ to-websockets-markets:1:1,\ - to-websockets-candles:1:1" + to-websockets-candles:1:1,\ + to-websockets-block-height:1:1" KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL_SAME_HOST://:29092 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT diff --git a/indexer/docker-compose.yml b/indexer/docker-compose.yml index 141505046e..306469544b 100644 --- a/indexer/docker-compose.yml +++ b/indexer/docker-compose.yml @@ -7,14 +7,15 @@ services: environment: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: + KAFKA_CREATE_TOPICS: "to-ender:1:1,\ to-vulcan:1:1,\ to-websockets-orderbooks:1:1,\ to-websockets-subaccounts:1:1,\ to-websockets-trades:1:1,\ to-websockets-markets:1:1,\ - to-websockets-candles:1:1" + to-websockets-candles:1:1,\ + to-websockets-block-height:1:1" postgres-test: build: context: . diff --git a/indexer/packages/kafka/src/constants.ts b/indexer/packages/kafka/src/constants.ts index 9a02cb1b3e..01b5a3712b 100644 --- a/indexer/packages/kafka/src/constants.ts +++ b/indexer/packages/kafka/src/constants.ts @@ -5,3 +5,4 @@ export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0'; export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0'; export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; +export const BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0'; diff --git a/indexer/packages/kafka/src/types.ts b/indexer/packages/kafka/src/types.ts index d18c465995..27b0b3e993 100644 --- a/indexer/packages/kafka/src/types.ts +++ b/indexer/packages/kafka/src/types.ts @@ -4,6 +4,7 @@ export enum WebsocketTopics { TO_WEBSOCKETS_TRADES = 'to-websockets-trades', TO_WEBSOCKETS_MARKETS = 'to-websockets-markets', TO_WEBSOCKETS_CANDLES = 'to-websockets-candles', + TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height', } export enum KafkaTopics { @@ -14,4 +15,5 @@ export enum KafkaTopics { TO_WEBSOCKETS_TRADES = 'to-websockets-trades', TO_WEBSOCKETS_MARKETS = 'to-websockets-markets', TO_WEBSOCKETS_CANDLES = 'to-websockets-candles', + TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height', } diff --git a/indexer/packages/v4-protos/src/codegen/dydxprotocol/indexer/socks/messages.ts b/indexer/packages/v4-protos/src/codegen/dydxprotocol/indexer/socks/messages.ts index 6a0f8fd6f5..9563487bd9 100644 --- a/indexer/packages/v4-protos/src/codegen/dydxprotocol/indexer/socks/messages.ts +++ b/indexer/packages/v4-protos/src/codegen/dydxprotocol/indexer/socks/messages.ts @@ -265,6 +265,30 @@ export interface CandleMessageSDKType { version: string; } +/** Message to be sent through the 'to-websockets-block-height` kafka topic. */ + +export interface BlockHeightMessage { + /** Block height where the contents occur. */ + blockHeight: string; + /** ISO formatted time of the block height. */ + + time: string; + /** Version of the websocket message. */ + + version: string; +} +/** Message to be sent through the 'to-websockets-block-height` kafka topic. */ + +export interface BlockHeightMessageSDKType { + /** Block height where the contents occur. */ + block_height: string; + /** ISO formatted time of the block height. */ + + time: string; + /** Version of the websocket message. */ + + version: string; +} function createBaseOrderbookMessage(): OrderbookMessage { return { @@ -629,4 +653,69 @@ export const CandleMessage = { return message; } +}; + +function createBaseBlockHeightMessage(): BlockHeightMessage { + return { + blockHeight: "", + time: "", + version: "" + }; +} + +export const BlockHeightMessage = { + encode(message: BlockHeightMessage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.blockHeight !== "") { + writer.uint32(10).string(message.blockHeight); + } + + if (message.time !== "") { + writer.uint32(18).string(message.time); + } + + if (message.version !== "") { + writer.uint32(26).string(message.version); + } + + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): BlockHeightMessage { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseBlockHeightMessage(); + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 1: + message.blockHeight = reader.string(); + break; + + case 2: + message.time = reader.string(); + break; + + case 3: + message.version = reader.string(); + break; + + default: + reader.skipType(tag & 7); + break; + } + } + + return message; + }, + + fromPartial(object: DeepPartial<BlockHeightMessage>): BlockHeightMessage { + const message = createBaseBlockHeightMessage(); + message.blockHeight = object.blockHeight ?? ""; + message.time = object.time ?? ""; + message.version = object.version ?? ""; + return message; + } + }; \ No newline at end of file diff --git a/indexer/services/auxo/src/constants.ts b/indexer/services/auxo/src/constants.ts index 39f9efedb1..dc5cc4ce4e 100644 --- a/indexer/services/auxo/src/constants.ts +++ b/indexer/services/auxo/src/constants.ts @@ -10,6 +10,13 @@ export const BAZOOKA_DB_MIGRATION_PAYLOAD: Uint8Array = new TextEncoder().encode }), ); +export const BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD: Uint8Array = new TextEncoder().encode( + JSON.stringify({ + migrate: true, + create_kafka_topics: true, + }), +); + export const ECS_SERVICE_NAMES: EcsServiceNames[] = [ EcsServiceNames.COMLINK, EcsServiceNames.ENDER, diff --git a/indexer/services/auxo/src/index.ts b/indexer/services/auxo/src/index.ts index 8bd286e778..2965d5861d 100644 --- a/indexer/services/auxo/src/index.ts +++ b/indexer/services/auxo/src/index.ts @@ -30,6 +30,7 @@ import _ from 'lodash'; import config from './config'; import { + BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD, BAZOOKA_DB_MIGRATION_PAYLOAD, BAZOOKA_LAMBDA_FUNCTION_NAME, ECS_SERVICE_NAMES, @@ -40,7 +41,7 @@ import { AuxoEventJson, EcsServiceNames, TaskDefinitionArnMap } from './types'; /** * Upgrades all services and run migrations * 1. Upgrade Bazooka - * 2. Run db migration in Bazooka + * 2. Run db migration in Bazooka, and update kafka topics * 3. Create new ECS Task Definition for ECS Services with new image * 4. Upgrade all ECS Services (comlink, ender, roundtable, socks, vulcan) */ @@ -66,8 +67,9 @@ export async function handler( // 1. Upgrade Bazooka await upgradeBazooka(lambda, ecr, event); - // 2. Run db migration in Bazooka - await runDbMigration(lambda); + // 2. Run db migration in Bazooka, + // boolean flag used to determine if new kafka topics should be created + await runDbAndKafkaMigration(event.addNewKafkaTopics, lambda); // 3. Create new ECS Task Definition for ECS Services with new image const taskDefinitionArnMap: TaskDefinitionArnMap = await createNewEcsTaskDefinitions( @@ -192,16 +194,20 @@ async function getImageDetail( } -async function runDbMigration( +async function runDbAndKafkaMigration( + createNewKafkaTopics: boolean, lambda: ECRClient, ): Promise<void> { logger.info({ at: 'index#runDbMigration', message: 'Running db migration', }); + const payload = createNewKafkaTopics + ? BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD + : BAZOOKA_DB_MIGRATION_PAYLOAD; const response: InvokeCommandOutput = await lambda.send(new InvokeCommand({ FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME, - Payload: BAZOOKA_DB_MIGRATION_PAYLOAD, + Payload: payload, // RequestResponse means that the lambda is synchronously invoked InvocationType: 'RequestResponse', })); diff --git a/indexer/services/auxo/src/types.ts b/indexer/services/auxo/src/types.ts index 315f38ae0b..7fff098cc0 100644 --- a/indexer/services/auxo/src/types.ts +++ b/indexer/services/auxo/src/types.ts @@ -13,6 +13,7 @@ export interface AuxoEventJson { region: string; // In our naming we often times use the appreviated region name regionAbbrev: string; + addNewKafkaTopics: boolean; } // EcsServiceName to task definition arn mapping diff --git a/indexer/services/bazooka/src/index.ts b/indexer/services/bazooka/src/index.ts index 5d78cc85f8..91d691ecb3 100644 --- a/indexer/services/bazooka/src/index.ts +++ b/indexer/services/bazooka/src/index.ts @@ -18,6 +18,7 @@ const KAFKA_TOPICS: KafkaTopics[] = [ KafkaTopics.TO_WEBSOCKETS_TRADES, KafkaTopics.TO_WEBSOCKETS_MARKETS, KafkaTopics.TO_WEBSOCKETS_CANDLES, + KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, ]; const DEFAULT_NUM_REPLICAS: number = 3; @@ -30,6 +31,7 @@ const KAFKA_TOPICS_TO_PARTITIONS: { [key in KafkaTopics]: number } = { [KafkaTopics.TO_WEBSOCKETS_TRADES]: 1, [KafkaTopics.TO_WEBSOCKETS_MARKETS]: 1, [KafkaTopics.TO_WEBSOCKETS_CANDLES]: 1, + [KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT]: 1, }; export interface BazookaEventJson { @@ -196,7 +198,7 @@ async function createKafkaTopics( _.forEach(KAFKA_TOPICS, (kafkaTopic: KafkaTopics) => { if (_.includes(existingKafkaTopics, kafkaTopic)) { logger.info({ - at: 'index#clearKafkaTopics', + at: 'index#createKafkaTopics', message: `Cannot create kafka topic that does exist: ${kafkaTopic}`, }); return; diff --git a/indexer/services/ender/__tests__/lib/block-processor.test.ts b/indexer/services/ender/__tests__/lib/block-processor.test.ts index 6f8c18a6f3..310b435215 100644 --- a/indexer/services/ender/__tests__/lib/block-processor.test.ts +++ b/indexer/services/ender/__tests__/lib/block-processor.test.ts @@ -24,6 +24,7 @@ import { BatchedHandlers } from '../../src/lib/batched-handlers'; import { SyncHandlers } from '../../src/lib/sync-handlers'; import { mock, MockProxy } from 'jest-mock-extended'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import { BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka'; describe('block-processor', () => { let batchedHandlers: MockProxy<BatchedHandlers>; @@ -162,4 +163,60 @@ describe('block-processor', () => { batchedHandlers.process.mock.invocationCallOrder[0], ); }); + + it('Adds a block height message to the Kafka publisher', async () => { + const block: IndexerTendermintBlock = createIndexerTendermintBlock( + defaultHeight, + defaultTime, + events, + [ + defaultTxHash, + defaultTxHash2, + ], + ); + + const txId: number = await Transaction.start(); + const blockProcessor: BlockProcessor = new BlockProcessor( + block, + txId, + defaultDateTime.toString(), + ); + const processor = await blockProcessor.process(); + await Transaction.commit(txId); + expect(processor.blockHeightMessages).toHaveLength(1); + expect(processor.blockHeightMessages[0].blockHeight).toEqual(String(defaultHeight)); + expect(processor.blockHeightMessages[0].version) + .toEqual(BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION); + expect(processor.blockHeightMessages[0].time).toEqual(defaultDateTime.toString()); + }); + + it('createBlockHeightMsg creates a BlockHeightMessage', async () => { + const block: IndexerTendermintBlock = createIndexerTendermintBlock( + defaultHeight, + defaultTime, + events, + [ + defaultTxHash, + defaultTxHash2, + ], + ); + + const txId: number = await Transaction.start(); + const blockProcessor: BlockProcessor = new BlockProcessor( + block, + txId, + defaultDateTime.toString(), + ); + await Transaction.commit(txId); + + const msg = blockProcessor.createBlockHeightMsg(); + expect(msg).toEqual({ + topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + message: { + blockHeight: String(defaultHeight), + time: defaultDateTime.toString(), + version: BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION, + }, + }); + }); }); diff --git a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts index 3656be98b3..a73eaef01c 100644 --- a/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts +++ b/indexer/services/ender/__tests__/lib/kafka-publisher.test.ts @@ -17,13 +17,16 @@ import { TradeType, TransferFromDatabase, } from '@dydxprotocol-indexer/postgres'; -import { IndexerSubaccountId, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; +import { + BlockHeightMessage, IndexerSubaccountId, SubaccountMessage, TradeMessage, +} from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; import _ from 'lodash'; import { AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage } from '../../src/lib/types'; import { KafkaPublisher } from '../../src/lib/kafka-publisher'; import { + defaultDateTime, defaultSubaccountMessage, defaultTradeContent, defaultTradeKafkaEvent, @@ -43,6 +46,7 @@ import { } from '../../src/helpers/kafka-helper'; import { DateTime } from 'luxon'; import { convertToSubaccountMessage } from '../../src/lib/helper'; +import { defaultBlock } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants'; describe('kafka-publisher', () => { let producerSendMock: jest.SpyInstance; @@ -105,6 +109,30 @@ describe('kafka-publisher', () => { }); }); + it('successfully publishes block height messages', async () => { + const message: BlockHeightMessage = { + blockHeight: String(defaultBlock), + version: '1.0.0', + time: defaultDateTime.toString(), + }; + const blockHeightEvent: ConsolidatedKafkaEvent = { + topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + message, + }; + + const publisher: KafkaPublisher = new KafkaPublisher(); + publisher.addEvents([blockHeightEvent]); + + await publisher.publish(); + expect(producerSendMock).toHaveBeenCalledTimes(1); + expect(producerSendMock).toHaveBeenCalledWith({ + topic: blockHeightEvent.topic, + messages: [{ + value: Buffer.from(BlockHeightMessage.encode(blockHeightEvent.message).finish()), + }], + }); + }); + describe('sortTradeEvents', () => { const trade: SingleTradeMessage = contentToSingleTradeMessage( {} as TradeContent, diff --git a/indexer/services/ender/__tests__/lib/on-message.test.ts b/indexer/services/ender/__tests__/lib/on-message.test.ts index 973830b5f5..ae521b3351 100644 --- a/indexer/services/ender/__tests__/lib/on-message.test.ts +++ b/indexer/services/ender/__tests__/lib/on-message.test.ts @@ -671,7 +671,7 @@ describe('on-message', () => { expectBlock(defaultHeight.toString(), defaultDateTime.toISO()), ]); - expect(producerSendMock).toHaveBeenCalledTimes(2); + expect(producerSendMock).toHaveBeenCalledTimes(3); // First message batch sent should contain the first message expect(producerSendMock.mock.calls[0][0].messages).toHaveLength(1); // Second message batch should contain the second message diff --git a/indexer/services/ender/src/lib/block-processor.ts b/indexer/services/ender/src/lib/block-processor.ts index c54fa8e14b..4e7287110f 100644 --- a/indexer/services/ender/src/lib/block-processor.ts +++ b/indexer/services/ender/src/lib/block-processor.ts @@ -1,9 +1,11 @@ /* eslint-disable max-len */ import { logger, stats, STATS_NO_SAMPLING } from '@dydxprotocol-indexer/base'; +import { BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION, KafkaTopics } from '@dydxprotocol-indexer/kafka'; import { storeHelpers, } from '@dydxprotocol-indexer/postgres'; import { + BlockHeightMessage, IndexerTendermintBlock, IndexerTendermintEvent, } from '@dydxprotocol-indexer/v4-protos'; @@ -33,6 +35,7 @@ import { indexerTendermintEventToEventProtoWithType, indexerTendermintEventToTra import { KafkaPublisher } from './kafka-publisher'; import { SyncHandlers, SYNCHRONOUS_SUBTYPES } from './sync-handlers'; import { + ConsolidatedKafkaEvent, DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents, } from './types'; @@ -225,6 +228,18 @@ export class BlockProcessor { }); } + createBlockHeightMsg(): ConsolidatedKafkaEvent { + const message: BlockHeightMessage = { + blockHeight: String(this.block.height), + version: BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION, + time: this.block.time?.toISOString() ?? '', + }; + return { + topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + message, + }; + } + private async processEvents(): Promise<KafkaPublisher> { const kafkaPublisher: KafkaPublisher = new KafkaPublisher(); @@ -271,6 +286,9 @@ export class BlockProcessor { ); } + // Create a block message from the current block + kafkaPublisher.addEvent(this.createBlockHeightMsg()); + // in genesis, handle sync events first, then batched events. // in other blocks, handle batched events first, then sync events. if (this.block.height === 0) { diff --git a/indexer/services/ender/src/lib/kafka-publisher.ts b/indexer/services/ender/src/lib/kafka-publisher.ts index 8be583c0cc..8295ca5539 100644 --- a/indexer/services/ender/src/lib/kafka-publisher.ts +++ b/indexer/services/ender/src/lib/kafka-publisher.ts @@ -8,6 +8,7 @@ import { } from '@dydxprotocol-indexer/kafka'; import { FillSubaccountMessageContents, TradeMessageContents } from '@dydxprotocol-indexer/postgres'; import { + BlockHeightMessage, CandleMessage, MarketMessage, OffChainUpdateV1, @@ -20,7 +21,10 @@ import _ from 'lodash'; import config from '../config'; import { convertToSubaccountMessage } from './helper'; import { - AnnotatedSubaccountMessage, ConsolidatedKafkaEvent, SingleTradeMessage, VulcanMessage, + AnnotatedSubaccountMessage, + ConsolidatedKafkaEvent, + SingleTradeMessage, + VulcanMessage, } from './types'; type TopicKafkaMessages = { @@ -31,9 +35,10 @@ type TopicKafkaMessages = { type OrderedMessage = AnnotatedSubaccountMessage | SingleTradeMessage; type Message = AnnotatedSubaccountMessage | SingleTradeMessage | MarketMessage | -CandleMessage | VulcanMessage; +CandleMessage | VulcanMessage | BlockHeightMessage; export class KafkaPublisher { + blockHeightMessages: BlockHeightMessage[]; subaccountMessages: AnnotatedSubaccountMessage[]; tradeMessages: SingleTradeMessage[]; marketMessages: MarketMessage[]; @@ -41,6 +46,7 @@ export class KafkaPublisher { vulcanMessages: VulcanMessage[]; constructor() { + this.blockHeightMessages = []; this.subaccountMessages = []; this.tradeMessages = []; this.marketMessages = []; @@ -76,6 +82,8 @@ export class KafkaPublisher { return this.candleMessages; case KafkaTopics.TO_VULCAN: return this.vulcanMessages; + case KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT: + return this.blockHeightMessages; default: throw new Error('Invalid Topic'); } @@ -199,6 +207,17 @@ export class KafkaPublisher { private generateAllTopicKafkaMessages(): TopicKafkaMessages[] { const allTopicKafkaMessages: TopicKafkaMessages[] = []; + if (this.blockHeightMessages.length > 0) { + allTopicKafkaMessages.push({ + topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + messages: _.map(this.blockHeightMessages, (message: BlockHeightMessage) => { + return { + value: Buffer.from(Uint8Array.from(BlockHeightMessage.encode(message).finish())), + }; + }), + }); + } + if (this.subaccountMessages.length > 0) { this.aggregateFillEventsForSubaccountMessages(); allTopicKafkaMessages.push({ diff --git a/indexer/services/ender/src/lib/types.ts b/indexer/services/ender/src/lib/types.ts index ce381dbd33..96b2ccea00 100644 --- a/indexer/services/ender/src/lib/types.ts +++ b/indexer/services/ender/src/lib/types.ts @@ -35,6 +35,7 @@ import { DeleveragingEventV1, TradingRewardsEventV1, OpenInterestUpdateEventV1, + BlockHeightMessage, } from '@dydxprotocol-indexer/v4-protos'; import { IHeaders } from 'kafkajs'; import Long from 'long'; @@ -258,6 +259,9 @@ export type ConsolidatedKafkaEvent = { } | { topic: KafkaTopics.TO_VULCAN, message: VulcanMessage, +} | { + topic: KafkaTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + message: BlockHeightMessage }; export enum TransferEventType { diff --git a/indexer/services/socks/__tests__/constants.ts b/indexer/services/socks/__tests__/constants.ts index 3c33c31ebf..ef8bc17d02 100644 --- a/indexer/services/socks/__tests__/constants.ts +++ b/indexer/services/socks/__tests__/constants.ts @@ -11,6 +11,7 @@ import { MAX_PARENT_SUBACCOUNTS, } from '@dydxprotocol-indexer/postgres'; import { + BlockHeightMessage, CandleMessage, CandleMessage_Resolution, MarketMessage, @@ -121,3 +122,9 @@ export const defaultTransferContents: TransferSubaccountMessageContents = { createdAt: '2023-10-05T14:48:00.000Z', createdAtHeight: '10', }; + +export const defaultBlockHeightMessage: BlockHeightMessage = { + blockHeight: defaultBlockHeight, + time: '2023-10-05T14:48:00.000Z', + version: '1.0.0', +}; diff --git a/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts b/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts index d4ba610d6c..ebd241b86e 100644 --- a/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts +++ b/indexer/services/socks/__tests__/helpers/from-kafka-helpers.test.ts @@ -1,4 +1,4 @@ -import { getChannels, getMessageToForward } from '../../src/helpers/from-kafka-helpers'; +import { getChannels, getMessageToForward, getMessagesToForward } from '../../src/helpers/from-kafka-helpers'; import { InvalidForwardMessageError, InvalidTopicError } from '../../src/lib/errors'; import { Channel, @@ -21,10 +21,12 @@ import { tradesMessage, defaultChildAccNumber, defaultTransferContents, + defaultBlockHeightMessage, } from '../constants'; import { KafkaMessage } from 'kafkajs'; import { createKafkaMessage } from './kafka'; import { + BlockHeightMessage, CandleMessage, MarketMessage, OrderbookMessage, @@ -52,6 +54,7 @@ describe('from-kafka-helpers', () => { [Channel.V4_ACCOUNTS, Channel.V4_PARENT_ACCOUNTS], ], [WebsocketTopics.TO_WEBSOCKETS_TRADES, [Channel.V4_TRADES]], + [WebsocketTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, [Channel.V4_BLOCK_HEIGHT]], ])('gets correct channel for topic %s', (topic: WebsocketTopics, channels: Channel[]) => { expect(getChannels(topic)).toEqual(channels); }); @@ -156,6 +159,24 @@ describe('from-kafka-helpers', () => { expect(messageToForward.subaccountNumber).toEqual(defaultChildAccNumber); }); + it('gets correct MessageToForward for BlockHeight message', () => { + const message: KafkaMessage = createKafkaMessage( + Buffer.from(Uint8Array.from(BlockHeightMessage.encode(defaultBlockHeightMessage).finish())), + ); + const messageToForward: MessageToForward = getMessagesToForward( + WebsocketTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + message, + ).pop()!; + expect(messageToForward.channel).toEqual(Channel.V4_BLOCK_HEIGHT); + expect(messageToForward.version).toEqual(defaultBlockHeightMessage.version); + expect(messageToForward.contents).toEqual( + { + blockHeight: defaultBlockHeightMessage.blockHeight, + time: defaultBlockHeightMessage.time, + }, + ); + }); + it('filters out transfers between child subaccounts for parent subaccount channel', () => { const transferContents: SubaccountMessageContents = { transfers: { diff --git a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts index a7c1ad75b0..ccdbd4c263 100644 --- a/indexer/services/socks/__tests__/lib/message-forwarder.test.ts +++ b/indexer/services/socks/__tests__/lib/message-forwarder.test.ts @@ -13,6 +13,7 @@ import { startConsumer, TRADES_WEBSOCKET_MESSAGE_VERSION, SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION, + BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION, } from '@dydxprotocol-indexer/kafka'; import { MessageForwarder } from '../../src/lib/message-forwarder'; import WebSocket from 'ws'; @@ -27,7 +28,7 @@ import { WebsocketEvents, } from '../../src/types'; import { Admin } from 'kafkajs'; -import { SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; +import { BlockHeightMessage, SubaccountMessage, TradeMessage } from '@dydxprotocol-indexer/v4-protos'; import { dbHelpers, testMocks, @@ -44,6 +45,7 @@ import { defaultSubaccountId, ethClobPairId, ethTicker, + defaultBlockHeightMessage, } from '../constants'; import _ from 'lodash'; import { axiosRequest } from '../../src/lib/axios'; @@ -544,6 +546,97 @@ describe('message-forwarder', () => { })); }); }); + + it('forwards block height messages', (done: jest.DoneCallback) => { + const channel: Channel = Channel.V4_BLOCK_HEIGHT; + const id: string = 'v4_block_height'; + + const blockHeightMessage2 = { + ...defaultBlockHeightMessage, + blockHeight: '1', + }; + + const messageForwarder: MessageForwarder = new MessageForwarder(subscriptions, index); + subscriptions.start(messageForwarder.forwardToClient); + messageForwarder.start(); + + const ws = new WebSocket(WS_HOST); + let connectionId: string; + + ws.on(WebsocketEvents.MESSAGE, async (message) => { + const msg: OutgoingMessage = JSON.parse(message.toString()) as OutgoingMessage; + if (msg.message_id === 0) { + connectionId = msg.connection_id; + } + if (msg.message_id === 1) { + // Check that the initial message is correct. + checkInitialMessage( + msg as SubscribedMessage, + connectionId, + channel, + id, + mockAxiosResponse, + ); + + // Send a couple of block height messages + for (const blockHeightMessage of _.concat( + defaultBlockHeightMessage, + blockHeightMessage2, + )) { + await producer.send({ + topic: WebsocketTopics.TO_WEBSOCKETS_BLOCK_HEIGHT, + messages: [{ + value: Buffer.from( + Uint8Array.from(BlockHeightMessage.encode(blockHeightMessage).finish()), + ), + partition: 0, + timestamp: `${Date.now()}`, + }], + }); + } + } + + const forwardedMsg: ChannelDataMessage = JSON.parse( + message.toString(), + ) as ChannelDataMessage; + + if (msg.message_id >= 2) { + expect(forwardedMsg.connection_id).toBe(connectionId); + expect(forwardedMsg.type).toBe(OutgoingMessageType.CHANNEL_DATA); + expect(forwardedMsg.channel).toBe(channel); + expect(forwardedMsg.id).toBe(id); + expect(forwardedMsg.version).toEqual(BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION); + } + + if (msg.message_id === 2) { + expect(forwardedMsg.contents) + .toEqual( + { + blockHeight: defaultBlockHeightMessage.blockHeight, + time: defaultBlockHeightMessage.time, + }); + } + + if (msg.message_id === 3) { + expect(forwardedMsg.contents) + .toEqual( + { + blockHeight: blockHeightMessage2.blockHeight, + time: blockHeightMessage2.time, + }); + done(); + } + }); + + ws.on('open', () => { + ws.send(JSON.stringify({ + type: IncomingMessageType.SUBSCRIBE, + channel, + id, + batched: false, + })); + }); + }); }); function checkInitialMessage( diff --git a/indexer/services/socks/__tests__/lib/subscriptions.test.ts b/indexer/services/socks/__tests__/lib/subscriptions.test.ts index b721c2bc82..6f38ab1fe6 100644 --- a/indexer/services/socks/__tests__/lib/subscriptions.test.ts +++ b/indexer/services/socks/__tests__/lib/subscriptions.test.ts @@ -41,8 +41,10 @@ describe('Subscriptions', () => { [Channel.V4_ORDERBOOK]: btcTicker, [Channel.V4_TRADES]: btcTicker, [Channel.V4_PARENT_ACCOUNTS]: mockSubaccountId, + [Channel.V4_BLOCK_HEIGHT]: defaultId, }; - const invalidIdsMap: Record<Exclude<Channel, Channel.V4_MARKETS>, string[]> = { + const invalidIdsMap: + Record<Exclude<Channel, Channel.V4_MARKETS | Channel.V4_BLOCK_HEIGHT>, string[]> = { [Channel.V4_ACCOUNTS]: [invalidTicker], [Channel.V4_CANDLES]: [ `${invalidTicker}/${CandleResolution.ONE_DAY}`, @@ -66,6 +68,7 @@ describe('Subscriptions', () => { '/v4/addresses/.+/parentSubaccountNumber/.+', '/v4/orders/parentSubaccountNumber?.+parentSubaccountNumber.+OPEN,UNTRIGGERED,BEST_EFFORT_OPENED,BEST_EFFORT_CANCELED', ], + [Channel.V4_BLOCK_HEIGHT]: ['v4/height'], }; const initialMessage: Object = { a: 'b' }; const country: string = 'AR'; @@ -105,6 +108,7 @@ describe('Subscriptions', () => { [Channel.V4_ORDERBOOK, validIds[Channel.V4_ORDERBOOK]], [Channel.V4_TRADES, validIds[Channel.V4_TRADES]], [Channel.V4_PARENT_ACCOUNTS, validIds[Channel.V4_PARENT_ACCOUNTS]], + [Channel.V4_BLOCK_HEIGHT, validIds[Channel.V4_BLOCK_HEIGHT]], ])('handles valid subscription request to channel %s', async ( channel: Channel, id: string, @@ -316,6 +320,7 @@ describe('Subscriptions', () => { [Channel.V4_MARKETS, validIds[Channel.V4_MARKETS]], [Channel.V4_ORDERBOOK, validIds[Channel.V4_ORDERBOOK]], [Channel.V4_TRADES, validIds[Channel.V4_TRADES]], + [Channel.V4_BLOCK_HEIGHT, validIds[Channel.V4_BLOCK_HEIGHT]], ])('handles valid unsubscription request to channel %s', async ( channel: Channel, id: string, diff --git a/indexer/services/socks/__tests__/websocket/index.test.ts b/indexer/services/socks/__tests__/websocket/index.test.ts index 22d510f7e4..83676f57df 100644 --- a/indexer/services/socks/__tests__/websocket/index.test.ts +++ b/indexer/services/socks/__tests__/websocket/index.test.ts @@ -174,7 +174,9 @@ describe('Index', () => { ALL_CHANNELS.map((channel: Channel) => { return [channel]; }), )('handles valid subscription message for channel: %s', (channel: Channel) => { // Test that markets work with a missing id. - const id: string | undefined = channel === Channel.V4_MARKETS ? undefined : subId; + const id: string | undefined = ( + channel === Channel.V4_MARKETS || channel === Channel.V4_BLOCK_HEIGHT + ) ? undefined : subId; const isBatched: boolean = false; const subMessage: IncomingMessage = createIncomingMessage({ type: IncomingMessageType.SUBSCRIBE, @@ -200,7 +202,9 @@ describe('Index', () => { ALL_CHANNELS.map((channel: Channel) => { return [channel]; }), )('handles valid unsubscribe message for channel: %s', (channel: Channel) => { // Test that markets work with a missing id. - const id: string | undefined = channel === Channel.V4_MARKETS ? undefined : subId; + const id: string | undefined = ( + channel === Channel.V4_MARKETS || channel === Channel.V4_BLOCK_HEIGHT + ) ? undefined : subId; const unSubMessage: IncomingMessage = createIncomingMessage({ type: IncomingMessageType.UNSUBSCRIBE, channel, diff --git a/indexer/services/socks/src/helpers/from-kafka-helpers.ts b/indexer/services/socks/src/helpers/from-kafka-helpers.ts index 581879c7c2..4138f18587 100644 --- a/indexer/services/socks/src/helpers/from-kafka-helpers.ts +++ b/indexer/services/socks/src/helpers/from-kafka-helpers.ts @@ -7,6 +7,7 @@ import { } from '@dydxprotocol-indexer/postgres'; import { getParentSubaccountNum } from '@dydxprotocol-indexer/postgres/build/src/lib/parent-subaccount-helpers'; import { + BlockHeightMessage, CandleMessage, CandleMessage_Resolution, MarketMessage, @@ -16,7 +17,7 @@ import { } from '@dydxprotocol-indexer/v4-protos'; import { KafkaMessage } from 'kafkajs'; -import { TOPIC_TO_CHANNEL, V4_MARKETS_ID } from '../lib/constants'; +import { TOPIC_TO_CHANNEL, V4_BLOCK_HEIGHT_ID, V4_MARKETS_ID } from '../lib/constants'; import { InvalidForwardMessageError, InvalidTopicError } from '../lib/errors'; import { Channel, MessageToForward, WebsocketTopics } from '../types'; @@ -163,6 +164,18 @@ export function getMessagesToForward(topic: string, message: KafkaMessage): Mess version: subaccountMessage.version, }]; } + case WebsocketTopics.TO_WEBSOCKETS_BLOCK_HEIGHT: { + const blockHeightMessage: BlockHeightMessage = BlockHeightMessage.decode(messageBinary); + return [{ + channel: Channel.V4_BLOCK_HEIGHT, + id: V4_BLOCK_HEIGHT_ID, + version: blockHeightMessage.version, + contents: { + blockHeight: blockHeightMessage.blockHeight, + time: blockHeightMessage.time, + }, + }]; + } default: throw new InvalidForwardMessageError(`Unknown topic: ${topic}`); } diff --git a/indexer/services/socks/src/lib/constants.ts b/indexer/services/socks/src/lib/constants.ts index acf32e73f8..b675948333 100644 --- a/indexer/services/socks/src/lib/constants.ts +++ b/indexer/services/socks/src/lib/constants.ts @@ -19,6 +19,7 @@ export const ERR_INVALID_WEBSOCKET_FRAME: string = 'Invalid WebSocket frame'; export const WEBSOCKET_NOT_OPEN: string = 'ws not open'; export const V4_MARKETS_ID: string = 'v4_markets'; +export const V4_BLOCK_HEIGHT_ID: string = 'v4_block_height'; export const TOPIC_TO_CHANNEL: Record<WebsocketTopics, Channel[]> = { [WebsocketTopics.TO_WEBSOCKETS_CANDLES]: [Channel.V4_CANDLES], @@ -26,6 +27,7 @@ export const TOPIC_TO_CHANNEL: Record<WebsocketTopics, Channel[]> = { [WebsocketTopics.TO_WEBSOCKETS_ORDERBOOKS]: [Channel.V4_ORDERBOOK], [WebsocketTopics.TO_WEBSOCKETS_SUBACCOUNTS]: [Channel.V4_ACCOUNTS, Channel.V4_PARENT_ACCOUNTS], [WebsocketTopics.TO_WEBSOCKETS_TRADES]: [Channel.V4_TRADES], + [WebsocketTopics.TO_WEBSOCKETS_BLOCK_HEIGHT]: [Channel.V4_BLOCK_HEIGHT], }; export const MAX_TIMEOUT_INTEGER: number = 2147483647; diff --git a/indexer/services/socks/src/lib/subscription.ts b/indexer/services/socks/src/lib/subscription.ts index 9cae74b842..c684367f59 100644 --- a/indexer/services/socks/src/lib/subscription.ts +++ b/indexer/services/socks/src/lib/subscription.ts @@ -26,7 +26,7 @@ import { SubscriptionInfo, } from '../types'; import { axiosRequest } from './axios'; -import { V4_MARKETS_ID, WS_CLOSE_CODE_POLICY_VIOLATION } from './constants'; +import { V4_BLOCK_HEIGHT_ID, V4_MARKETS_ID, WS_CLOSE_CODE_POLICY_VIOLATION } from './constants'; import { BlockedError, InvalidChannelError } from './errors'; import { RateLimiter } from './rate-limit'; @@ -112,7 +112,7 @@ export class Subscriptions { return; } - const subscriptionId: string = this.normalizeSubscriptionId(id); + const subscriptionId: string = this.normalizeSubscriptionId(channel, id); const duration: number = this.subscribeRateLimiter.rateLimit({ connectionId, key: channel + subscriptionId, @@ -303,7 +303,7 @@ export class Subscriptions { channel: Channel, id?: string, ): void { - const subscriptionId: string = this.normalizeSubscriptionId(id); + const subscriptionId: string = this.normalizeSubscriptionId(channel, id); if (this.subscriptionLists[connectionId]) { this.subscriptionLists[connectionId] = this.subscriptionLists[connectionId].filter( (e: Subscription) => (e.channel !== channel || e.id !== subscriptionId), @@ -388,8 +388,9 @@ export class Subscriptions { * @returns */ private validateSubscription(channel: Channel, id?: string): boolean { - // Only markets channel does not require an id to subscribe to. - if (channel !== Channel.V4_MARKETS && id === undefined) { + // Only markets & block height channels do not require an id to subscribe to. + if ((channel !== Channel.V4_MARKETS && channel !== Channel.V4_BLOCK_HEIGHT) && + id === undefined) { return false; } switch (channel) { @@ -399,6 +400,7 @@ export class Subscriptions { MAX_PARENT_SUBACCOUNTS * CHILD_SUBACCOUNT_MULTIPLIER, ); } + case (Channel.V4_BLOCK_HEIGHT): case (Channel.V4_MARKETS): { return true; } @@ -437,12 +439,16 @@ export class Subscriptions { /** * Normalizes subscription ids. If the id is undefined, returns the default id for the markets - * channel, which is the only channel that does not have specific ids to subscribe to. + * channel or block height channel which are the only channels that don't + * have specific ids to subscribe to. * NOTE: Validation of the id and channel will happen in other functions. * @param id Subscription id to normalize. * @returns Normalized subscription id. */ - private normalizeSubscriptionId(id?: string): string { + private normalizeSubscriptionId(channel: Channel, id?: string): string { + if (channel === Channel.V4_BLOCK_HEIGHT) { + return id ?? V4_BLOCK_HEIGHT_ID; + } return id ?? V4_MARKETS_ID; } @@ -486,6 +492,9 @@ export class Subscriptions { case (Channel.V4_MARKETS): { return `${COMLINK_URL}/v4/perpetualMarkets`; } + case (Channel.V4_BLOCK_HEIGHT): { + return `${COMLINK_URL}/v4/height`; + } case (Channel.V4_ORDERBOOK): { if (id === undefined) { throw new Error('Invalid undefined channel'); diff --git a/indexer/services/socks/src/types.ts b/indexer/services/socks/src/types.ts index 0f66f7784e..9b42042cca 100644 --- a/indexer/services/socks/src/types.ts +++ b/indexer/services/socks/src/types.ts @@ -22,6 +22,7 @@ export enum Channel { V4_MARKETS = 'v4_markets', V4_CANDLES = 'v4_candles', V4_PARENT_ACCOUNTS = 'v4_parent_subaccounts', + V4_BLOCK_HEIGHT = 'v4_block_height', } export const ALL_CHANNELS = Object.values(Channel); @@ -144,6 +145,7 @@ export enum WebsocketTopics { TO_WEBSOCKETS_TRADES = 'to-websockets-trades', TO_WEBSOCKETS_MARKETS = 'to-websockets-markets', TO_WEBSOCKETS_CANDLES = 'to-websockets-candles', + TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height', } export enum WebsocketEvents { diff --git a/indexer/services/socks/src/websocket/index.ts b/indexer/services/socks/src/websocket/index.ts index 914e898299..ad2d7edf83 100644 --- a/indexer/services/socks/src/websocket/index.ts +++ b/indexer/services/socks/src/websocket/index.ts @@ -461,7 +461,7 @@ export class Index { private validateSubscriptionForChannel( message: SubscribeMessage | UnsubscribeMessage, ): boolean { - if (message.channel === Channel.V4_MARKETS) { + if (message.channel === Channel.V4_MARKETS || message.channel === Channel.V4_BLOCK_HEIGHT) { return true; } return message.id !== undefined && typeof message.id === 'string'; diff --git a/proto/dydxprotocol/indexer/socks/messages.proto b/proto/dydxprotocol/indexer/socks/messages.proto index 36c99cfe39..b3577388a3 100644 --- a/proto/dydxprotocol/indexer/socks/messages.proto +++ b/proto/dydxprotocol/indexer/socks/messages.proto @@ -92,3 +92,15 @@ message CandleMessage { // Version of the websocket message. string version = 4; } + +// Message to be sent through the 'to-websockets-block-height` kafka topic. +message BlockHeightMessage { + // Block height where the contents occur. + string block_height = 1; + + // ISO formatted time of the block height. + string time = 2; + + // Version of the websocket message. + string version = 3; +}