You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
For eachBatch(), call to resolveOffset for each message followed by commitOffsetsIfNecessary does not commit messages unless autoCommitInterval or autoCommitThreshold is set
#1729
Open
emranho95 opened this issue
Jan 2, 2025
· 0 comments
Describe the bug
Call to resolveOffset() for each message followed by commitOffsetsIfNecessary() does not commit messages unless autoCommitInterval or autoCommitThreshold is set. But with the same configuration, if I set autoCommitInterval to a lower value that is less than the message processing time in the consumer side, ex: 10ms, the messages will be committed.
To Reproduce
Consumer config
Use eachBatch() to handle messages 2. autoCommit = false, eachBatchAutoResolve = false, eachBatchAutoResolve = false
Resolve messages after each message processing and then call commitOffsetsIfNecessary()
Inside eachBatch() method ->
for (const message of messages) {
resolveOffset(message.rawMessage.offset);
await heartbeat();
}
await commitOffsetsIfNecessary();
Produce small amount of messages than will be consumed within few milliseconds, ex: 5
Consume the messages using one consumer
Expected behavior
The committed offsets fetched by admin.fetchOffsets() should match the length of the messages.
Observed behavior
None of the messages are committed and so it returns -1.
Environment:
OS: Mac OS
KafkaJS version [e.g. 2.2.4]
NodeJS version [npm v10.5.0]
The text was updated successfully, but these errors were encountered:
Describe the bug
Call to resolveOffset() for each message followed by commitOffsetsIfNecessary() does not commit messages unless autoCommitInterval or autoCommitThreshold is set. But with the same configuration, if I set autoCommitInterval to a lower value that is less than the message processing time in the consumer side, ex: 10ms, the messages will be committed.
To Reproduce
Consumer config
eachBatch
() to handle messages2. autoCommit = false, eachBatchAutoResolve = false, eachBatchAutoResolve = false
Resolve messages after each message processing and then call commitOffsetsIfNecessary()
Expected behavior
admin.fetchOffsets()
should match the length of the messages.Observed behavior
None of the messages are committed and so it returns -1.
Environment:
The text was updated successfully, but these errors were encountered: