Skip to content

Commit 8303b96

Browse files
authored
[fix][client] Copy eventTime to retry letter topic and DLQ messages (#24059)
1 parent 0cc266d commit 8303b96

File tree

4 files changed

+75
-1
lines changed

4 files changed

+75
-1
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java

+62
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
2727
import java.time.Duration;
28+
import java.time.Instant;
2829
import java.util.ArrayList;
2930
import java.util.HashMap;
3031
import java.util.HashSet;
@@ -261,6 +262,67 @@ public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
261262
consumer.close();
262263
}
263264

265+
@Test
266+
public void testDeadLetterTopicMessagesWithEventTime() throws Exception {
267+
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
268+
269+
final int maxRedeliveryCount = 1;
270+
271+
final int sendMessages = 100;
272+
273+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
274+
.topic(topic)
275+
.subscriptionName("my-subscription")
276+
.subscriptionType(SubscriptionType.Shared)
277+
.ackTimeout(1, TimeUnit.SECONDS)
278+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
279+
.receiverQueueSize(100)
280+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
281+
.subscribe();
282+
283+
@Cleanup
284+
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
285+
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
286+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
287+
.subscriptionName("my-subscription")
288+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
289+
.subscribe();
290+
291+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
292+
.topic(topic)
293+
.create();
294+
295+
long testEventTime = Instant.now().toEpochMilli();
296+
for (int i = 0; i < sendMessages; i++) {
297+
producer.newMessage()
298+
.eventTime(testEventTime)
299+
.value(String.format("Hello Pulsar, eventTime: [%d]", testEventTime).getBytes())
300+
.send();
301+
}
302+
303+
producer.close();
304+
305+
int totalReceived = 0;
306+
do {
307+
Message<byte[]> message = consumer.receive();
308+
log.info("consumer received message : {} {}", message.getMessageId(),
309+
new String(message.getData()));
310+
totalReceived++;
311+
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
312+
313+
int totalInDeadLetter = 0;
314+
do {
315+
Message<byte[]> message = deadLetterConsumer.receive();
316+
assertEquals(message.getEventTime(), testEventTime);
317+
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
318+
deadLetterConsumer.acknowledge(message);
319+
totalInDeadLetter++;
320+
} while (totalInDeadLetter < sendMessages);
321+
322+
deadLetterConsumer.close();
323+
consumer.close();
324+
}
325+
264326
public void testDeadLetterTopicWithProducerName() throws Exception {
265327
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
266328
final String subscription = "my-subscription";

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.testng.Assert.assertNull;
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
27+
import java.time.Instant;
2728
import java.util.HashMap;
2829
import java.util.HashSet;
2930
import java.util.List;
@@ -300,6 +301,7 @@ public void testRetryTopicProperties() throws Exception {
300301

301302
byte[] key = "key".getBytes();
302303
byte[] orderingKey = "orderingKey".getBytes();
304+
long eventTime = Instant.now().toEpochMilli();
303305

304306
final int maxRedeliveryCount = 3;
305307

@@ -333,6 +335,7 @@ public void testRetryTopicProperties() throws Exception {
333335
.value(String.format("Hello Pulsar [%d]", i).getBytes())
334336
.keyBytes(key)
335337
.orderingKey(orderingKey)
338+
.eventTime(eventTime)
336339
.send();
337340
originMessageIds.add(msgId.toString());
338341
}
@@ -350,6 +353,7 @@ public void testRetryTopicProperties() throws Exception {
350353
assertEquals(message.getKeyBytes(), key);
351354
assertTrue(message.hasOrderingKey());
352355
assertEquals(message.getOrderingKey(), orderingKey);
356+
assertEquals(message.getEventTime(), eventTime);
353357
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
354358
}
355359
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
@@ -373,6 +377,7 @@ public void testRetryTopicProperties() throws Exception {
373377
assertEquals(message.getKeyBytes(), key);
374378
assertTrue(message.hasOrderingKey());
375379
assertEquals(message.getOrderingKey(), orderingKey);
380+
assertEquals(message.getEventTime(), eventTime);
376381
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
377382
}
378383
deadLetterConsumer.acknowledge(message);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+8
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
717717
.value(retryMessage.getData())
718718
.properties(propertiesMap);
719719
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
720+
copyMessageEventTime(message, typedMessageBuilderNew);
720721
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
721722
consumerDlqMessagesCounter.increment();
722723

@@ -749,6 +750,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
749750
typedMessageBuilderNew.deliverAfter(delayTime, unit);
750751
}
751752
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
753+
copyMessageEventTime(message, typedMessageBuilderNew);
752754
typedMessageBuilderNew.sendAsync()
753755
.thenCompose(
754756
__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
@@ -824,6 +826,11 @@ private MessageImpl<?> getMessageImpl(Message<?> message) {
824826
return null;
825827
}
826828

829+
private static void copyMessageEventTime(Message<?> message,
830+
TypedMessageBuilder<byte[]> typedMessageBuilderNew) {
831+
typedMessageBuilderNew.eventTime(message.getEventTime());
832+
}
833+
827834
@Override
828835
public void negativeAcknowledge(MessageId messageId) {
829836
consumerNacksCounter.increment();
@@ -2242,6 +2249,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
22422249
.value(message.getData())
22432250
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
22442251
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
2252+
copyMessageEventTime(message, typedMessageBuilderNew);
22452253
typedMessageBuilderNew.sendAsync()
22462254
.thenAccept(messageIdInDLQ -> {
22472255
possibleSendToDeadLetterTopicMessages.remove(messageId);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java

-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ public TypedMessageBuilder<T> properties(Map<String, String> properties) {
186186

187187
@Override
188188
public TypedMessageBuilder<T> eventTime(long timestamp) {
189-
checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
190189
msgMetadata.setEventTime(timestamp);
191190
return this;
192191
}

0 commit comments

Comments
 (0)