diff --git a/CHANGELOG.md b/CHANGELOG.md index 37463d79..8f4ae04e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * Dependency updates (Kafka 3.9.0, Vert.x 4.5.11, Netty 4.1.115.Final) * Added support for creating a new topic via endpoint. +* Fixed metadata order on the HTTP "offsets" response when producing records. ## 0.30.0 diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java index 2f353087..76a81577 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpSourceBridgeEndpoint.java @@ -35,6 +35,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; /** * Represents an HTTP bridge source endpoint for the Kafka producer operations @@ -137,7 +138,6 @@ public void handle(RoutingContext routingContext, Handler ha return; } - List> results = new ArrayList<>(records.size()); // fulfilling the request of sending (multiple) record(s) sequentially but in a separate thread // this will free the Vert.x event loop still in place @@ -154,31 +154,27 @@ public void handle(RoutingContext routingContext, Handler ha return; } - @SuppressWarnings({ "rawtypes" }) - List promises = new ArrayList<>(records.size()); + List>> promises = new ArrayList<>(records.size()); for (ProducerRecord record : records) { - CompletionStage sendHandler = - // inside send method, the callback which completes the promise is executed in the kafka-producer-network-thread - // let's do the result handling in the same thread to keep the messages order delivery execution - this.kafkaBridgeProducer.send(record).handle((metadata, ex) -> { - LOGGER.trace("Handle thread {}", Thread.currentThread()); - if (ex == null) { - LOGGER.debug("Delivered record {} to Kafka on topic {} at partition {} [{}]", record, metadata.topic(), metadata.partition(), metadata.offset()); - results.add(new HttpBridgeResult<>(metadata)); - } else { - String msg = ex.getMessage(); - int code = handleError(ex); - LOGGER.error("Failed to deliver record {}", record, ex); - results.add(new HttpBridgeResult<>(new HttpBridgeError(code, msg))); - } - return metadata; - }); + CompletionStage> sendHandler = this.kafkaBridgeProducer.send(record).handle((metadata, ex) -> { + LOGGER.trace("Handle thread {}", Thread.currentThread()); + if (ex == null) { + LOGGER.debug("Delivered record {} to Kafka on topic {} at partition {} [{}]", record, metadata.topic(), metadata.partition(), metadata.offset()); + return new HttpBridgeResult<>(metadata); + } else { + String msg = ex.getMessage(); + int code = handleError(ex); + LOGGER.error("Failed to deliver record {}", record, ex); + return new HttpBridgeResult<>(new HttpBridgeError(code, msg)); + } + }); promises.add(sendHandler.toCompletableFuture()); } CompletableFuture.allOf(promises.toArray(new CompletableFuture[0])) // sending HTTP response asynchronously to free the kafka-producer-network-thread .whenCompleteAsync((v, t) -> { + List> results = promises.stream().map(CompletableFuture::join).collect(Collectors.toList()); LOGGER.trace("All sent thread {}", Thread.currentThread()); // always return OK, since failure cause is in the response, per message span.finish(HttpResponseStatus.OK.code()); diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index 252f9519..37fd5eb3 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -103,16 +103,67 @@ void sendSimpleMessage(VertxTestContext context) throws InterruptedException, Ex } @Test - void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedException, ExecutionException { - KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); + void sendMessagesToMultiplePartitions(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic, 3, 1); String value = "message-value"; - int partition = 1; JsonArray records = new JsonArray(); + records.add(valuePartitionRecord(value, 0)); + + records.add(valuePartitionRecord(value, 1)); + + records.add(valuePartitionRecord(value, 2)); + + JsonObject root = new JsonObject(); + root.put("records", records); + System.out.println(root); + + future.get(); + + producerService() + .sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON) + .sendJsonObject(root, ar -> + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + assertThat(response.statusCode(), is(HttpResponseStatus.OK.code())); + JsonObject bridgeResponse = response.body(); + System.out.println(bridgeResponse); + JsonArray offsets = bridgeResponse.getJsonArray("offsets"); + assertThat(offsets.size(), is(3)); + JsonObject metadata = offsets.getJsonObject(0); + assertThat(metadata.getInteger("partition"), is(0)); + assertThat(metadata.getLong("offset"), is(0L)); + + JsonObject metadata2 = offsets.getJsonObject(1); + assertThat(metadata2.getInteger("partition"), is(1)); + assertThat(metadata2.getLong("offset"), is(0L)); + + JsonObject metadata3 = offsets.getJsonObject(2); + assertThat(metadata3.getInteger("partition"), is(2)); + assertThat(metadata3.getLong("offset"), is(0L)); + context.completeNow(); + })); + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } + + private static JsonObject valuePartitionRecord(String value, int partition) { JsonObject json = new JsonObject(); json.put("value", value); json.put("partition", partition); + return json; + } + + @Test + void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic, 2, 1); + + String value = "message-value"; + int partition = 1; + + JsonArray records = new JsonArray(); + JsonObject json = valuePartitionRecord(value, partition); records.add(json); JsonObject root = new JsonObject(); @@ -782,9 +833,7 @@ void sendToNonExistingPartitionsTest(VertxTestContext context) throws Interrupte int partition = 1000; JsonArray records = new JsonArray(); - JsonObject json = new JsonObject(); - json.put("value", value); - json.put("partition", partition); + JsonObject json = valuePartitionRecord(value, partition); records.add(json); JsonObject root = new JsonObject(); @@ -824,9 +873,7 @@ void sendToNonExistingTopicTest(VertxTestContext context) { int partition = 1; JsonArray records = new JsonArray(); - JsonObject json = new JsonObject(); - json.put("value", value); - json.put("partition", partition); + JsonObject json = valuePartitionRecord(value, partition); records.add(json); JsonObject root = new JsonObject(); @@ -1020,14 +1067,10 @@ void sendMultipleRecordsWithOneInvalidPartitionTest(VertxTestContext context) th int partition = 1; JsonArray records = new JsonArray(); - JsonObject json = new JsonObject(); - json.put("value", value); - json.put("partition", partition); + JsonObject json = valuePartitionRecord(value, partition); records.add(json); - JsonObject json2 = new JsonObject(); - json2.put("value", value + "invalid"); - json2.put("partition", 500); + JsonObject json2 = valuePartitionRecord(value + "invalid", 500); records.add(json2); JsonObject root = new JsonObject();