From 7afe161d4e9919b5c5f237a71f0d984a077a6c3f Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Wed, 29 Jan 2025 11:48:35 +0800 Subject: [PATCH 1/2] Implementing delayed message cancellation in pulsar --- .../service/AbstractBaseDispatcher.java | 49 ++++++++++++++ .../persistent/DelayedDeliveryTest.java | 66 +++++++++++++++++++ .../pulsar/common/naming/Constants.java | 2 + 3 files changed, 117 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index fb5c457fcc874..919b811f002c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -20,6 +20,8 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty; +import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION; +import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE; import io.netty.buffer.ByteBuf; import io.prometheus.client.Gauge; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -46,10 +49,12 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.apache.pulsar.compaction.Compactor; import org.checkerframework.checker.nullness.qual.Nullable; @@ -69,10 +74,13 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen private final LongAdder filterRejectedMsgs = new LongAdder(); private final LongAdder filterRescheduledMsgs = new LongAdder(); + protected final ConcurrentLongLongPairHashMap delayedMessageMarkDeleteMap; + protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) { super(subscription); this.serviceConfig = serviceConfig; this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled(); + this.delayedMessageMarkDeleteMap = ConcurrentLongLongPairHashMap.newBuilder().autoShrink(true).build(); } @@ -221,6 +229,47 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i entries.set(i, null); entry.release(); continue; + } else if (delayedMessageMarkDeleteMap.containsKey(entry.getLedgerId(), entry.getEntryId())) { + // The delayed message is marked for delete. + ConcurrentLongLongPairHashMap.LongPair markMessageId = delayedMessageMarkDeleteMap + .get(entry.getLedgerId(), entry.getEntryId()); + List deleteDelayedMessageList = new ArrayList<>(); + deleteDelayedMessageList.add(entry.getPosition()); + deleteDelayedMessageList.add(PositionFactory.create(markMessageId.first, markMessageId.second)); + + delayedMessageMarkDeleteMap.remove(entry.getLedgerId(), entry.getEntryId()); + individualAcknowledgeMessageIfNeeded(deleteDelayedMessageList, Collections.emptyMap()); + entries.set(i, null); + entry.release(); + continue; + } + + List propertiesList = msgMetadata.getPropertiesList(); + if (!propertiesList.isEmpty()) { + Map propertiesMap = propertiesList.stream() + .filter(p -> p.getKey().equals(DELAY_CANCELED_MESSAGE_POSITION) + || p.getKey().equals(IS_MARK_DELETE_DELAY_MESSAGE)) + .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, + (oldValue, newValue) -> newValue)); + + if (propertiesMap.containsKey(IS_MARK_DELETE_DELAY_MESSAGE)) { + if (propertiesMap.containsKey(DELAY_CANCELED_MESSAGE_POSITION)) { + String[] data = propertiesMap.get(DELAY_CANCELED_MESSAGE_POSITION).split(":"); + long ledgerId = Long.parseLong(data[0]); + long entryId = Long.parseLong(data[1]); + delayedMessageMarkDeleteMap.put(ledgerId, entryId, + entry.getLedgerId(), entry.getEntryId()); + entries.set(i, null); + entry.release(); + continue; + } else { + individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()), + Collections.emptyMap()); + entries.set(i, null); + entry.release(); + continue; + } + } } if (hasFilter) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index e47857e8ec60f..df676a79e11d7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION; +import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -32,6 +34,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; @@ -43,6 +47,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -686,4 +691,65 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception { + maxDeliveryDelayInMillis + " milliseconds"); } } + + @Test + public void testDelayedMessageCancel() throws Exception { + String topic = BrokerTestUtil.newUniqueName("testDelayedMessageCancel"); + CountDownLatch latch = new CountDownLatch(9); + Set receivedMessages = ConcurrentHashMap.newKeySet(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("shared-sub") + .subscriptionType(SubscriptionType.Shared) + .messageListener((Consumer c, Message msg) -> { + receivedMessages.add(msg.getValue()); + c.acknowledgeAsync(msg); + latch.countDown(); + }) + .subscribe(); + + final long tickTime = 1000L; + + admin.topicPolicies().setDelayedDeliveryPolicy(topic, + DelayedDeliveryPolicies.builder() + .active(true) + .tickTime(tickTime) + .maxDeliveryDelayInMillis(10000) + .build()); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 10; i++) { + final int n = i; + final long currentTime = System.currentTimeMillis(); + final long deliverAtTime = currentTime + 5000L; + producer.newMessage() + .key(String.valueOf(i)) + .value("msg-" + i) + .deliverAt(deliverAtTime) + .sendAsync().whenComplete((id, ex) -> { + if (n == 0) { + MessageIdAdv messageIdAdv = (MessageIdAdv) id; + String deleteDelayedMessageId = messageIdAdv.getLedgerId() + ":" + messageIdAdv.getEntryId(); + producer.newMessage() + .key(String.valueOf(n)) + .value("msg-0-mark") + .deliverAt(deliverAtTime - 2 * tickTime) + .property(IS_MARK_DELETE_DELAY_MESSAGE, "true") + .property(DELAY_CANCELED_MESSAGE_POSITION, deleteDelayedMessageId) + .sendAsync(); + } + }); + } + producer.flush(); + + assertTrue(latch.await(15, TimeUnit.SECONDS), "Not all messages were received in time"); + assertFalse(receivedMessages.contains("msg-0") || receivedMessages.contains("msg-0-mark"), + "msg-0 and msg-0-mark should have been cancelled but was received"); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java index ab71f2a43e5d4..944a35a779566 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java @@ -24,6 +24,8 @@ public class Constants { public static final String GLOBAL_CLUSTER = "global"; + public static final String DELAY_CANCELED_MESSAGE_POSITION = "delayCanceledMsgPosition"; + public static final String IS_MARK_DELETE_DELAY_MESSAGE = "isMarkDeleteDelayMessage"; private Constants() {} } From 88ffdf9dc9d5536760d28d4f68bf87ad0941e790 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 9 Feb 2025 00:43:06 +0800 Subject: [PATCH 2/2] feat(broker): Add support for canceling delayed messages in DelayedDeliveryTracker. --- .../delayed/DelayedDeliveryTracker.java | 23 ++++++++ .../InMemoryDelayedDeliveryTracker.java | 10 ++++ .../bucket/BucketDelayedDeliveryTracker.java | 54 +++++++++++++++++-- .../delayed/bucket/DelayedOperationType.java | 47 ++++++++++++++++ .../broker/delayed/bucket/MutableBucket.java | 29 +++++++++- .../service/AbstractBaseDispatcher.java | 49 ----------------- .../DelayedMessageIndexBucketSegment.proto | 5 ++ .../pulsar/common/naming/Constants.java | 2 - 8 files changed, 164 insertions(+), 55 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedOperationType.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index 7c954879fe845..e0ca9252691bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -22,6 +22,7 @@ import java.util.NavigableSet; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.delayed.bucket.DelayedOperationType; /** * Represent the tracker for the delayed delivery of messages for a particular subscription. @@ -74,6 +75,23 @@ public interface DelayedDeliveryTracker extends AutoCloseable { */ void resetTickTime(long tickTime); + /** + * Apply a delayed operation for a specific message. {@link DelayedOperationType} + *

+ * For {@link DelayedOperationType#DELAY} type: + * - Adds the message to the tracker with its scheduled delivery time. + *

+ * For {@link DelayedOperationType#CANCEL} type: + * - Removes the message from the tracker, preventing it from being delivered later. + * + * @param ledgerId the ledger ID of the message + * @param entryId the entry ID of the message + * @param deliverAt the scheduled delivery time (in milliseconds) for DELAY type, or ignored for CANCEL + * @param operationType the type of operation (DELAY/CANCEL) + * @return true if the operation was successfully applied, false if the message was already canceled or not found + */ + boolean applyDelayOperation(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType); + /** * Clear all delayed messages from the tracker. * @@ -122,6 +140,11 @@ public void resetTickTime(long tickTime) { } + @Override + public boolean applyDelayOperation(long ledgerId, long entryId, long deliverAt, DelayedOperationType opType) { + return false; + } + @Override public CompletableFuture clear() { return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 5796fcbd78550..fe5a682f3abfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -28,12 +28,14 @@ import it.unimi.dsi.fastutil.longs.LongSet; import java.time.Clock; import java.util.NavigableSet; +import java.util.Objects; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.delayed.bucket.DelayedOperationType; import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.roaringbitmap.longlong.Roaring64Bitmap; @@ -258,6 +260,14 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead && !hasMessageAvailable(); } + @Override + public boolean applyDelayOperation(long ledgerId, long entryId, long deliverAt, DelayedOperationType opType) { + if (Objects.requireNonNull(opType) == DelayedOperationType.DELAY) { + return addMessage(ledgerId, entryId, deliverAt); + } + return false; + } + protected long nextDeliveryTime() { return delayedMessageMap.firstLongKey(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 08f3ae1fa6e8a..c1a390138c25e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -60,6 +60,7 @@ import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.roaringbitmap.RoaringBitmap; @@ -105,6 +106,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private CompletableFuture pendingLoad = null; + private final ConcurrentLongPairSet canceledMessages; + public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -137,6 +140,7 @@ public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumer bucketSnapshotStorage); this.stats = new BucketDelayedMessageIndexStats(); + this.canceledMessages = ConcurrentLongPairSet.newBuilder().autoShrink(true).build(); // Close the tracker if failed to recover. try { this.numberDelayedMessages = recoverBucketSnapshot(); @@ -206,8 +210,14 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), immutableBucket); for (DelayedIndex index : indexList) { - this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), - index.getEntryId()); + long ledgerId = index.getLedgerId(); + long entryId = index.getEntryId(); + if (index.hasDelayedOperationType() + && index.getDelayedOperationType() == DelayedIndex.DelayedOperationType.CANCEL) { + this.canceledMessages.add(ledgerId, entryId); + } else if (!canceledMessages.contains(ledgerId, entryId)) { + this.sharedBucketPriorityQueue.add(index.getTimestamp(), ledgerId, entryId); + } } } } @@ -315,7 +325,7 @@ private void afterCreateImmutableBucket(Pair immu immutableBucket.asyncUpdateSnapshotLength(); log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(), immutableBucket.bucketKey()); - + lastMutableBucket.clearCanceledOperations(); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, System.currentTimeMillis() - startTime); @@ -610,6 +620,11 @@ public synchronized NavigableSet getScheduledMessages(int maxMessages) long ledgerId = sharedBucketPriorityQueue.peekN2(); long entryId = sharedBucketPriorityQueue.peekN3(); + if (canceledMessages.contains(ledgerId, entryId)) { + sharedBucketPriorityQueue.pop(); + removeIndexBit(ledgerId, entryId); + continue; + } ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { @@ -710,6 +725,39 @@ public boolean shouldPauseAllDeliveries() { return false; } + public synchronized boolean applyDelayOperation(long ledgerId, long entryId, long deliverAt, + DelayedOperationType opType) { + switch (opType) { + case DELAY -> { + return addMessage(ledgerId, entryId, deliverAt); + } + case CANCEL -> { + return doCancelOperation(ledgerId, entryId, deliverAt); + } + default -> { + return false; + } + } + } + + private synchronized boolean doCancelOperation(long ledgerId, long entryId, long deliverAt) { + if (containsMessage(ledgerId, entryId)) { + removeIndexBit(ledgerId, entryId); + --numberDelayedMessages; + return true; + } + + if (deliverAt < 0 || deliverAt <= getCutoffTime()) { + return false; + } + + long cancelAheadTime = 2 * tickTimeMillis; + long cancelTime = Math.max(clock.millis(), deliverAt - cancelAheadTime); + + lastMutableBucket.addMessage(ledgerId, entryId, cancelTime, DelayedOperationType.CANCEL); + return true; + } + @Override public synchronized CompletableFuture clear() { CompletableFuture future = cleanImmutableBuckets(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedOperationType.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedOperationType.java new file mode 100644 index 0000000000000..242adea8cd4b1 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedOperationType.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed.bucket; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum DelayedOperationType { + DELAY(0), + CANCEL(1); + + final int value; + + DelayedOperationType(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + + public static DelayedOperationType valueOf(int value) { + return switch (value) { + case 0 -> DELAY; + case 1 -> CANCEL; + default -> throw new IllegalArgumentException("Invalid value for DelayedOperationType: " + value); + }; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index 1173a401a8903..7464547d247fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.delayed.proto.SnapshotSegment; import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.roaringbitmap.RoaringBitmap; @@ -42,11 +43,13 @@ class MutableBucket extends Bucket implements AutoCloseable { private final TripleLongPriorityQueue priorityQueue; + private final ConcurrentLongPairSet canceledOperations; MutableBucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer sequencer, BucketSnapshotStorage bucketSnapshotStorage) { super(dispatcherName, cursor, sequencer, bucketSnapshotStorage, -1L, -1L); this.priorityQueue = new TripleLongPriorityQueue(); + this.canceledOperations = ConcurrentLongPairSet.newBuilder().autoShrink(true).build(); } Pair sealBucketAndAsyncPersistent( @@ -97,6 +100,12 @@ Pair createImmutableBucketAndAsyncPersistent( final long ledgerId = delayedIndex.getLedgerId(); final long entryId = delayedIndex.getEntryId(); + if (canceledOperations.contains(delayedIndex.getLedgerId(), delayedIndex.getEntryId())) { + delayedIndex.setDelayedOperationType(DelayedIndex.DelayedOperationType.CANCEL); + } else { + delayedIndex.setDelayedOperationType(DelayedIndex.DelayedOperationType.DELAY); + } + removeIndexBit(ledgerId, entryId); checkArgument(ledgerId >= startLedgerId && ledgerId <= endLedgerId); @@ -189,7 +198,9 @@ void moveScheduledMessageToSharedQueue(long cutoffTime, TripleLongPriorityQueue long ledgerId = priorityQueue.peekN2(); long entryId = priorityQueue.peekN3(); - sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId); + if (!canceledOperations.contains(ledgerId, entryId)) { + sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId); + } priorityQueue.pop(); } @@ -234,4 +245,20 @@ void addMessage(long ledgerId, long entryId, long deliverAt) { this.endLedgerId = ledgerId; putIndexBit(ledgerId, entryId); } + + void addMessage(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType) { + switch (operationType) { + case CANCEL -> { + priorityQueue.add(deliverAt, ledgerId, entryId); + canceledOperations.add(ledgerId, entryId); + putIndexBit(ledgerId, entryId); + } + case DELAY -> addMessage(ledgerId, entryId, deliverAt); + default -> throw new IllegalArgumentException("Unknown operation type: " + operationType); + } + } + + void clearCanceledOperations() { + canceledOperations.clear(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 919b811f002c8..fb5c457fcc874 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -20,8 +20,6 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty; -import static org.apache.pulsar.common.naming.Constants.DELAY_CANCELED_MESSAGE_POSITION; -import static org.apache.pulsar.common.naming.Constants.IS_MARK_DELETE_DELAY_MESSAGE; import io.netty.buffer.ByteBuf; import io.prometheus.client.Gauge; import java.util.ArrayList; @@ -30,7 +28,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -49,12 +46,10 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; -import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; import org.apache.pulsar.compaction.Compactor; import org.checkerframework.checker.nullness.qual.Nullable; @@ -74,13 +69,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen private final LongAdder filterRejectedMsgs = new LongAdder(); private final LongAdder filterRescheduledMsgs = new LongAdder(); - protected final ConcurrentLongLongPairHashMap delayedMessageMarkDeleteMap; - protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) { super(subscription); this.serviceConfig = serviceConfig; this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled(); - this.delayedMessageMarkDeleteMap = ConcurrentLongLongPairHashMap.newBuilder().autoShrink(true).build(); } @@ -229,47 +221,6 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i entries.set(i, null); entry.release(); continue; - } else if (delayedMessageMarkDeleteMap.containsKey(entry.getLedgerId(), entry.getEntryId())) { - // The delayed message is marked for delete. - ConcurrentLongLongPairHashMap.LongPair markMessageId = delayedMessageMarkDeleteMap - .get(entry.getLedgerId(), entry.getEntryId()); - List deleteDelayedMessageList = new ArrayList<>(); - deleteDelayedMessageList.add(entry.getPosition()); - deleteDelayedMessageList.add(PositionFactory.create(markMessageId.first, markMessageId.second)); - - delayedMessageMarkDeleteMap.remove(entry.getLedgerId(), entry.getEntryId()); - individualAcknowledgeMessageIfNeeded(deleteDelayedMessageList, Collections.emptyMap()); - entries.set(i, null); - entry.release(); - continue; - } - - List propertiesList = msgMetadata.getPropertiesList(); - if (!propertiesList.isEmpty()) { - Map propertiesMap = propertiesList.stream() - .filter(p -> p.getKey().equals(DELAY_CANCELED_MESSAGE_POSITION) - || p.getKey().equals(IS_MARK_DELETE_DELAY_MESSAGE)) - .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, - (oldValue, newValue) -> newValue)); - - if (propertiesMap.containsKey(IS_MARK_DELETE_DELAY_MESSAGE)) { - if (propertiesMap.containsKey(DELAY_CANCELED_MESSAGE_POSITION)) { - String[] data = propertiesMap.get(DELAY_CANCELED_MESSAGE_POSITION).split(":"); - long ledgerId = Long.parseLong(data[0]); - long entryId = Long.parseLong(data[1]); - delayedMessageMarkDeleteMap.put(ledgerId, entryId, - entry.getLedgerId(), entry.getEntryId()); - entries.set(i, null); - entry.release(); - continue; - } else { - individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()), - Collections.emptyMap()); - entries.set(i, null); - entry.release(); - continue; - } - } } if (hasFilter) { diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto index a6ed30cfe8cd4..4f2b5af40c1ec 100644 --- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto +++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto @@ -24,9 +24,14 @@ option optimize_for = SPEED; option java_multiple_files = true; message DelayedIndex { + enum DelayedOperationType { + DELAY = 0; + CANCEL = 1; + } required uint64 timestamp = 1; required uint64 ledger_id = 2; required uint64 entry_id = 3; + optional DelayedOperationType delayed_operation_type = 4 [default = DELAY]; } message SnapshotSegment { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java index 944a35a779566..ab71f2a43e5d4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Constants.java @@ -24,8 +24,6 @@ public class Constants { public static final String GLOBAL_CLUSTER = "global"; - public static final String DELAY_CANCELED_MESSAGE_POSITION = "delayCanceledMsgPosition"; - public static final String IS_MARK_DELETE_DELAY_MESSAGE = "isMarkDeleteDelayMessage"; private Constants() {} }