Skip to content

Commit

Permalink
add code.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Feb 27, 2025
1 parent f2e4ac4 commit 03e5279
Showing 1 changed file with 92 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import lombok.Cleanup;
import lombok.Data;
Expand Down Expand Up @@ -1052,6 +1053,97 @@ public void testDeadLetterTopicWithInitialSubscription() throws Exception {
consumer.close();
}

@Test()
public void testDeadLetterTopicWithProducerBuilder() throws Exception {
final String suffix = "-with-producer-builder";
final String topic = "persistent://my-property/my-ns/dead-letter-topic" + suffix;
final int maxRedeliveryCount = 2;
final int sendMessages = 100;

// enable batch
Function<String, ProducerBuilder<byte[]>> producerBuilderFunction = (topicName) -> {
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer(Schema.BYTES);
producerBuilder.topic(topicName).enableBatching(true);
return producerBuilder;
};
String subscriptionName = "my-subscription";
String subscriptionNameDLQ = "my-subscription-DLQ";
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.initialSubscriptionName(subscriptionNameDLQ)
.deadLetterProducerBuilder(producerBuilderFunction)
.retryLetterProducerBuilder(producerBuilderFunction)
.build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic(topic + "-" + subscriptionName + "-DLQ")
.subscriptionName(subscriptionNameDLQ)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.enableBatching(true)
.create();

Map<Integer, String> messageContent = new HashMap<>();
for (int i = 0; i < sendMessages; i++) {
String data = String.format("Hello Pulsar [%d]", i);
producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send();
messageContent.put(i, data);
}
producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
assertNotNull(message, "The consumer should be able to receive messages.");
log.info("consumer received message : {}", message.getMessageId());
totalReceived++;
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS);
assertNotNull(message, "the deadLetterConsumer should receive messages.");
assertEquals(new String(message.getData()), messageContent.get(Integer.parseInt(message.getKey())));
messageContent.remove(Integer.parseInt(message.getKey()));
log.info("dead letter consumer received message : {}", message.getMessageId());
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);
assertTrue(messageContent.isEmpty());

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
}
assertNull(checkMessage);

checkConsumer.close();
}

private CompletableFuture<Void> consumerReceiveForDLQ(Consumer<byte[]> consumer, AtomicInteger totalReceived,
int sendMessages, int maxRedeliveryCount) {
return CompletableFuture.runAsync(() -> {
Expand Down

0 comments on commit 03e5279

Please sign in to comment.