Skip to content

[improve][pip] PIP-398: Subscription replication on the broker, namespace and topic levels #23770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions pip/pip-398.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# PIP-398: Subscription replication on the broker, namespace and topic levels

# Background knowledge

https://github.com/apache/pulsar/pull/4299 introduces the subscription replication feature on the consumer level:

```java
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(false/*true*/)
.subscriptionName("sub").subscribe();
```

While this provides flexibility, it introduces overhead in managing replication for a large number of consumers. Users
need to manually enable the `replicateSubscriptionState` flag for each consumer, which can become cumbersome in
large-scale deployments.

# Motivation

The key motivation behind this PIP is to simplify subscription replication configuration, especially in failover
scenarios. When a main cluster goes down and a backup cluster is activated, ensuring that subscription states are
consistently replicated across clusters is critical for failover scenarios. By extending the replication configuration
to the broker, namespace and topic levels, the system reduces the need for explicit consumer-level configuration.

# Goals

## In Scope

The PIP aims to provide management of subscription replication at the broker, namespace and topic levels using the
Pulsar Admin CLI and API.

# High Level Design

Introduces the `replicateSubscriptionState` configuration to enabling subscription replication on the broker, namespace
and topic levels, when enabled, all consumers under the broker/namespace/topic will automatically replicate their
subscription states to remote clusters.

The priority for the subscription replication configuration is as follows:

- consumer level > topic level > namespace level > broker level.
- If `replicateSubscriptionState` is set at the consumer level, configurations at the topic, namespace, and broker levels are
ignored.
- If set at the topic level, the namespace-level configuration is ignored.
- If set at the namespace level, the broker-level configuration is ignored.
Comment on lines +39 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there could be a need to override any value set on the consumer side. I also added some comments that controlling the behavior at namespace and consumer level should perhaps be a policy concern instead of creating a separate concept for handling the setting.

If the setting at namespace and topic level would be more than just a true/false. It would be possible to add a separate setting that would allow overriding the consumer level setting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there could be a need to override any value set on the consumer side.

It's also possible.

I also added some comments that controlling the behavior at namespace and consumer level should perhaps be a policy concern instead of creating a separate concept for handling the setting.

Yes. This config is present in the broker/namespace policies/topic policies/cursor property.

If the setting at namespace and topic level would be more than just a true/false. It would be possible to add a separate setting that would allow overriding the consumer level setting.

I don't want to introduce more config, so when the consume level is false, the broker/namespace policies/topic policies can override the consume level setting.


# Detailed Design

## Design & Implementation Details

### Broker level

Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.broker.ServiceConfiguration` class
to control subscription replication at the broker level:
```java
public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
required = false,
dynamic = true,
doc = "The default value for replicating subscription state."
)
private Boolean replicate_subscriptions_state;
}
```

### Namespace level

1. Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.common.policies.data.Policies` class
to control subscription replication at the namespace level:
```java
public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public Boolean replicate_subscription_state;
}
```
2. Add the management methods to the `org.apache.pulsar.client.admin.Namespaces` interface:
```java
public interface Namespaces {
void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException;
CompletableFuture<Void> setReplicateSubscriptionStateAsync(String namespace, Boolean enabled);
Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException;
CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String namespace);
}
```
3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.NamespacesImpl` class.

### Topic level

1. Add the field `Boolean replicateSubscriptionState` to the `org.apache.pulsar.common.policies.data.TopicPolicies`
class to enable subscription replication at the topic level:
```java
public class TopicPolicies {
public Boolean replicateSubscriptionState;
}
```
2. Add the management methods to the `org.apache.pulsar.client.admin.TopicPolicies` interface:
```java
public interface TopicPolicies {
void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException;
CompletableFuture<Void> setReplicateSubscriptionStateAsync(String topic, Boolean enabled);
Boolean getReplicateSubscriptionState(String topic, boolean applied) throws PulsarAdminException;
CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String topic, boolean applied);
}
```
3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.TopicPoliciesImpl` class.

### Consumer level

No changes. When the consumer with `replicateSubscriptionState=true`, the old/new subscription will be snapshot. If `false`, no operation will
be performed.

