From c44e8f38e46e42c188ce0c2e2aadc50762a5ffa4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 2 Jan 2025 18:56:29 +0800 Subject: [PATCH 1/4] [fix] [ml] fix key_shared mode delivery are order out order after a consumers reconnection --- ...entDispatcherMultipleConsumersClassic.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 910491e60b2cf..22b82c51885ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -98,7 +98,8 @@ public class PersistentDispatcherMultipleConsumersClassic extends AbstractPersis protected volatile boolean havePendingRead = false; protected volatile boolean havePendingReplayRead = false; protected volatile Position minReplayedPosition = null; - protected boolean shouldRewindBeforeReadingOrReplaying = false; + protected boolean shouldRewindBeforeReading = false; + protected boolean shouldSkipNextReplaying = false; protected final String name; private boolean sendInProgress = false; protected static final AtomicIntegerFieldUpdater @@ -173,12 +174,15 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { return CompletableFuture.completedFuture(null); } if (consumerList.isEmpty()) { - if (havePendingRead || havePendingReplayRead) { + if (havePendingReplayRead) { + shouldSkipNextReplaying = true; + } + if (havePendingRead) { // There is a pending read from previous run. We must wait for it to complete and then rewind - shouldRewindBeforeReadingOrReplaying = true; + shouldRewindBeforeReading = true; } else { cursor.rewind(); - shouldRewindBeforeReadingOrReplaying = false; + shouldRewindBeforeReading = false; } redeliveryMessages.clear(); delayedDeliveryTracker.ifPresent(tracker -> { @@ -633,6 +637,10 @@ public final synchronized void readEntriesComplete(List entries, Object c havePendingRead = false; } else { havePendingReplayRead = false; + if (shouldSkipNextReplaying && readType == ReadType.Replay) { + shouldSkipNextReplaying = false; + return; + } } if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { @@ -646,11 +654,11 @@ public final synchronized void readEntriesComplete(List entries, Object c readFailureBackoff.reduceToHalf(); - if (shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) { + if (shouldRewindBeforeReading && readType == ReadType.Normal) { // All consumers got disconnected before the completion of the read operation entries.forEach(Entry::release); cursor.rewind(); - shouldRewindBeforeReadingOrReplaying = false; + shouldRewindBeforeReading = false; readMoreEntries(); return; } @@ -931,8 +939,10 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj } } - if (shouldRewindBeforeReadingOrReplaying) { - shouldRewindBeforeReadingOrReplaying = false; + if (shouldSkipNextReplaying && readType == ReadType.Replay) { + shouldSkipNextReplaying = false; + } else if (shouldRewindBeforeReading && readType == ReadType.Normal) { + shouldRewindBeforeReading = false; cursor.rewind(); } From d6735049cb23db3c4031a534556c4f601269f330 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Jan 2025 12:20:01 +0800 Subject: [PATCH 2/4] fix for 4.0 --- ...PersistentDispatcherMultipleConsumers.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index fa03a260e131e..b8fa2b66f4208 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -97,7 +97,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis protected volatile boolean havePendingRead = false; protected volatile boolean havePendingReplayRead = false; protected volatile Position minReplayedPosition = null; - protected boolean shouldRewindBeforeReadingOrReplaying = false; + protected boolean shouldRewindBeforeReading = false; + protected boolean shouldSkipNextReplaying = false; protected final String name; private boolean sendInProgress = false; protected static final AtomicIntegerFieldUpdater @@ -185,12 +186,15 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { return CompletableFuture.completedFuture(null); } if (consumerList.isEmpty()) { - if (havePendingRead || havePendingReplayRead) { + if (havePendingReplayRead) { + shouldSkipNextReplaying = true; + } + if (havePendingRead) { // There is a pending read from previous run. We must wait for it to complete and then rewind - shouldRewindBeforeReadingOrReplaying = true; + shouldRewindBeforeReading = true; } else { cursor.rewind(); - shouldRewindBeforeReadingOrReplaying = false; + shouldRewindBeforeReading = false; } redeliveryMessages.clear(); delayedDeliveryTracker.ifPresent(tracker -> { @@ -720,6 +724,10 @@ public final synchronized void readEntriesComplete(List entries, Object c havePendingRead = false; } else { havePendingReplayRead = false; + if (shouldSkipNextReplaying && readType == ReadType.Replay) { + shouldSkipNextReplaying = false; + return; + } } if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { @@ -733,11 +741,11 @@ public final synchronized void readEntriesComplete(List entries, Object c readFailureBackoff.reduceToHalf(); - if (shouldRewindBeforeReadingOrReplaying && readType == ReadType.Normal) { + if (shouldRewindBeforeReading && readType == ReadType.Normal) { // All consumers got disconnected before the completion of the read operation entries.forEach(Entry::release); cursor.rewind(); - shouldRewindBeforeReadingOrReplaying = false; + shouldRewindBeforeReading = false; readMoreEntriesAsync(); return; } @@ -1061,8 +1069,10 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj } } - if (shouldRewindBeforeReadingOrReplaying) { - shouldRewindBeforeReadingOrReplaying = false; + if (shouldSkipNextReplaying && readType == ReadType.Replay) { + shouldSkipNextReplaying = false; + } else if (shouldRewindBeforeReading && readType == ReadType.Normal) { + shouldRewindBeforeReading = false; cursor.rewind(); } From 8e0947da7895f07b3ca3dcab7d9591d6c9738ff6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 23 Jan 2025 12:15:15 +0800 Subject: [PATCH 3/4] add tests --- ...KeyDispatcherMultipleConsumersClassic.java | 2 +- ...ntryCacheKeySharedSubscriptionV30Test.java | 309 ++++++++++++++++++ .../client/api/ProducerConsumerBase.java | 2 + 3 files changed, 312 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 56161d8dd1544..053e90c1b1095 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -158,7 +158,7 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { // If there is a delayed "cursor.rewind" after the pending read, the consumers that will be // added before the "cursor.rewind" will have a same "recent joined position", which is the // same as "mark deleted position +1", so we can skip this adding. - && !shouldRewindBeforeReadingOrReplaying) { + && !shouldRewindBeforeReading) { recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); sortRecentlyJoinedConsumersIfNeeded(); } diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java index 0de3d4edfad64..ae201b4ef1914 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java @@ -21,9 +21,14 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -31,26 +36,32 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersClassic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.awaitility.Awaitility; import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -78,6 +89,304 @@ protected void doInitConf() throws Exception { this.conf.setSubscriptionKeySharedUseClassicPersistentImplementation(true); } + @Test(timeOut = 180 * 1000) + public void testNormalReadAfterPendingReplay() throws Exception { + // The message with "k1" will be sent to consumer1. + String k1 = "11"; + // The message with "k2" will be sent to consumer2. + String k2 = "12"; + // The message with "k3" will be sent to consumer3, and will be sent to consumer2 if consumer3 is offline. + String k3 = "3"; + + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subName = "my-sub"; + final DefaultThreadFactory threadFactory = + new DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread")); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subName, MessageId.earliest); + Producer producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create(); + + // Make a scenario: + // - consumer1: stuck + // - consumer2: acked all messages that it received. + ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 1 + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(1) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c1") + .subscribe(); + ConsumerImpl consumer2 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 4 + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(10) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c2") + .subscribe(); + AtomicInteger msgGeneratorForK1 = new AtomicInteger(0); + AtomicInteger msgGeneratorForK2 = new AtomicInteger(10000); + producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send(); + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k2).value(msgGeneratorForK2.incrementAndGet()).send(); + } + Awaitility.await().untilAsserted(() -> { + log.info("c1 queue size: {}", consumer1.getTotalIncomingMessages()); + log.info("c2 queue size: {}", consumer2.getTotalIncomingMessages()); + assertTrue(1 <= consumer1.getTotalIncomingMessages()); + assertTrue(1 <= consumer2.getTotalIncomingMessages()); + }); + ReceivedMessages receivedMessages1 = ackAllMessages(consumer2); + assertEquals(receivedMessages1.getMessagesReceived().size(), 20); + final PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + final PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName); + final PersistentStickyKeyDispatcherMultipleConsumersClassic dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumersClassic) persistentSubscription.getDispatcher(); + // Verify: consumer2 is waiting for new messages. + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.isHavePendingRead()); + }); + + // Trigger a replay reading through close consumer1 + // - Inject a delay for the next replay read. + LedgerHandle firstLedger = ml.currentLedger; + LedgerHandle spyFirstLedger = spy(firstLedger); + CountDownLatch replyReadSignal = new CountDownLatch(1); + AtomicBoolean replayReadWasTriggered = new AtomicBoolean(); + Answer answer = invocation -> { + long firstEntry = (long) invocation.getArguments()[0]; + log.info("replay reading: {}", firstEntry); + if (firstEntry == 0) { + replayReadWasTriggered.set(true); + final CompletableFuture res = new CompletableFuture<>(); + threadFactory.newThread(() -> { + try { + replyReadSignal.await(); + CompletableFuture future = + (CompletableFuture) invocation.callRealMethod(); + future.thenAccept(v -> { + res.complete(v); + }).exceptionally(ex -> { + res.completeExceptionally(ex); + return null; + }); + } catch (Throwable ex) { + res.completeExceptionally(ex); + } + }).start(); + return res; + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong()); + doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong()); + ml.currentLedger = spyFirstLedger; + consumer1.close(); + Awaitility.await().until(() -> replayReadWasTriggered.get()); + + // Verify: the next normal reading will be skipped because of there is a pending replay read. + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send(); + } + ReceivedMessages receivedMessages2 = ackAllMessages(consumer2); + assertEquals(receivedMessages2.getMessagesReceived().size(), 0); + + // Verify: all messages with the key "k1" will be received in order. + replyReadSignal.countDown(); + Thread.sleep(1000 * 3); + ReceivedMessages receivedMessages3 = ackAllMessages(consumer2); + List messagesReceived = + receivedMessages3.getMessagesReceived().stream().map(e -> e.getRight()).collect(Collectors.toList()); + assertEquals(messagesReceived.size(), 21); + List messagesReceivedSorted = new ArrayList<>(messagesReceived); + Collections.sort(messagesReceivedSorted); + assertEquals(messagesReceivedSorted, messagesReceivedSorted); + + // Cleanup. + producer.close(); + consumer2.close(); + admin.topics().delete(topic, false); + } + + @DataProvider + public Object[][] testCasesAfterClosedAllConsumers() { + return new Object[][] { + {"testCancelPendingReadWillNotCancelReplay"}, + {"testRewindImmediately"} + }; + } + + @Test(timeOut = 180 * 1000, dataProvider = "testCasesAfterClosedAllConsumers") + public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exception { + // The message with "k1" will be sent to consumer1. + String k1 = "11"; + // The message with "k2" will be sent to consumer2. + String k2 = "12"; + // The message with "k3" will be sent to consumer3, and will be sent to consumer2 if consumer3 is offline. + String k3 = "3"; + + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subName = "my-sub"; + final DefaultThreadFactory threadFactory = + new DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread")); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subName, MessageId.earliest); + Producer producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create(); + + // Make a scenario: + // - consumer1: stuck + // - consumer2: acked all messages that it received. + List messagesThatWillBeAckedAtLast = new ArrayList<>(); + ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 1 + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(1) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c1") + .subscribe(); + ConsumerImpl consumer2 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 4 + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(10) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c2") + .subscribe(); + AtomicInteger msgGeneratorForK1 = new AtomicInteger(0); + AtomicInteger msgGeneratorForK2 = new AtomicInteger(1000); + AtomicInteger msgGeneratorForK3 = new AtomicInteger(1000_000); + producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send(); + messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get()); + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k2).value(msgGeneratorForK2.incrementAndGet()).send(); + } + Awaitility.await().untilAsserted(() -> { + log.info("c1 queue size: {}", consumer1.getTotalIncomingMessages()); + log.info("c2 queue size: {}", consumer2.getTotalIncomingMessages()); + assertTrue(1 <= consumer1.getTotalIncomingMessages()); + assertTrue(1 <= consumer2.getTotalIncomingMessages()); + }); + ReceivedMessages receivedMessages1 = ackAllMessages(consumer2); + assertEquals(receivedMessages1.getMessagesReceived().size(), 20); + final PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + final PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName); + final PersistentStickyKeyDispatcherMultipleConsumersClassic dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumersClassic) persistentSubscription.getDispatcher(); + // Verify: consumer2 is waiting for new messages. + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.isHavePendingRead()); + }); + // Complete the pending read. + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send(); + messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get()); + } + Awaitility.await().untilAsserted(() -> { + assertFalse(dispatcher.isHavePendingRead()); + }); + + // Trigger a replay reading through close consumer1 + // - Inject a delay for the next replay read. + LedgerHandle firstLedger = ml.currentLedger; + LedgerHandle spyFirstLedger = spy(firstLedger); + CountDownLatch replyReadSignal = new CountDownLatch(1); + AtomicBoolean replayReadWasTriggered = new AtomicBoolean(); + Answer answer = invocation -> { + long firstEntry = (long) invocation.getArguments()[0]; + log.info("replay reading: {}", firstEntry); + if (firstEntry == 0) { + replayReadWasTriggered.set(true); + final CompletableFuture res = new CompletableFuture<>(); + threadFactory.newThread(() -> { + try { + replyReadSignal.await(); + CompletableFuture future = + (CompletableFuture) invocation.callRealMethod(); + future.thenAccept(v -> { + res.complete(v); + }).exceptionally(ex -> { + res.completeExceptionally(ex); + return null; + }); + } catch (Throwable ex) { + res.completeExceptionally(ex); + } + }).start(); + return res; + } else { + return invocation.callRealMethod(); + } + }; + doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong()); + doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong()); + + // Trigger a "cancelPendingRead", but the operation will do nothing because of there is no pending read. + // - after removing all consumers and adding a new consumer, broker will call a "cursor.cancelPendingRead" + // and "cursor.rewind". + consumer1.close(); + consumer2.close(); + int queueSize3 = "testRewindImmediately".equals(testCase) ? 0 : 100; + ConsumerImpl consumer3 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 4 + .topic(topic) + .subscriptionName(subName) + .receiverQueueSize(queueSize3) + .subscriptionType(SubscriptionType.Key_Shared) + .consumerName("c3") + .subscribe(); + + // Verify 1: "cursor.cancelPendingReadRequest" will not cancel the pending replay read. + // This motivation of this verify is here: https://github.com/apache/pulsar/pull/23855#issuecomment-2597522865. + if ("testCancelPendingReadWillNotCancelReplay".equals(testCase)) { + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.isHavePendingRead()); + }); + } + // Verify 2: "cursor.rewind" will be called immediately, without wait for the next replay read. + if ("testRewindImmediately".equals(testCase)) { + Awaitility.await().untilAsserted(() -> { + log.info("cursor rd-pos: {}, md-pos: {}", dispatcher.getCursor().getReadPosition(), + dispatcher.getCursor() + .getMarkDeletedPosition()); + assertEquals(dispatcher.getCursor().getReadPosition(), ml.getNextValidPosition(dispatcher.getCursor() + .getMarkDeletedPosition())); + }); + } + + // Verify 3: all messages(including "replay red" and "normal read") will be received in order. + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send(); + messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get()); + } + replyReadSignal.countDown(); + if (queueSize3 > 2) { + Awaitility.await().untilAsserted(() -> { + assertTrue(consumer3.numMessagesInQueue() > 2); + }); + } + Thread.sleep(1000 * 3); + List messagesReceived = new ArrayList<>(); + while (true) { + if (messagesReceived.size() < messagesThatWillBeAckedAtLast.size()) { + Message msg = consumer3.receive(); + messagesReceived.add(msg.getValue()); + consumer3.acknowledge(msg); + } else { + break; + } + } + assertEquals(messagesReceived, messagesThatWillBeAckedAtLast); + Awaitility.await().untilAsserted(() -> { + assertEquals(dispatcher.getCursor().getMarkDeletedPosition(), ml.getLastConfirmedEntry()); + }); + + // cleanup. + producer.close(); + consumer3.close(); + admin.topics().delete(topic, false); + } @Test(timeOut = 180 * 1000, invocationCount = 1) public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index 0cf2e49d35bee..ed43ab41ae55e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -28,6 +28,7 @@ import java.util.Random; import java.util.Set; import java.util.function.BiFunction; +import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.common.policies.data.ClusterData; @@ -97,6 +98,7 @@ protected ReceivedMessages ackAllMessages(Consumer...consumers) throws protected static class ReceivedMessages { + @Getter List> messagesReceived = Collections.synchronizedList(new ArrayList<>()); List> messagesAcked = Collections.synchronizedList(new ArrayList<>()); From 3cbf13d53d65d553523cb40ea0d1bc5eadc2ab54 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 23 Jan 2025 17:22:32 +0800 Subject: [PATCH 4/4] fix issue --- ...entDispatcherMultipleConsumersClassic.java | 1 + ...ntryCacheKeySharedSubscriptionV30Test.java | 41 +++++++++++-------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 22b82c51885ae..f69bd4c945ea8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -639,6 +639,7 @@ public final synchronized void readEntriesComplete(List entries, Object c havePendingReplayRead = false; if (shouldSkipNextReplaying && readType == ReadType.Replay) { shouldSkipNextReplaying = false; + readMoreEntriesAsync(); return; } } diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java index ae201b4ef1914..a4823f923bda7 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java @@ -214,12 +214,13 @@ public void testNormalReadAfterPendingReplay() throws Exception { public Object[][] testCasesAfterClosedAllConsumers() { return new Object[][] { {"testCancelPendingReadWillNotCancelReplay"}, - {"testRewindImmediately"} + {"testRewindImmediately"}, + {"testRepeatedDelivery"} }; } @Test(timeOut = 180 * 1000, dataProvider = "testCasesAfterClosedAllConsumers") - public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exception { + public void testMixedReplayReadingAndNormalReading(String testCase) throws Exception { // The message with "k1" will be sent to consumer1. String k1 = "11"; // The message with "k2" will be sent to consumer2. @@ -279,14 +280,6 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc Awaitility.await().untilAsserted(() -> { assertTrue(dispatcher.isHavePendingRead()); }); - // Complete the pending read. - for (int i = 0; i < 20; i++) { - producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send(); - messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get()); - } - Awaitility.await().untilAsserted(() -> { - assertFalse(dispatcher.isHavePendingRead()); - }); // Trigger a replay reading through close consumer1 // - Inject a delay for the next replay read. @@ -322,13 +315,28 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc }; doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong()); doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong()); + ml.currentLedger = spyFirstLedger; // Trigger a "cancelPendingRead", but the operation will do nothing because of there is no pending read. // - after removing all consumers and adding a new consumer, broker will call a "cursor.cancelPendingRead" // and "cursor.rewind". consumer1.close(); + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.isHavePendingReplayRead()); + }); + // Complete the pending read. + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send(); + messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get()); + } + Awaitility.await().untilAsserted(() -> { + assertFalse(dispatcher.isHavePendingRead()); + }); consumer2.close(); - int queueSize3 = "testRewindImmediately".equals(testCase) ? 0 : 100; + int queueSize3 = 1000; + if ("testRewindImmediately".equals(testCase)) { + queueSize3 = 0; + } ConsumerImpl consumer3 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 4 .topic(topic) .subscriptionName(subName) @@ -341,7 +349,7 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc // This motivation of this verify is here: https://github.com/apache/pulsar/pull/23855#issuecomment-2597522865. if ("testCancelPendingReadWillNotCancelReplay".equals(testCase)) { Awaitility.await().untilAsserted(() -> { - assertTrue(dispatcher.isHavePendingRead()); + assertTrue(dispatcher.isHavePendingReplayRead()); }); } // Verify 2: "cursor.rewind" will be called immediately, without wait for the next replay read. @@ -356,23 +364,24 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc } // Verify 3: all messages(including "replay red" and "normal read") will be received in order. + // This check is then main purpose of the PR https://github.com/apache/pulsar/pull/23803. for (int i = 0; i < 20; i++) { producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send(); messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get()); } replyReadSignal.countDown(); - if (queueSize3 > 2) { + if ("testRepeatedDelivery".equals(testCase)) { Awaitility.await().untilAsserted(() -> { - assertTrue(consumer3.numMessagesInQueue() > 2); + assertTrue(consumer3.numMessagesInQueue() >= messagesThatWillBeAckedAtLast.size()); }); } - Thread.sleep(1000 * 3); List messagesReceived = new ArrayList<>(); while (true) { + log.info("received msg count: {}", messagesReceived.size()); if (messagesReceived.size() < messagesThatWillBeAckedAtLast.size()) { Message msg = consumer3.receive(); messagesReceived.add(msg.getValue()); - consumer3.acknowledge(msg); + consumer3.acknowledgeAsync(msg); } else { break; }