From d07f3c74324f96f1a5d3eac5e2e98b205a19a7c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Tue, 7 May 2024 14:55:58 +0200 Subject: [PATCH 1/4] [improve][client]: add --streaming option to topics partitioned-stats-internal (#261) (cherry picked from commit 471669ae594243fa50066b55f4ab3b99beeff015) --- .../apache/pulsar/client/admin/Topics.java | 53 +++++++++++++++++++ .../client/admin/internal/TopicsImpl.java | 27 ++++++++++ .../apache/pulsar/admin/cli/CmdTopics.java | 34 ++++++++++-- 3 files changed, 110 insertions(+), 4 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index c681bd1a7bca1..821d13d77eb7c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.admin; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Optional; @@ -1206,6 +1207,26 @@ default CompletableFuture getStatsAsync(String topic) { */ PersistentTopicInternalStats getInternalStats(String topic, boolean metadata) throws PulsarAdminException; + + /** + * Get the internal stats for the topic in a streaming fashion. + *

+ * + * @param topic + * topic name + * @param metadata + * flag to include ledger metadata + * @return the topic statistics + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic does not exist + * @throws PulsarAdminException + * Unexpected error + */ + InputStream streamInternalStats(String topic, boolean metadata) throws PulsarAdminException; + /** * Get the internal stats for the topic. *

@@ -1235,6 +1256,17 @@ default CompletableFuture getStatsAsync(String topic) { */ CompletableFuture getInternalStatsAsync(String topic, boolean metadata); + /** + * Get the internal stats for the topic asynchronously in a streaming fashion. + * + * @param topic + * topic Name + * @param metadata + * flag to include ledger metadata + * @return a future that can be used to track when the internal topic statistics are returned + */ + CompletableFuture streamInternalStatsAsync(String topic, boolean metadata); + /** * Get the internal stats for the topic asynchronously. * @@ -1405,6 +1437,27 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) */ CompletableFuture getPartitionedInternalStatsAsync(String topic); + + /** + * Get the stats for the partitioned topic in a streaming fashion. + * + * @param topic + * topic name + * @return + * @throws PulsarAdminException + */ + InputStream streamPartitionedInternalStats(String topic) + throws PulsarAdminException; + + /** + * Get the stats-internal for the partitioned topic asynchronously in a streaming fashion. + * + * @param topic + * topic Name + * @return a future that can be used to track when the partitioned topic statistics are returned + */ + CompletableFuture streamPartitionedInternalStatsAsync(String topic); + /** * Delete a subscription. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 9c4a6eef753de..025dc0312fa15 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -44,6 +44,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; + import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -674,6 +675,20 @@ public CompletableFuture getInternalStatsAsync(Str return asyncGetRequest(path, new FutureCallback(){}); } + @Override + public InputStream streamInternalStats(String topic, boolean metadata) throws PulsarAdminException { + return sync(() -> streamInternalStatsAsync(topic, metadata)); + } + + @Override + public CompletableFuture streamInternalStatsAsync(String topic, boolean metadata) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "internalStats"); + path = path.queryParam("metadata", metadata); + CompletableFuture future = new CompletableFuture<>(); + return asyncGetRequest(path, new FutureCallback() {}); + } + @Override public String getInternalInfo(String topic) throws PulsarAdminException { return sync(() -> getInternalInfoAsync(topic)); @@ -776,6 +791,18 @@ public CompletableFuture getPartitionedInternalSt return asyncGetRequest(path, new FutureCallback(){}); } + @Override + public InputStream streamPartitionedInternalStats(String topic) throws PulsarAdminException { + return sync(() -> streamPartitionedInternalStatsAsync(topic)); + } + + @Override + public CompletableFuture streamPartitionedInternalStatsAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "partitioned-internalStats"); + return asyncGetRequest(path, new FutureCallback(){}); + } + @Override public void deleteSubscription(String topic, String subName) throws PulsarAdminException { sync(() -> deleteSubscriptionAsync(topic, subName)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 261bd81a5b7bd..4d9caf5035473 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -28,7 +28,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -747,11 +750,22 @@ private class GetInternalStats extends CliCommand { @Option(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata") private boolean metadata = false; + @Parameter(names = "--streaming", description = "Streaming the output directly to the standard output without " + + "parsing the response in memory.") + private boolean streaming = false; @Override - void run() throws PulsarAdminException { - String topic = validateTopicName(topicName); - print(getTopics().getInternalStats(topic, metadata)); + void run() throws PulsarAdminException, IOException { + String persistentTopic = validatePersistentTopic(params); + if (streaming) { + try (InputStream in = getTopics().streamInternalStats(persistentTopic, metadata);) { + int size; + byte[] buffer = new byte[2048]; + while ((size = in.read(buffer)) != -1) System.out.write(buffer, 0, size); + } + } else { + print(getTopics().getInternalStats(persistentTopic, metadata)); + } } } @@ -811,11 +825,23 @@ void run() throws Exception { private class GetPartitionedStatsInternal extends CliCommand { @Parameters(description = "persistent://tenant/namespace/topic", arity = "1") private String topicName; + @Parameter(names = "--streaming", description = "Streaming the output directly to the standard output without " + + "parsing the response in memory.") + private boolean streaming = false; @Override void run() throws Exception { String topic = validateTopicName(topicName); - print(getTopics().getPartitionedInternalStats(topic)); + if (streaming) { + try (InputStream in = getTopics().streamPartitionedInternalStats(topic);) { + int size; + byte[] buffer = new byte[2048]; + while ((size = in.read(buffer)) != -1) System.out.write(buffer, 0, size); + } + } else { + print(getTopics().getPartitionedInternalStats(topic)); + } + } } From 9e9d4a3572b12c2a51701bc4bb5f426e045ae442 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 27 Sep 2024 10:57:22 -0700 Subject: [PATCH 2/4] checkstyle, picocli lib, etc --- .../client/admin/internal/TopicsImpl.java | 1 - .../apache/pulsar/admin/cli/CmdTopics.java | 20 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 025dc0312fa15..5f23ad0cdb274 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -44,7 +44,6 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; - import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 4d9caf5035473..6eb0c3aee32cd 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -28,10 +28,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; -import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -750,18 +748,20 @@ private class GetInternalStats extends CliCommand { @Option(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata") private boolean metadata = false; - @Parameter(names = "--streaming", description = "Streaming the output directly to the standard output without " + - "parsing the response in memory.") + @Option(names = "--streaming", description = "Streaming the output directly to the standard output without " + + "parsing the response in memory.") private boolean streaming = false; @Override void run() throws PulsarAdminException, IOException { - String persistentTopic = validatePersistentTopic(params); + String persistentTopic = validatePersistentTopic(topicName); if (streaming) { try (InputStream in = getTopics().streamInternalStats(persistentTopic, metadata);) { int size; byte[] buffer = new byte[2048]; - while ((size = in.read(buffer)) != -1) System.out.write(buffer, 0, size); + while ((size = in.read(buffer)) != -1) { + System.out.write(buffer, 0, size); + } } } else { print(getTopics().getInternalStats(persistentTopic, metadata)); @@ -825,8 +825,8 @@ void run() throws Exception { private class GetPartitionedStatsInternal extends CliCommand { @Parameters(description = "persistent://tenant/namespace/topic", arity = "1") private String topicName; - @Parameter(names = "--streaming", description = "Streaming the output directly to the standard output without " + - "parsing the response in memory.") + @Option(names = "--streaming", description = "Streaming the output directly to the standard output without " + + "parsing the response in memory.") private boolean streaming = false; @Override @@ -836,7 +836,9 @@ void run() throws Exception { try (InputStream in = getTopics().streamPartitionedInternalStats(topic);) { int size; byte[] buffer = new byte[2048]; - while ((size = in.read(buffer)) != -1) System.out.write(buffer, 0, size); + while ((size = in.read(buffer)) != -1) { + System.out.write(buffer, 0, size); + } } } else { print(getTopics().getPartitionedInternalStats(topic)); From b9c578673f3f607dd46d66bd2a7a1cb126c09957 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 27 Sep 2024 11:53:09 -0700 Subject: [PATCH 3/4] test --- .../pulsar/client/admin/internal/TopicsImpl.java | 1 - .../pulsar/admin/cli/PulsarAdminToolTest.java | 14 ++++++++++++++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 5f23ad0cdb274..de66e99dff3bb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -684,7 +684,6 @@ public CompletableFuture streamInternalStatsAsync(String topic, boo TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "internalStats"); path = path.queryParam("metadata", metadata); - CompletableFuture future = new CompletableFuture<>(); return asyncGetRequest(path, new FutureCallback() {}); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 5f8c9f49d65d1..46c6b2dfa0e1f 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -38,8 +38,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; import java.io.PrintStream; import java.io.PrintWriter; import java.net.URL; @@ -1560,6 +1563,11 @@ public void topics() throws Exception { Lookup mockLookup = mock(Lookup.class); when(admin.lookups()).thenReturn(mockLookup); + String statsString = "{}"; + InputStream stream = new ByteArrayInputStream(statsString.getBytes(StandardCharsets.UTF_8)); + when(mockTopics.streamInternalStats("persistent://myprop/clust/ns1/ds1", false)).thenReturn(stream); + when(mockTopics.streamPartitionedInternalStats("persistent://myprop/clust/ns1/ds1")).thenReturn(stream); + CmdTopics cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("truncate persistent://myprop/clust/ns1/ds1")); @@ -1607,6 +1615,9 @@ public void topics() throws Exception { cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1 --streaming")); + verify(mockTopics).streamInternalStats("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap")); verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true); cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold")); @@ -1655,6 +1666,9 @@ public void topics() throws Exception { cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1 --streaming")); + verify(mockTopics).streamPartitionedInternalStats("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1")); verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 6eb0c3aee32cd..a5b151bc6d0a8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -833,7 +833,7 @@ private class GetPartitionedStatsInternal extends CliCommand { void run() throws Exception { String topic = validateTopicName(topicName); if (streaming) { - try (InputStream in = getTopics().streamPartitionedInternalStats(topic);) { + try (InputStream in = getTopics().streamPartitionedInternalStats(topic)) { int size; byte[] buffer = new byte[2048]; while ((size = in.read(buffer)) != -1) { From cd8810902ec6b7df10ced189295ec64a293647a8 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 27 Sep 2024 14:39:02 -0700 Subject: [PATCH 4/4] clean up --- .../main/java/org/apache/pulsar/admin/cli/CmdTopics.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index a5b151bc6d0a8..4cf60dd269b28 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -754,9 +754,9 @@ private class GetInternalStats extends CliCommand { @Override void run() throws PulsarAdminException, IOException { - String persistentTopic = validatePersistentTopic(topicName); + String topic = validateTopicName(topicName); if (streaming) { - try (InputStream in = getTopics().streamInternalStats(persistentTopic, metadata);) { + try (InputStream in = getTopics().streamInternalStats(topic, metadata);) { int size; byte[] buffer = new byte[2048]; while ((size = in.read(buffer)) != -1) { @@ -764,7 +764,7 @@ void run() throws PulsarAdminException, IOException { } } } else { - print(getTopics().getInternalStats(persistentTopic, metadata)); + print(getTopics().getInternalStats(topic, metadata)); } } }