From 262290e61ad66b4c9a69e20661f9d39df0675eb9 Mon Sep 17 00:00:00 2001 From: kedixa <1204837541@qq.com> Date: Wed, 8 Jan 2025 20:03:01 +0800 Subject: [PATCH 1/2] fix KafkaMessage::parse_record_batch --- src/protocol/KafkaMessage.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/protocol/KafkaMessage.cc b/src/protocol/KafkaMessage.cc index af06e7ec086..d7208dfc873 100644 --- a/src/protocol/KafkaMessage.cc +++ b/src/protocol/KafkaMessage.cc @@ -1450,8 +1450,9 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size, return 1; KafkaBlock block; + bool compressed = ((hdr.attributes & 7) != 0); - if (hdr.attributes & 7) + if (compressed) { if (uncompress_buf(*buf, hdr.length - 61 + 12, &block, hdr.attributes & 7) < 0) return -1; @@ -1492,7 +1493,7 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size, } } - if (hdr.attributes == 0) + if (!compressed) { *buf = p; *size = n; From b3fe8d7583cb73dddc60d036a012ac3eca5a0940 Mon Sep 17 00:00:00 2001 From: kedixa <1204837541@qq.com> Date: Wed, 8 Jan 2025 20:37:29 +0800 Subject: [PATCH 2/2] use compress_type instead of compressed --- src/protocol/KafkaMessage.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/protocol/KafkaMessage.cc b/src/protocol/KafkaMessage.cc index d7208dfc873..05c184eff48 100644 --- a/src/protocol/KafkaMessage.cc +++ b/src/protocol/KafkaMessage.cc @@ -1450,11 +1450,11 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size, return 1; KafkaBlock block; - bool compressed = ((hdr.attributes & 7) != 0); + int compress_type = hdr.attributes & 7; - if (compressed) + if (compress_type != 0) { - if (uncompress_buf(*buf, hdr.length - 61 + 12, &block, hdr.attributes & 7) < 0) + if (uncompress_buf(*buf, hdr.length - 61 + 12, &block, compress_type) < 0) return -1; *buf = (char *)*buf + hdr.length - 61 + 12; @@ -1493,7 +1493,7 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size, } } - if (!compressed) + if (compress_type == 0) { *buf = p; *size = n;