From 0c1478cdad9ad29c65226839bbf1c8bfb6c15947 Mon Sep 17 00:00:00 2001 From: "xiaolei.wang" Date: Fri, 25 Mar 2022 13:46:04 +0800 Subject: [PATCH] fix: Incorrect stored offset will be use after rebalanced. --- src/rdkafka_request.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 0ad58b4d5..e1600969e 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -824,7 +824,12 @@ rd_kafka_handle_OffsetFetch (rd_kafka_t *rk, /* Update toppar's committed offset */ rd_kafka_toppar_lock(rktp); rktp->rktp_committed_offset = rktpar->offset; - /** Update toppar's stored offset to INVALID, issue #3745 */ + + /* Update toppar's stored offset to INVALID. + * Reset the stored offset in rebalanced, + * so that it will not commit previous stored offset to broker, (issue #3745, #3710). + * And the codes in rdkafka_assignment.c -> rd_kafka_assignment_serve_removals() -> rd_kafka_offset_store0(rktp, RD_KAFKA_OFFSET_INVALID,RD_DONT_LOCK); + * can not fix the incorrect offset completely. */ rktp->rktp_stored_offset = RD_KAFKA_OFFSET_INVALID; rd_kafka_toppar_unlock(rktp); }