Skip to content

Commit e517705

Browse files
authored
[fix][client] PIP-409 retry/dead letter topic producer config don't take effect. (#24071)
1 parent 2eff40f commit e517705

File tree

4 files changed

+38
-4
lines changed

4 files changed

+38
-4
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.testng.Assert.assertNull;
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
27+
2728
import java.time.Duration;
2829
import java.time.Instant;
2930
import java.util.ArrayList;
@@ -45,6 +46,9 @@
4546
import org.apache.pulsar.broker.BrokerTestUtil;
4647
import org.apache.pulsar.client.api.schema.GenericRecord;
4748
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
49+
import org.apache.pulsar.client.impl.ConsumerImpl;
50+
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
51+
import org.apache.pulsar.client.impl.ProducerImpl;
4852
import org.apache.pulsar.client.util.RetryMessageUtil;
4953
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5054
import org.awaitility.Awaitility;
@@ -1123,6 +1127,7 @@ public void testDeadLetterTopicWithProducerBuilder() throws Exception {
11231127
// enable batch
11241128
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
11251129
producerBuilder.enableBatching(true);
1130+
producerBuilder.enableChunking(false);
11261131
};
11271132
String subscriptionName = "my-subscription";
11281133
String subscriptionNameDLQ = "my-subscription-DLQ";
@@ -1183,6 +1188,23 @@ public void testDeadLetterTopicWithProducerBuilder() throws Exception {
11831188
} while (totalInDeadLetter < sendMessages);
11841189
assertTrue(messageContent.isEmpty());
11851190

1191+
// check the retry topic producer enable batch
1192+
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
1193+
for (ConsumerImpl<byte[]> consumerImpl : consumers) {
1194+
Producer<byte[]> retryProducer = consumerImpl.getRetryLetterProducer();
1195+
Producer<byte[]> deadLetterProducer = consumerImpl.getDeadLetterProducer();
1196+
// there are two type of consumers in MultiTopicsConsumerImpl when enableRetry is true
1197+
// 1. consumer subscribe to original topic
1198+
// 2. consumer subscribe to retry topic
1199+
if (consumerImpl.getTopic().endsWith(RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX)) {
1200+
assertTrue(((ProducerImpl<byte[]>) retryProducer).isBatchMessagingEnabled());
1201+
assertTrue(((ProducerImpl<byte[]>) deadLetterProducer).isBatchMessagingEnabled());
1202+
} else {
1203+
assertTrue(((ProducerImpl<byte[]>) retryProducer).isBatchMessagingEnabled());
1204+
assertNull(deadLetterProducer);
1205+
}
1206+
}
1207+
11861208
deadLetterConsumer.close();
11871209
consumer.close();
11881210

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void testRetryTopicWithProducerBuilder() throws Exception {
146146
// enable batch
147147
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
148148
producerBuilder.enableBatching(true);
149+
producerBuilder.enableChunking(false);
149150
};
150151
String subscriptionName = "my-subscription";
151152
String subscriptionNameDLQ = "my-subscription-DLQ";

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -400,20 +400,21 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
400400
topic, subscription))
401401
.build();
402402
}
403-
404403
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
405404
this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
406405
} else {
407406
this.deadLetterPolicy.setRetryLetterTopic(String.format(
408407
"%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX,
409408
topic, subscription));
410409
}
411-
412410
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) {
413411
this.deadLetterPolicy.setInitialSubscriptionName(
414412
conf.getDeadLetterPolicy().getInitialSubscriptionName());
415413
}
416-
414+
this.deadLetterPolicy.setRetryLetterProducerBuilderCustomizer(
415+
conf.getDeadLetterPolicy().getRetryLetterProducerBuilderCustomizer());
416+
this.deadLetterPolicy.setDeadLetterProducerBuilderCustomizer(
417+
conf.getDeadLetterPolicy().getDeadLetterProducerBuilderCustomizer());
417418
} else {
418419
deadLetterPolicy = null;
419420
possibleSendToDeadLetterTopicMessages = null;
@@ -3229,4 +3230,14 @@ enum SeekStatus {
32293230
IN_PROGRESS,
32303231
COMPLETED
32313232
}
3233+
3234+
@VisibleForTesting
3235+
public Producer<byte[]> getRetryLetterProducer() {
3236+
return (retryLetterProducer == null || !retryLetterProducer.isDone()) ? null : retryLetterProducer.join();
3237+
}
3238+
3239+
@VisibleForTesting
3240+
public Producer<byte[]> getDeadLetterProducer() throws ExecutionException, InterruptedException {
3241+
return (deadLetterProducer == null || !deadLetterProducer.isDone()) ? null : deadLetterProducer.get();
3242+
}
32323243
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ public ConnectionHandler getConnectionHandler() {
350350
return connectionHandler;
351351
}
352352

353-
private boolean isBatchMessagingEnabled() {
353+
public boolean isBatchMessagingEnabled() {
354354
return conf.isBatchingEnabled();
355355
}
356356

0 commit comments

Comments
 (0)