Skip to content

Commit 657d5ee

Browse files
rdhabalialhotari
authored andcommitted
[improve][cli] Support additional msg metadata for V1 topic on peek message cmd (#23978)
(cherry picked from commit 626b211)
1 parent 50997af commit 657d5ee

File tree

2 files changed

+52
-66
lines changed

2 files changed

+52
-66
lines changed

Diff for: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java

+2-22
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.admin.cli;
2020

2121
import static org.apache.commons.lang3.StringUtils.isNotBlank;
22+
import static org.apache.pulsar.admin.cli.CmdTopics.printMessages;
2223
import com.google.gson.Gson;
2324
import com.google.gson.GsonBuilder;
2425
import io.netty.buffer.ByteBuf;
@@ -37,8 +38,6 @@
3738
import org.apache.pulsar.client.admin.Topics;
3839
import org.apache.pulsar.client.api.Message;
3940
import org.apache.pulsar.client.api.MessageId;
40-
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
41-
import org.apache.pulsar.client.impl.MessageIdImpl;
4241
import picocli.CommandLine.Command;
4342
import picocli.CommandLine.Option;
4443
import picocli.CommandLine.Parameters;
@@ -589,26 +588,7 @@ private class PeekMessages extends CliCommand {
589588
void run() throws PulsarAdminException {
590589
String persistentTopic = validatePersistentTopic(topicName);
591590
List<Message<byte[]>> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages);
592-
int position = 0;
593-
for (Message<byte[]> msg : messages) {
594-
if (++position != 1) {
595-
System.out.println("-------------------------------------------------------------------------\n");
596-
}
597-
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
598-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
599-
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
600-
+ msgId.getBatchIndex());
601-
} else {
602-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
603-
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
604-
}
605-
if (msg.getProperties().size() > 0) {
606-
System.out.println("Properties:");
607-
print(msg.getProperties());
608-
}
609-
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
610-
System.out.println(ByteBufUtil.prettyHexDump(data));
611-
}
591+
printMessages(messages, false, this);
612592
}
613593
}
614594

Diff for: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java

+50-44
Original file line numberDiff line numberDiff line change
@@ -1117,50 +1117,7 @@ void run() throws PulsarAdminException {
11171117
String persistentTopic = validatePersistentTopic(topicName);
11181118
List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages,
11191119
showServerMarker, transactionIsolationLevel);
1120-
int position = 0;
1121-
for (Message<byte[]> msg : messages) {
1122-
MessageImpl message = (MessageImpl) msg;
1123-
if (++position != 1) {
1124-
System.out.println("-------------------------------------------------------------------------\n");
1125-
}
1126-
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1127-
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1128-
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
1129-
+ msgId.getBatchIndex());
1130-
} else {
1131-
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
1132-
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
1133-
}
1134-
1135-
System.out.println("Publish time: " + message.getPublishTime());
1136-
System.out.println("Event time: " + message.getEventTime());
1137-
1138-
if (message.getDeliverAtTime() != 0) {
1139-
System.out.println("Deliver at time: " + message.getDeliverAtTime());
1140-
}
1141-
MessageMetadata msgMetaData = message.getMessageBuilder();
1142-
if (showServerMarker && msgMetaData.hasMarkerType()) {
1143-
System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType()));
1144-
}
1145-
1146-
if (message.getBrokerEntryMetadata() != null) {
1147-
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
1148-
System.out.println("Broker entry metadata timestamp: "
1149-
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
1150-
}
1151-
if (message.getBrokerEntryMetadata().hasIndex()) {
1152-
System.out.println("Broker entry metadata index: "
1153-
+ message.getBrokerEntryMetadata().getIndex());
1154-
}
1155-
}
1156-
1157-
if (message.getProperties().size() > 0) {
1158-
System.out.println("Properties:");
1159-
print(msg.getProperties());
1160-
}
1161-
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
1162-
System.out.println(ByteBufUtil.prettyHexDump(data));
1163-
}
1120+
printMessages(messages, showServerMarker, this);
11641121
}
11651122
}
11661123

@@ -1379,6 +1336,55 @@ static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
13791336
return null;
13801337
}
13811338

1339+
public static void printMessages(List<Message<byte[]>> messages, boolean showServerMarker, CliCommand cli) {
1340+
if (messages == null) {
1341+
return;
1342+
}
1343+
int position = 0;
1344+
for (Message<byte[]> msg : messages) {
1345+
MessageImpl message = (MessageImpl) msg;
1346+
if (++position != 1) {
1347+
System.out.println("-------------------------------------------------------------------------\n");
1348+
}
1349+
if (message.getMessageId() instanceof BatchMessageIdImpl) {
1350+
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
1351+
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
1352+
+ msgId.getBatchIndex());
1353+
} else {
1354+
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
1355+
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
1356+
}
1357+
1358+
System.out.println("Publish time: " + message.getPublishTime());
1359+
System.out.println("Event time: " + message.getEventTime());
1360+
1361+
if (message.getDeliverAtTime() != 0) {
1362+
System.out.println("Deliver at time: " + message.getDeliverAtTime());
1363+
}
1364+
MessageMetadata msgMetaData = message.getMessageBuilder();
1365+
if (showServerMarker && msgMetaData.hasMarkerType()) {
1366+
System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType()));
1367+
}
1368+
1369+
if (message.getBrokerEntryMetadata() != null) {
1370+
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
1371+
System.out.println("Broker entry metadata timestamp: "
1372+
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
1373+
}
1374+
if (message.getBrokerEntryMetadata().hasIndex()) {
1375+
System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex());
1376+
}
1377+
}
1378+
1379+
if (message.getProperties().size() > 0) {
1380+
System.out.println("Properties:");
1381+
cli.print(msg.getProperties());
1382+
}
1383+
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
1384+
System.out.println(ByteBufUtil.prettyHexDump(data));
1385+
}
1386+
}
1387+
13821388
@Command(description = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)")
13831389
private class Offload extends CliCommand {
13841390
@Option(names = { "-s", "--size-threshold" },

0 commit comments

Comments
 (0)