Skip to content

Commit 2badcf6

Browse files
authored
[fix][broker] Fix NPE causing dispatching to stop when using Key_Shared mode and allowOutOfOrderDelivery=true (apache#22533)
1 parent 7aedb6b commit 2badcf6

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

+10
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ private int getAvailablePermits(Consumer c) {
457457

458458
@Override
459459
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
460+
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
461+
// So skip this filter out.
462+
if (isAllowOutOfOrderDelivery()) {
463+
return src;
464+
}
460465
if (src.isEmpty()) {
461466
return src;
462467
}
@@ -501,6 +506,11 @@ protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarde
501506
*/
502507
@Override
503508
protected boolean hasConsumersNeededNormalRead() {
509+
// The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()",
510+
// So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here.
511+
if (isAllowOutOfOrderDelivery()) {
512+
return true;
513+
}
504514
for (Consumer consumer : consumerList) {
505515
if (consumer == null || consumer.isBlocked()) {
506516
continue;

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -1741,6 +1741,14 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
17411741
admin.topics().delete(topic, false);
17421742
}
17431743

1744+
@DataProvider(name = "allowKeySharedOutOfOrder")
1745+
public Object[][] allowKeySharedOutOfOrder() {
1746+
return new Object[][]{
1747+
{true},
1748+
{false}
1749+
};
1750+
}
1751+
17441752
/**
17451753
* This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
17461754
* 1. Start 3 consumers:
@@ -1755,8 +1763,8 @@ public void testNoRepeatedReadAndDiscard() throws Exception {
17551763
* - no repeated Read-and-discard.
17561764
* - at last, all messages will be received.
17571765
*/
1758-
@Test(timeOut = 180 * 1000) // the test will be finished in 60s.
1759-
public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
1766+
@Test(timeOut = 180 * 1000, dataProvider = "allowKeySharedOutOfOrder") // the test will be finished in 60s.
1767+
public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedOutOfOrder) throws Exception {
17601768
final int messagesSentPerTime = 100;
17611769
final Set<Integer> totalReceivedMessages = new TreeSet<>();
17621770
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
@@ -1775,6 +1783,8 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
17751783
log.info("Published message :{}", messageId);
17761784
}
17771785

1786+
KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange()
1787+
.setAllowOutOfOrderDelivery(allowKeySharedOutOfOrder);
17781788
// 1. Start 3 consumers and make ack holes.
17791789
// - one consumer will be closed and trigger a messages redeliver.
17801790
// - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
@@ -1785,18 +1795,21 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
17851795
.subscriptionName(subName)
17861796
.receiverQueueSize(10)
17871797
.subscriptionType(SubscriptionType.Key_Shared)
1798+
.keySharedPolicy(keySharedPolicy)
17881799
.subscribe();
17891800
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
17901801
.topic(topic)
17911802
.subscriptionName(subName)
17921803
.receiverQueueSize(10)
17931804
.subscriptionType(SubscriptionType.Key_Shared)
1805+
.keySharedPolicy(keySharedPolicy)
17941806
.subscribe();
17951807
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
17961808
.topic(topic)
17971809
.subscriptionName(subName)
17981810
.receiverQueueSize(10)
17991811
.subscriptionType(SubscriptionType.Key_Shared)
1812+
.keySharedPolicy(keySharedPolicy)
18001813
.subscribe();
18011814
List<Message> msgList1 = new ArrayList<>();
18021815
List<Message> msgList2 = new ArrayList<>();
@@ -1845,6 +1858,7 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
18451858
.subscriptionName(subName)
18461859
.receiverQueueSize(1000)
18471860
.subscriptionType(SubscriptionType.Key_Shared)
1861+
.keySharedPolicy(keySharedPolicy)
18481862
.subscribe();
18491863
consumerWillBeClose.close();
18501864

0 commit comments

Comments
 (0)