From 0d9b59bc450441b730724e36d934591fe02ba45b Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 24 Feb 2025 22:52:08 -0800 Subject: [PATCH] [improve][cli] Support additional msg metadata for V1 topic on peek message cmd (#23978) (cherry picked from commit 626b211f91fd8d1e9821ae8e2b9b29520e72ac63) (cherry picked from commit 657d5ee17e59e2cd72d1f4dce703e35b3fe2422a) --- .../pulsar/admin/cli/CmdPersistentTopics.java | 24 +---- .../apache/pulsar/admin/cli/CmdTopics.java | 94 ++++++++++--------- 2 files changed, 52 insertions(+), 66 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 3dc0ba7b6f24a..7b86e2af7f523 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.admin.cli; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.admin.cli.CmdTopics.printMessages; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.netty.buffer.ByteBuf; @@ -37,8 +38,6 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import picocli.CommandLine.Command; import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; @@ -589,26 +588,7 @@ private class PeekMessages extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages); - int position = 0; - for (Message msg : messages) { - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - if (msg.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, false, this); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 382e6c2522a2f..614970341851e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1150,50 +1150,7 @@ void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, showServerMarker, transactionIsolationLevel); - int position = 0; - for (Message msg : messages) { - MessageImpl message = (MessageImpl) msg; - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - - System.out.println("Publish time: " + message.getPublishTime()); - System.out.println("Event time: " + message.getEventTime()); - - if (message.getDeliverAtTime() != 0) { - System.out.println("Deliver at time: " + message.getDeliverAtTime()); - } - MessageMetadata msgMetaData = message.getMessageBuilder(); - if (showServerMarker && msgMetaData.hasMarkerType()) { - System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); - } - - if (message.getBrokerEntryMetadata() != null) { - if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { - System.out.println("Broker entry metadata timestamp: " - + message.getBrokerEntryMetadata().getBrokerTimestamp()); - } - if (message.getBrokerEntryMetadata().hasIndex()) { - System.out.println("Broker entry metadata index: " - + message.getBrokerEntryMetadata().getIndex()); - } - } - - if (message.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, showServerMarker, this); } } @@ -1412,6 +1369,55 @@ static MessageId findFirstLedgerWithinThreshold(List> messages, boolean showServerMarker, CliCommand cli) { + if (messages == null) { + return; + } + int position = 0; + for (Message msg : messages) { + MessageImpl message = (MessageImpl) msg; + if (++position != 1) { + System.out.println("-------------------------------------------------------------------------\n"); + } + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + MessageMetadata msgMetaData = message.getMessageBuilder(); + if (showServerMarker && msgMetaData.hasMarkerType()) { + System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + cli.print(msg.getProperties()); + } + ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); + System.out.println(ByteBufUtil.prettyHexDump(data)); + } + } + @Command(description = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)") private class Offload extends CliCommand { @Option(names = { "-s", "--size-threshold" },