Skip to content

Commit ad5b14c

Browse files
SantanuKar43mukesh-ctds
authored andcommitted
[fix][client] Copy eventTime to retry letter topic and DLQ messages (apache#24059)
(cherry picked from commit 8303b96) (cherry picked from commit 9e0b720)
1 parent a087c6e commit ad5b14c

File tree

4 files changed

+75
-1
lines changed

4 files changed

+75
-1
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java

+62
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
2727
import java.time.Duration;
28+
import java.time.Instant;
2829
import java.util.ArrayList;
2930
import java.util.HashMap;
3031
import java.util.HashSet;
@@ -260,6 +261,67 @@ public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
260261
consumer.close();
261262
}
262263

264+
@Test
265+
public void testDeadLetterTopicMessagesWithEventTime() throws Exception {
266+
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
267+
268+
final int maxRedeliveryCount = 1;
269+
270+
final int sendMessages = 100;
271+
272+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
273+
.topic(topic)
274+
.subscriptionName("my-subscription")
275+
.subscriptionType(SubscriptionType.Shared)
276+
.ackTimeout(1, TimeUnit.SECONDS)
277+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
278+
.receiverQueueSize(100)
279+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
280+
.subscribe();
281+
282+
@Cleanup
283+
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
284+
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
285+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
286+
.subscriptionName("my-subscription")
287+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
288+
.subscribe();
289+
290+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
291+
.topic(topic)
292+
.create();
293+
294+
long testEventTime = Instant.now().toEpochMilli();
295+
for (int i = 0; i < sendMessages; i++) {
296+
producer.newMessage()
297+
.eventTime(testEventTime)
298+
.value(String.format("Hello Pulsar, eventTime: [%d]", testEventTime).getBytes())
299+
.send();
300+
}
301+
302+
producer.close();
303+
304+
int totalReceived = 0;
305+
do {
306+
Message<byte[]> message = consumer.receive();
307+
log.info("consumer received message : {} {}", message.getMessageId(),
308+
new String(message.getData()));
309+
totalReceived++;
310+
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
311+
312+
int totalInDeadLetter = 0;
313+
do {
314+
Message<byte[]> message = deadLetterConsumer.receive();
315+
assertEquals(message.getEventTime(), testEventTime);
316+
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
317+
deadLetterConsumer.acknowledge(message);
318+
totalInDeadLetter++;
319+
} while (totalInDeadLetter < sendMessages);
320+
321+
deadLetterConsumer.close();
322+
consumer.close();
323+
}
324+
263325
@DataProvider(name = "produceLargeMessages")
264326
public Object[][] produceLargeMessages() {
265327
return new Object[][] { { false }, { true } };

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.testng.Assert.assertNull;
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
27+
import java.time.Instant;
2728
import java.util.HashMap;
2829
import java.util.HashSet;
2930
import java.util.List;
@@ -300,6 +301,7 @@ public void testRetryTopicProperties() throws Exception {
300301

301302
byte[] key = "key".getBytes();
302303
byte[] orderingKey = "orderingKey".getBytes();
304+
long eventTime = Instant.now().toEpochMilli();
303305

304306
final int maxRedeliveryCount = 3;
305307

@@ -333,6 +335,7 @@ public void testRetryTopicProperties() throws Exception {
333335
.value(String.format("Hello Pulsar [%d]", i).getBytes())
334336
.keyBytes(key)
335337
.orderingKey(orderingKey)
338+
.eventTime(eventTime)
336339
.send();
337340
originMessageIds.add(msgId.toString());
338341
}
@@ -350,6 +353,7 @@ public void testRetryTopicProperties() throws Exception {
350353
assertEquals(message.getKeyBytes(), key);
351354
assertTrue(message.hasOrderingKey());
352355
assertEquals(message.getOrderingKey(), orderingKey);
356+
assertEquals(message.getEventTime(), eventTime);
353357
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
354358
}
355359
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
@@ -373,6 +377,7 @@ public void testRetryTopicProperties() throws Exception {
373377
assertEquals(message.getKeyBytes(), key);
374378
assertTrue(message.hasOrderingKey());
375379
assertEquals(message.getOrderingKey(), orderingKey);
380+
assertEquals(message.getEventTime(), eventTime);
376381
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
377382
}
378383
deadLetterConsumer.acknowledge(message);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+8
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
673673
.value(retryMessage.getData())
674674
.properties(propertiesMap);
675675
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
676+
copyMessageEventTime(message, typedMessageBuilderNew);
676677
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
677678
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
678679
result.complete(null);
@@ -703,6 +704,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
703704
typedMessageBuilderNew.deliverAfter(delayTime, unit);
704705
}
705706
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
707+
copyMessageEventTime(message, typedMessageBuilderNew);
706708
typedMessageBuilderNew.sendAsync()
707709
.thenCompose(
708710
__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
@@ -777,6 +779,11 @@ private MessageImpl<?> getMessageImpl(Message<?> message) {
777779
return null;
778780
}
779781

782+
private static void copyMessageEventTime(Message<?> message,
783+
TypedMessageBuilder<byte[]> typedMessageBuilderNew) {
784+
typedMessageBuilderNew.eventTime(message.getEventTime());
785+
}
786+
780787
@Override
781788
public void negativeAcknowledge(MessageId messageId) {
782789
negativeAcksTracker.add(messageId);
@@ -2164,6 +2171,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
21642171
.value(message.getData())
21652172
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
21662173
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
2174+
copyMessageEventTime(message, typedMessageBuilderNew);
21672175
typedMessageBuilderNew.sendAsync()
21682176
.thenAccept(messageIdInDLQ -> {
21692177
possibleSendToDeadLetterTopicMessages.remove(messageId);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java

-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ public TypedMessageBuilder<T> properties(Map<String, String> properties) {
186186

187187
@Override
188188
public TypedMessageBuilder<T> eventTime(long timestamp) {
189-
checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
190189
msgMetadata.setEventTime(timestamp);
191190
return this;
192191
}

0 commit comments

Comments
 (0)