diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 4156df4d27f02..e869d07a6f679 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -1055,8 +1055,7 @@ public void testDeadLetterTopicWithInitialSubscription() throws Exception { @Test() public void testDeadLetterTopicWithProducerBuilder() throws Exception { - final String suffix = "-with-producer-builder"; - final String topic = "persistent://my-property/my-ns/dead-letter-topic" + suffix; + final String topic = "persistent://my-property/my-ns/dead-letter-topic-with-producer-builder"; final int maxRedeliveryCount = 2; final int sendMessages = 100; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 2b897760b6f00..e9940ce424e6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -32,6 +32,8 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; + import lombok.Cleanup; import lombok.Data; import org.apache.avro.AvroRuntimeException; @@ -136,6 +138,85 @@ public void testRetryTopic() throws Exception { checkConsumer.close(); } + @Test + public void testRetryTopicWithProducerBuilder() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic-with-producer-builder"; + final int maxRedeliveryCount = 2; + final int sendMessages = 100; + + // enable batch + Function> producerBuilderFunction = (topicName) -> { + ProducerBuilder producerBuilder = pulsarClient.newProducer(Schema.BYTES); + producerBuilder.topic(topicName).enableBatching(true); + return producerBuilder; + }; + String subscriptionName = "my-subscription"; + String subscriptionNameDLQ = "my-subscription-DLQ"; + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .retryLetterProducerBuilder(producerBuilderFunction) + .build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic(topic + "-" + subscriptionName + "-DLQ") + .subscriptionName(subscriptionNameDLQ) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + /** * Retry topic feature relies on the delay queue feature when consumer produce a delayed message * to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type.