-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[improve][pip] PIP-398: Subscription replication on the broker, namespace and topic levels #23770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nodece
wants to merge
6
commits into
apache:master
Choose a base branch
from
nodece:subscription-replication-on-namespace-topic
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
da3f425
[improve][pip] PIP-398: Subscription replication on the namespace and…
nodece 8cba4bc
Upgrade Compatibility
nodece 5935c9b
Add broker config and update compatibility
nodece b335e5b
Compatible with existing designs as much as possible
nodece 377007d
Update replicateSubscriptionState
nodece 022a975
Update subscription replication applied
nodece File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
# PIP-398: Subscription replication on the broker, namespace and topic levels | ||
|
||
# Background knowledge | ||
|
||
https://github.com/apache/pulsar/pull/4299 introduces the subscription replication feature on the consumer level: | ||
|
||
```java | ||
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(false/*true*/) | ||
.subscriptionName("sub").subscribe(); | ||
``` | ||
|
||
While this provides flexibility, it introduces overhead in managing replication for a large number of consumers. Users | ||
need to manually enable the `replicateSubscriptionState` flag for each consumer, which can become cumbersome in | ||
large-scale deployments. | ||
|
||
# Motivation | ||
|
||
The key motivation behind this PIP is to simplify subscription replication configuration, especially in failover | ||
scenarios. When a main cluster goes down and a backup cluster is activated, ensuring that subscription states are | ||
consistently replicated across clusters is critical for failover scenarios. By extending the replication configuration | ||
to the broker, namespace and topic levels, the system reduces the need for explicit consumer-level configuration. | ||
|
||
# Goals | ||
|
||
## In Scope | ||
|
||
The PIP aims to provide management of subscription replication at the broker, namespace and topic levels using the | ||
Pulsar Admin CLI and API. | ||
|
||
# High Level Design | ||
|
||
Introduces the `replicateSubscriptionState` configuration to enabling subscription replication on the broker, namespace | ||
and topic levels, when enabled, all consumers under the broker/namespace/topic will automatically replicate their | ||
subscription states to remote clusters. | ||
|
||
The priority for the subscription replication configuration is as follows: | ||
|
||
- consumer level > topic level > namespace level > broker level. | ||
- If `replicateSubscriptionState` is set at the consumer level, configurations at the topic, namespace, and broker levels are | ||
ignored. | ||
- If set at the topic level, the namespace-level configuration is ignored. | ||
- If set at the namespace level, the broker-level configuration is ignored. | ||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
|
||
### Broker level | ||
|
||
Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.broker.ServiceConfiguration` class | ||
to control subscription replication at the broker level: | ||
```java | ||
public class ServiceConfiguration implements PulsarConfiguration { | ||
@FieldContext( | ||
category = CATEGORY_SERVER, | ||
required = false, | ||
dynamic = true, | ||
doc = "The default value for replicating subscription state." | ||
) | ||
private Boolean replicate_subscriptions_state; | ||
} | ||
``` | ||
|
||
### Namespace level | ||
|
||
1. Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.common.policies.data.Policies` class | ||
to control subscription replication at the namespace level: | ||
```java | ||
public class Policies { | ||
@SuppressWarnings("checkstyle:MemberName") | ||
public Boolean replicate_subscription_state; | ||
} | ||
``` | ||
2. Add the management methods to the `org.apache.pulsar.client.admin.Namespaces` interface: | ||
```java | ||
public interface Namespaces { | ||
void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException; | ||
CompletableFuture<Void> setReplicateSubscriptionStateAsync(String namespace, Boolean enabled); | ||
Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException; | ||
CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String namespace); | ||
} | ||
``` | ||
3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.NamespacesImpl` class. | ||
|
||
### Topic level | ||
|
||
1. Add the field `Boolean replicateSubscriptionState` to the `org.apache.pulsar.common.policies.data.TopicPolicies` | ||
class to enable subscription replication at the topic level: | ||
```java | ||
public class TopicPolicies { | ||
public Boolean replicateSubscriptionState; | ||
} | ||
``` | ||
2. Add the management methods to the `org.apache.pulsar.client.admin.TopicPolicies` interface: | ||
```java | ||
public interface TopicPolicies { | ||
void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException; | ||
CompletableFuture<Void> setReplicateSubscriptionStateAsync(String topic, Boolean enabled); | ||
Boolean getReplicateSubscriptionState(String topic, boolean applied) throws PulsarAdminException; | ||
CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String topic, boolean applied); | ||
} | ||
``` | ||
3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class. | ||
|
||
### Consumer level | ||
|
||
No changes. When the consumer with `replicateSubscriptionState=true`, the old/new subscription will be snapshot. If `false`, no operation will | ||
be performed. | ||
|
||
### Subscription replication applied | ||
|
||
When the subscription replication is changed on the broker, namespace or topic level, the subscription replication will | ||
be applied to all consumers under the broker/namespace/topic. | ||
|
||
Since the consumer enables the subscription replication when the `replicateSubscriptionState` is `true` in the consumer | ||
level. Prioritizing `true` over `false` ensures consistency, this approach should be applied to topic, namespace, and | ||
broker as | ||
well. By doing so, we can avoid unnecessary back-and-forth state changes and maintain smoother transitions: | ||
|
||
1. If the consumer level is present and its value is `true`, enable subscription replication. | ||
2. If the consumer level is `false`, and the topic level is present with its value set to `true`, enable subscription | ||
replication. | ||
3. If the consumer level is `false`, the topic level is not set, but the namespace level is present and its value is | ||
`true`, enable subscription replication. | ||
4. If the consumer level is `false`, and both the topic and namespace levels are not set, but the broker level is | ||
present with its value set to `true`, enable subscription replication. | ||
|
||
The administrator is permitted to enable the subscription replication and subsequently disable it. | ||
|
||
## Public-facing Changes | ||
|
||
### Public API | ||
|
||
##### Namespace level | ||
|
||
- `/{tenant}/{namespace}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the | ||
namespace level. | ||
- Method: `POST` | ||
- Content-Type: `application/json` | ||
- Body: | ||
- true | ||
- false | ||
- null | ||
- `GET /{tenant}/{namespace}/replicateSubscriptionState` to get subscription replication configuration on the namespace | ||
level. | ||
- Method: `GET` | ||
- Response: | ||
- true | ||
- false | ||
- null | ||
nodece marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
##### Topic level | ||
|
||
- `/{tenant}/{namespace}/{topic}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the | ||
topic level. | ||
- Method: `POST` | ||
- Content-Type: `application/json` | ||
- Body: | ||
- true | ||
- false | ||
- null | ||
- `/{tenant}/{namespace}/{topic}/replicateSubscriptionState` to get subscription replication configuration on the topic | ||
level. | ||
- Method: `GET` | ||
- Parameters: | ||
- `applied=true`: get the applied subscription replication configuration, if topic is not set, return the | ||
namespace level configuration. | ||
- `applied=false`: get the applied subscription replication configuration, if topic is set, return the topic | ||
level configuration. | ||
- Response: | ||
- true | ||
- false | ||
- null | ||
nodece marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
### CLI | ||
|
||
#### Namespace level | ||
|
||
- `pulsar-admin namespaces set-replicate-subscription-state <tenant>/<namespace> --enabled true/false` to | ||
enable/disable the subscription replication on the namespace level. | ||
- `pulsar-admin namespaces get-replicate-subscription-state <tenant>/<namespace>` to get the subscription | ||
replication configuration on the namespace level. | ||
- `pulsar-admin namespaces remove-replicate-subscription-state <tenant>/<namespace>` to remove the subscription | ||
replication configuration on the namespace level. | ||
|
||
#### Topic level | ||
|
||
- `pulsar-admin topicPolicies set-replicate-subscription-state <tenant>/<namespace>/<topic> --enabled true/false` | ||
to enable/disable the subscription replication on the topic level. | ||
- `pulsar-admin topicPolicies get-replicate-subscription-state <tenant>/<namespace>/<topic>` to get the | ||
subscription replication configuration on the topic level. | ||
- `pulsar-admin topicPolicies remove-replicate-subscription-state <tenant>/<namespace>/<topic>` to remove the | ||
subscription replication configuration on the topic level. | ||
|
||
# Security Considerations | ||
|
||
Both write and read operations require the necessary permissions, which already exist in Pulsar. | ||
|
||
## Namespace level | ||
|
||
- Write the subscription replication configuration: | ||
- Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `WRITE` permission. | ||
- Read the subscription replication configuration: | ||
- Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `READ` permission. | ||
|
||
## Topic level | ||
|
||
- Write the subscription replication configuration | ||
- Required: `TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS` permission. | ||
- Read the subscription replication configuration: | ||
- Required: `TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS` permission. | ||
|
||
# Backward & Forward Compatibility | ||
|
||
## Upgrade | ||
|
||
None. | ||
|
||
## Downgrade / Rollback | ||
|
||
If the broker is downgrade, users need to ensure that the subscription replication configuration is reset to maintain | ||
the correct replication behavior by the pulsar-admin CLI or API: | ||
- CLI | ||
```shell | ||
# enable | ||
pulsar-admin topics set-replicated-subscription-status <tenant>/<namespace>/<topic> -s <subName> --enable | ||
# disable | ||
pulsar-admin topics set-replicated-subscription-status <tenant>/<namespace>/<topic> -s <subName> --disable | ||
``` | ||
- API | ||
```java | ||
// enable | ||
admin.topics().setReplicatedSubscriptionStatus(topic, subName, true); | ||
// disable | ||
admin.topics().setReplicatedSubscriptionStatus(topic, subName, false); | ||
``` | ||
|
||
## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations | ||
|
||
None. | ||
|
||
# Alternatives | ||
|
||
None. | ||
|
||
# General Notes | ||
|
||
None. | ||
|
||
# Links | ||
|
||
* Mailing List discussion thread: https://lists.apache.org/thread/6tc51xwknypzl2q2d9rwr6z65ws5b0l0 | ||
* Mailing List voting thread: |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there could be a need to override any value set on the consumer side. I also added some comments that controlling the behavior at namespace and consumer level should perhaps be a policy concern instead of creating a separate concept for handling the setting.
If the setting at namespace and topic level would be more than just a true/false. It would be possible to add a separate setting that would allow overriding the consumer level setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also possible.
Yes. This config is present in the broker/namespace policies/topic policies/cursor property.
I don't want to introduce more config, so when the consume level is false, the broker/namespace policies/topic policies can override the consume level setting.