### Subscription replication applied

When the subscription replication is changed on the broker, namespace or topic level, the subscription replication will
be applied to all consumers under the broker/namespace/topic.

We exclusively consider the `true` case for replication. If the consumer level is set to `false`, the
replication configuration defined at the topic, namespace, or broker level will be applied to the subscription.

There is a special case here, if the user intentionally sets `false` at the consumer level, and then set `true` at the
topic/ns/broker level, this may disrupt the user's behavior. This way, we can minimize changes to the Pulsar public API
as much as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to explain what "disrupt the user's behavior" means. Perhaps rewording with the help of Generative AI could be useful since "disruption" is typically used in a different context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user expected to disable subscription replication, however, found that the feature had been enabled.


## Public-facing Changes

### Public API

##### Namespace level

- `/{tenant}/{namespace}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the
namespace level.
- Method: `POST`
- Content-Type: `application/json`
- Body:
- true
- false
- null
- `GET /{tenant}/{namespace}/replicateSubscriptionState` to get subscription replication configuration on the namespace
level.
- Method: `GET`
- Response:
- true
- false
- null

##### Topic level

- `/{tenant}/{namespace}/{topic}/replicateSubscriptionState`: enable/disable/remove the subscription replication on the
topic level.
- Method: `POST`
- Content-Type: `application/json`
- Body:
- true
- false
- null
- `/{tenant}/{namespace}/{topic}/replicateSubscriptionState` to get subscription replication configuration on the topic
level.
- Method: `GET`
- Parameters:
- `applied=true`: get the applied subscription replication configuration, if topic is not set, return the
namespace level configuration.
- `applied=false`: get the applied subscription replication configuration, if topic is set, return the topic
level configuration.
- Response:
- true
- false
- null

### CLI

#### Namespace level

- `pulsar-admin namespaces set-replicate-subscription-state <tenant>/<namespace> --enabled true/false` to
enable/disable the subscription replication on the namespace level.
- `pulsar-admin namespaces get-replicate-subscription-state <tenant>/<namespace>` to get the subscription
replication configuration on the namespace level.
- `pulsar-admin namespaces remove-replicate-subscription-state <tenant>/<namespace>` to remove the subscription
replication configuration on the namespace level.

#### Topic level

- `pulsar-admin topicPolicies set-replicate-subscription-state <tenant>/<namespace>/<topic> --enabled true/false`
to enable/disable the subscription replication on the topic level.
- `pulsar-admin topicPolicies get-replicate-subscription-state <tenant>/<namespace>/<topic>` to get the
subscription replication configuration on the topic level.
- `pulsar-admin topicPolicies remove-replicate-subscription-state <tenant>/<namespace>/<topic>` to remove the
subscription replication configuration on the topic level.

# Security Considerations

Both write and read operations require the necessary permissions, which already exist in Pulsar.

## Namespace level

- Write the subscription replication configuration:
- Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `WRITE` permission.
- Read the subscription replication configuration:
- Required: `PolicyName.REPLICATED_SUBSCRIPTION` with `READ` permission.

## Topic level

- Write the subscription replication configuration
- Required: `TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS` permission.
- Read the subscription replication configuration:
- Required: `TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS` permission.

# Backward & Forward Compatibility

## Upgrade

None.

## Downgrade / Rollback

If the broker is downgrade, users need to ensure that the subscription replication configuration is reset to maintain
the correct replication behavior by the pulsar-admin CLI or API:
- CLI
```shell
# enable
pulsar-admin topics set-replicated-subscription-status <tenant>/<namespace>/<topic> -s <subName> --enable
# disable
pulsar-admin topics set-replicated-subscription-status <tenant>/<namespace>/<topic> -s <subName> --disable
```
- API
```java
// enable
admin.topics().setReplicatedSubscriptionStatus(topic, subName, true);
// disable
admin.topics().setReplicatedSubscriptionStatus(topic, subName, false);
```

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

None.

# Alternatives

None.

# General Notes

None.

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/6tc51xwknypzl2q2d9rwr6z65ws5b0l0
* Mailing List voting thread: