-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Indexer e2e latency round 2 #1314
Conversation
* fwd through message times * use the var i made * post processing stat emission * post-forwarding timestamp * pass through event type from vulcan * event type to stat emissions * test fix function calls * WIP WIP WIP * fix tests * unused import * test that kafka messages are threaded
WalkthroughThe update streamlines timestamp handling across services by introducing a utility for converting protobuf timestamps to JavaScript Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Status
Configuration used: CodeRabbit UI
Files selected for processing (16)
- indexer/packages/v4-protos/src/index.ts (1 hunks)
- indexer/packages/v4-protos/src/utils.ts (1 hunks)
- indexer/services/ender/tests/helpers/indexer-proto-helpers.ts (2 hunks)
- indexer/services/ender/src/lib/helper.ts (3 hunks)
- indexer/services/socks/src/lib/message-forwarder.ts (3 hunks)
- indexer/services/vulcan/tests/handlers/order-place-handler.test.ts (3 hunks)
- indexer/services/vulcan/tests/handlers/order-remove-handler.test.ts (24 hunks)
- indexer/services/vulcan/tests/helpers/constants.ts (1 hunks)
- indexer/services/vulcan/tests/helpers/helpers.ts (3 hunks)
- indexer/services/vulcan/tests/helpers/websocket-helpers.ts (1 hunks)
- indexer/services/vulcan/tests/lib/on-message.test.ts (1 hunks)
- indexer/services/vulcan/src/handlers/handler.ts (2 hunks)
- indexer/services/vulcan/src/handlers/order-place-handler.ts (7 hunks)
- indexer/services/vulcan/src/handlers/order-remove-handler.ts (11 hunks)
- indexer/services/vulcan/src/handlers/order-update-handler.ts (3 hunks)
- indexer/services/vulcan/src/lib/on-message.ts (2 hunks)
Additional Context Used
Path-based Instructions (16)
indexer/packages/v4-protos/src/utils.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/__tests__/helpers/constants.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/packages/v4-protos/src/index.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/src/handlers/handler.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/__tests__/helpers/helpers.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/src/lib/on-message.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/__tests__/lib/on-message.test.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/ender/src/lib/helper.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/socks/src/lib/message-forwarder.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/src/handlers/order-update-handler.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/src/handlers/order-place-handler.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/src/handlers/order-remove-handler.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts (1)
Pattern
**/**
: Do not include a walkthrough. For all comments on line changes that are longer than 15 lines, add a prefix to the comment of what lines you are commenting on (for example: "The changes from lines 100 to 200...". Add your comment to be on the last 15 lines of the chunk rather than adding your comment to the entire chunk.
Additional comments not posted (34)
indexer/packages/v4-protos/src/utils.ts (2)
3-4
: The constantsMILLIS_IN_NANOS
andSECONDS_IN_MILLIS
are correctly defined and named, making their purpose clear and their usage intuitive.
5-11
: TheprotoTimestampToDate
function correctly converts a protobufTimestamp
to a JavaScriptDate
object. The calculation for time in milliseconds is accurate, considering both seconds and nanoseconds. This utility function is essential for handling time conversions consistently across the application.indexer/services/vulcan/__tests__/helpers/constants.ts (1)
11-19
: The constantsdefaultDateTime
,defaultTime
, anddefaultKafkaHeaders
are well-defined and serve their purpose in testing scenarios. The use ofprotoTimestampToDate
for settingmessage_received_timestamp
indefaultKafkaHeaders
aligns with the PR's objective to handle Kafka message headers correctly.indexer/packages/v4-protos/src/index.ts (1)
19-19
: The addition ofexport * from './utils';
correctly exports the utility functions and constants defined inutils.ts
, making them available for use across the application. This change supports the PR's objective of enhancing utility functions and their usage.indexer/services/vulcan/__tests__/helpers/websocket-helpers.ts (1)
3-18
: The addition of theexpectedHeaders
parameter to theexpectWebsocketSubaccountMessage
function is a meaningful enhancement. It allows for the validation of Kafka message headers in the testing framework, aligning with the PR's objective to ensure correct handling of Kafka message headers.indexer/services/vulcan/src/handlers/handler.ts (1)
19-23
: The addition of theheaders
parameter to thehandle
andhandleUpdate
methods in theHandler
class is correctly implemented. This change ensures that Kafka message headers are passed through the handling process, aligning with the PR's objective to improve Kafka header management.indexer/services/vulcan/__tests__/helpers/helpers.ts (1)
24-30
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [16-41]
The assignment of
defaultKafkaHeaders
tomessage.headers
in bothhandleInitialOrderPlace
andhandleOrderUpdate
functions is correctly implemented. This ensures consistent handling of Kafka message headers in testing scenarios, aligning with the PR's objective to improve Kafka header management.indexer/services/vulcan/src/lib/on-message.ts (1)
99-118
: > 📝 NOTEThis review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [65-115]
The changes introduce logic to track message processing time before and after handling, utilizing timestamps from message headers. This enhancement is correctly implemented and aligns with the PR's objective to improve end-to-end latency and ensure robust handling of Kafka message headers. It's important to ensure that the timestamp values are always in the expected format to avoid potential errors during the conversion to numbers.
indexer/services/vulcan/__tests__/lib/on-message.test.ts (1)
67-67
: The modification of the assertion to includemessage.headers ?? {}
as an additional argument in theonMessage
function test is correctly implemented. This change ensures that the test accurately reflects the updated behavior of theonMessage
function, aligning with the PR's objective to improve Kafka header management.indexer/services/ender/src/lib/helper.ts (1)
8-13
: > 📝 NOTEThis review was outside the diff hunks, and no overlapping diff hunk was found. Original lines [1-1]
The removal of imports for
Timestamp
,MILLIS_IN_NANOS
, andSECONDS_IN_MILLIS
, along with the functionprotoTimestampToDate
, suggests a refactoring to centralize time conversion logic. Ensure that all references to these entities have been updated across the codebase to use the new utility function from thev4-protos
package.indexer/services/socks/src/lib/message-forwarder.ts (4)
6-6
: The addition ofSTATS_NO_SAMPLING
import is used to ensure that certain statistics are always recorded without sampling. This is a good practice for critical metrics that need to be accurately tracked.
89-92
: The changes from lines 89 to 92 improve the calculation of message time in queue by using a start timestamp. This is a more accurate method compared to previous versions and helps in better understanding the latency characteristics of the system.
130-135
: The calculation of forwarding message time has been refined to use a start timestamp, similar to the improvement made for calculating time in queue. This consistency in approach is beneficial for maintaining code readability and accuracy in metrics.
143-154
: The addition of tracking message time since received, if available, is a valuable enhancement. It allows for more detailed analysis of message processing times and can help identify bottlenecks or inefficiencies in the message handling pipeline.indexer/services/vulcan/src/handlers/order-update-handler.ts (2)
54-54
: The addition of theheaders
parameter to thehandle
method allows for passing through Kafka message headers, aligning with the PR's objective to improve Kafka header management. This change enhances the flexibility and capability of the handler to process messages with varying header requirements.
174-174
: Ensuring that theheaders
parameter is passed along with the message in thesendMessageWrapper
call is a good practice. It ensures that any relevant headers are preserved and forwarded as part of the message processing flow.indexer/services/vulcan/src/handlers/order-place-handler.ts (6)
55-55
: The addition of theheaders
parameter to thehandle
method in theOrderPlaceHandler
class is consistent with the changes made in other handlers. This allows for the flexible handling of Kafka message headers, which is crucial for maintaining the integrity of message metadata throughout the processing pipeline.
145-145
: Ensuring that theheaders
parameter is passed to thesendCachedOrderUpdate
method is a good practice. It allows for the preservation and forwarding of message headers, which is essential for maintaining context and metadata associated with Kafka messages.
154-154
: Passing theheaders
parameter along with the message in thesendMessageWrapper
call ensures that any relevant headers are preserved and forwarded. This is crucial for maintaining the integrity and context of messages as they are processed and forwarded to other services.
168-168
: The inclusion of theheaders
parameter in the message construction for order book updates is consistent with the approach taken in other parts of the code. This ensures that message headers are properly managed and forwarded, aligning with the PR's objectives.
339-339
: The addition of theheaders
parameter to thesendCachedOrderUpdate
method enhances the handler's ability to manage Kafka message headers effectively. This change is part of a broader effort to improve header management across the system.
357-357
: Passing theheaders
parameter along with the message in thesendMessageWrapper
call is a good practice. It ensures that any relevant headers are preserved and forwarded as part of the message processing flow, maintaining the integrity and context of Kafka messages.indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts (1)
53-53
: The change to importprotoTimestampToDate
from@dydxprotocol-indexer/v4-protos
is a good practice, as it reduces code duplication and leverages shared utilities across the project. This enhances maintainability and consistency in utility function usage.indexer/services/vulcan/src/handlers/order-remove-handler.ts (4)
71-71
: The addition of theheaders
parameter to thehandle
method and its subsequent passing to other methods is a positive change. It ensures that Kafka message headers are correctly handled, which is crucial for message processing and compatibility.
196-196
: The update to include theheaders
parameter in thehandleStatefulOrderCancelation
method is consistent with the overall improvements in Kafka header management. This ensures that headers are correctly passed and handled throughout the message processing flow.
264-264
: Including theheaders
parameter in thehandleOrderRemoval
method aligns with the PR's goal of improving Kafka header handling. This update is crucial for ensuring that headers are correctly managed and passed through the message processing flow.
346-346
: The modification to include theheaders
parameter in theupdateOrderbook
method is a positive step towards consistent Kafka header management. This ensures that headers are correctly handled throughout the message processing flow.indexer/services/vulcan/__tests__/handlers/order-place-handler.test.ts (3)
65-65
: The import ofdefaultKafkaHeaders
is added to support the new approach of handling Kafka message headers. This change aligns with the PR objectives to ensure consistent handling of Kafka message headers across different components.
200-204
: The changes from lines 200 to 204 assigndefaultKafkaHeaders
to theheaders
property of each Kafka message in a loop. This ensures that all Kafka messages use the standardized headers, addressing the issue of Kafka's inability to handle undefined values for message headers. This modification is crucial for the smooth operation of the indexer and aligns with the PR's primary objective.However, it's important to ensure that
defaultKafkaHeaders
contains all necessary headers and does not inadvertently overwrite any headers that might be set individually for specific messages. If there's a possibility that some messages might require unique headers, consider mergingdefaultKafkaHeaders
with any message-specific headers to preserve them.
1235-1239
: The changes from lines 1235 to 1239 update theexpectWebsocketSubaccountMessage
call to includedefaultKafkaHeaders
, ensuring that the standardized Kafka message headers are used in the test assertions. This change is consistent with the PR's goal of improving Kafka message header handling and ensures that the tests accurately reflect the new header management approach.indexer/services/vulcan/__tests__/handlers/order-remove-handler.test.ts (4)
8-10
: The import ofdefaultTime
from../helpers/constants
and its usage in constructingdefaultKafkaHeaders
is a good practice for ensuring consistency in test data. This approach helps in maintaining the readability and maintainability of the test suite.
141-143
: The definition ofdefaultKafkaHeaders
usingprotoTimestampToDate(defaultTime)
demonstrates a clear and consistent approach to handling Kafka message headers in tests. This setup ensures that tests accurately reflect the handling of timestamps and headers in the actual implementation.
191-194
: IncludingdefaultKafkaHeaders
in the calls tohandleUpdate
within the test cases is crucial for ensuring that the tests accurately simulate the handling of Kafka message headers by theOrderRemoveHandler
. This addition enhances the test coverage for scenarios involving Kafka headers.
2116-2120
: The use ofexpectWebsocketSubaccountMessage
to validate the contents of the WebSocket message sent for subaccount updates is a good practice. It ensures that the message contents, including Kafka headers, are as expected. This level of detail in testing is commendable for ensuring the reliability of message forwarding functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Status
Configuration used: CodeRabbit UI
Files selected for processing (1)
- indexer/services/vulcan/src/lib/on-message.ts (2 hunks)
Files skipped from review as they are similar to previous changes (1)
- indexer/services/vulcan/src/lib/on-message.ts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review Status
Configuration used: CodeRabbit UI
Files selected for processing (1)
- indexer/services/vulcan/src/lib/on-message.ts (2 hunks)
Files skipped from review as they are similar to previous changes (1)
- indexer/services/vulcan/src/lib/on-message.ts
https://github.com/Mergifyio backport release/indexer/v5.x |
✅ Backports have been created
|
* [CT-708] Indexer track e2e latency (#1237) * fwd through message times * use the var i made * post processing stat emission * post-forwarding timestamp * pass through event type from vulcan * event type to stat emissions * test fix function calls * WIP WIP WIP * fix tests * unused import * test that kafka messages are threaded * pass through message headers verbatim * test logs for on message * short term order event types (cherry picked from commit 4daa11d) # Conflicts: # indexer/services/socks/src/lib/message-forwarder.ts
Co-authored-by: Jonathan Fung <[email protected]> Co-authored-by: Will Liu <[email protected]>
await handler.handleUpdate(update, headers); | ||
|
||
const postProcessingTime: number = Date.now(); | ||
if (originalMessageTimestamp !== undefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be using originalMessageTimestamp? Shouldn't we be grabbing the timestamp again due to what we're doing at L109 to L120?
First PR failing because kafka doesn't like undefined values for kafka message headers. Pass through the headers verbatim.