From da3f425b4ccd1dc59e2bd4e863185b138edb6dfd Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 26 Dec 2024 12:02:02 +0800 Subject: [PATCH 1/6] [improve][pip] PIP-398: Subscription replication on the namespace and topic levels Signed-off-by: Zixuan Liu --- pip/pip-398.md | 237 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 pip/pip-398.md diff --git a/pip/pip-398.md b/pip/pip-398.md new file mode 100644 index 0000000000000..51866d1edf7dd --- /dev/null +++ b/pip/pip-398.md @@ -0,0 +1,237 @@ +# PIP-398: Subscription replication on the namespace and topic levels + +# Background knowledge + +https://github.com/apache/pulsar/pull/4299 introduces the subscription replication feature on the consumer level: + +```java +Consumer consumer = pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(false/*true*/) + .subscriptionName("sub").subscribe(); +``` + +While this provides flexibility, it introduces overhead in managing replication for a large number of consumers. Users +need to manually enable the `replicateSubscriptionState` flag for each consumer, which can become cumbersome in +large-scale deployments. + +# Motivation + +The key motivation behind this PIP is to simplify subscription replication configuration, especially in failover +scenarios. When a main cluster goes down and a backup cluster is activated, ensuring that subscription states are +consistently replicated across clusters is critical for failover scenarios. By extending the replication configuration +to the namespace and topic levels, the system reduces the need for explicit consumer-level configuration. + +# Goals + +## In Scope + +The PIP aims to provide management of subscription replication at the namespace and topic levels using the Pulsar Admin +CLI and API. + +# Out scope + +This feature depends on https://github.com/apache/pulsar/pull/23757, which makes the subscription replication nullable +on the consumer level. Otherwise, the namespace and topic levels will be ignored. + +# High Level Design + +Introduces a configuration used for enabling subscription replication on the namespace and topic levels, when enabled, +all consumers under the topic will automatically replicate their subscription states to remote clusters. + +The priority for the subscription replication configuration is as follows: + +- consumer level > topic level > namespace level. +- If `replicateSubscriptionState` is set at the consumer level, configurations at the topic and namespace levels are + ignored. +- If set at the topic level, the namespace-level configuration is ignored. + +# Detailed Design + +## Design & Implementation Details + +### Namespace level + +1. Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.common.policies.data.Policies` class + to control subscription replication at the namespace level: + ```java + public class Policies { + @SuppressWarnings("checkstyle:MemberName") + public Boolean replicate_subscription_state; + } + ``` +2. Add the management methods to the `org.apache.pulsar.client.admin.Namespaces` interface: + ```java + public interface Namespaces { + void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException; + CompletableFuture setReplicateSubscriptionStateAsync(String namespace, Boolean enabled); + Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException; + CompletableFuture getReplicateSubscriptionStateAsync(String namespace); + } + ``` +3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.NamespacesImpl` class. + +### Topic level + +1. Add the field `Boolean replicateSubscriptionState` to the `org.apache.pulsar.common.policies.data.TopicPolicies` + class to enable subscription replication at the topic level: + ```java + public class TopicPolicies { + public Boolean replicateSubscriptionState; + } + ``` +2. Add the management methods to the `org.apache.pulsar.client.admin.TopicPolicies` interface: + ```java + public interface TopicPolicies { + void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException; + CompletableFuture setReplicateSubscriptionStateAsync(String topic, Boolean enabled); + Boolean getReplicateSubscriptionState(String topic, boolean applied) throws PulsarAdminException; + CompletableFuture getReplicateSubscriptionStateAsync(String topic, boolean applied); + } + ``` +3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class. + +### Consumer level + +We currently support the `replicateSubscriptionState` flag at the consumer level, which can be set to `true`, `false`. +If `true`, we persist `pulsar.replicated.subscription=1L` to the cursor properties, and if `false`, we remove that. + +To keep the consumer-level configuration, we need to consider the `false` case, so propose using +`pulsar.replicated.subscription=0L` instead of removing the property. + +The consumer-level configuration takes precedence over both the topic and namespace levels, ensuring that +consumer-specific configurations can still override the more general settings. + +### Subscription replication applied + +When the subscription replication is changed on the namespace or topic level, the subscription replication will be +applied to all consumers under the topic. + +Please notice that the `org.apache.pulsar.client.admin.Topics.setReplicatedSubscriptionStatus()` can override the +subscription replication configuration for the special subscription, and equals the consumer level. + +## Public-facing Changes + +### Public API + +### Client API + +Change the set replicated subscription status method in the `org.apache.pulsar.client.admin.Topics` interface to accept the Boolean type: + +```java +void setReplicatedSubscriptionStatus(String topic, String subName, Boolean enabled) throws PulsarAdminException; +CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, Boolean enabled); +``` + +#### REST API + +##### Namespace level + +- `/{tenant}/{namespace}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the + namespace level. + - Method: `POST` + - Content-Type: `application/json` + - Body: + - true + - false + - null +- `GET /{tenant}/{namespace}/replicateSubscriptionState` to get subscription replication configuration on the namespace + level. + - Method: `GET` + - Response: + - true + - false + - null + +##### Topic level + +- `/{tenant}/{namespace}/{topic}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the + topic level. + - Method: `POST` + - Content-Type: `application/json` + - Body: + - true + - false + - null +- `/{tenant}/{namespace}/{topic}/replicateSubscriptionState` to get subscription replication configuration on the topic + level. + - Method: `GET` + - Parameters: + - `applied=true`: get the applied subscription replication configuration, if topic is not set, return the + namespace level configuration. + - `applied=false`: get the applied subscription replication configuration, if topic is set, return the topic + level configuration. + - Response: + - true + - false + - null +- `/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus`: Change the body type from boolean + to Boolean. + +### CLI + +#### Namespace level + +- `pulsar-admin namespaces set-replicate-subscription-state / --enabled true/false` to + enable/disable the subscription replication on the namespace level. +- `pulsar-admin namespaces get-replicate-subscription-state /` to get the subscription + replication configuration on the namespace level. +- `pulsar-admin namespaces remove-replicate-subscription-state /` to remove the subscription + replication configuration on the namespace level. + +#### Topic level + +- `pulsar-admin topicPolicies set-replicate-subscription-state // --enabled true/false` + to enable/disable the subscription replication on the topic level. +- `pulsar-admin topicPolicies get-replicate-subscription-state //` to get the + subscription replication configuration on the topic level. +- `pulsar-admin topicPolicies remove-replicate-subscription-state //` to remove the + subscription replication configuration on the topic level. +- `pulsar-admin topicPolicies remove-replicate-subscription-state //` to remove the + subscription replication configuration on the topic level. +- `pulsar-admin topics remove-replicated-subscription-status //` to remove the + subscription replication configuration on the consumer(subscription) level. + +# Security Considerations + +Both write and read operations require the necessary permissions, which already exist in Pulsar. + +## Namespace level + +- Write the subscription replication configuration: + - Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `WRITE` permission. +- Read the subscription replication configuration: + - Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `READ` permission. + +## Topic level + +- Write the subscription replication configuration + - Required: `TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS` permission. +- Read the subscription replication configuration: + - Required: `TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS` permission. + +# Backward & Forward Compatibility + +## Upgrade + +None. + +## Downgrade / Rollback + +If the broker is downgrade, users will need to ensure that the consumer-level configuration is re-enabled to maintain +the correct replication behavior. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +None. + +# Alternatives + +None. + +# General Notes + +None. + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/6tc51xwknypzl2q2d9rwr6z65ws5b0l0 +* Mailing List voting thread: From 8cba4bc99e927bc068479bd98320cb57bcd7630a Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 30 Dec 2024 12:23:08 +0800 Subject: [PATCH 2/6] Upgrade Compatibility Signed-off-by: Zixuan Liu --- pip/pip-398.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pip/pip-398.md b/pip/pip-398.md index 51866d1edf7dd..a56705779a544 100644 --- a/pip/pip-398.md +++ b/pip/pip-398.md @@ -212,12 +212,22 @@ Both write and read operations require the necessary permissions, which already ## Upgrade -None. +The existing subscription with `replicateSubscriptionSate=false`,the broker will use the policy from the topic or namespace level. + +New subscriptions will be followed as usual. ## Downgrade / Rollback -If the broker is downgrade, users will need to ensure that the consumer-level configuration is re-enabled to maintain -the correct replication behavior. +If the broker is downgrade, users will need to ensure that the subscription replication configuration is re-enabled to maintain +the correct replication behavior by the admin CLI or API: +- CLI: + ```shell + pulsar-admin topics set-replicated-subscription-status // -s -e + ``` +- API: + ```java + admin.topics().setReplicatedSubscriptionStatus(topic, subName, true); + ``` ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations From 5935c9ba18a8fc1178ccda600c688bbac2922406 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 15 Jan 2025 12:27:34 +0800 Subject: [PATCH 3/6] Add broker config and update compatibility --- pip/pip-398.md | 81 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 23 deletions(-) diff --git a/pip/pip-398.md b/pip/pip-398.md index a56705779a544..400221e56fdff 100644 --- a/pip/pip-398.md +++ b/pip/pip-398.md @@ -1,4 +1,4 @@ -# PIP-398: Subscription replication on the namespace and topic levels +# PIP-398: Subscription replication on the broker, namespace and topic levels # Background knowledge @@ -18,36 +18,53 @@ large-scale deployments. The key motivation behind this PIP is to simplify subscription replication configuration, especially in failover scenarios. When a main cluster goes down and a backup cluster is activated, ensuring that subscription states are consistently replicated across clusters is critical for failover scenarios. By extending the replication configuration -to the namespace and topic levels, the system reduces the need for explicit consumer-level configuration. +to the broker, namespace and topic levels, the system reduces the need for explicit consumer-level configuration. # Goals ## In Scope -The PIP aims to provide management of subscription replication at the namespace and topic levels using the Pulsar Admin -CLI and API. +The PIP aims to provide management of subscription replication at the broker, namespace and topic levels using the +Pulsar Admin CLI and API. # Out scope This feature depends on https://github.com/apache/pulsar/pull/23757, which makes the subscription replication nullable -on the consumer level. Otherwise, the namespace and topic levels will be ignored. +on the consumer level. Otherwise, the broker, namespace and topic levels will be ignored. # High Level Design -Introduces a configuration used for enabling subscription replication on the namespace and topic levels, when enabled, -all consumers under the topic will automatically replicate their subscription states to remote clusters. +Introduces a configuration used for enabling subscription replication on the broker, namespace and topic levels, when +enabled, all consumers under the topic will automatically replicate their subscription states to remote clusters. The priority for the subscription replication configuration is as follows: -- consumer level > topic level > namespace level. -- If `replicateSubscriptionState` is set at the consumer level, configurations at the topic and namespace levels are +- consumer level > topic level > namespace level > broker level. +- If `replicateSubscriptionState` is set at the consumer level, configurations at the topic, namespace, and broker levels are ignored. - If set at the topic level, the namespace-level configuration is ignored. +- If set at the namespace level, the broker-level configuration is ignored. # Detailed Design ## Design & Implementation Details +### Broker level + +Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.broker.ServiceConfiguration` class +to control subscription replication at the broker level: +```java +public class ServiceConfiguration implements PulsarConfiguration { + @FieldContext( + category = CATEGORY_SERVER, + required = false, + dynamic = true, + doc = "The default value for replicating subscription state." + ) + private Boolean replicate_subscriptions_state; +} +``` + ### Namespace level 1. Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.common.policies.data.Policies` class @@ -91,19 +108,20 @@ The priority for the subscription replication configuration is as follows: ### Consumer level -We currently support the `replicateSubscriptionState` flag at the consumer level, which can be set to `true`, `false`. -If `true`, we persist `pulsar.replicated.subscription=1L` to the cursor properties, and if `false`, we remove that. +We currently support the `replicateSubscriptionState` flag for new subscription at the consumer level, which can be set +to `true` or `false`. If `true`, we persist `pulsar.replicated.subscription=1L` to the cursor properties, and if +`false`, we remove that. To keep the consumer-level configuration, we need to consider the `false` case, so propose using `pulsar.replicated.subscription=0L` instead of removing the property. -The consumer-level configuration takes precedence over both the topic and namespace levels, ensuring that +The consumer-level configuration takes precedence over both the topic, broker and namespace levels, ensuring that consumer-specific configurations can still override the more general settings. ### Subscription replication applied -When the subscription replication is changed on the namespace or topic level, the subscription replication will be -applied to all consumers under the topic. +When the subscription replication is changed on the broker, namespace or topic level, the subscription replication will +be applied to all consumers under the topic. Please notice that the `org.apache.pulsar.client.admin.Topics.setReplicatedSubscriptionStatus()` can override the subscription replication configuration for the special subscription, and equals the consumer level. @@ -114,7 +132,7 @@ subscription replication configuration for the special subscription, and equals ### Client API -Change the set replicated subscription status method in the `org.apache.pulsar.client.admin.Topics` interface to accept the Boolean type: +Change the set replicated subscription status method in the `org.apache.pulsar.client.admin.Topics` interface to accept the `Boolean` type: ```java void setReplicatedSubscriptionStatus(String topic, String subName, Boolean enabled) throws PulsarAdminException; @@ -212,21 +230,38 @@ Both write and read operations require the necessary permissions, which already ## Upgrade -The existing subscription with `replicateSubscriptionSate=false`,the broker will use the policy from the topic or namespace level. - -New subscriptions will be followed as usual. +Because the consumer's `replicateSubscriptionSate=false` does not persist to the cursor properties, it leads to +compatibility issues: + +- When the existing subscription with `replicateSubscriptionSate=false` on the consumer level, which uses the policy + from the topic, namespace, or broker level. If you want to use the initial configuration, please use the pulsar-admin + API orCLI: + - CLI + ```shell + bin/pulsar-admin topics set-replicated-subscription-status // -s --disable + ``` + - API + ```java + admin.topics().setReplicatedSubscriptionStatus(topic, subName, false); + ``` ## Downgrade / Rollback -If the broker is downgrade, users will need to ensure that the subscription replication configuration is re-enabled to maintain -the correct replication behavior by the admin CLI or API: -- CLI: +If the broker is downgrade, users need to ensure that the subscription replication configuration is reset to maintain +the correct replication behavior by the pulsar-admin CLI or API: +- CLI ```shell - pulsar-admin topics set-replicated-subscription-status // -s -e + # enable + pulsar-admin topics set-replicated-subscription-status // -s --enable + # disable + pulsar-admin topics set-replicated-subscription-status // -s --disable ``` -- API: +- API ```java + // enable admin.topics().setReplicatedSubscriptionStatus(topic, subName, true); + // disable + admin.topics().setReplicatedSubscriptionStatus(topic, subName, false); ``` ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations From b335e5b7f116c8b0c2176f18ccf61726c2af540e Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 18 Feb 2025 18:46:41 +0800 Subject: [PATCH 4/6] Compatible with existing designs as much as possible Signed-off-by: Zixuan Liu --- pip/pip-398.md | 61 ++++++++++---------------------------------------- 1 file changed, 12 insertions(+), 49 deletions(-) diff --git a/pip/pip-398.md b/pip/pip-398.md index 400221e56fdff..59c9c1b1a4adb 100644 --- a/pip/pip-398.md +++ b/pip/pip-398.md @@ -27,15 +27,11 @@ to the broker, namespace and topic levels, the system reduces the need for expli The PIP aims to provide management of subscription replication at the broker, namespace and topic levels using the Pulsar Admin CLI and API. -# Out scope - -This feature depends on https://github.com/apache/pulsar/pull/23757, which makes the subscription replication nullable -on the consumer level. Otherwise, the broker, namespace and topic levels will be ignored. - # High Level Design Introduces a configuration used for enabling subscription replication on the broker, namespace and topic levels, when -enabled, all consumers under the topic will automatically replicate their subscription states to remote clusters. +enabled, all consumers under the broker/namespace/topic will automatically replicate their subscription states to remote +clusters. The priority for the subscription replication configuration is as follows: @@ -108,39 +104,25 @@ public class ServiceConfiguration implements PulsarConfiguration { ### Consumer level -We currently support the `replicateSubscriptionState` flag for new subscription at the consumer level, which can be set -to `true` or `false`. If `true`, we persist `pulsar.replicated.subscription=1L` to the cursor properties, and if -`false`, we remove that. - -To keep the consumer-level configuration, we need to consider the `false` case, so propose using -`pulsar.replicated.subscription=0L` instead of removing the property. - -The consumer-level configuration takes precedence over both the topic, broker and namespace levels, ensuring that -consumer-specific configurations can still override the more general settings. +No changes. When the consumer with `replicateSubscriptionState=true`, the old/new subscription will be snapshot. If `false`, no operation will +be performed. ### Subscription replication applied When the subscription replication is changed on the broker, namespace or topic level, the subscription replication will -be applied to all consumers under the topic. +be applied to all consumers under the broker/namespace/topic. -Please notice that the `org.apache.pulsar.client.admin.Topics.setReplicatedSubscriptionStatus()` can override the -subscription replication configuration for the special subscription, and equals the consumer level. +We exclusively consider the `true` case for replication. If the consumer level is set to `false`, the +replication configuration defined at the topic, namespace, or broker level will be applied to the subscription. + +There is a special case here, if the user intentionally sets `false` at the consumer level, and then set `true` at the +topic/ns/broker level, this may disrupt the user's behavior. This way, we can minimize changes to the Pulsar public API +as much as possible. ## Public-facing Changes ### Public API -### Client API - -Change the set replicated subscription status method in the `org.apache.pulsar.client.admin.Topics` interface to accept the `Boolean` type: - -```java -void setReplicatedSubscriptionStatus(String topic, String subName, Boolean enabled) throws PulsarAdminException; -CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, Boolean enabled); -``` - -#### REST API - ##### Namespace level - `/{tenant}/{namespace}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the @@ -181,8 +163,6 @@ CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, Strin - true - false - null -- `/{tenant}/{namespace}/{topic}/subscription/{subName}/replicatedSubscriptionStatus`: Change the body type from boolean - to Boolean. ### CLI @@ -203,10 +183,6 @@ CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, Strin subscription replication configuration on the topic level. - `pulsar-admin topicPolicies remove-replicate-subscription-state //` to remove the subscription replication configuration on the topic level. -- `pulsar-admin topicPolicies remove-replicate-subscription-state //` to remove the - subscription replication configuration on the topic level. -- `pulsar-admin topics remove-replicated-subscription-status //` to remove the - subscription replication configuration on the consumer(subscription) level. # Security Considerations @@ -230,20 +206,7 @@ Both write and read operations require the necessary permissions, which already ## Upgrade -Because the consumer's `replicateSubscriptionSate=false` does not persist to the cursor properties, it leads to -compatibility issues: - -- When the existing subscription with `replicateSubscriptionSate=false` on the consumer level, which uses the policy - from the topic, namespace, or broker level. If you want to use the initial configuration, please use the pulsar-admin - API orCLI: - - CLI - ```shell - bin/pulsar-admin topics set-replicated-subscription-status // -s --disable - ``` - - API - ```java - admin.topics().setReplicatedSubscriptionStatus(topic, subName, false); - ``` +None. ## Downgrade / Rollback From 377007d75412fd1b5f3e76249bfeeddb067c51a0 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Tue, 18 Feb 2025 19:54:28 +0800 Subject: [PATCH 5/6] Update replicateSubscriptionState Signed-off-by: Zixuan Liu --- pip/pip-398.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pip/pip-398.md b/pip/pip-398.md index 59c9c1b1a4adb..01b944a8a870e 100644 --- a/pip/pip-398.md +++ b/pip/pip-398.md @@ -29,9 +29,9 @@ Pulsar Admin CLI and API. # High Level Design -Introduces a configuration used for enabling subscription replication on the broker, namespace and topic levels, when -enabled, all consumers under the broker/namespace/topic will automatically replicate their subscription states to remote -clusters. +Introduces the `replicateSubscriptionState` configuration to enabling subscription replication on the broker, namespace +and topic levels, when enabled, all consumers under the broker/namespace/topic will automatically replicate their +subscription states to remote clusters. The priority for the subscription replication configuration is as follows: From 022a975e277a0395497c406bec2664702b86b859 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Sat, 22 Feb 2025 16:28:52 +0800 Subject: [PATCH 6/6] Update subscription replication applied Signed-off-by: Zixuan Liu --- pip/pip-398.md | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/pip/pip-398.md b/pip/pip-398.md index 01b944a8a870e..b915175d7c87c 100644 --- a/pip/pip-398.md +++ b/pip/pip-398.md @@ -112,12 +112,20 @@ be performed. When the subscription replication is changed on the broker, namespace or topic level, the subscription replication will be applied to all consumers under the broker/namespace/topic. -We exclusively consider the `true` case for replication. If the consumer level is set to `false`, the -replication configuration defined at the topic, namespace, or broker level will be applied to the subscription. - -There is a special case here, if the user intentionally sets `false` at the consumer level, and then set `true` at the -topic/ns/broker level, this may disrupt the user's behavior. This way, we can minimize changes to the Pulsar public API -as much as possible. +Since the consumer enables the subscription replication when the `replicateSubscriptionState` is `true` in the consumer +level. Prioritizing `true` over `false` ensures consistency, this approach should be applied to topic, namespace, and +broker as +well. By doing so, we can avoid unnecessary back-and-forth state changes and maintain smoother transitions: + +1. If the consumer level is present and its value is `true`, enable subscription replication. +2. If the consumer level is `false`, and the topic level is present with its value set to `true`, enable subscription + replication. +3. If the consumer level is `false`, the topic level is not set, but the namespace level is present and its value is + `true`, enable subscription replication. +4. If the consumer level is `false`, and both the topic and namespace levels are not set, but the broker level is + present with its value set to `true`, enable subscription replication. + +The administrator is permitted to enable the subscription replication and subsequently disable it. ## Public-facing Changes