diff --git a/pip/pip-405.md b/pip/pip-405.md new file mode 100644 index 0000000000000..d01ee3e3c7f90 --- /dev/null +++ b/pip/pip-405.md @@ -0,0 +1,165 @@ +# PIP-405: Enhanced Dynamic Handling of Selective Consumers in MultiTopicConsumers + +# Background knowledge + +In Pulsar, consumers can subscribe to multiple topics by pattern or by passing a list of topics. +However, currently, if you need to modify this pattern, it often requires shutting down the existing consumers and creating new consumers with the new pattern. +This is because in `MultiTopicConsumer`, each operation is applied to all consumers. +We hope to dynamically add or remove certain topics without affecting other subscribed topics. + +# Motivation + +We want a way to unsubscribe or close from certain consumers and add new ones without restarting the entire consumer. + +Additionally, these changes should remain optional for users who do not need them. + +This helps developers experiment with dynamic subscription capabilities without cluttering the core consumer interface. + +# Goals + +## In Scope + +- Introduce a low-level extension interface to manage dynamic subscription operations on multi-topic consumers. +- Provide optional methods to add or remove topic subscriptions at runtime. + +## Out of Scope + +# High Level Design + +The design focuses on providing an extension interface, tentatively named `SelectiveConsumerHandler`, that exposes methods like `closeTopicConsumer(Set topic)` and `addTopicConsumer(Set topic)`. +Instead of forcing all consumers to implement these methods, we define a generic `getExtension(Class extensionInterface)` API. +If the consumer supports the interface, an `Optional` containing the handle is returned; otherwise, it returns `Optional.empty()`. + +# Detailed Design + +## Design & Implementation Details + +1. **Extension Retrieval** + A generic method is introduced in the consumer: +```java + public Optional getExtension(Class extensionInterface) { + // Implementation + } +``` + +- If the consumer supports the requested extension, an instance is returned. +- If not, Optional.empty() is returned. + +2. **Extension Interface** + Define an interface named SelectiveConsumerHandler with two methods: +```java +public interface SelectiveConsumerHandler { + + /** + * Closes the underlying consumer for the specified topics without affecting the consumers of other topics. + * If a topic does not exist in the current consumer's subscription list, it will be ignored. + * + * @param topicNames A collection of topic names to close the consumers for. + * These should include any necessary partition suffixes if applicable. + */ + void closeTopicConsumer(Collection topicNames); + + /** + * Registers and starts a consumer for each of the topics specified if they are not already being consumed. + * This method facilitates dynamic addition of topic subscriptions to an existing consumer, expanding + * its topic set without needing to recreate or restart the entire consumer. + * + * @param topicNames A collection of topic names for which to start consumers. + * Each topic name should include the necessary partition suffix if the topics are partitioned. + */ + void addTopicConsumer(Collection topicNames); +} +``` + +- closeTopicConsumer allows closing consumers dynamically. +- addTopicConsumer allows creating a new consumer for a topic at runtime. + +3. **Implementation** +- In `MultiTopicsConsumerImpl` or `PatternMultiTopicsConsumerImpl`, implement getExtension to return a `SelectiveConsumerHandler`. +- Other consumer implementations can simply return Optional.empty() if they do not support dynamic subscription changes. +- Extract TopicsChangedListener and PatternTopicsChangedListener from PatternMultiTopicsConsumerImpl, rename PatternTopicsChangedListener to MultiTopicsChangedListener, and place it in MultiTopicsConsumerImpl. +- MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl implement the SelectiveConsumerHandler interface and call the TopicsChangedListener's onTopicsRemoved and onTopicsAdded methods. + +## Public-facing Changes + +### Public API + +A new method in `Consumer. + +```java + Optional getExtension(Class extensionInterface); +``` + +A new optional interface `SelectiveConsumerHandler` is introduced. + +```java +public interface SelectiveConsumerHandler { + + /** + * Closes the underlying consumer for the specified topics without affecting the consumers of other topics. + * If a topic does not exist in the current consumer's subscription list, it will be ignored. + * + * @param topicNames A collection of topic names to close the consumers for. + * These should include any necessary partition suffixes if applicable. + */ + void closeTopicConsumer(Collection topicNames); + + /** + * Registers and starts a consumer for each of the topics specified if they are not already being consumed. + * This method facilitates dynamic addition of topic subscriptions to an existing consumer, expanding + * its topic set without needing to recreate or restart the entire consumer. + * + * @param topicNames A collection of topic names for which to start consumers. + * Each topic name should include the necessary partition suffix if the topics are partitioned. + */ + void addTopicConsumer(Collection topicNames); +} +``` + +Developers may call: + +```java +Set blockTopics = new HashSet<>(); +blockTopics.add(baseTopicName + "-2"); +blockTopics.add(baseTopicName + "-3"); + +Consumer consumer = ...; +Optional extensionHandler = consumer.getExtension(SelectiveConsumerHandler.class); + +extensionHandler.ifPresent(handler -> { + handler.addTopicConsumer(blockTopics); + handler.closeTopicConsumer(blockTopics); +}); +``` + +### Binary protocol + +### Configuration + +### CLI + +### Metrics + +# Monitoring + +# Security Considerations + +# Backward & Forward Compatibility + +## Upgrade + +## Downgrade / Rollback + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +# Alternatives + +- Implementing this functionality for non multi-topic consumers. +- Changing existing consumer interfaces in an incompatible way. + +# General Notes + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: