Skip to content

Commit 676fdb1

Browse files
authored
[improve][broker] PIP-379: Enable the use of the classic implementation of Key_Shared / Shared with feature flag (#23424)
1 parent 5aadec0 commit 676fdb1

File tree

45 files changed

+3211
-263
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3211
-263
lines changed

Diff for: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+16
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,22 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
817817
+ "The higher the number, the more equal the assignment of keys to consumers")
818818
private int subscriptionKeySharedConsistentHashingReplicaPoints = 100;
819819

820+
@FieldContext(
821+
category = CATEGORY_POLICIES,
822+
doc = "For persistent Key_Shared subscriptions, enables the use of the classic implementation of the "
823+
+ "Key_Shared subscription that was used before Pulsar 4.0.0 and PIP-379.",
824+
dynamic = true
825+
)
826+
private boolean subscriptionKeySharedUseClassicPersistentImplementation = false;
827+
828+
@FieldContext(
829+
category = CATEGORY_POLICIES,
830+
doc = "For persistent Shared subscriptions, enables the use of the classic implementation of the Shared "
831+
+ "subscription that was used before Pulsar 4.0.0.",
832+
dynamic = true
833+
)
834+
private boolean subscriptionSharedUseClassicPersistentImplementation = false;
835+
820836
@FieldContext(
821837
category = CATEGORY_POLICIES,
822838
doc = "Set the default behavior for message deduplication in the broker.\n\n"

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424
import java.time.Clock;
2525
import java.util.concurrent.TimeUnit;
2626
import lombok.extern.slf4j.Slf4j;
27-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
27+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
2828

2929
@Slf4j
3030
public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {
3131

32-
protected final PersistentDispatcherMultipleConsumers dispatcher;
32+
protected final AbstractPersistentDispatcherMultipleConsumers dispatcher;
3333

3434
// Reference to the shared (per-broker) timer for delayed delivery
3535
protected final Timer timer;
@@ -49,13 +49,13 @@ public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryT
4949

5050
private final boolean isDelayedDeliveryDeliverAtTimeStrict;
5151

52-
public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
52+
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
5353
long tickTimeMillis,
5454
boolean isDelayedDeliveryDeliverAtTimeStrict) {
5555
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
5656
}
5757

58-
public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
58+
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
5959
long tickTimeMillis, Clock clock,
6060
boolean isDelayedDeliveryDeliverAtTimeStrict) {
6161
this.dispatcher = dispatcher;

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
3737
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
3838
import org.apache.pulsar.broker.service.BrokerService;
39-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
39+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
4040
import org.apache.pulsar.common.util.FutureUtil;
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
@@ -78,7 +78,7 @@ public void initialize(PulsarService pulsarService) throws Exception {
7878
}
7979

8080
@Override
81-
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
81+
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
8282
String topicName = dispatcher.getTopic().getName();
8383
String subscriptionName = dispatcher.getSubscription().getName();
8484
BrokerService brokerService = dispatcher.getTopic().getBrokerService();
@@ -97,7 +97,7 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
9797
}
9898

9999
@VisibleForTesting
100-
BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
100+
BucketDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher)
101101
throws RecoverDelayedDeliveryTrackerException {
102102
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
103103
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import com.google.common.annotations.Beta;
2222
import org.apache.pulsar.broker.PulsarService;
23-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
23+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
2424

2525
/**
2626
* Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations.
@@ -42,7 +42,7 @@ public interface DelayedDeliveryTrackerFactory extends AutoCloseable {
4242
* @param dispatcher
4343
* a multi-consumer dispatcher instance
4444
*/
45-
DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher);
45+
DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);
4646

4747
/**
4848
* Close the factory and release all the resources.

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import lombok.extern.slf4j.Slf4j;
2929
import org.apache.bookkeeper.mledger.Position;
3030
import org.apache.bookkeeper.mledger.PositionFactory;
31-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
31+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
3232
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
3333

3434
@Slf4j
@@ -52,17 +52,18 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
5252
// Track whether we have seen all messages with fixed delay so far.
5353
private boolean messagesHaveFixedDelay = true;
5454

55-
InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
55+
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
56+
long tickTimeMillis,
5657
boolean isDelayedDeliveryDeliverAtTimeStrict,
5758
long fixedDelayDetectionLookahead) {
5859
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
5960
fixedDelayDetectionLookahead);
6061
}
6162

62-
public InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
63-
long tickTimeMillis, Clock clock,
64-
boolean isDelayedDeliveryDeliverAtTimeStrict,
65-
long fixedDelayDetectionLookahead) {
63+
public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
64+
long tickTimeMillis, Clock clock,
65+
boolean isDelayedDeliveryDeliverAtTimeStrict,
66+
long fixedDelayDetectionLookahead) {
6667
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
6768
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
6869
}

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.concurrent.TimeUnit;
2626
import org.apache.pulsar.broker.PulsarService;
2727
import org.apache.pulsar.broker.ServiceConfiguration;
28-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
28+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

@@ -51,7 +51,7 @@ public void initialize(PulsarService pulsarService) {
5151
}
5252

5353
@Override
54-
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
54+
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
5555
String topicName = dispatcher.getTopic().getName();
5656
String subscriptionName = dispatcher.getSubscription().getName();
5757
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
@@ -66,7 +66,7 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
6666
}
6767

6868
@VisibleForTesting
69-
InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
69+
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
7070
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
7171
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
7272
}

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
5858
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
5959
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
60-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
60+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
6161
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
6262
import org.apache.pulsar.common.util.FutureUtil;
6363
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@@ -105,7 +105,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
105105

106106
private CompletableFuture<Void> pendingLoad = null;
107107

108-
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
108+
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
109109
Timer timer, long tickTimeMillis,
110110
boolean isDelayedDeliveryDeliverAtTimeStrict,
111111
BucketSnapshotStorage bucketSnapshotStorage,
@@ -117,7 +117,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
117117
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
118118
}
119119

120-
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
120+
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
121121
Timer timer, long tickTimeMillis, Clock clock,
122122
boolean isDelayedDeliveryDeliverAtTimeStrict,
123123
BucketSnapshotStorage bucketSnapshotStorage,

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java

-6
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import java.util.concurrent.ThreadLocalRandom;
2525
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2626
import org.apache.pulsar.broker.ServiceConfiguration;
27-
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
2827
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3128

3229
/**
3330
*
@@ -239,7 +236,4 @@ private int getFirstConsumerIndexOfPriority(int targetPriority) {
239236
return -1;
240237
}
241238

242-
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
243-
244-
245239
}

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@
129129
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
130130
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
131131
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
132+
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
132133
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
133-
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
134134
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
135135
import org.apache.pulsar.broker.service.persistent.SystemTopic;
136136
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
@@ -301,7 +301,7 @@ public class BrokerService implements Closeable {
301301
private final int maxUnackedMessages;
302302
public final int maxUnackedMsgsPerDispatcher;
303303
private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
304-
private final Set<PersistentDispatcherMultipleConsumers> blockedDispatchers = ConcurrentHashMap.newKeySet();
304+
private final Set<AbstractPersistentDispatcherMultipleConsumers> blockedDispatchers = ConcurrentHashMap.newKeySet();
305305
private final ReadWriteLock lock = new ReentrantReadWriteLock();
306306
@VisibleForTesting
307307
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
@@ -3328,7 +3328,7 @@ public OrderedExecutor getTopicOrderedExecutor() {
33283328
* @param dispatcher
33293329
* @param numberOfMessages
33303330
*/
3331-
public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
3331+
public void addUnAckedMessages(AbstractPersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
33323332
// don't block dispatchers if maxUnackedMessages = 0
33333333
if (maxUnackedMessages > 0) {
33343334
totalUnackedMessages.add(numberOfMessages);
@@ -3387,10 +3387,10 @@ private void blockDispatchersWithLargeUnAckMessages() {
33873387
try {
33883388
forEachTopic(topic -> {
33893389
topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
3390-
if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
3391-
PersistentDispatcherMultipleConsumers dispatcher =
3392-
(PersistentDispatcherMultipleConsumers) persistentSubscription
3393-
.getDispatcher();
3390+
if (persistentSubscription.getDispatcher()
3391+
instanceof AbstractPersistentDispatcherMultipleConsumers) {
3392+
AbstractPersistentDispatcherMultipleConsumers dispatcher =
3393+
(AbstractPersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher();
33943394
int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
33953395
if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
33963396
log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
@@ -3411,7 +3411,7 @@ private void blockDispatchersWithLargeUnAckMessages() {
34113411
*
34123412
* @param dispatcherList
34133413
*/
3414-
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
3414+
public void unblockDispatchersOnUnAckMessages(List<AbstractPersistentDispatcherMultipleConsumers> dispatcherList) {
34153415
lock.writeLock().lock();
34163416
try {
34173417
dispatcherList.forEach(dispatcher -> {

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public class Consumer {
146146

147147
private static final double avgPercent = 0.9;
148148
private boolean preciseDispatcherFlowControl;
149-
private Position lastSentPositionWhenJoining;
149+
private Position readPositionWhenJoining;
150150
private final String clientAddress; // IP address only, no port number included
151151
private final MessageId startMessageId;
152152
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
@@ -973,8 +973,8 @@ public ConsumerStatsImpl getStats() {
973973
stats.unackedMessages = unackedMessages;
974974
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
975975
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
976-
if (lastSentPositionWhenJoining != null) {
977-
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
976+
if (readPositionWhenJoining != null) {
977+
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
978978
}
979979
return stats;
980980
}
@@ -1189,8 +1189,8 @@ public boolean isPreciseDispatcherFlowControl() {
11891189
return preciseDispatcherFlowControl;
11901190
}
11911191

1192-
public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
1193-
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
1192+
public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
1193+
this.readPositionWhenJoining = readPositionWhenJoining;
11941194
}
11951195

11961196
public int getMaxUnackedMessages() {

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){}
153153

154154
/**
155155
* Trigger a new "readMoreEntries" if the dispatching has been paused before. This method is only implemented in
156-
* {@link org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers} right now, other
157-
* implements are not necessary to implement this method.
156+
* {@link org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers} right now,
157+
* other implementations do not necessary implement this method.
158158
* @return did a resume.
159159
*/
160160
default boolean checkAndResumeIfPaused(){
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import org.apache.bookkeeper.mledger.Position;
25+
import org.apache.pulsar.client.api.Range;
26+
import org.apache.pulsar.common.api.proto.KeySharedMeta;
27+
import org.apache.pulsar.common.api.proto.KeySharedMode;
28+
29+
public interface StickyKeyDispatcher extends Dispatcher {
30+
31+
boolean hasSameKeySharedPolicy(KeySharedMeta ksm);
32+
33+
Map<Consumer, List<Range>> getConsumerKeyHashRanges();
34+
35+
boolean isAllowOutOfOrderDelivery();
36+
37+
KeySharedMode getKeySharedMode();
38+
39+
StickyKeyConsumerSelector getSelector();
40+
41+
long getNumberOfMessagesInReplay();
42+
43+
default LinkedHashMap<Consumer, Position> getRecentlyJoinedConsumers() {
44+
return null;
45+
}
46+
47+
boolean isClassic();
48+
}

0 commit comments

Comments
 (0)