Skip to content

Commit

Permalink
Fix metadata order in response when producing records (#945)
Browse files Browse the repository at this point in the history
* Add integration test checking result ordering

Signed-off-by: Robert Young <[email protected]>

* fix: preserve record send order in response metadata

This uses the send CompletionStages to keep the original ordering of
records.

kafka-clients Producer#send only guarantees that callbacks are executed
in order per partition, the ordering of partitions is not guaranteed.

Signed-off-by: Robert Young <[email protected]>

* Updated CHANGELOG with metadata fix description

Signed-off-by: Paolo Patierno <[email protected]>

---------

Signed-off-by: Robert Young <[email protected]>
Signed-off-by: Paolo Patierno <[email protected]>
Co-authored-by: Paolo Patierno <[email protected]>
  • Loading branch information
robobario and ppatierno committed Nov 25, 2024
1 parent 38da89e commit bf51ecf
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Dependency updates (Kafka 3.9.0, Vert.x 4.5.11, Netty 4.1.115.Final)
* Added support for creating a new topic via endpoint.
* Fixed metadata order on the HTTP "offsets" response when producing records.

## 0.30.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

/**
* Represents an HTTP bridge source endpoint for the Kafka producer operations
Expand Down Expand Up @@ -137,7 +138,6 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha

return;
}
List<HttpBridgeResult<?>> results = new ArrayList<>(records.size());

// fulfilling the request of sending (multiple) record(s) sequentially but in a separate thread
// this will free the Vert.x event loop still in place
Expand All @@ -154,31 +154,27 @@ public void handle(RoutingContext routingContext, Handler<HttpBridgeEndpoint> ha
return;
}

@SuppressWarnings({ "rawtypes" })
List<CompletableFuture> promises = new ArrayList<>(records.size());
List<CompletableFuture<HttpBridgeResult<?>>> promises = new ArrayList<>(records.size());
for (ProducerRecord<K, V> record : records) {
CompletionStage<RecordMetadata> sendHandler =
// inside send method, the callback which completes the promise is executed in the kafka-producer-network-thread
// let's do the result handling in the same thread to keep the messages order delivery execution
this.kafkaBridgeProducer.send(record).handle((metadata, ex) -> {
LOGGER.trace("Handle thread {}", Thread.currentThread());
if (ex == null) {
LOGGER.debug("Delivered record {} to Kafka on topic {} at partition {} [{}]", record, metadata.topic(), metadata.partition(), metadata.offset());
results.add(new HttpBridgeResult<>(metadata));
} else {
String msg = ex.getMessage();
int code = handleError(ex);
LOGGER.error("Failed to deliver record {}", record, ex);
results.add(new HttpBridgeResult<>(new HttpBridgeError(code, msg)));
}
return metadata;
});
CompletionStage<HttpBridgeResult<?>> sendHandler = this.kafkaBridgeProducer.send(record).handle((metadata, ex) -> {
LOGGER.trace("Handle thread {}", Thread.currentThread());
if (ex == null) {
LOGGER.debug("Delivered record {} to Kafka on topic {} at partition {} [{}]", record, metadata.topic(), metadata.partition(), metadata.offset());
return new HttpBridgeResult<>(metadata);
} else {
String msg = ex.getMessage();
int code = handleError(ex);
LOGGER.error("Failed to deliver record {}", record, ex);
return new HttpBridgeResult<>(new HttpBridgeError(code, msg));
}
});
promises.add(sendHandler.toCompletableFuture());
}

CompletableFuture.allOf(promises.toArray(new CompletableFuture[0]))
// sending HTTP response asynchronously to free the kafka-producer-network-thread
.whenCompleteAsync((v, t) -> {
List<HttpBridgeResult<?>> results = promises.stream().map(CompletableFuture::join).collect(Collectors.toList());
LOGGER.trace("All sent thread {}", Thread.currentThread());
// always return OK, since failure cause is in the response, per message
span.finish(HttpResponseStatus.OK.code());
Expand Down
73 changes: 58 additions & 15 deletions src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,67 @@ void sendSimpleMessage(VertxTestContext context) throws InterruptedException, Ex
}

@Test
void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedException, ExecutionException {
KafkaFuture<Void> future = adminClientFacade.createTopic(topic, 2, 1);
void sendMessagesToMultiplePartitions(VertxTestContext context) throws InterruptedException, ExecutionException {
KafkaFuture<Void> future = adminClientFacade.createTopic(topic, 3, 1);

String value = "message-value";
int partition = 1;

JsonArray records = new JsonArray();
records.add(valuePartitionRecord(value, 0));

records.add(valuePartitionRecord(value, 1));

records.add(valuePartitionRecord(value, 2));

JsonObject root = new JsonObject();
root.put("records", records);
System.out.println(root);

future.get();

producerService()
.sendRecordsRequest(topic, root, BridgeContentType.KAFKA_JSON_JSON)
.sendJsonObject(root, ar ->
context.verify(() -> {
assertThat(ar.succeeded(), is(true));
HttpResponse<JsonObject> response = ar.result();
assertThat(response.statusCode(), is(HttpResponseStatus.OK.code()));
JsonObject bridgeResponse = response.body();
System.out.println(bridgeResponse);
JsonArray offsets = bridgeResponse.getJsonArray("offsets");
assertThat(offsets.size(), is(3));
JsonObject metadata = offsets.getJsonObject(0);
assertThat(metadata.getInteger("partition"), is(0));
assertThat(metadata.getLong("offset"), is(0L));

JsonObject metadata2 = offsets.getJsonObject(1);
assertThat(metadata2.getInteger("partition"), is(1));
assertThat(metadata2.getLong("offset"), is(0L));

JsonObject metadata3 = offsets.getJsonObject(2);
assertThat(metadata3.getInteger("partition"), is(2));
assertThat(metadata3.getLong("offset"), is(0L));
context.completeNow();
}));
assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true));
}

private static JsonObject valuePartitionRecord(String value, int partition) {
JsonObject json = new JsonObject();
json.put("value", value);
json.put("partition", partition);
return json;
}

@Test
void sendSimpleMessageToPartition(VertxTestContext context) throws InterruptedException, ExecutionException {
KafkaFuture<Void> future = adminClientFacade.createTopic(topic, 2, 1);

String value = "message-value";
int partition = 1;

JsonArray records = new JsonArray();
JsonObject json = valuePartitionRecord(value, partition);
records.add(json);

JsonObject root = new JsonObject();
Expand Down Expand Up @@ -782,9 +833,7 @@ void sendToNonExistingPartitionsTest(VertxTestContext context) throws Interrupte
int partition = 1000;

JsonArray records = new JsonArray();
JsonObject json = new JsonObject();
json.put("value", value);
json.put("partition", partition);
JsonObject json = valuePartitionRecord(value, partition);
records.add(json);

JsonObject root = new JsonObject();
Expand Down Expand Up @@ -824,9 +873,7 @@ void sendToNonExistingTopicTest(VertxTestContext context) {
int partition = 1;

JsonArray records = new JsonArray();
JsonObject json = new JsonObject();
json.put("value", value);
json.put("partition", partition);
JsonObject json = valuePartitionRecord(value, partition);
records.add(json);

JsonObject root = new JsonObject();
Expand Down Expand Up @@ -1020,14 +1067,10 @@ void sendMultipleRecordsWithOneInvalidPartitionTest(VertxTestContext context) th
int partition = 1;

JsonArray records = new JsonArray();
JsonObject json = new JsonObject();
json.put("value", value);
json.put("partition", partition);
JsonObject json = valuePartitionRecord(value, partition);
records.add(json);

JsonObject json2 = new JsonObject();
json2.put("value", value + "invalid");
json2.put("partition", 500);
JsonObject json2 = valuePartitionRecord(value + "invalid", 500);
records.add(json2);

JsonObject root = new JsonObject();
Expand Down

0 comments on commit bf51ecf

Please sign in to comment.