Skip to content

[improve][pip] PIP-405: Enhanced Dynamic Handling of Selective Consumers in MultiTopicConsumers #23895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions pip/pip-405.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# PIP-405: Enhanced Dynamic Handling of Selective Consumers in MultiTopicConsumers

# Background knowledge

In Pulsar, consumers can subscribe to multiple topics by pattern or by passing a list of topics.
However, currently, if you need to modify this pattern, it often requires shutting down the existing consumers and creating new consumers with the new pattern.
This is because in `MultiTopicConsumer`, each operation is applied to all consumers.
We hope to dynamically add or remove certain topics without affecting other subscribed topics.

# Motivation

We want a way to unsubscribe or close from certain consumers and add new ones without restarting the entire consumer.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here might not only be "close" or "unsubscribe," "subscribe." It seems that "pause" and "resume" can also be added? Everyone take a look at this section?


Additionally, these changes should remain optional for users who do not need them.

This helps developers experiment with dynamic subscription capabilities without cluttering the core consumer interface.

# Goals

## In Scope

- Introduce a low-level extension interface to manage dynamic subscription operations on multi-topic consumers.
- Provide optional methods to add or remove topic subscriptions at runtime.

## Out of Scope

# High Level Design

The design focuses on providing an extension interface, tentatively named `SelectiveConsumerHandler`, that exposes methods like `closeTopicConsumer(Set<String> topic)` and `addTopicConsumer(Set<String> topic)`.
Instead of forcing all consumers to implement these methods, we define a generic `getExtension(Class<T> extensionInterface)` API.
If the consumer supports the interface, an `Optional` containing the handle is returned; otherwise, it returns `Optional.empty()`.

# Detailed Design

## Design & Implementation Details

1. **Extension Retrieval**
A generic method is introduced in the consumer:
```java
public <T> Optional<T> getExtension(Class<T> extensionInterface) {
// Implementation
}
```

- If the consumer supports the requested extension, an instance is returned.
- If not, Optional.empty() is returned.

2. **Extension Interface**
Define an interface named SelectiveConsumerHandler with two methods:
```java
public interface SelectiveConsumerHandler {

/**
* Closes the underlying consumer for the specified topics without affecting the consumers of other topics.
* If a topic does not exist in the current consumer's subscription list, it will be ignored.
*
* @param topicNames A collection of topic names to close the consumers for.
* These should include any necessary partition suffixes if applicable.
*/
void closeTopicConsumer(Collection<String> topicNames);

/**
* Registers and starts a consumer for each of the topics specified if they are not already being consumed.
* This method facilitates dynamic addition of topic subscriptions to an existing consumer, expanding
* its topic set without needing to recreate or restart the entire consumer.
*
* @param topicNames A collection of topic names for which to start consumers.
* Each topic name should include the necessary partition suffix if the topics are partitioned.
*/
void addTopicConsumer(Collection<String> topicNames);
}
```

- closeTopicConsumer allows closing consumers dynamically.
- addTopicConsumer allows creating a new consumer for a topic at runtime.

3. **Implementation**
- In `MultiTopicsConsumerImpl` or `PatternMultiTopicsConsumerImpl`, implement getExtension to return a `SelectiveConsumerHandler`.
- Other consumer implementations can simply return Optional.empty() if they do not support dynamic subscription changes.
- Extract TopicsChangedListener and PatternTopicsChangedListener from PatternMultiTopicsConsumerImpl, rename PatternTopicsChangedListener to MultiTopicsChangedListener, and place it in MultiTopicsConsumerImpl.
- MultiTopicsConsumerImpl and PatternMultiTopicsConsumerImpl implement the SelectiveConsumerHandler interface and call the TopicsChangedListener's onTopicsRemoved and onTopicsAdded methods.

## Public-facing Changes

### Public API

A new method in `Consumer<T>.

```java
<V> Optional<V> getExtension(Class<V> extensionInterface);
```

A new optional interface `SelectiveConsumerHandler` is introduced.

```java
public interface SelectiveConsumerHandler {

/**
* Closes the underlying consumer for the specified topics without affecting the consumers of other topics.
* If a topic does not exist in the current consumer's subscription list, it will be ignored.
*
* @param topicNames A collection of topic names to close the consumers for.
* These should include any necessary partition suffixes if applicable.
*/
void closeTopicConsumer(Collection<String> topicNames);

/**
* Registers and starts a consumer for each of the topics specified if they are not already being consumed.
* This method facilitates dynamic addition of topic subscriptions to an existing consumer, expanding
* its topic set without needing to recreate or restart the entire consumer.
*
* @param topicNames A collection of topic names for which to start consumers.
* Each topic name should include the necessary partition suffix if the topics are partitioned.
*/
void addTopicConsumer(Collection<String> topicNames);
}
```

Developers may call:

```java
Set<String> blockTopics = new HashSet<>();
blockTopics.add(baseTopicName + "-2");
blockTopics.add(baseTopicName + "-3");

Consumer<?> consumer = ...;
Optional<SelectiveConsumerHandler> extensionHandler = consumer.getExtension(SelectiveConsumerHandler.class);

extensionHandler.ifPresent(handler -> {
handler.addTopicConsumer(blockTopics);
handler.closeTopicConsumer(blockTopics);
});
```

### Binary protocol

### Configuration

### CLI

### Metrics

# Monitoring

# Security Considerations

# Backward & Forward Compatibility

## Upgrade

## Downgrade / Rollback

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

# Alternatives

- Implementing this functionality for non multi-topic consumers.
- Changing existing consumer interfaces in an incompatible way.

# General Notes

# Links

* Mailing List discussion thread:
* Mailing List voting thread:
Loading