Skip to content

Commit 11a615e

Browse files
authored
[fix][broker] Apply dispatcherMaxReadSizeBytes also for replay reads for Shared and Key_Shared subscriptions (#23894)
1 parent 2a9d4ac commit 11a615e

File tree

3 files changed

+157
-4
lines changed

3 files changed

+157
-4
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ public synchronized void readMoreEntries() {
384384
}
385385

386386
Set<Position> messagesToReplayNow =
387-
canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet();
387+
canReplayMessages() ? getMessagesToReplayNow(messagesToRead, bytesToRead) : Collections.emptySet();
388388
if (!messagesToReplayNow.isEmpty()) {
389389
if (log.isDebugEnabled()) {
390390
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
@@ -1343,15 +1343,20 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
13431343
}
13441344
}
13451345

1346-
protected synchronized NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead) {
1346+
protected synchronized NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead, long bytesToRead) {
1347+
int cappedMaxMessagesToRead = cursor.applyMaxSizeCap(maxMessagesToRead, bytesToRead);
1348+
if (cappedMaxMessagesToRead < maxMessagesToRead && log.isDebugEnabled()) {
1349+
log.debug("[{}] Capped max messages to read from redelivery list to {} (max was {})",
1350+
name, cappedMaxMessagesToRead, maxMessagesToRead);
1351+
}
13471352
if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) {
13481353
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
13491354
NavigableSet<Position> messagesAvailableNow =
1350-
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
1355+
delayedDeliveryTracker.get().getScheduledMessages(cappedMaxMessagesToRead);
13511356
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
13521357
}
13531358
if (!redeliveryMessages.isEmpty()) {
1354-
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead, createFilterForReplay());
1359+
return redeliveryMessages.getMessagesToReplayNow(cappedMaxMessagesToRead, createFilterForReplay());
13551360
} else {
13561361
return Collections.emptyNavigableSet();
13571362
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service.persistent;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.doAnswer;
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import lombok.Cleanup;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.bookkeeper.mledger.Entry;
30+
import org.apache.commons.lang3.RandomUtils;
31+
import org.apache.pulsar.broker.BrokerTestUtil;
32+
import org.apache.pulsar.broker.testinterceptor.BrokerTestInterceptor;
33+
import org.apache.pulsar.client.api.Consumer;
34+
import org.apache.pulsar.client.api.Message;
35+
import org.apache.pulsar.client.api.Producer;
36+
import org.apache.pulsar.client.api.ProducerConsumerBase;
37+
import org.apache.pulsar.client.api.SubscriptionType;
38+
import org.testng.Assert;
39+
import org.testng.annotations.AfterClass;
40+
import org.testng.annotations.AfterMethod;
41+
import org.testng.annotations.BeforeClass;
42+
import org.testng.annotations.Test;
43+
44+
@Slf4j
45+
@Test(groups = "broker-api")
46+
public class PersistentDispatcherMultipleConsumersReadLimitsTest extends ProducerConsumerBase {
47+
48+
@BeforeClass(alwaysRun = true)
49+
@Override
50+
protected void setup() throws Exception {
51+
super.internalSetup();
52+
super.producerBaseSetup();
53+
}
54+
55+
@Override
56+
protected void doInitConf() throws Exception {
57+
super.doInitConf();
58+
// start at max batch size to reproduce the issue more easily
59+
conf.setDispatcherMinReadBatchSize(conf.getDispatcherMaxReadBatchSize());
60+
BrokerTestInterceptor.INSTANCE.configure(conf);
61+
}
62+
63+
@AfterClass(alwaysRun = true)
64+
@Override
65+
protected void cleanup() throws Exception {
66+
super.internalCleanup();
67+
}
68+
69+
@AfterMethod(alwaysRun = true)
70+
protected void resetInterceptors() throws Exception {
71+
BrokerTestInterceptor.INSTANCE.reset();
72+
}
73+
74+
@Test(timeOut = 30 * 1000)
75+
public void testDispatcherMaxReadSizeBytes() throws Exception {
76+
final String topicName = BrokerTestUtil.newUniqueName(
77+
"persistent://public/default/testDispatcherMaxReadSizeBytes");
78+
final String subscription = "sub";
79+
80+
AtomicInteger entriesReadMax = new AtomicInteger(0);
81+
BrokerTestInterceptor.INSTANCE.applyDispatcherSpyDecorator(PersistentDispatcherMultipleConsumers.class,
82+
spy -> {
83+
doAnswer(invocation -> {
84+
List<Entry> entries = invocation.getArgument(0);
85+
PersistentDispatcherMultipleConsumers.ReadType readType = invocation.getArgument(1);
86+
int numberOfEntries = entries.size();
87+
log.info("intercepted readEntriesComplete with {} entries, read type {}", numberOfEntries,
88+
readType);
89+
entriesReadMax.updateAndGet(current -> Math.max(current, numberOfEntries));
90+
return invocation.callRealMethod();
91+
}).when(spy).readEntriesComplete(any(), any());
92+
}
93+
);
94+
95+
// Create two consumers on a shared subscription
96+
@Cleanup
97+
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
98+
.consumerName("c1")
99+
.topic(topicName)
100+
.subscriptionName(subscription)
101+
.subscriptionType(SubscriptionType.Shared)
102+
.receiverQueueSize(10000)
103+
.subscribe();
104+
105+
@Cleanup
106+
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
107+
.consumerName("c2")
108+
.topic(topicName)
109+
.subscriptionName(subscription)
110+
.subscriptionType(SubscriptionType.Shared)
111+
.startPaused(true)
112+
.receiverQueueSize(10000)
113+
.subscribe();
114+
115+
@Cleanup
116+
Producer<byte[]> producer =
117+
pulsarClient.newProducer().enableBatching(false).topic(topicName).create();
118+
int numberOfMessages = 200;
119+
int payLoadSizeBytes = 1025 * 1024; // 1025kB
120+
byte[] payload = RandomUtils.nextBytes(payLoadSizeBytes);
121+
for (int i = 0; i < numberOfMessages; i++) {
122+
producer.send(payload);
123+
}
124+
125+
// Consume messages with consumer1 but don't ack
126+
for (int i = 0; i < numberOfMessages; i++) {
127+
consumer1.receive();
128+
}
129+
130+
// Close consumer1 and resume consumer2 to replay the messages
131+
consumer1.close();
132+
consumer2.resume();
133+
134+
// Verify that consumer2 can receive the messages
135+
for (int i = 0; i < numberOfMessages; i++) {
136+
Message<byte[]> msg = consumer2.receive(1, TimeUnit.SECONDS);
137+
Assert.assertNotNull(msg, "Consumer2 should receive the message");
138+
consumer2.acknowledge(msg);
139+
}
140+
141+
int expectedMaxEntriesInRead = conf.getDispatcherMaxReadSizeBytes() / payLoadSizeBytes;
142+
assertThat(entriesReadMax.get()).isLessThanOrEqualTo(expectedMaxEntriesInRead);
143+
}
144+
}

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ public void setup() throws Exception {
191191
doReturn(null).when(cursorMock).getLastIndividualDeletedRange();
192192
doReturn(subscriptionName).when(cursorMock).getName();
193193
doReturn(ledgerMock).when(cursorMock).getManagedLedger();
194+
doAnswer(invocation -> {
195+
int max = invocation.getArgument(0);
196+
return max;
197+
}).when(cursorMock).applyMaxSizeCap(anyInt(), anyLong());
194198

195199
consumerMock = createMockConsumer();
196200
channelMock = mock(ChannelPromise.class);

0 commit comments

Comments
 (0)