Skip to content

Commit 7159543

Browse files
shibdnodece
authored andcommitted
[fix][broker] Pattern subscription doesn't work when the pattern excludes the topic domain. (apache#24072)
(cherry picked from commit 3bae1d1)
1 parent ccff29e commit 7159543

File tree

6 files changed

+45
-7
lines changed

6 files changed

+45
-7
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3022,7 +3022,8 @@ protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTop
30223022
final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
30233023

30243024
Pattern topicsPattern = Pattern.compile(commandWatchTopicList.hasTopicsPattern()
3025-
? commandWatchTopicList.getTopicsPattern() : TopicList.ALL_TOPICS_PATTERN);
3025+
? TopicList.removeTopicDomainScheme(commandWatchTopicList.getTopicsPattern())
3026+
: TopicList.ALL_TOPICS_PATTERN);
30263027
String topicsHash = commandWatchTopicList.hasTopicsHash()
30273028
? commandWatchTopicList.getTopicsHash() : null;
30283029

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public List<String> getMatchingTopics() {
7070
*/
7171
@Override
7272
public void accept(String topicName, NotificationType notificationType) {
73-
if (topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches()) {
73+
String partitionedTopicName = TopicName.get(topicName).getPartitionedTopicName();
74+
if (topicsPattern.matcher(TopicList.removeTopicDomainScheme(partitionedTopicName)).matches()) {
7475
List<String> newTopics;
7576
List<String> deletedTopics;
7677
if (notificationType == NotificationType.Deleted) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class TopicListWatcherTest {
4040
);
4141

4242
private static final long ID = 7;
43-
private static final Pattern PATTERN = Pattern.compile("persistent://tenant/ns/topic\\d+");
43+
private static final Pattern PATTERN = Pattern.compile("tenant/ns/topic\\d+");
4444

4545

4646
private TopicListService topicListService;

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,40 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchi
683683
}
684684
}
685685

686+
@Test(timeOut = testTimeout)
687+
public void testSubscribePatterWithOutTopicDomain() throws Exception {
688+
final String key = "testSubscribePatterWithOutTopicDomain";
689+
final String subscriptionName = "my-ex-subscription-" + key;
690+
final Pattern pattern = Pattern.compile("my-property/my-ns/test-pattern.*");
691+
692+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
693+
.topicsPattern(pattern)
694+
.subscriptionName(subscriptionName)
695+
.subscriptionType(SubscriptionType.Shared)
696+
.receiverQueueSize(4)
697+
.subscribe();
698+
699+
// 0. Need make sure topic watcher started
700+
waitForTopicListWatcherStarted(consumer);
701+
702+
// 1. create partition topic
703+
String topicName = "persistent://my-property/my-ns/test-pattern" + key;
704+
admin.topics().createPartitionedTopic(topicName, 4);
705+
706+
// 2. verify broker will push the changes to update(CommandWatchTopicUpdate).
707+
assertSame(pattern.pattern(), ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern().pattern());
708+
Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
709+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4);
710+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4);
711+
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
712+
});
713+
714+
// cleanup.
715+
consumer.close();
716+
admin.topics().deletePartitionedTopic(topicName);
717+
pulsarClient.close();
718+
}
719+
686720
@DataProvider(name= "regexpConsumerArgs")
687721
public Object[][] regexpConsumerArgs(){
688722
return new Object[][]{

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
128128
/**
129129
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
130130
*
131+
* <p>Will ignore the topic domain("persistent://" or "non-persistent://") when pattern matching.
132+
*
131133
* <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the
132134
* pattern.
133135
*
@@ -143,7 +145,9 @@ public interface ConsumerBuilder<T> extends Cloneable {
143145
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
144146
*
145147
* <p>It accepts a regular expression that is compiled into a pattern internally. E.g.,
146-
* "persistent://public/default/pattern-topic-.*"
148+
* "persistent://public/default/pattern-topic-.*" or "public/default/pattern-topic-.*"
149+
*
150+
* <p>Will ignore the topic domain("persistent://" or "non-persistent://") when pattern matching.
147151
*
148152
* <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the
149153
* pattern.

pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.pulsar.common.topics;
2020

21-
import com.google.common.annotations.VisibleForTesting;
2221
import com.google.common.hash.Hashing;
2322
import java.nio.charset.StandardCharsets;
2423
import java.util.Collection;
@@ -84,8 +83,7 @@ public static Set<String> minus(Collection<String> list1, Collection<String> lis
8483
return s1;
8584
}
8685

87-
@VisibleForTesting
88-
static String removeTopicDomainScheme(String originalRegexp) {
86+
public static String removeTopicDomainScheme(String originalRegexp) {
8987
if (!originalRegexp.toString().contains(SCHEME_SEPARATOR)) {
9088
return originalRegexp;
9189
}

0 commit comments

Comments
 (0)