Skip to content

Commit

Permalink
fix nits and merge
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Pedro <[email protected]>
  • Loading branch information
antonio-pedro99 committed Jul 22, 2024
2 parents 53b5b37 + 3d0e4eb commit 31c82be
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## 0.30.0

* Dependency updates (Kafka 3.7.1, Vert.x 4.5.9, Netty 4.1.111.Final, Prometheus JMX Collector 1.0.1, Prometheus Client 1.3.1)
* Added support for message timestamp.
1. Implemented support for interpreting the timestamp parameter in `ProducerRecord`objects sent to Kafka topics via the bridge.
2. Allow users to read the `ConsumerRecord`'s timestamp on the request's response.

## 0.29.0

Expand Down
36 changes: 18 additions & 18 deletions src/test/java/io/strimzi/kafka/bridge/clients/BasicKafkaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name);

try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp)
.withProperties(properties)
.withHeaders(headers)
.withNullKeyRecord(withNullKeyRecord)
.build()) {
.withProperties(properties)
.withHeaders(headers)
.withNullKeyRecord(withNullKeyRecord)
.build()) {

plainProducer.getVertx().deployVerticle(plainProducer);

Expand All @@ -84,7 +84,7 @@ public int sendStringMessagesPlain(long timeoutMs, String topicName, int message
*/
public int sendStringMessagesPlain(String topicName, int messageCount) {
return sendStringMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount,
List.of(), "\"Hello\" : \"World\"", 0, null, false);
List.of(), "\"Hello\" : \"World\"", 0, null, false);
}

/**
Expand All @@ -93,7 +93,7 @@ public int sendStringMessagesPlain(String topicName, int messageCount) {
* @param topicName topic name where messages are send
* @param message content to be sent
* @param messageCount message count
* @param partition partition, which will be selected
* @param partition partition, which will be selected
* @return sent message count
*/
public int sendStringMessagesPlain(String topicName, String message, int messageCount, int partition) {
Expand Down Expand Up @@ -133,10 +133,10 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo
properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-sender-plain-" + new Random().nextInt(Integer.MAX_VALUE));
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name);
try (Producer plainProducer = new Producer.ProducerBuilder(resultPromise, msgCntPredicate, topicName, message, partition, timestamp)
.withProperties(properties)
.withHeaders(headers)
.withNullKeyRecord(withNullKeyRecord)
.build()) {
.withProperties(properties)
.withHeaders(headers)
.withNullKeyRecord(withNullKeyRecord)
.build()) {
plainProducer.getVertx().deployVerticle(plainProducer);

return plainProducer.getResultPromise().get(timeoutMs, TimeUnit.MILLISECONDS);
Expand All @@ -157,7 +157,7 @@ public int sendJsonMessagesPlain(long timeoutMs, String topicName, int messageCo
*/
public int sendJsonMessagesPlain(String topicName, int messageCount, String message, Long timestamp) {
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
message, 0, timestamp, false);
message, 0, timestamp, false);
}

/**
Expand All @@ -172,7 +172,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess
*/
public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition, boolean withNullKeyRecord) {
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
message, partition, null, withNullKeyRecord);
message, partition, null, withNullKeyRecord);
}

/**
Expand All @@ -195,26 +195,26 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, List<KafkaH
*
* @param topicName topic name where messages are send
* @param messageCount message count
* @param message specific message to send
* @param partition partition count, how many shards/partitions will topic have
* @param message specific message to send
* @param partition partition count, how many shards/partitions will topic have
* @return sent message count
*/
public int sendJsonMessagesPlain(String topicName, int messageCount, String message, int partition) {
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
message, partition, null, false);
message, partition, null, false);
}

/**
* Send messages to entry-point of the kafka cluster with PLAINTEXT security protocol setting
*
* @param topicName topic name where messages are send
* @param messageCount message count
* @param message specific message to send
* @param message specific message to send
* @return sent message count
*/
public int sendJsonMessagesPlain(String topicName, int messageCount, String message) {
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
message, 0, null, false);
message, 0, null, false);
}

/**
Expand All @@ -226,7 +226,7 @@ public int sendJsonMessagesPlain(String topicName, int messageCount, String mess
*/
public int sendJsonMessagesPlain(String topicName, int messageCount) {
return sendJsonMessagesPlain(Duration.ofMinutes(2).toMillis(), topicName, messageCount, List.of(),
"{\"Hello\" : \"World\"}", 0, null, false);
"{\"Hello\" : \"World\"}", 0, null, false);
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ void sendSimpleMessageWithKey(VertxTestContext context) throws InterruptedExcept
KafkaFuture<Void> future = adminClientFacade.createTopic(topic, 2, 1);

String value = "message-value";

String key = "my-key";

JsonArray records = new JsonArray();
Expand Down

0 comments on commit 31c82be

Please sign in to comment.