From 86a73f10a49b01ef9651aaaf3c40e1755d44c2e7 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Fri, 19 May 2023 11:00:52 +1200 Subject: [PATCH] Upgrade to kafka-clients 3.4.0 This is an enabler for integration with the kroxylicious proxy framework because it depends on kafka-clients too, causing problems at runtime if we try to call the EncryptionModule built against an older kafka-clients. Note: topic-encryption with the vertx-proxy will still requires the proxied Kafka cluster or client to be less that version 3.1.0 because it does not yet work with topic ids in the FetchResponse. Signed-off-by: Robert Young --- .../io/strimzi/kafka/topicenc/EncryptionModule.java | 10 +++++----- pom.xml | 2 +- .../io/strimzi/kafka/proxy/vertx/MessageHandler.java | 2 +- .../java/io/strimzi/kafka/proxy/vertx/msg/MsgUtil.java | 4 ++-- .../java/io/strimzi/kafka/proxy/vertx/EncModTest.java | 6 +++--- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java b/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java index 0e6299f..6a3577d 100644 --- a/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java +++ b/encmod/src/main/java/io/strimzi/kafka/topicenc/EncryptionModule.java @@ -11,7 +11,7 @@ import javax.crypto.SecretKey; -import org.apache.kafka.common.message.FetchResponseData.FetchablePartitionResponse; +import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FetchResponseData.FetchableTopicResponse; import org.apache.kafka.common.message.ProduceRequestData.PartitionProduceData; import org.apache.kafka.common.message.ProduceRequestData.TopicProduceData; @@ -112,18 +112,18 @@ public boolean decrypt(FetchableTopicResponse fetchRsp) // If this far, the data was encrypted. // Navigate into each record and decrypt. - for (FetchablePartitionResponse partitionData : fetchRsp.partitionResponses()) { + for (FetchResponseData.PartitionData partitionData : fetchRsp.partitions()) { if (LOGGER.isDebugEnabled()) { String msg = String.format( "partition: %d, logStartOffset: %08X, lastStableOffset: %08X, " + "partition leader epoch: %04X", - partitionData.partition(), partitionData.currentLeader().leaderEpoch(), + partitionData.partitionIndex(), partitionData.currentLeader().leaderEpoch(), partitionData.logStartOffset(), partitionData.lastStableOffset()); LOGGER.debug(msg); } - MemoryRecords recs = (MemoryRecords) partitionData.recordSet(); + MemoryRecords recs = (MemoryRecords) partitionData.records(); long firstOffset = getFirstOffset(recs); MemoryRecordsBuilder builder = createMemoryRecsBuilder(recs.sizeInBytes(), @@ -147,7 +147,7 @@ public boolean decrypt(FetchableTopicResponse fetchRsp) } // overwrite the partition's memoryrecords with the decrypted records: MemoryRecords newRecs = builder.build(); - partitionData.setRecordSet(newRecs); + partitionData.setRecords(newRecs); } return true; } diff --git a/pom.xml b/pom.xml index 6a210ba..3c3e212 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ 1.3.2 1.17.2 - 2.8.0 + 3.4.0 4.7.1 2.13.3 diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java index 6d49f94..42a3bc4 100644 --- a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/MessageHandler.java @@ -366,7 +366,7 @@ public Buffer processFetchResponse(Buffer buffer, RequestHeader reqHeader) throws EncSerDerException, GeneralSecurityException, KmsException { // instantiate FetchResponse instance KafkaRspMsg rsp = new KafkaRspMsg(buffer, reqHeader.apiVersion()); - FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(), + FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(), reqHeader); // iterate through response records, decrypting where needed diff --git a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/msg/MsgUtil.java b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/msg/MsgUtil.java index 6a46f71..6b06484 100644 --- a/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/msg/MsgUtil.java +++ b/vertx-proxy/src/main/java/io/strimzi/kafka/proxy/vertx/msg/MsgUtil.java @@ -106,7 +106,7 @@ public static Buffer toSendBuffer(byte[] header, AbstractRequest req) { * @param req * @return */ - public static Buffer toSendBuffer(FetchResponse fetchRsp, RequestHeader reqHeader) { + public static Buffer toSendBuffer(FetchResponse fetchRsp, RequestHeader reqHeader) { ByteBuffer serializedRsp = serialize(fetchRsp, reqHeader); byte[] rspBytes = serializedRsp.array(); int bufLen = Integer.BYTES + rspBytes.length; @@ -124,7 +124,7 @@ public static Buffer toSendBuffer(FetchResponse fetchRsp, RequestHeader reqHe * @param fetchRsp * @return */ - private static ByteBuffer serialize(FetchResponse fetchRsp, RequestHeader reqHeader) { + private static ByteBuffer serialize(FetchResponse fetchRsp, RequestHeader reqHeader) { ResponseHeader rspHeader = reqHeader.toResponseHeader(); //System.out.println("****** req: " + reqHeader.apiVersion() + " rsp: " + rspHeader.headerVersion()); return RequestUtils.serialize(rspHeader.data(), rspHeader.headerVersion(), fetchRsp.data(), reqHeader.apiVersion()); diff --git a/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTest.java b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTest.java index c69fb75..21e926a 100644 --- a/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTest.java +++ b/vertx-proxy/src/test/java/io/strimzi/kafka/proxy/vertx/EncModTest.java @@ -90,7 +90,7 @@ private void testDecryption(File rspMsgFile) // instantiate the decrypted fetch response KafkaRspMsg rsp = new KafkaRspMsg(fetchRspBuf, reqHeader.apiVersion()); - FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(), + FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(), reqHeader); FetchResponseData data = fetch.data(); @@ -102,8 +102,8 @@ private void navigate(FetchResponseData data) { // This tests the integrity of the decrypted response. List responses = data.responses(); for (FetchableTopicResponse topicRsp : responses) { - topicRsp.partitionResponses().forEach(pd -> { - MemoryRecords recs = (MemoryRecords) pd.recordSet(); + topicRsp.partitions().forEach(pd -> { + MemoryRecords recs = (MemoryRecords) pd.records(); recs.records().forEach(r -> { if (r.hasValue()) { byte[] recordData = new byte[r.valueSize()];