diff --git a/conf/broker.conf b/conf/broker.conf index af335c141534f..6d72fb969784f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -204,6 +204,9 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 +# Default number of partitions for the system topics +systemTopicDefaultNumPartitions=1 + # Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter. # If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true. brokerDeleteInactiveTopicsEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 90cf3b57ff941..c27bf12637ec0 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1187,6 +1187,9 @@ allowAutoSubscriptionCreation=true # The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. defaultNumPartitions=1 +# Default number of partitions for the system topics +systemTopicDefaultNumPartitions=1 + ### --- Transaction config variables --- ### # Enable transaction coordinator in broker transactionCoordinatorEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0f7ae00713dce..394e02196a5c1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2167,6 +2167,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " if allowAutoTopicCreationType is partitioned." ) private int defaultNumPartitions = 1; + @FieldContext( + category = CATEGORY_STORAGE_ML, + dynamic = true, + doc = "Default number of partitions for the system topics." + ) + private int systemTopicDefaultNumPartitions = 1; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "The class of the managed ledger storage" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c79d839097e68..3f54b8add723f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3571,9 +3571,11 @@ public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Op public int getDefaultNumPartitions(final TopicName topicName, final Optional policies) { AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies); if (autoTopicCreationOverride != null) { - return autoTopicCreationOverride.getDefaultNumPartitions(); + return isSystemTopic(topicName) ? autoTopicCreationOverride.getSystemTopicDefaultNumPartitions() + : autoTopicCreationOverride.getDefaultNumPartitions(); } else { - return pulsar.getConfiguration().getDefaultNumPartitions(); + return isSystemTopic(topicName) ? pulsar.getConfiguration().getSystemTopicDefaultNumPartitions() + : pulsar.getConfiguration().getDefaultNumPartitions(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index e7bfa3278e36d..265d10300c48a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -80,6 +80,7 @@ protected void setup() throws Exception { conf.setAllowAutoTopicCreation(false); conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED); conf.setDefaultNumPartitions(PARTITIONS); + conf.setSystemTopicDefaultNumPartitions(PARTITIONS); conf.setManagedLedgerMaxEntriesPerLedger(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); conf.setTransactionCoordinatorEnabled(true); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java index 24e3a57d2a33c..42e44b24be59a 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java @@ -30,6 +30,8 @@ public interface AutoTopicCreationOverride { Integer getDefaultNumPartitions(); + Integer getSystemTopicDefaultNumPartitions(); + interface Builder { Builder allowAutoTopicCreation(boolean allowTopicCreation); @@ -37,6 +39,8 @@ interface Builder { Builder defaultNumPartitions(Integer defaultNumPartition); + Builder systemTopicDefaultNumPartitions(Integer systemTopicDefaultNumPartitions); + AutoTopicCreationOverride build(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java index 52cf1f1829b8d..ce6133dd05f25 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/AutoTopicCreationOverrideImpl.java @@ -35,6 +35,7 @@ public final class AutoTopicCreationOverrideImpl implements AutoTopicCreationOve private boolean allowAutoTopicCreation; private String topicType; private Integer defaultNumPartitions; + private Integer systemTopicDefaultNumPartitions; public static ValidateResult validateOverride(AutoTopicCreationOverride override) { if (override == null) { @@ -51,6 +52,14 @@ public static ValidateResult validateOverride(AutoTopicCreationOverride override if (override.getDefaultNumPartitions() <= 0) { return ValidateResult.fail("[defaultNumPartitions] cannot be less than 1 for partition type."); } + if (override.getSystemTopicDefaultNumPartitions() == null) { + return ValidateResult.fail( + "[systemTopicDefaultNumPartitions] cannot be null when the type is partitioned."); + } + if (override.getSystemTopicDefaultNumPartitions() <= 0) { + return ValidateResult.fail( + "[systemTopicDefaultNumPartitions] cannot be less than 1 for partition type."); + } } else if (TopicType.NON_PARTITIONED.toString().equals(override.getTopicType())) { if (override.getDefaultNumPartitions() != null) { return ValidateResult.fail("[defaultNumPartitions] is not allowed to be" @@ -69,6 +78,7 @@ public static class AutoTopicCreationOverrideImplBuilder implements AutoTopicCre private boolean allowAutoTopicCreation; private String topicType; private Integer defaultNumPartitions; + private Integer systemTopicDefaultNumPartitions; public AutoTopicCreationOverrideImplBuilder allowAutoTopicCreation(boolean allowAutoTopicCreation) { this.allowAutoTopicCreation = allowAutoTopicCreation; @@ -85,8 +95,15 @@ public AutoTopicCreationOverrideImplBuilder defaultNumPartitions(Integer default return this; } + public AutoTopicCreationOverrideImplBuilder systemTopicDefaultNumPartitions( + Integer systemTopicDefaultNumPartitions) { + this.systemTopicDefaultNumPartitions = systemTopicDefaultNumPartitions; + return this; + } + public AutoTopicCreationOverrideImpl build() { - return new AutoTopicCreationOverrideImpl(allowAutoTopicCreation, topicType, defaultNumPartitions); + return new AutoTopicCreationOverrideImpl(allowAutoTopicCreation, topicType, defaultNumPartitions, + systemTopicDefaultNumPartitions); } } }