Skip to content

Commit

Permalink
Indexer e2e latency round 2 (#1314)
Browse files Browse the repository at this point in the history
* [CT-708] Indexer track e2e latency (#1237)

* fwd through message times

* use the var i made

* post processing stat emission

* post-forwarding timestamp

* pass through event type from vulcan

* event type to stat emissions

* test fix function calls

* WIP WIP WIP

* fix tests

* unused import

* test that kafka messages are threaded

* pass through message headers verbatim

* test logs for on message

* short term order event types

(cherry picked from commit 4daa11d)

# Conflicts:
#	indexer/services/socks/src/lib/message-forwarder.ts
  • Loading branch information
jonfung-dydx authored and mergify[bot] committed May 21, 2024
1 parent e5841c3 commit 0697816
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 56 deletions.
1 change: 1 addition & 0 deletions indexer/packages/v4-protos/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export * from './codegen/google/protobuf/timestamp';
export * from './codegen/dydxprotocol/indexer/protocol/v1/clob';
export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount';
export * from './codegen/dydxprotocol/indexer/shared/removal_reason';
export * from './utils';
12 changes: 12 additions & 0 deletions indexer/packages/v4-protos/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Timestamp } from './codegen/google/protobuf/timestamp';

export const MILLIS_IN_NANOS: number = 1_000_000;
export const SECONDS_IN_MILLIS: number = 1_000;
export function protoTimestampToDate(
protoTime: Timestamp,
): Date {
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);

return new Date(timeInMillis);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
PerpetualMarketCreateEventV1,
PerpetualMarketCreateEventV2,
DeleveragingEventV1,
protoTimestampToDate,
} from '@dydxprotocol-indexer/v4-protos';
import {
PerpetualMarketType,
Expand All @@ -64,7 +65,6 @@ import {
generatePerpetualMarketMessage,
generatePerpetualPositionsContents,
} from '../../src/helpers/kafka-helper';
import { protoTimestampToDate } from '../../src/lib/helper';
import { DydxIndexerSubtypes, VulcanMessage } from '../../src/lib/types';

// TX Hash is SHA256, so is of length 64 hexadecimal without the '0x'.
Expand Down
14 changes: 0 additions & 14 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
import {
IndexerTendermintEvent,
IndexerTendermintEvent_BlockEvent,
Timestamp,
OrderFillEventV1,
MarketEventV1,
SubaccountUpdateEventV1,
Expand All @@ -32,10 +31,6 @@ import Big from 'big.js';
import _ from 'lodash';
import { DateTime } from 'luxon';

import {
MILLIS_IN_NANOS,
SECONDS_IN_MILLIS,
} from '../constants';
import {
AnnotatedSubaccountMessage,
DydxIndexerSubtypes,
Expand Down Expand Up @@ -73,15 +68,6 @@ export function convertToSubaccountMessage(
return subaccountMessage;
}

export function protoTimestampToDate(
protoTime: Timestamp,
): Date {
const timeInMillis: number = Number(protoTime.seconds) * SECONDS_IN_MILLIS +
Math.floor(protoTime.nanos / MILLIS_IN_NANOS);

return new Date(timeInMillis);
}

export function dateToDateTime(
protoTime: Date,
): DateTime {
Expand Down
32 changes: 31 additions & 1 deletion indexer/services/socks/src/lib/message-forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ export class MessageForwarder {
}

public onMessage(topic: string, message: KafkaMessage): void {
const start: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.message_time_in_queue`,
Date.now() - Number(message.timestamp),
start - Number(message.timestamp),
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
Expand Down Expand Up @@ -154,6 +155,35 @@ export class MessageForwarder {
);
}
}
<<<<<<< HEAD
=======

const startForwardMessage: number = Date.now();
this.forwardMessage(messageToForward);
const end: number = Date.now();
stats.timing(
`${config.SERVICE_NAME}.forward_message`,
end - startForwardMessage,
config.MESSAGE_FORWARDER_STATSD_SAMPLE_RATE,
{
topic,
channel: String(channel),
},
);

const originalMessageTimestamp = message.headers?.message_received_timestamp;
if (originalMessageTimestamp !== undefined) {
stats.timing(
`${config.SERVICE_NAME}.message_time_since_received`,
startForwardMessage - Number(originalMessageTimestamp),
STATS_NO_SAMPLING,
{
topic,
event_type: String(message.headers?.event_type),
},
);
}
>>>>>>> 4daa11de (Indexer e2e latency round 2 (#1314))
}

public forwardMessage(message: MessageToForward): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import { expectCanceledOrderStatus, expectOpenOrderIds, handleInitialOrderPlace
import { expectOffchainUpdateMessage, expectWebsocketOrderbookMessage, expectWebsocketSubaccountMessage } from '../helpers/websocket-helpers';
import { OrderbookSide } from '../../src/lib/types';
import { getOrderIdHash, isLongTermOrder, isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import { defaultKafkaHeaders } from '../helpers/constants';
import config from '../../src/config';

jest.mock('@dydxprotocol-indexer/base', () => ({
Expand Down Expand Up @@ -196,6 +197,12 @@ describe('order-place-handler', () => {
const replacementMessageIoc: KafkaMessage = createKafkaMessage(
Buffer.from(Uint8Array.from(OffChainUpdateV1.encode(replacementUpdateIoc).finish())),
);
[replacementMessage, replacementMessageGoodTilBlockTime, replacementMessageConditional,
replacementMessageFok, replacementMessageIoc].forEach((message) => {
// eslint-disable-next-line no-param-reassign
message.headers = defaultKafkaHeaders;
});

const dbDefaultOrder: OrderFromDatabase = {
...testConstants.defaultOrder,
id: testConstants.defaultOrderId,
Expand Down Expand Up @@ -1225,7 +1232,11 @@ function expectWebsocketMessagesSent(
version: SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION,
});

expectWebsocketSubaccountMessage(producerSendSpy.mock.calls[callIndex][0], subaccountMessage);
expectWebsocketSubaccountMessage(
producerSendSpy.mock.calls[callIndex][0],
subaccountMessage,
defaultKafkaHeaders,
);
callIndex += 1;
}

Expand Down
Loading

0 comments on commit 0697816

Please sign in to comment.