Skip to content

Commit 4302dc4

Browse files
committed
[fix][broker]fix Repeated messages of shared dispatcher
1 parent 927a00e commit 4302dc4

File tree

3 files changed

+237
-0
lines changed

3 files changed

+237
-0
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public void add(long ledgerId, long entryId, long stickyKeyHash) {
5151
messagesToRedeliver.add(ledgerId, entryId);
5252
}
5353

54+
public boolean contains(long ledgerId, long entryId) {
55+
return messagesToRedeliver.contains(ledgerId, entryId);
56+
}
57+
5458
public void remove(long ledgerId, long entryId) {
5559
if (hashesToBeBlocked != null) {
5660
hashesToBeBlocked.remove(ledgerId, entryId);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+3
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
583583
if (needTrimAckedMessages()) {
584584
cursor.trimDeletedEntries(entries);
585585
}
586+
if (readType == ReadType.Replay) {
587+
entries.removeIf(entry -> !redeliveryMessages.contains(entry.getLedgerId(), entry.getEntryId()));
588+
}
586589

587590
int entriesToDispatch = entries.size();
588591
// Trigger read more messages

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

+230
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.netty.util.Timeout;
4343
import java.io.ByteArrayInputStream;
4444
import java.io.IOException;
45+
import java.lang.reflect.Field;
4546
import java.nio.ByteBuffer;
4647
import java.nio.file.Files;
4748
import java.nio.file.Paths;
@@ -76,14 +77,21 @@
7677
import lombok.EqualsAndHashCode;
7778
import org.apache.avro.Schema.Parser;
7879
import org.apache.bookkeeper.common.concurrent.FutureUtils;
80+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
81+
import org.apache.bookkeeper.mledger.Entry;
82+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
7983
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
84+
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
8085
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
86+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
8187
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
8288
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
8389
import org.apache.commons.lang3.RandomUtils;
8490
import org.apache.commons.lang3.reflect.FieldUtils;
8591
import org.apache.commons.lang3.tuple.Pair;
8692
import org.apache.pulsar.broker.PulsarService;
93+
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
94+
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
8795
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
8896
import org.apache.pulsar.client.admin.PulsarAdminException;
8997
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -109,6 +117,8 @@
109117
import org.apache.pulsar.common.schema.SchemaType;
110118
import org.apache.pulsar.common.util.FutureUtil;
111119
import org.awaitility.Awaitility;
120+
import org.mockito.invocation.InvocationOnMock;
121+
import org.mockito.stubbing.Answer;
112122
import org.slf4j.Logger;
113123
import org.slf4j.LoggerFactory;
114124
import org.testng.Assert;
@@ -2383,6 +2393,226 @@ public void testPriorityConsumer() throws Exception {
23832393
log.info("-- Exiting {} test --", methodName);
23842394
}
23852395

2396+
@Test(timeOut = 300000)
2397+
public void concurrentlyReceiveDeliveredMessages() throws Exception {
2398+
String topic = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();
2399+
String subName = "sub";
2400+
final int totalPublishMessages = 50;
2401+
2402+
// create consumer tasks.
2403+
List<ConsumerTask> taskList = new ArrayList<>(10);
2404+
List<Thread> threadList = new ArrayList<>(10);
2405+
for (int i = 0; i < 10; i++){
2406+
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
2407+
ConsumerTask consumerTask = new ConsumerTask(pulsarClient, topic, subName);
2408+
taskList.add(consumerTask);
2409+
threadList.add(new Thread(consumerTask, consumerTask.consumerName));
2410+
}
2411+
2412+
// create topic, subscription and mock dispatcher.
2413+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
2414+
.enableBatching(false)
2415+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
2416+
.create();
2417+
Consumer<byte[]> consumerForCreateSubscription = pulsarClient.newConsumer()
2418+
.topic(topic).subscriptionName(subName).consumerName("for_create_dispatcher")
2419+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
2420+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2421+
final PersistentDispatcherMultipleConsumers shardDispatcher = getShardDispatcher(topic, subName);
2422+
2423+
// Make redeliver messages.
2424+
for (int i = 0; i < totalPublishMessages; i++) {
2425+
final String message = "my-message-" + i;
2426+
producer.send(message.getBytes());
2427+
}
2428+
List<PositionImpl> redeliveryPositions = new ArrayList<>();
2429+
Consumer<byte[]> consumerForRedeliver = pulsarClient.newConsumer()
2430+
.topic(topic).subscriptionName(subName).consumerName("for_redeliver")
2431+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(totalPublishMessages)
2432+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2433+
for (int i = 0; i < totalPublishMessages; i++) {
2434+
Message message = consumerForRedeliver.receive();
2435+
MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
2436+
redeliveryPositions.add(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()));
2437+
}
2438+
consumerForRedeliver.close();
2439+
Awaitility.await().until(() -> shardDispatcher.getConsumers().size() == 1);
2440+
2441+
// Start all consumers.
2442+
threadList.forEach(thread -> thread.start());
2443+
2444+
// Wait all task finish.
2445+
for (Thread t : threadList){
2446+
t.join();
2447+
}
2448+
2449+
// Verify.
2450+
for (ConsumerTask task : taskList){
2451+
assertFalse(task.fail);
2452+
}
2453+
Assert.assertEquals(taskList.stream().map(t -> t.receivedEntries.size())
2454+
.reduce((a, b) -> a + b).get().intValue(), totalPublishMessages);
2455+
2456+
// cleanup.
2457+
consumerForCreateSubscription.close();
2458+
producer.close();
2459+
admin.topics().delete(topic, false);
2460+
}
2461+
2462+
private void makeReadEntryDelay20MillisSeconds(String topic, String subName) throws Exception {
2463+
PersistentTopic persistentTopic =
2464+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
2465+
persistentTopic.getManagedLedger();
2466+
PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
2467+
ManagedCursorImpl managedCursor = (ManagedCursorImpl) persistentSubscription.getCursor();
2468+
final ManagedLedgerImpl originalManagedLedger = (ManagedLedgerImpl) managedCursor.getManagedLedger();
2469+
ManagedLedgerImpl spyManagedLedger = spy(originalManagedLedger);
2470+
doAnswer(new Answer() {
2471+
@Override
2472+
public Object answer(InvocationOnMock invocation) throws Throwable {
2473+
PositionImpl position = (PositionImpl) invocation.getArguments()[0];
2474+
final AsyncCallbacks.ReadEntryCallback originalCallback =
2475+
(AsyncCallbacks.ReadEntryCallback) invocation.getArguments()[1];
2476+
Object ctx = invocation.getArguments()[2];
2477+
originalManagedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback(){
2478+
2479+
@Override
2480+
public void readEntryComplete(Entry entry, Object ctx) {
2481+
try {
2482+
Thread.sleep(20);
2483+
} catch (InterruptedException e) {
2484+
}
2485+
originalCallback.readEntryComplete(entry, ctx);
2486+
}
2487+
2488+
@Override
2489+
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
2490+
originalCallback.readEntryFailed(exception, ctx);
2491+
}
2492+
}, ctx);
2493+
return null;
2494+
}
2495+
}).when(spyManagedLedger)
2496+
.asyncReadEntry(any(PositionImpl.class), any(AsyncCallbacks.ReadEntryCallback.class), any());
2497+
Field field_ledger = ManagedCursorImpl.class.getDeclaredField("ledger");
2498+
field_ledger.setAccessible(true);
2499+
field_ledger.set(managedCursor, spyManagedLedger);
2500+
}
2501+
2502+
@Test(timeOut = 30000)
2503+
public void concurrentlyRedeliverAndCloseLastConsumer() throws Exception {
2504+
String topic = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();
2505+
String subName = "sub";
2506+
final int totalPublishMessages = 50;
2507+
// Trigger subscription create.
2508+
Consumer<byte[]> consumerForCreateSubscription = pulsarClient.newConsumer()
2509+
.topic(topic).subscriptionName(subName).consumerName("for_create_dispatcher")
2510+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
2511+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2512+
consumerForCreateSubscription.close();
2513+
2514+
// Make many messages in delivery queue.
2515+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
2516+
.enableBatching(false)
2517+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
2518+
.create();
2519+
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
2520+
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
2521+
.topic(topic).subscriptionName(subName)
2522+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(10)
2523+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2524+
for (int i = 0; i < totalPublishMessages; i++) {
2525+
final String message = "my-message-" + i;
2526+
producer.send(message.getBytes());
2527+
}
2528+
consumer.receive();
2529+
// Make read entry delay, increase the probability of problems, see:#18289.
2530+
makeReadEntryDelay20MillisSeconds(topic, subName);
2531+
2532+
// Make "close last consumer" and "redeliver messages" execute concurrently.
2533+
log.info("===> trigger redeliver");
2534+
consumer.redeliverUnacknowledgedMessages();
2535+
consumer.close();
2536+
log.info("===> close first consumer");
2537+
2538+
// Receive messages and wait task finish.
2539+
List<ConsumerTask> taskList = new ArrayList<>(10);
2540+
List<Thread> threadList = new ArrayList<>(10);
2541+
for (int i = 0; i < 10; i++){
2542+
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
2543+
ConsumerTask consumerTask = new ConsumerTask(pulsarClient, topic, subName);
2544+
taskList.add(consumerTask);
2545+
threadList.add(new Thread(consumerTask, consumerTask.consumerName));
2546+
}
2547+
// wait tasks finished.
2548+
threadList.forEach(thread -> thread.start());
2549+
for (Thread t : threadList){
2550+
t.join();
2551+
}
2552+
// Verify.
2553+
for (ConsumerTask task : taskList){
2554+
assertFalse(task.fail);
2555+
}
2556+
Assert.assertEquals(taskList.stream().map(t -> t.receivedEntries.size())
2557+
.reduce((a, b) -> a + b).get().intValue(), totalPublishMessages);
2558+
// cleanup.
2559+
producer.close();
2560+
admin.topics().delete(topic, false);
2561+
}
2562+
2563+
private PersistentDispatcherMultipleConsumers getShardDispatcher(String topic, String subName) {
2564+
PersistentTopic persistentTopic =
2565+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
2566+
PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
2567+
return (PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher();
2568+
}
2569+
2570+
private static class ConsumerTask implements Runnable {
2571+
2572+
private static final AtomicInteger consumerNamSequence = new AtomicInteger();
2573+
2574+
private final List<Long> receivedEntries = new ArrayList<>();
2575+
private final PulsarClient pulsarClient;
2576+
private final String consumerName;
2577+
private final String topicName;
2578+
private final String subName;
2579+
private volatile boolean fail = false;
2580+
2581+
public ConsumerTask(PulsarClient newPulsarClient, String topicName, String subName){
2582+
this.pulsarClient = newPulsarClient;
2583+
this.topicName = topicName;
2584+
this.subName = subName;
2585+
this.consumerName = subName + "-" + consumerNamSequence.incrementAndGet();
2586+
}
2587+
2588+
@Override
2589+
public void run() {
2590+
try {
2591+
log.info("===> start consumer task");
2592+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
2593+
.topic(topicName).subscriptionName(subName)
2594+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(10)
2595+
.consumerName(consumerName)
2596+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2597+
while(true){
2598+
Message message = consumer.receive(2, TimeUnit.SECONDS);
2599+
if (message == null){
2600+
break;
2601+
}
2602+
MessageIdImpl messageId = (MessageIdImpl)message.getMessageId();
2603+
consumer.acknowledge(messageId);
2604+
log.info("===> GET " + messageId.getLedgerId() + ":" + messageId.getEntryId() + ", Consumer: " + consumerName);
2605+
receivedEntries.add(messageId.getEntryId());
2606+
}
2607+
consumer.close();
2608+
pulsarClient.close();
2609+
} catch (Exception e) {
2610+
fail = true;
2611+
throw new RuntimeException(e);
2612+
}
2613+
}
2614+
}
2615+
23862616
/**
23872617
* <pre>
23882618
* Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked

0 commit comments

Comments
 (0)