#4299 introduces the subscription replication feature on the consumer level:
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("topic").replicateSubscriptionState(false/*true*/)
.subscriptionName("sub").subscribe();
While this provides flexibility, it introduces overhead in managing replication for a large number of consumers. Users
need to manually enable the replicateSubscriptionState
flag for each consumer, which can become cumbersome in
large-scale deployments.
The key motivation behind this PIP is to simplify subscription replication configuration, especially in failover scenarios. When a main cluster goes down and a backup cluster is activated, ensuring that subscription states are consistently replicated across clusters is critical for failover scenarios. By extending the replication configuration to the broker, namespace and topic levels, the system reduces the need for explicit consumer-level configuration.
The PIP aims to provide management of subscription replication at the broker, namespace and topic levels using the Pulsar Admin CLI and API.
Introduces the replicateSubscriptionState
configuration to enabling subscription replication on the broker, namespace
and topic levels, when enabled, all consumers under the broker/namespace/topic will automatically replicate their
subscription states to remote clusters.
The priority for the subscription replication configuration is as follows:
- consumer level > topic level > namespace level > broker level.
- If
replicateSubscriptionState
is set at the consumer level, configurations at the topic, namespace, and broker levels are ignored. - If set at the topic level, the namespace-level configuration is ignored.
- If set at the namespace level, the broker-level configuration is ignored.
Add the field Boolean replicate_subscriptions_state
to the org.apache.pulsar.broker.ServiceConfiguration
class
to control subscription replication at the broker level:
public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
required = false,
dynamic = true,
doc = "The default value for replicating subscription state."
)
private Boolean replicate_subscriptions_state;
}
- Add the field
Boolean replicate_subscriptions_state
to theorg.apache.pulsar.common.policies.data.Policies
class to control subscription replication at the namespace level:public class Policies { @SuppressWarnings("checkstyle:MemberName") public Boolean replicate_subscription_state; }
- Add the management methods to the
org.apache.pulsar.client.admin.Namespaces
interface:public interface Namespaces { void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException; CompletableFuture<Void> setReplicateSubscriptionStateAsync(String namespace, Boolean enabled); Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException; CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String namespace); }
- Implement the management methods in the
org.apache.pulsar.client.admin.internal.NamespacesImpl
class.
- Add the field
Boolean replicateSubscriptionState
to theorg.apache.pulsar.common.policies.data.TopicPolicies
class to enable subscription replication at the topic level:public class TopicPolicies { public Boolean replicateSubscriptionState; }
- Add the management methods to the
org.apache.pulsar.client.admin.TopicPolicies
interface:public interface TopicPolicies { void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException; CompletableFuture<Void> setReplicateSubscriptionStateAsync(String topic, Boolean enabled); Boolean getReplicateSubscriptionState(String topic, boolean applied) throws PulsarAdminException; CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String topic, boolean applied); }
- Implement the management methods in the
org.apache.pulsar.client.admin.internal.TopicPoliciesImpl
class.
No changes. When the consumer with replicateSubscriptionState=true
, the old/new subscription will be snapshot. If false
, no operation will
be performed.
When the subscription replication is changed on the broker, namespace or topic level, the subscription replication will be applied to all consumers under the broker/namespace/topic.
We exclusively consider the true
case for replication. If the consumer level is set to false
, the
replication configuration defined at the topic, namespace, or broker level will be applied to the subscription.
There is a special case here, if the user intentionally sets false
at the consumer level, and then set true
at the
topic/ns/broker level, this may disrupt the user's behavior. This way, we can minimize changes to the Pulsar public API
as much as possible.
/{tenant}/{namespace}/replicateSubscriptionState
: enable/disable/remove the subscription replication on the namespace level.- Method:
POST
- Content-Type:
application/json
- Body:
- true
- false
- null
- Method:
GET /{tenant}/{namespace}/replicateSubscriptionState
to get subscription replication configuration on the namespace level.- Method:
GET
- Response:
- true
- false
- null
- Method:
/{tenant}/{namespace}/{topic}/replicateSubscriptionState
: enable/disable/remove the subscription replication on the topic level.- Method:
POST
- Content-Type:
application/json
- Body:
- true
- false
- null
- Method:
/{tenant}/{namespace}/{topic}/replicateSubscriptionState
to get subscription replication configuration on the topic level.- Method:
GET
- Parameters:
applied=true
: get the applied subscription replication configuration, if topic is not set, return the namespace level configuration.applied=false
: get the applied subscription replication configuration, if topic is set, return the topic level configuration.
- Response:
- true
- false
- null
- Method:
pulsar-admin namespaces set-replicate-subscription-state <tenant>/<namespace> --enabled true/false
to enable/disable the subscription replication on the namespace level.pulsar-admin namespaces get-replicate-subscription-state <tenant>/<namespace>
to get the subscription replication configuration on the namespace level.pulsar-admin namespaces remove-replicate-subscription-state <tenant>/<namespace>
to remove the subscription replication configuration on the namespace level.
pulsar-admin topicPolicies set-replicate-subscription-state <tenant>/<namespace>/<topic> --enabled true/false
to enable/disable the subscription replication on the topic level.pulsar-admin topicPolicies get-replicate-subscription-state <tenant>/<namespace>/<topic>
to get the subscription replication configuration on the topic level.pulsar-admin topicPolicies remove-replicate-subscription-state <tenant>/<namespace>/<topic>
to remove the subscription replication configuration on the topic level.
Both write and read operations require the necessary permissions, which already exist in Pulsar.
- Write the subscription replication configuration:
- Required:
PolicyName.REPLICATED_SUBSCRIPTION
withWRITE
permission.
- Required:
- Read the subscription replication configuration:
- Required:
PolicyName.REPLICATED_SUBSCRIPTION
withREAD
permission.
- Required:
- Write the subscription replication configuration
- Required:
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS
permission.
- Required:
- Read the subscription replication configuration:
- Required:
TopicOperation.GET_REPLICATED_SUBSCRIPTION_STATUS
permission.
- Required:
None.
If the broker is downgrade, users need to ensure that the subscription replication configuration is reset to maintain the correct replication behavior by the pulsar-admin CLI or API:
- CLI
# enable pulsar-admin topics set-replicated-subscription-status <tenant>/<namespace>/<topic> -s <subName> --enable # disable pulsar-admin topics set-replicated-subscription-status <tenant>/<namespace>/<topic> -s <subName> --disable
- API
// enable admin.topics().setReplicatedSubscriptionStatus(topic, subName, true); // disable admin.topics().setReplicatedSubscriptionStatus(topic, subName, false);
None.
None.
None.
- Mailing List discussion thread: https://lists.apache.org/thread/6tc51xwknypzl2q2d9rwr6z65ws5b0l0
- Mailing List voting thread: