From b54c308749db9f32aae42da7066536eefd8eb6b3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 26 Sep 2024 13:23:08 +0800 Subject: [PATCH 1/5] [fix] [broker] Fix delayed msg metric if enabled delayedDeliveryFixedDelayDetectionLookahead --- .../org/apache/pulsar/broker/service/Dispatcher.java | 4 ++++ .../PersistentDispatcherMultipleConsumers.java | 5 +++++ .../service/persistent/PersistentSubscription.java | 11 +++++++++-- .../policies/data/stats/SubscriptionStatsImpl.java | 2 ++ 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index d1d44709a9c52..9a107ab827a49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -151,6 +151,10 @@ default boolean checkAndUnblockIfStuck() { */ default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){} + default boolean isAllMessagesAreFixedDelayed() { + return false; + } + /** * Trigger a new "readMoreEntries" if the dispatching has been paused before. This method is only implemented in * {@link org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers} right now, other diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 73ad2cf0a3dee..589160d64d674 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1334,6 +1334,11 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries(); } + @Override + public boolean isAllMessagesAreFixedDelayed() { + return shouldPauseDeliveryForDelayTracker(); + } + @Override public long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 9a0545e6f0ab2..a76f4907f45a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1269,16 +1269,23 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge ((PersistentDispatcherMultipleConsumers) dispatcher).getBucketDelayedIndexStats(); } + subStats.msgBacklog = getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog()); if (Subscription.isIndividualAckMode(subType)) { if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher; subStats.unackedMessages = d.getTotalUnackedMessages(); subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs(); - subStats.msgDelayed = d.getNumberOfDelayedMessages(); subStats.msgInReplay = d.getNumberOfMessagesInReplay(); + if (d.isAllMessagesAreFixedDelayed()) { + subStats.msgDelayed = subStats.msgBacklog; + subStats.msgDelayedInMemory = d.getNumberOfDelayedMessages(); + } else { + subStats.msgDelayed = d.getNumberOfDelayedMessages(); + subStats.msgDelayedInMemory = subStats.msgDelayed; + } } } - subStats.msgBacklog = getNumberOfEntriesInBacklog(getStatsOptions.isGetPreciseBacklog()); + if (getStatsOptions.isSubscriptionBacklogSize()) { subStats.backlogSize = topic.getManagedLedger() .getEstimatedBacklogSize(cursor.getMarkDeletedPosition()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 977ed28e86814..d9112db05341d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -73,6 +73,8 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Number of delayed messages currently being tracked. */ public long msgDelayed; + /** Number of delayed messages currently being tracked. */ + public long msgDelayedInMemory; /** Number of messages registered for replay. */ public long msgInReplay; From 3173e8ce5ca8bc5264e8fe2ad94240919b6168df Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 26 Sep 2024 13:53:49 +0800 Subject: [PATCH 2/5] add test --- .../persistent/DelayedDeliveryTest.java | 58 +++++++++++++++++++ .../policies/data/SubscriptionStats.java | 5 +- .../data/stats/SubscriptionStatsImpl.java | 3 +- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 3ca966d210886..b0e5fa0fd01ba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -36,6 +36,7 @@ import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; @@ -43,14 +44,17 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -684,4 +688,58 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception { + maxDeliveryDelayInMillis + " milliseconds"); } } + + @Test + public void testMsgBacklogNoDelayedWhenFixedDelay() throws Exception { + final long originalDelayedDeliveryFixedDelayDetectionLookahead = + pulsar.getConfig().getDelayedDeliveryFixedDelayDetectionLookahead();; + final int delayedDeliveryFixedDelayDetectionLookahead = 10; + final int entries = delayedDeliveryFixedDelayDetectionLookahead * 10; + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String sName = "delayed_s1"; + DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = + WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "delayedDeliveryTrackerFactory"); + if (delayedDeliveryTrackerFactory != null) { + pulsar.getConfig() + .setDelayedDeliveryFixedDelayDetectionLookahead(delayedDeliveryFixedDelayDetectionLookahead); + delayedDeliveryTrackerFactory.initialize(pulsar); + } + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, sName, MessageId.earliest); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < entries; i++) { + producer.newMessage().deliverAfter(1, TimeUnit.HOURS).value(i + "").send(); + } + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) + .receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe(); + + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentTopic + .getSubscription(sName).getDispatcher(); + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.shouldPauseDeliveryForDelayTracker()); + assertTrue(dispatcher.getNumberOfDelayedMessages() >= delayedDeliveryFixedDelayDetectionLookahead); + }); + + SubscriptionStats subscriptionStats = persistentTopic.getStats(true, false, false) + .getSubscriptions().get(sName); + assertEquals(subscriptionStats.getMsgBacklog(), entries); + assertEquals(subscriptionStats.getMsgDelayed(), entries); + assertEquals(subscriptionStats.getMsgBacklogNoDelayed(), 0); + assertTrue(subscriptionStats.getMsgDelayedInMemory() < entries); + + // cleanup. + DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory2 = + WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "delayedDeliveryTrackerFactory"); + if (delayedDeliveryTrackerFactory2 != null) { + pulsar.getConfig().setDelayedDeliveryFixedDelayDetectionLookahead( + originalDelayedDeliveryFixedDelayDetectionLookahead); + delayedDeliveryTrackerFactory2.initialize(pulsar); + } + consumer.close(); + producer.close(); + admin.topics().delete(topic, false); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index e307e41862e74..52d53a61b9d8c 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -63,9 +63,12 @@ public interface SubscriptionStats { /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */ boolean isBlockedSubscriptionOnUnackedMsgs(); - /** Number of delayed messages currently being tracked. */ + /** Number of delayed messages. */ long getMsgDelayed(); + /** Number of delayed messages currently being tracked. */ + long getMsgDelayedInMemory(); + /** Number of messages registered for replay. */ long getMsgInReplay(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index d9112db05341d..bbd843e720445 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -71,8 +71,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */ public boolean blockedSubscriptionOnUnackedMsgs; - /** Number of delayed messages currently being tracked. */ + /** Number of delayed messages. */ public long msgDelayed; + /** Number of delayed messages currently being tracked. */ public long msgDelayedInMemory; From 19939ac3902532452648dd131a0f5f3d4796013c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 26 Sep 2024 14:00:16 +0800 Subject: [PATCH 3/5] add test --- .../pulsar/broker/service/persistent/DelayedDeliveryTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index b0e5fa0fd01ba..715abe08d6fc7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -714,7 +714,8 @@ public void testMsgBacklogNoDelayedWhenFixedDelay() throws Exception { producer.newMessage().deliverAfter(1, TimeUnit.HOURS).value(i + "").send(); } Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) - .receiverQueueSize(20).subscriptionType(SubscriptionType.Shared).subscribe(); + .receiverQueueSize(delayedDeliveryFixedDelayDetectionLookahead * 2) + .subscriptionType(SubscriptionType.Shared).subscribe(); PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentTopic .getSubscription(sName).getDispatcher(); From a824a7a8611ce16b2499e75c89d369e42175842b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 26 Sep 2024 14:21:29 +0800 Subject: [PATCH 4/5] fix bug --- .../persistent/PersistentSubscription.java | 6 +- .../persistent/DelayedDeliveryTest.java | 57 +++++++++++++++---- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a76f4907f45a8..244d41be12ff2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1277,7 +1277,11 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs(); subStats.msgInReplay = d.getNumberOfMessagesInReplay(); if (d.isAllMessagesAreFixedDelayed()) { - subStats.msgDelayed = subStats.msgBacklog; + long msgDeliveredOut = 0; + for (Consumer c : dispatcher.getConsumers()){ + msgDeliveredOut += c.getUnackedMessages(); + } + subStats.msgDelayed = subStats.msgBacklog - msgDeliveredOut - subStats.msgInReplay; subStats.msgDelayedInMemory = d.getNumberOfDelayedMessages(); } else { subStats.msgDelayed = d.getNumberOfDelayedMessages(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 715abe08d6fc7..3ae55358992ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.opentelemetry.api.common.Attributes; +import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -694,7 +696,9 @@ public void testMsgBacklogNoDelayedWhenFixedDelay() throws Exception { final long originalDelayedDeliveryFixedDelayDetectionLookahead = pulsar.getConfig().getDelayedDeliveryFixedDelayDetectionLookahead();; final int delayedDeliveryFixedDelayDetectionLookahead = 10; - final int entries = delayedDeliveryFixedDelayDetectionLookahead * 10; + final int msgNoDelayed = 15; + final int receiverQueueSize = 10; + final int msgDelayed = delayedDeliveryFixedDelayDetectionLookahead * 10; final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); final String sName = "delayed_s1"; DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory = @@ -709,27 +713,57 @@ public void testMsgBacklogNoDelayedWhenFixedDelay() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + // Send some message no-delayed. + // Send many messages delayed. Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); - for (int i = 0; i < entries; i++) { + for (int i = 0; i < msgNoDelayed; i++) { + producer.newMessage().send(); + } + for (int i = 0; i < msgDelayed; i++) { producer.newMessage().deliverAfter(1, TimeUnit.HOURS).value(i + "").send(); } - Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) - .receiverQueueSize(delayedDeliveryFixedDelayDetectionLookahead * 2) + Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) + .receiverQueueSize(receiverQueueSize) + .subscriptionType(SubscriptionType.Shared).subscribe(); + Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(sName) + .receiverQueueSize(receiverQueueSize) .subscriptionType(SubscriptionType.Shared).subscribe(); + // Wait for the checker that named "shouldPauseDeliveryForDelayTracker". PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentTopic .getSubscription(sName).getDispatcher(); Awaitility.await().untilAsserted(() -> { assertTrue(dispatcher.shouldPauseDeliveryForDelayTracker()); assertTrue(dispatcher.getNumberOfDelayedMessages() >= delayedDeliveryFixedDelayDetectionLookahead); }); - - SubscriptionStats subscriptionStats = persistentTopic.getStats(true, false, false) + // Verify: backlog stats + SubscriptionStats subscriptionStats1 = persistentTopic.getStats(true, false, false) .getSubscriptions().get(sName); - assertEquals(subscriptionStats.getMsgBacklog(), entries); - assertEquals(subscriptionStats.getMsgDelayed(), entries); - assertEquals(subscriptionStats.getMsgBacklogNoDelayed(), 0); - assertTrue(subscriptionStats.getMsgDelayedInMemory() < entries); + assertEquals(subscriptionStats1.getMsgBacklog(), msgDelayed + msgNoDelayed); + assertEquals(subscriptionStats1.getMsgDelayed(), msgDelayed); + assertEquals(subscriptionStats1.getMsgBacklogNoDelayed(), msgNoDelayed); + assertEquals(subscriptionStats1.getMsgInReplay(), 0); + assertTrue(subscriptionStats1.getMsgDelayedInMemory() < msgDelayed); + + // Let some messages being pushed into replay queue. + consumer2.close(); + // Wait for the checker that named "shouldPauseDeliveryForDelayTracker". + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.shouldPauseDeliveryForDelayTracker()); + assertTrue(dispatcher.getNumberOfDelayedMessages() >= delayedDeliveryFixedDelayDetectionLookahead); + }); + // And verify: backlog stats + Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + SubscriptionStats subscriptionStats2 = persistentTopic.getStats(true, false, false) + .getSubscriptions().get(sName); + assertEquals(subscriptionStats2.getMsgBacklog(), msgDelayed + msgNoDelayed); + assertEquals(subscriptionStats2.getMsgDelayed(), msgDelayed); + assertEquals(subscriptionStats2.getMsgBacklogNoDelayed(), msgNoDelayed); + GrowableArrayBlockingQueue incomingMessages = + WhiteboxImpl.getInternalState(consumer1, "incomingMessages"); + assertEquals(subscriptionStats2.getMsgInReplay(), msgNoDelayed - incomingMessages.size()); + assertTrue(subscriptionStats2.getMsgDelayedInMemory() < msgDelayed); + }); // cleanup. DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory2 = @@ -739,7 +773,8 @@ public void testMsgBacklogNoDelayedWhenFixedDelay() throws Exception { originalDelayedDeliveryFixedDelayDetectionLookahead); delayedDeliveryTrackerFactory2.initialize(pulsar); } - consumer.close(); + consumer1.close(); + consumer2.close(); producer.close(); admin.topics().delete(topic, false); } From 85e5b230ce40e84ac6b46f8e356f941ecb26a0f4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 26 Sep 2024 14:23:04 +0800 Subject: [PATCH 5/5] fix bug --- .../main/java/org/apache/pulsar/broker/service/Dispatcher.java | 2 +- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- .../broker/service/persistent/PersistentSubscription.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 9a107ab827a49..f9661f195a997 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -151,7 +151,7 @@ default boolean checkAndUnblockIfStuck() { */ default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){} - default boolean isAllMessagesAreFixedDelayed() { + default boolean isAllWaitingReadMessagesAreFixedDelayed() { return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 589160d64d674..fb44498f4da21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1335,7 +1335,7 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { } @Override - public boolean isAllMessagesAreFixedDelayed() { + public boolean isAllWaitingReadMessagesAreFixedDelayed() { return shouldPauseDeliveryForDelayTracker(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 244d41be12ff2..eb4a5d78e22d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1276,7 +1276,7 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge subStats.unackedMessages = d.getTotalUnackedMessages(); subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs(); subStats.msgInReplay = d.getNumberOfMessagesInReplay(); - if (d.isAllMessagesAreFixedDelayed()) { + if (d.isAllWaitingReadMessagesAreFixedDelayed()) { long msgDeliveredOut = 0; for (Consumer c : dispatcher.getConsumers()){ msgDeliveredOut += c.getUnackedMessages();