Closed
Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
Pulsar Version: 3.0.7
Minimal reproduce step
Note: The issue is coming intermittently
- Create a topic and subscription or use an existing topic/subscription
- Set dispatch rate policy at subscription level using below command
3.bin\pulsar-admin topicPolicies set-subscription-dispatch-rate -bd 34323 -s <<subname>> <<topicfqn>>
What did you expect to see?
The subscription dispatch rate should get set with 200 response.
What did you see instead?
Getting below exception
Message: null
Stacktrace:
**java.lang.UnsupportedOperationException**
at java.base/java.util.Collections$EmptyMap.**computeIfAbsent**(Collections.java:4764)
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$**internalSetSubscriptionLevelDispatchRate**$487(PersistentTopicsBase.java:4960)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$2(RetryUtil.java:62)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:48)
at org.apache.pulsar.client.util.RetryUtil.lambda$retryAsynchronously$0(RetryUtil.java:42)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Anything else?
The error is coming when subscriptionPolicies
is null; it's returning Collections.emptyMap()
which is immutable, and internalSetSubscriptionLevelDispatchRate
is trying to add the element in that map.
Fix would be instead of returning an immutable map, it should be a mutable map.
Ref: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L4778
Are you willing to submit a PR?
- I'm willing to submit a PR!