-
Notifications
You must be signed in to change notification settings - Fork 876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer reset offset after rebalance - commit with explicit consumer result used #1792
Comments
Are there any error logs on any of the consumer pods when this happens? Perhaps something about offset resets? |
Our log handlers are constructed like this
Some logs we see are (overwritten are topic or consumer names) I can't see any logs from the client library about offset resets - are there any log handler we are missing where this information could be gathered? From Kafka we see logs like this Those transition to Dead are one time consumers - these are generated because the client doesn't support subscription without group id. Hence no alarm there. |
"Removed 79 expired offsets .." What is your offset commit retention time on the broker? E.g, |
offsets.retention.minutes 7 dayes offsets.retention.check.interval.ms 10 minutes The delete renstion.ms on __consumer_offsets are set to 1 day. |
I have added link to two comments from issue #1672, which has the same setup as this issue and which I wouldn't expect to be mitigated by flush of offset storage: |
Hi @edenhill I have been able to catch the offset shift with all full debug statements from client library (consumer,cgrp,topic,fetch). The below are log statements regarding affected topic TOPIC-XXX and partition [11]. All logs below are output from .SetLogHandler. All other messages than heartbeats are logged. The below messages are filtered with the requirement log.KafkaMessage field should contain 11 (offset) and topic name (TOPIC-XXX).
One important log statement are the following
I can see, that the OffsetFetchRequest from the Kafka Protocol actually receives a offset of -1. This is even though the latest fetch before the deployment roll out was correct:
At timestamp 21:49:03.748, I get assigned partitions from .SetPartitionsAssignedHandler. Here TOPIC-XXX [[11]] are assigned to pod POD-85d549d46d-7j5zk.
The first log messages after the assignments, which ask to get assigned to a broker, are below statement - importantly I can see, at it is assigned to the same broker as before the deployment roll out. Hence I would expect Kafka to be fully in sync and I find it quite strange -1 gets returned. Could there be any errors related to the byte translated of the protocol received from Kafka? Liked the TAG_BUFFER, COMPACT_NULLABLE_STRING or COMPACT_STRING? Apache Kafka
I have went through our Kafka logs over the rebalancing period. There is nothing which indicates removing offset off CONSUMER-GROUP-XXX and TOPIC-XXX. Also there a no metrics which should support this. I find the below statements a bit strange - they are from a pod which isn’t the one which have the partition assigned after the assignment statement. Maybe it is because of some state between the consumers?
I have also looked at logs around the OffsetFetchResponse (~150 before and after filtered only from the pod which ends up with the partition assignment). They are given below. I can see the OffsetFetchResponse was part of a partition batch of 20 - all responses are returned with error code success.
To sum up I get a offset -1 in a OffsetFetchResponse from Kafka which seems really strange due to the fetch just before the former application is closed and nothing from the Kafka logs. I find it a bit strange pod POD-85d549d46d-8npng has some actions related to TOPIC-XXX partition 11 after assignment has been given to POD-85d549d46d-7j5zk. I don’t know if this could give some concurrency issues. Other than this maybe the translation of the protocol could maybe be a issues in some corner cases? I have tried to give as excessive logs as possible. We have a lot more without any filter. If it helps you I would gladly arrange some time sharing what we have. Kind Regards, |
Some more info - lately we experienced it at a deployment in the morning, where we the day before in the evening saw Kafka was deleting a lot of old log segments. There are no logs from Kafka indicating removing consumer group offset. I would expect Kafka to return this earliest offset and not -1. |
more info from my side related to this issue: "Offset Reset". scenario:
in same scenario but without PartitionAssignment = CooperativeSticky we dont have offset reset. Seems that hard rebalance revoking all partitions avoid the offset reset, at least in our environment. in issue #1672 (comment) is mentioned Offset Reset is only related to the use of storing offset, unfortunately we have seen issue happening with explicit commit like consumer.commit(cr), always when PartitionAssignment = CooperativeSticky is used. |
Hi, I'm also facing same issue, can someone kindly suggest any solution... |
Description
We have experience some issues with our consumers in our Kafka environment. The issue we see is when a consumer is assigned to some partitions, it starts from first message - not the latest committed. Hence the offset gets reset on the given partition.
We are hosting our cluster in Kubernetes, and one consumer group exist in many different pods and get’s assigned to many different topics. These topics often have many partitions and hence we end up with many members (~500) of the consumer group.
We have experience the issue in several scenarios but always around rebalancing - either following rolling updates or scaling. When pods start, some partitions have their offset reset.
We can see that Kafka actually also see the initial offset as unset - this because we can see the value of metric kafka_consumergroup_current_offset is -1 just after the issue happens.
How to reproduce
We are currently not able to reproduce the issue. A simplified flow of our consumer follows that of example
confluent-kafka-dotnet/examples/Consumer/Program.cs
Line 37 in 9baf187
We have followed issue #1672, however the issue are resolved by a fix which flush the local offset storage between assignments.
There are comments on the issue, from some who have the same setup with
EnableAutoCommit=false
and using commit with explicit consumer result, which reports the same behavior as we see and which - I would expect they still have the issue after the change to flush offset storage:We are not disabling auto offset storage - hence on every poll the storage would be populated. However we are explicit using the consumer result at commit - and from a small chat with @edenhill on the other issue unfortunately explicit commits are not related to the fix of the issue.
I would only expect the other issue and fix to be related to this issue, if there is a side effect to the commit with consumer result, which both commit the explicit consumer result and at the same time commit whatever is in offset storage.
Configurations
auto.offset.reset: earliest
enable.auto.commit: false
partition.assignment.strategy: range (default)
The default.replication.factor is 3 which means all topics would be replicated to three brokers.
If the leader doesn't hear from followers within replica.lag.time.max.ms the follower will be removed from ISR. We would expect to see it on metric kafka_topic_partition_under_replicated_partition.
Just before offset reset events happen we don't see any out of sync replica - hence I would expect everything to be replicated.
Versions
Kafka: v2.8
.NET Confluent Kafka: 1.8.2
The text was updated successfully, but these errors were encountered: