diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index c681bd1a7bca1..821d13d77eb7c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.admin; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Optional; @@ -1206,6 +1207,26 @@ default CompletableFuture getStatsAsync(String topic) { */ PersistentTopicInternalStats getInternalStats(String topic, boolean metadata) throws PulsarAdminException; + + /** + * Get the internal stats for the topic in a streaming fashion. + *

+ * + * @param topic + * topic name + * @param metadata + * flag to include ledger metadata + * @return the topic statistics + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic does not exist + * @throws PulsarAdminException + * Unexpected error + */ + InputStream streamInternalStats(String topic, boolean metadata) throws PulsarAdminException; + /** * Get the internal stats for the topic. *

@@ -1235,6 +1256,17 @@ default CompletableFuture getStatsAsync(String topic) { */ CompletableFuture getInternalStatsAsync(String topic, boolean metadata); + /** + * Get the internal stats for the topic asynchronously in a streaming fashion. + * + * @param topic + * topic Name + * @param metadata + * flag to include ledger metadata + * @return a future that can be used to track when the internal topic statistics are returned + */ + CompletableFuture streamInternalStatsAsync(String topic, boolean metadata); + /** * Get the internal stats for the topic asynchronously. * @@ -1405,6 +1437,27 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) */ CompletableFuture getPartitionedInternalStatsAsync(String topic); + + /** + * Get the stats for the partitioned topic in a streaming fashion. + * + * @param topic + * topic name + * @return + * @throws PulsarAdminException + */ + InputStream streamPartitionedInternalStats(String topic) + throws PulsarAdminException; + + /** + * Get the stats-internal for the partitioned topic asynchronously in a streaming fashion. + * + * @param topic + * topic Name + * @return a future that can be used to track when the partitioned topic statistics are returned + */ + CompletableFuture streamPartitionedInternalStatsAsync(String topic); + /** * Delete a subscription. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9c4a6eef753de..de66e99dff3bb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -674,6 +674,19 @@ public CompletableFuture getInternalStatsAsync(Str return asyncGetRequest(path, new FutureCallback(){}); } + @Override + public InputStream streamInternalStats(String topic, boolean metadata) throws PulsarAdminException { + return sync(() -> streamInternalStatsAsync(topic, metadata)); + } + + @Override + public CompletableFuture streamInternalStatsAsync(String topic, boolean metadata) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "internalStats"); + path = path.queryParam("metadata", metadata); + return asyncGetRequest(path, new FutureCallback() {}); + } + @Override public String getInternalInfo(String topic) throws PulsarAdminException { return sync(() -> getInternalInfoAsync(topic)); @@ -776,6 +789,18 @@ public CompletableFuture getPartitionedInternalSt return asyncGetRequest(path, new FutureCallback(){}); } + @Override + public InputStream streamPartitionedInternalStats(String topic) throws PulsarAdminException { + return sync(() -> streamPartitionedInternalStatsAsync(topic)); + } + + @Override + public CompletableFuture streamPartitionedInternalStatsAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "partitioned-internalStats"); + return asyncGetRequest(path, new FutureCallback(){}); + } + @Override public void deleteSubscription(String topic, String subName) throws PulsarAdminException { sync(() -> deleteSubscriptionAsync(topic, subName)); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5f8c9f49d65d1..46c6b2dfa0e1f 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -38,8 +38,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; import java.io.PrintStream; import java.io.PrintWriter; import java.net.URL; @@ -1560,6 +1563,11 @@ public void topics() throws Exception { Lookup mockLookup = mock(Lookup.class); when(admin.lookups()).thenReturn(mockLookup); + String statsString = "{}"; + InputStream stream = new ByteArrayInputStream(statsString.getBytes(StandardCharsets.UTF_8)); + when(mockTopics.streamInternalStats("persistent://myprop/clust/ns1/ds1", false)).thenReturn(stream); + when(mockTopics.streamPartitionedInternalStats("persistent://myprop/clust/ns1/ds1")).thenReturn(stream); + CmdTopics cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("truncate persistent://myprop/clust/ns1/ds1")); @@ -1607,6 +1615,9 @@ public void topics() throws Exception { cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1 --streaming")); + verify(mockTopics).streamInternalStats("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap")); verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold")); @@ -1655,6 +1666,9 @@ public void topics() throws Exception { cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1 --streaming")); + verify(mockTopics).streamPartitionedInternalStats("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1")); verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 261bd81a5b7bd..4cf60dd269b28 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -29,6 +29,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -747,11 +748,24 @@ private class GetInternalStats extends CliCommand { @Option(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata") private boolean metadata = false; + @Option(names = "--streaming", description = "Streaming the output directly to the standard output without " + + "parsing the response in memory.") + private boolean streaming = false; @Override - void run() throws PulsarAdminException { + void run() throws PulsarAdminException, IOException { String topic = validateTopicName(topicName); - print(getTopics().getInternalStats(topic, metadata)); + if (streaming) { + try (InputStream in = getTopics().streamInternalStats(topic, metadata);) { + int size; + byte[] buffer = new byte[2048]; + while ((size = in.read(buffer)) != -1) { + System.out.write(buffer, 0, size); + } + } + } else { + print(getTopics().getInternalStats(topic, metadata)); + } } } @@ -811,11 +825,25 @@ void run() throws Exception { private class GetPartitionedStatsInternal extends CliCommand { @Parameters(description = "persistent://tenant/namespace/topic", arity = "1") private String topicName; + @Option(names = "--streaming", description = "Streaming the output directly to the standard output without " + + "parsing the response in memory.") + private boolean streaming = false; @Override void run() throws Exception { String topic = validateTopicName(topicName); - print(getTopics().getPartitionedInternalStats(topic)); + if (streaming) { + try (InputStream in = getTopics().streamPartitionedInternalStats(topic)) { + int size; + byte[] buffer = new byte[2048]; + while ((size = in.read(buffer)) != -1) { + System.out.write(buffer, 0, size); + } + } + } else { + print(getTopics().getPartitionedInternalStats(topic)); + } + } }