Skip to content
This repository has been archived by the owner on Jul 1, 2024. It is now read-only.

Upgrade to kafka-clients 3.4.0 #57

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import javax.crypto.SecretKey;

import org.apache.kafka.common.message.FetchResponseData.FetchablePartitionResponse;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchResponseData.FetchableTopicResponse;
import org.apache.kafka.common.message.ProduceRequestData.PartitionProduceData;
import org.apache.kafka.common.message.ProduceRequestData.TopicProduceData;
Expand Down Expand Up @@ -112,18 +112,18 @@ public boolean decrypt(FetchableTopicResponse fetchRsp)

// If this far, the data was encrypted.
// Navigate into each record and decrypt.
for (FetchablePartitionResponse partitionData : fetchRsp.partitionResponses()) {
for (FetchResponseData.PartitionData partitionData : fetchRsp.partitions()) {

if (LOGGER.isDebugEnabled()) {
String msg = String.format(
"partition: %d, logStartOffset: %08X, lastStableOffset: %08X, "
+ "partition leader epoch: %04X",
partitionData.partition(), partitionData.currentLeader().leaderEpoch(),
partitionData.partitionIndex(), partitionData.currentLeader().leaderEpoch(),
partitionData.logStartOffset(), partitionData.lastStableOffset());
LOGGER.debug(msg);
}

MemoryRecords recs = (MemoryRecords) partitionData.recordSet();
MemoryRecords recs = (MemoryRecords) partitionData.records();

long firstOffset = getFirstOffset(recs);
MemoryRecordsBuilder builder = createMemoryRecsBuilder(recs.sizeInBytes(),
Expand All @@ -147,7 +147,7 @@ public boolean decrypt(FetchableTopicResponse fetchRsp)
}
// overwrite the partition's memoryrecords with the decrypted records:
MemoryRecords newRecs = builder.build();
partitionData.setRecordSet(newRecs);
partitionData.setRecords(newRecs);
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<junit-platform-surefire-provider.version>1.3.2</junit-platform-surefire-provider.version>
<testcontainers.version>1.17.2</testcontainers.version>

<kafka.version>2.8.0</kafka.version>
<kafka.version>3.4.0</kafka.version>

<spotbugs.version>4.7.1</spotbugs.version>
<fasterxml.jackson-core.version>2.13.3</fasterxml.jackson-core.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public Buffer processFetchResponse(Buffer buffer, RequestHeader reqHeader)
throws EncSerDerException, GeneralSecurityException, KmsException {
// instantiate FetchResponse instance
KafkaRspMsg rsp = new KafkaRspMsg(buffer, reqHeader.apiVersion());
FetchResponse<?> fetch = (FetchResponse<?>) AbstractResponse.parseResponse(rsp.getPayload(),
FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(),
reqHeader);

// iterate through response records, decrypting where needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static Buffer toSendBuffer(byte[] header, AbstractRequest req) {
* @param req
* @return
*/
public static Buffer toSendBuffer(FetchResponse<?> fetchRsp, RequestHeader reqHeader) {
public static Buffer toSendBuffer(FetchResponse fetchRsp, RequestHeader reqHeader) {
ByteBuffer serializedRsp = serialize(fetchRsp, reqHeader);
byte[] rspBytes = serializedRsp.array();
int bufLen = Integer.BYTES + rspBytes.length;
Expand All @@ -124,7 +124,7 @@ public static Buffer toSendBuffer(FetchResponse<?> fetchRsp, RequestHeader reqHe
* @param fetchRsp
* @return
*/
private static ByteBuffer serialize(FetchResponse<?> fetchRsp, RequestHeader reqHeader) {
private static ByteBuffer serialize(FetchResponse fetchRsp, RequestHeader reqHeader) {
ResponseHeader rspHeader = reqHeader.toResponseHeader();
//System.out.println("****** req: " + reqHeader.apiVersion() + " rsp: " + rspHeader.headerVersion());
return RequestUtils.serialize(rspHeader.data(), rspHeader.headerVersion(), fetchRsp.data(), reqHeader.apiVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void testDecryption(File rspMsgFile)

// instantiate the decrypted fetch response
KafkaRspMsg rsp = new KafkaRspMsg(fetchRspBuf, reqHeader.apiVersion());
FetchResponse<?> fetch = (FetchResponse<?>) AbstractResponse.parseResponse(rsp.getPayload(),
FetchResponse fetch = (FetchResponse) AbstractResponse.parseResponse(rsp.getPayload(),
reqHeader);

FetchResponseData data = fetch.data();
Expand All @@ -102,8 +102,8 @@ private void navigate(FetchResponseData data) {
// This tests the integrity of the decrypted response.
List<FetchableTopicResponse> responses = data.responses();
for (FetchableTopicResponse topicRsp : responses) {
topicRsp.partitionResponses().forEach(pd -> {
MemoryRecords recs = (MemoryRecords) pd.recordSet();
topicRsp.partitions().forEach(pd -> {
MemoryRecords recs = (MemoryRecords) pd.records();
recs.records().forEach(r -> {
if (r.hasValue()) {
byte[] recordData = new byte[r.valueSize()];
Expand Down