Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwik committed Dec 1, 2023
1 parent 49176d0 commit cb63976
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 17 deletions.
3 changes: 3 additions & 0 deletions indexer/packages/kafka/src/batch-kafka-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ export class BatchKafkaProducer {
constructor(
topic: KafkaTopics,
producer: Producer,
// Note that default parameters are bound during module load time making it difficult
// to modify the parameter during a test so we explicitly require callers to pass in
// config.KAFKA_MAX_BATCH_WEBSOCKET_MESSAGE_SIZE_BYTES.
maxBatchSizeBytes: number,
) {
this.maxBatchSizeBytes = maxBatchSizeBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const scripts: string[] = [
'create_extension_uuid_ossp.sql',
'dydx_asset_create_handler.sql',
'dydx_block_processor.sql',
'dydx_block_processor_batched_handlers.sql',
'dydx_block_processor_sync_handlers.sql',
'dydx_block_processor_ordered_handlers.sql',
'dydx_block_processor_unordered_handlers.sql',
'dydx_clob_pair_status_to_market_status.sql',
'dydx_deleveraging_handler.sql',
'dydx_market_create_handler.sql',
Expand Down
8 changes: 4 additions & 4 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type DecodedIndexerTendermintEvent = Omit<IndexerTendermintEvent, 'dataBytes'> &

export class BlockProcessor {
block: IndexerTendermintBlock;
sqlEvents: Promise<object>[];
sqlEventPromises: Promise<object>[];
sqlBlock: DecodedIndexerTendermintBlock;
txId: number;
batchedHandlers: BatchedHandlers;
Expand All @@ -86,7 +86,7 @@ export class BlockProcessor {
...this.block,
events: new Array(this.block.events.length),
};
this.sqlEvents = new Array(this.block.events.length);
this.sqlEventPromises = new Array(this.block.events.length);
this.batchedHandlers = new BatchedHandlers();
this.syncHandlers = new SyncHandlers();
}
Expand Down Expand Up @@ -200,7 +200,7 @@ export class BlockProcessor {
);

validator.validate();
this.sqlEvents[eventProto.blockEventIndex] = validator.getEventForBlockProcessor();
this.sqlEventPromises[eventProto.blockEventIndex] = validator.getEventForBlockProcessor();
const handlers: Handler<EventMessage>[] = validator.createHandlers(
eventProto.indexerTendermintEvent,
this.txId,
Expand All @@ -218,7 +218,7 @@ export class BlockProcessor {
private async processEvents(): Promise<KafkaPublisher> {
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();

await Promise.all(this.sqlEvents).then((values) => {
await Promise.all(this.sqlEventPromises).then((values) => {
for (let i: number = 0; i < this.block.events.length; i++) {
const event: IndexerTendermintEvent = this.block.events[i];
this.sqlBlock.events[i] = {
Expand Down
22 changes: 13 additions & 9 deletions indexer/services/ender/src/scripts/dydx_block_processor.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ DECLARE
BEGIN
PERFORM dydx_create_initial_rows_for_tendermint_block(block_height, block_time, block->'txHashes', block->'events');

/** In genesis, handle sync events first, then batched events. In other blocks, handle batched events first, then sync events. */
/** In genesis, handle ordered events first, then unordered events. In other blocks, handle unordered events first, then ordered events. */
IF NOT block_height = 0 THEN
rval = dydx_block_processor_batched_handlers(block);
rval_to_merge = dydx_block_processor_sync_handlers(block);
rval = dydx_block_processor_unordered_handlers(block);
rval_to_merge = dydx_block_processor_ordered_handlers(block);
ELSE
rval = dydx_block_processor_sync_handlers(block);
rval_to_merge = dydx_block_processor_batched_handlers(block);
rval = dydx_block_processor_ordered_handlers(block);
rval_to_merge = dydx_block_processor_unordered_handlers(block);
END IF;

/** Note that arrays are 1-indexed in PostgreSQL and empty arrays return NULL for array_length. */
FOR i in 1..coalesce(array_length(rval, 1), 0) LOOP
rval[i] = coalesce(rval[i], rval_to_merge[i]);
END LOOP;
/**
Merge the results of the two handlers together by taking the first non-null result of each.
Note that arrays are 1-indexed in PostgreSQL and empty arrays return NULL for array_length.
*/
FOR i in 1..coalesce(array_length(rval, 1), 0) LOOP
rval[i] = coalesce(rval[i], rval_to_merge[i]);
END LOOP;

RETURN to_jsonb(rval);
END;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION dydx_block_processor_sync_handlers(block jsonb) RETURNS jsonb[] AS $$
CREATE OR REPLACE FUNCTION dydx_block_processor_ordered_handlers(block jsonb) RETURNS jsonb[] AS $$
/**
Processes each event that should be handled by the batched handler. This includes all synchronous types
(https://github.com/dydxprotocol/v4-chain/blob/b5d4e8a7c5cc48c460731b21c47f22eabef8b2b7/indexer/services/ender/src/lib/sync-handlers.ts#L11).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE FUNCTION dydx_block_processor_batched_handlers(block jsonb) RETURNS jsonb[] AS $$
CREATE OR REPLACE FUNCTION dydx_block_processor_unordered_handlers(block jsonb) RETURNS jsonb[] AS $$
/**
Processes each event that should be handled by the batched handler. This includes all supported non synchronous types
(https://github.com/dydxprotocol/v4-chain/blob/b5d4e8a7c5cc48c460731b21c47f22eabef8b2b7/indexer/services/ender/src/lib/sync-handlers.ts#L11).
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/validators/order-fill-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ export class OrderFillValidator extends Validator<OrderFillEventV1> {
}

public async getEventForBlockProcessor(): Promise<object> {
// If event.order is populated then this means it is not a liquidation
// order, and therefore we need to know the canceled order status stored
// in redis to correctly update the database.
if (this.event.order) {
return Promise.all([
CanceledOrdersCache.getOrderCanceledStatus(
Expand Down

0 comments on commit cb63976

Please sign in to comment.