In Pulsar, consumers can subscribe to multiple topics by pattern or by passing a list of topics.
However, currently, if you need to modify this pattern, it often requires shutting down the existing consumers and creating new consumers with the new pattern.
This is because in MultiTopicConsumer
, each operation is applied to all consumers.
We hope to dynamically add or remove certain topics without affecting other subscribed topics.
We want a way to unsubscribe or close from certain consumers and add new ones without restarting the entire consumer.
Additionally, these changes should remain optional for users who do not need them.
This helps developers experiment with dynamic subscription capabilities without cluttering the core consumer interface.
- Introduce a low-level extension interface to manage dynamic subscription operations on multi-topic consumers.
- Provide optional methods to add or remove topic subscriptions at runtime.
The design focuses on providing an extension interface, tentatively named SelectiveConsumerHandler
, that exposes methods like closeTopicConsumer(Set<String> topic)
and addTopicConsumer(Set<String> topic)
.
Instead of forcing all consumers to implement these methods, we define a generic getExtension(Class<T> extensionInterface)
API.
If the consumer supports the interface, an Optional
containing the handle is returned; otherwise, it returns Optional.empty()
.
- Extension Retrieval
A generic method is introduced in the consumer:
public <T> Optional<T> getExtension(Class<T> extensionInterface) {
// Implementation
}
- If the consumer supports the requested extension, an instance is returned.
- If not, Optional.empty() is returned.
- Extension Interface Define an interface named SelectiveConsumerHandler with two methods:
public interface SelectiveConsumerHandler {
/**
* Closes the underlying consumer for the specified topics without affecting the consumers of other topics.
* If a topic does not exist in the current consumer's subscription list, it will be ignored.
*
* @param topicNames A collection of topic names to close the consumers for.
* These should include any necessary partition suffixes if applicable.
*/
void closeTopicConsumer(Collection<String> topicNames);
/**
* Registers and starts a consumer for each of the topics specified if they are not already being consumed.
* This method facilitates dynamic addition of topic subscriptions to an existing consumer, expanding
* its topic set without needing to recreate or restart the entire consumer.
*
* @param topicNames A collection of topic names for which to start consumers.
* Each topic name should include the necessary partition suffix if the topics are partitioned.
*/
void addTopicConsumer(Collection<String> topicNames);
}
- closeTopicConsumer allows closing consumers dynamically.
- addTopicConsumer allows creating a new consumer for a topic at runtime.
- Implementation
- In
MultiTopicsConsumerImpl
orPatternMultiTopicsConsumerImpl
, implement getExtension to return aSelectiveConsumerHandler
. - Other consumer implementations can simply return Optional.empty() if they do not support dynamic subscription changes.
- Extract TopicsChangedListener and PatternTopicsChangedListener from PatternMultiTopicsConsumerImpl, rename PatternTopicsChangedListener to MultiTopicsChangedListener, and place it in MultiTopicsConsumerImpl.
- MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl implement the SelectiveConsumerHandler interface and call the TopicsChangedListener's onTopicsRemoved and onTopicsAdded methods.
A new method in `Consumer.
<V> Optional<V> getExtension(Class<V> extensionInterface);
A new optional interface SelectiveConsumerHandler
is introduced.
public interface SelectiveConsumerHandler {
/**
* Closes the underlying consumer for the specified topics without affecting the consumers of other topics.
* If a topic does not exist in the current consumer's subscription list, it will be ignored.
*
* @param topicNames A collection of topic names to close the consumers for.
* These should include any necessary partition suffixes if applicable.
*/
void closeTopicConsumer(Collection<String> topicNames);
/**
* Registers and starts a consumer for each of the topics specified if they are not already being consumed.
* This method facilitates dynamic addition of topic subscriptions to an existing consumer, expanding
* its topic set without needing to recreate or restart the entire consumer.
*
* @param topicNames A collection of topic names for which to start consumers.
* Each topic name should include the necessary partition suffix if the topics are partitioned.
*/
void addTopicConsumer(Collection<String> topicNames);
}
Developers may call:
Set<String> blockTopics = new HashSet<>();
blockTopics.add(baseTopicName + "-2");
blockTopics.add(baseTopicName + "-3");
Consumer<?> consumer = ...;
Optional<SelectiveConsumerHandler> extensionHandler = consumer.getExtension(SelectiveConsumerHandler.class);
extensionHandler.ifPresent(handler -> {
handler.addTopicConsumer(blockTopics);
handler.closeTopicConsumer(blockTopics);
});
- Implementing this functionality for non multi-topic consumers.
- Changing existing consumer interfaces in an incompatible way.
- Mailing List discussion thread:
- Mailing List voting thread: