Skip to content

Commit 181efdf

Browse files
committed
[fix][broker]fix Repeated messages of shared dispatcher
1 parent b193051 commit 181efdf

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public void add(long ledgerId, long entryId, long stickyKeyHash) {
5151
messagesToRedeliver.add(ledgerId, entryId);
5252
}
5353

54+
public boolean contains(long ledgerId, long entryId) {
55+
return messagesToRedeliver.contains(ledgerId, entryId);
56+
}
57+
5458
public void remove(long ledgerId, long entryId) {
5559
if (hashesToBeBlocked != null) {
5660
hashesToBeBlocked.remove(ledgerId, entryId);

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+3
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
583583
if (needTrimAckedMessages()) {
584584
cursor.trimDeletedEntries(entries);
585585
}
586+
if (readType == ReadType.Replay) {
587+
entries.removeIf(entry -> !redeliveryMessages.contains(entry.getLedgerId(), entry.getEntryId()));
588+
}
586589

587590
int entriesToDispatch = entries.size();
588591
// Trigger read more messages

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

+93
Original file line numberDiff line numberDiff line change
@@ -2383,6 +2383,99 @@ public void testPriorityConsumer() throws Exception {
23832383
log.info("-- Exiting {} test --", methodName);
23842384
}
23852385

2386+
@Test(timeOut = 30000)
2387+
public void concurrentlyReceiveDeliveriedMessages() throws Exception {
2388+
String topic = "persistent://my-property/my-ns/tp-" + UUID.randomUUID().toString();
2389+
String subName = "sub";
2390+
final int totalPublishMessages = 50;
2391+
// Make many messages in delivery queue.
2392+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
2393+
.enableBatching(false)
2394+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
2395+
.create();
2396+
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
2397+
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
2398+
.topic(topic).subscriptionName(subName)
2399+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(10)
2400+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2401+
for (int i = 0; i < totalPublishMessages; i++) {
2402+
final String message = "my-message-" + i;
2403+
producer.send(message.getBytes());
2404+
}
2405+
consumer.receive();
2406+
consumer.redeliverUnacknowledgedMessages();
2407+
consumer.close();
2408+
2409+
// create consumer tasks.
2410+
List<ConsumerTask> taskList = new ArrayList<>(10);
2411+
List<Thread> threadList = new ArrayList<>(10);
2412+
for (int i = 0; i < 10; i++){
2413+
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
2414+
ConsumerTask consumerTask = new ConsumerTask(pulsarClient, topic, subName);
2415+
taskList.add(consumerTask);
2416+
threadList.add(new Thread(consumerTask, consumerTask.consumerName));
2417+
}
2418+
// wait tasks finished.
2419+
threadList.forEach(thread -> thread.start());
2420+
for (Thread t : threadList){
2421+
t.join();
2422+
}
2423+
// Verify.
2424+
for (ConsumerTask task : taskList){
2425+
assertFalse(task.fail);
2426+
}
2427+
Assert.assertEquals(taskList.stream().map(t -> t.receivedEntries.size())
2428+
.reduce((a, b) -> a + b).get().intValue(), totalPublishMessages);
2429+
// cleanup.
2430+
producer.close();
2431+
admin.topics().delete(topic, false);
2432+
}
2433+
2434+
private static class ConsumerTask implements Runnable {
2435+
2436+
private static final AtomicInteger consumerNamSequence = new AtomicInteger();
2437+
2438+
private final List<Long> receivedEntries = new ArrayList<>();
2439+
private final PulsarClient pulsarClient;
2440+
private final String consumerName;
2441+
private final String topicName;
2442+
private final String subName;
2443+
private volatile boolean fail = false;
2444+
2445+
public ConsumerTask(PulsarClient newPulsarClient, String topicName, String subName){
2446+
this.pulsarClient = newPulsarClient;
2447+
this.topicName = topicName;
2448+
this.subName = subName;
2449+
this.consumerName = subName + "-" + consumerNamSequence.incrementAndGet();
2450+
}
2451+
2452+
@Override
2453+
public void run() {
2454+
try {
2455+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
2456+
.topic(topicName).subscriptionName(subName)
2457+
.subscriptionType(SubscriptionType.Shared).receiverQueueSize(10)
2458+
.consumerName(consumerName)
2459+
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
2460+
while(true){
2461+
Message message = consumer.receive(2, TimeUnit.SECONDS);
2462+
if (message == null){
2463+
break;
2464+
}
2465+
MessageIdImpl messageId = (MessageIdImpl)message.getMessageId();
2466+
consumer.acknowledge(messageId);
2467+
System.out.println("===>12 GET " + messageId.getLedgerId() + ":" + messageId.getEntryId() + ", Consumer: " + consumerName);
2468+
receivedEntries.add(messageId.getEntryId());
2469+
}
2470+
consumer.close();
2471+
pulsarClient.close();
2472+
} catch (Exception e) {
2473+
fail = true;
2474+
throw new RuntimeException(e);
2475+
}
2476+
}
2477+
}
2478+
23862479
/**
23872480
* <pre>
23882481
* Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked

0 commit comments

Comments
 (0)