diff --git a/indexer/packages/kafka/src/batch-kafka-producer.ts b/indexer/packages/kafka/src/batch-kafka-producer.ts index 279fcf519c..6885e29ae9 100644 --- a/indexer/packages/kafka/src/batch-kafka-producer.ts +++ b/indexer/packages/kafka/src/batch-kafka-producer.ts @@ -27,6 +27,9 @@ export class BatchKafkaProducer { constructor( topic: KafkaTopics, producer: Producer, + // Note that default parameters are bound during module load time making it difficult + // to modify the parameter during a test so we explicitly require callers to pass in + // config.KAFKA_MAX_BATCH_WEBSOCKET_MESSAGE_SIZE_BYTES. maxBatchSizeBytes: number, ) { this.maxBatchSizeBytes = maxBatchSizeBytes; diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index 41a6838f15..94024ee20f 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -31,8 +31,8 @@ const scripts: string[] = [ 'create_extension_uuid_ossp.sql', 'dydx_asset_create_handler.sql', 'dydx_block_processor.sql', - 'dydx_block_processor_batched_handlers.sql', - 'dydx_block_processor_sync_handlers.sql', + 'dydx_block_processor_ordered_handlers.sql', + 'dydx_block_processor_unordered_handlers.sql', 'dydx_clob_pair_status_to_market_status.sql', 'dydx_deleveraging_handler.sql', 'dydx_market_create_handler.sql', diff --git a/indexer/services/ender/src/lib/block-processor.ts b/indexer/services/ender/src/lib/block-processor.ts index 27f0878023..195456ddbf 100644 --- a/indexer/services/ender/src/lib/block-processor.ts +++ b/indexer/services/ender/src/lib/block-processor.ts @@ -70,7 +70,7 @@ type DecodedIndexerTendermintEvent = Omit & export class BlockProcessor { block: IndexerTendermintBlock; - sqlEvents: Promise[]; + sqlEventPromises: Promise[]; sqlBlock: DecodedIndexerTendermintBlock; txId: number; batchedHandlers: BatchedHandlers; @@ -86,7 +86,7 @@ export class BlockProcessor { ...this.block, events: new Array(this.block.events.length), }; - this.sqlEvents = new Array(this.block.events.length); + this.sqlEventPromises = new Array(this.block.events.length); this.batchedHandlers = new BatchedHandlers(); this.syncHandlers = new SyncHandlers(); } @@ -200,7 +200,7 @@ export class BlockProcessor { ); validator.validate(); - this.sqlEvents[eventProto.blockEventIndex] = validator.getEventForBlockProcessor(); + this.sqlEventPromises[eventProto.blockEventIndex] = validator.getEventForBlockProcessor(); const handlers: Handler[] = validator.createHandlers( eventProto.indexerTendermintEvent, this.txId, @@ -218,7 +218,7 @@ export class BlockProcessor { private async processEvents(): Promise { const kafkaPublisher: KafkaPublisher = new KafkaPublisher(); - await Promise.all(this.sqlEvents).then((values) => { + await Promise.all(this.sqlEventPromises).then((values) => { for (let i: number = 0; i < this.block.events.length; i++) { const event: IndexerTendermintEvent = this.block.events[i]; this.sqlBlock.events[i] = { diff --git a/indexer/services/ender/src/scripts/dydx_block_processor.sql b/indexer/services/ender/src/scripts/dydx_block_processor.sql index 7571d1abf6..c944c0adec 100644 --- a/indexer/services/ender/src/scripts/dydx_block_processor.sql +++ b/indexer/services/ender/src/scripts/dydx_block_processor.sql @@ -20,19 +20,23 @@ DECLARE BEGIN PERFORM dydx_create_initial_rows_for_tendermint_block(block_height, block_time, block->'txHashes', block->'events'); - /** In genesis, handle sync events first, then batched events. In other blocks, handle batched events first, then sync events. */ + /** In genesis, handle ordered events first, then unordered events. In other blocks, handle unordered events first, then ordered events. */ IF NOT block_height = 0 THEN - rval = dydx_block_processor_batched_handlers(block); - rval_to_merge = dydx_block_processor_sync_handlers(block); + rval = dydx_block_processor_unordered_handlers(block); + rval_to_merge = dydx_block_processor_ordered_handlers(block); ELSE - rval = dydx_block_processor_sync_handlers(block); - rval_to_merge = dydx_block_processor_batched_handlers(block); + rval = dydx_block_processor_ordered_handlers(block); + rval_to_merge = dydx_block_processor_unordered_handlers(block); END IF; - /** Note that arrays are 1-indexed in PostgreSQL and empty arrays return NULL for array_length. */ - FOR i in 1..coalesce(array_length(rval, 1), 0) LOOP - rval[i] = coalesce(rval[i], rval_to_merge[i]); - END LOOP; + /** + Merge the results of the two handlers together by taking the first non-null result of each. + + Note that arrays are 1-indexed in PostgreSQL and empty arrays return NULL for array_length. + */ + FOR i in 1..coalesce(array_length(rval, 1), 0) LOOP + rval[i] = coalesce(rval[i], rval_to_merge[i]); + END LOOP; RETURN to_jsonb(rval); END; diff --git a/indexer/services/ender/src/scripts/dydx_block_processor_sync_handlers.sql b/indexer/services/ender/src/scripts/dydx_block_processor_ordered_handlers.sql similarity index 96% rename from indexer/services/ender/src/scripts/dydx_block_processor_sync_handlers.sql rename to indexer/services/ender/src/scripts/dydx_block_processor_ordered_handlers.sql index cbc2e5904e..94d0ea30e0 100644 --- a/indexer/services/ender/src/scripts/dydx_block_processor_sync_handlers.sql +++ b/indexer/services/ender/src/scripts/dydx_block_processor_ordered_handlers.sql @@ -1,4 +1,4 @@ -CREATE OR REPLACE FUNCTION dydx_block_processor_sync_handlers(block jsonb) RETURNS jsonb[] AS $$ +CREATE OR REPLACE FUNCTION dydx_block_processor_ordered_handlers(block jsonb) RETURNS jsonb[] AS $$ /** Processes each event that should be handled by the batched handler. This includes all synchronous types (https://github.com/dydxprotocol/v4-chain/blob/b5d4e8a7c5cc48c460731b21c47f22eabef8b2b7/indexer/services/ender/src/lib/sync-handlers.ts#L11). diff --git a/indexer/services/ender/src/scripts/dydx_block_processor_batched_handlers.sql b/indexer/services/ender/src/scripts/dydx_block_processor_unordered_handlers.sql similarity index 97% rename from indexer/services/ender/src/scripts/dydx_block_processor_batched_handlers.sql rename to indexer/services/ender/src/scripts/dydx_block_processor_unordered_handlers.sql index 5f3f2aa320..87124ace11 100644 --- a/indexer/services/ender/src/scripts/dydx_block_processor_batched_handlers.sql +++ b/indexer/services/ender/src/scripts/dydx_block_processor_unordered_handlers.sql @@ -1,4 +1,4 @@ -CREATE OR REPLACE FUNCTION dydx_block_processor_batched_handlers(block jsonb) RETURNS jsonb[] AS $$ +CREATE OR REPLACE FUNCTION dydx_block_processor_unordered_handlers(block jsonb) RETURNS jsonb[] AS $$ /** Processes each event that should be handled by the batched handler. This includes all supported non synchronous types (https://github.com/dydxprotocol/v4-chain/blob/b5d4e8a7c5cc48c460731b21c47f22eabef8b2b7/indexer/services/ender/src/lib/sync-handlers.ts#L11). diff --git a/indexer/services/ender/src/validators/order-fill-validator.ts b/indexer/services/ender/src/validators/order-fill-validator.ts index ac6bcaab72..f9893b0ba6 100644 --- a/indexer/services/ender/src/validators/order-fill-validator.ts +++ b/indexer/services/ender/src/validators/order-fill-validator.ts @@ -38,6 +38,9 @@ export class OrderFillValidator extends Validator { } public async getEventForBlockProcessor(): Promise { + // If event.order is populated then this means it is not a liquidation + // order, and therefore we need to know the canceled order status stored + // in redis to correctly update the database. if (this.event.order) { return Promise.all([ CanceledOrdersCache.getOrderCanceledStatus(