diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index d437cb5929c7f..9efef66fc3d74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -259,6 +259,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { if (log.isDebugEnabled()) { log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies); } + if (!isSystemTopic()) { + updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); + } topicPolicies.getRetentionPolicies().updateNamespaceValue(namespacePolicies.retention_policies); topicPolicies.getCompactionThreshold().updateNamespaceValue(namespacePolicies.compaction_threshold); topicPolicies.getReplicationClusters().updateNamespaceValue( @@ -281,7 +285,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue( namespacePolicies.deduplicationSnapshotIntervalSeconds); - updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue( Optional.ofNullable(namespacePolicies.delayed_delivery_policies) .map(DelayedDeliveryPolicies::isActive).orElse(null)); @@ -299,7 +302,6 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { updateNamespaceSubscriptionDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); - updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced); topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index f2cec2138a3a0..2eb957353010c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -19,15 +19,19 @@ package org.apache.pulsar.broker.service.persistent; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PublishRateLimiter; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.Policies; public class SystemTopic extends PersistentTopic { @@ -111,4 +115,19 @@ public EntryFilters getEntryFiltersPolicy() { public List getEntryFilters() { return null; } + + @Override + public PublishRateLimiter getBrokerPublishRateLimiter() { + return PublishRateLimiter.DISABLED_RATE_LIMITER; + } + + @Override + public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { + // nothing todo. + } + + @Override + public Optional getBrokerDispatchRateLimiter() { + return Optional.empty(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java index b13f150387bee..b8cff6f838264 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java @@ -27,6 +27,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.util.RateLimiter; @@ -115,6 +117,23 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti super.internalCleanup(); } + @Test + public void testSystemTopicPublishNonBlock() throws Exception { + super.baseSetup(); + PublishRate publishRate = new PublishRate(1,10); + admin.namespaces().setPublishRate("prop/ns-abc", publishRate); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + PulsarAdmin admin1 = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null + ? brokerUrl.toString() : brokerUrlTls.toString()).readTimeout(5, TimeUnit.SECONDS).build(); + admin1.topics().createNonPartitionedTopic(topic); + admin1.topicPolicies().setDeduplicationStatus(topic, true); + admin1.topicPolicies().setDeduplicationStatus(topic, false); + // cleanup. + admin.namespaces().removePublishRate("prop/ns-abc"); + admin1.close(); + super.internalCleanup(); + } + @Test public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception { PublishRate publishRate = new PublishRate(1,10); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index 4f4affc39d316..fc82cdc5badf5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -48,6 +49,8 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.awaitility.Awaitility; import org.slf4j.Logger; @@ -211,6 +214,42 @@ public void testMessageRateDynamicallyChange() throws Exception { producer.close(); } + @SuppressWarnings("deprecation") + @Test + public void testSystemTopicDeliveryNonBlock() throws Exception { + final String namespace = "my-property/throttling_ns"; + admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + final String topicName = "persistent://" + namespace + "/" + UUID.randomUUID().toString().replaceAll("-", ""); + admin.topics().createNonPartitionedTopic(topicName); + // Set a rate limitation. + DispatchRate dispatchRate = DispatchRate.builder() + .dispatchThrottlingRateInMsg(1) + .dispatchThrottlingRateInByte(-1) + .ratePeriodInSecond(360) + .build(); + admin.namespaces().setDispatchRate(namespace, dispatchRate); + + // Verify the limitation does not take effect. in other words, the topic policies should takes effect. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + admin.topicPolicies().setPublishRate(topicName, new PublishRate(1000, 1000)); + Awaitility.await().untilAsserted(() -> { + assertNotNull(persistentTopic.getHierarchyTopicPolicies().getPublishRate().getTopicValue()); + }); + admin.topicPolicies().setRetention(topicName, new RetentionPolicies(1000, 1000)); + Awaitility.await().untilAsserted(() -> { + assertNotNull(persistentTopic.getHierarchyTopicPolicies().getRetentionPolicies().getTopicValue()); + }); + admin.topicPolicies().setMessageTTL(topicName, 1000); + Awaitility.await().untilAsserted(() -> { + assertNotNull(persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().getTopicValue()); + }); + + // cleanup. + admin.topics().delete(topicName); + admin.namespaces().removeDispatchRate(namespace); + } + /** * verify: consumer should not receive all messages due to message-rate throttling *