Skip to content

Commit 302c1d5

Browse files
thetumbledlhotari
andauthored
[improve][client] PIP-409: support producer configuration for retry/dead letter topic producer (#24020)
Co-authored-by: Lari Hotari <[email protected]>
1 parent 3f45154 commit 302c1d5

File tree

8 files changed

+433
-20
lines changed

8 files changed

+433
-20
lines changed

pip/pip-409.md

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,90 @@ retry/dead letter topic.
3333

3434
## Design & Implementation Details
3535

36-
- Add two new configurations in `DeadLetterPolicy`:
36+
- Add `DeadLetterProducerBuilderContext` and `DeadLetterProducerBuilderCustomizer` interface:
37+
```java
38+
public interface DeadLetterProducerBuilderContext {
39+
/**
40+
* Returns the default name of topic for the dead letter or retry letter producer. This topic name is used
41+
* unless the DeadLetterProducerBuilderCustomizer overrides it.
42+
*
43+
* @return a {@code String} representing the input topic name
44+
*/
45+
String getDefaultTopicName();
46+
47+
/**
48+
* Returns the name of the input topic for which the dead letter or retry letter producer is being configured.
49+
*
50+
* @return a {@code String} representing the input topic name
51+
*/
52+
String getInputTopicName();
53+
54+
/**
55+
* Returns the name of the subscription for which the dead letter or retry letter producer is being configured.
56+
*
57+
* @return a {@code String} representing the subscription name
58+
*/
59+
String getInputTopicSubscriptionName();
60+
61+
/**
62+
* Returns the name of the consumer for which the dead letter or
63+
* retry letter producer is being configured.
64+
* @return a {@code String} representing the consumer name
65+
*/
66+
String getInputTopicConsumerName();
67+
}
68+
69+
public interface DeadLetterProducerBuilderCustomizer {
70+
/**
71+
* Customize the given producer builder with settings specific to the topic context provided.
72+
*
73+
* @param context the context containing information about the input topic and the subscription
74+
* @param producerBuilder the producer builder instance to be customized
75+
*/
76+
void customize(DeadLetterProducerBuilderContext context, ProducerBuilder<byte[]> producerBuilder);
77+
}
78+
```
79+
80+
- Add two fields in `DeadLetterPolicy`:
3781
```java
3882
public class DeadLetterPolicy implements Serializable {
39-
/**
40-
* Function to build the producer for the retry letter topic.
41-
* The input parameter is the topic name.
42-
*/
43-
private Function<String, ProducerBuilder<byte[]>> retryLetterProducerBuilder;
44-
45-
/**
46-
* Function to build the producer for the dead letter topic.
47-
* The input parameter is the topic name.
48-
*/
49-
private Function<String, ProducerBuilder<byte[]>> deadLetterProducerBuilder;
83+
/**
84+
* Customizer for configuring the producer builder for the retry letter topic.
85+
*
86+
* <p>This field holds a function that allows the caller to customize the producer builder
87+
* settings for the retry letter topic before the producer is created. The customization logic
88+
* can use the provided context (which includes input topic and subscription details) to adjust
89+
* configurations such as timeouts, batching, or message routing.
90+
*/
91+
private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer;
92+
/**
93+
* Customizer for configuring the producer builder for the dead letter topic.
94+
*
95+
* <p>This field holds a function that allows the caller to customize the producer builder
96+
* settings for the dead letter topic before the producer is created. Using the provided context,
97+
* implementations can perform specific adjustments that ensure the dead letter queue operates
98+
* with the appropriate configurations tailored for handling undeliverable messages.
99+
*/
100+
private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer;
50101
}
51102
```
52103

53-
- use the `retryLetterProducerBuilder` to build the producer for retry topic, and use the
54-
`deadLetterProducerBuilder` to build the producer for dead letter topic.
104+
- use the `DeadLetterProducerBuilderCustomizer` to customize the producer of retry/dead letter topic like this:
105+
```java
106+
// enable batch
107+
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
108+
producerBuilder.enableBatching(true);
109+
};
110+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
111+
.topic(topic)
112+
.subscriptionName(subscriptionName)
113+
.enableRetry(true)
114+
.deadLetterPolicy(DeadLetterPolicy.builder()
115+
.maxRedeliverCount(maxRedeliveryCount)
116+
.retryLetterProducerBuilderCustomizer(producerBuilderCustomizer)
117+
.build())
118+
.subscribe();
119+
```
55120

56121

57122
# Backward & Forward Compatibility

pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,94 @@ public void testDeadLetterTopicWithInitialSubscription() throws Exception {
11141114
consumer.close();
11151115
}
11161116

1117+
@Test()
1118+
public void testDeadLetterTopicWithProducerBuilder() throws Exception {
1119+
final String topic = "persistent://my-property/my-ns/dead-letter-topic-with-producer-builder";
1120+
final int maxRedeliveryCount = 2;
1121+
final int sendMessages = 100;
1122+
1123+
// enable batch
1124+
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
1125+
producerBuilder.enableBatching(true);
1126+
};
1127+
String subscriptionName = "my-subscription";
1128+
String subscriptionNameDLQ = "my-subscription-DLQ";
1129+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
1130+
.topic(topic)
1131+
.subscriptionName(subscriptionName)
1132+
.subscriptionType(SubscriptionType.Shared)
1133+
.enableRetry(true)
1134+
.deadLetterPolicy(DeadLetterPolicy.builder()
1135+
.maxRedeliverCount(maxRedeliveryCount)
1136+
.initialSubscriptionName(subscriptionNameDLQ)
1137+
.deadLetterProducerBuilderCustomizer(producerBuilderCustomizer)
1138+
.retryLetterProducerBuilderCustomizer(producerBuilderCustomizer)
1139+
.build())
1140+
.receiverQueueSize(100)
1141+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1142+
.subscribe();
1143+
1144+
@Cleanup
1145+
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
1146+
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
1147+
.topic(topic + "-" + subscriptionName + "-DLQ")
1148+
.subscriptionName(subscriptionNameDLQ)
1149+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1150+
.subscribe();
1151+
1152+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
1153+
.topic(topic)
1154+
.enableBatching(true)
1155+
.create();
1156+
1157+
Map<Integer, String> messageContent = new HashMap<>();
1158+
for (int i = 0; i < sendMessages; i++) {
1159+
String data = String.format("Hello Pulsar [%d]", i);
1160+
producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send();
1161+
messageContent.put(i, data);
1162+
}
1163+
producer.close();
1164+
1165+
int totalReceived = 0;
1166+
do {
1167+
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
1168+
assertNotNull(message, "The consumer should be able to receive messages.");
1169+
log.info("consumer received message : {}", message.getMessageId());
1170+
totalReceived++;
1171+
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
1172+
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
1173+
1174+
int totalInDeadLetter = 0;
1175+
do {
1176+
Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS);
1177+
assertNotNull(message, "the deadLetterConsumer should receive messages.");
1178+
assertEquals(new String(message.getData()), messageContent.get(Integer.parseInt(message.getKey())));
1179+
messageContent.remove(Integer.parseInt(message.getKey()));
1180+
log.info("dead letter consumer received message : {}", message.getMessageId());
1181+
deadLetterConsumer.acknowledge(message);
1182+
totalInDeadLetter++;
1183+
} while (totalInDeadLetter < sendMessages);
1184+
assertTrue(messageContent.isEmpty());
1185+
1186+
deadLetterConsumer.close();
1187+
consumer.close();
1188+
1189+
Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
1190+
.topic(topic)
1191+
.subscriptionName("my-subscription")
1192+
.subscriptionType(SubscriptionType.Shared)
1193+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1194+
.subscribe();
1195+
1196+
Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
1197+
if (checkMessage != null) {
1198+
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
1199+
}
1200+
assertNull(checkMessage);
1201+
1202+
checkConsumer.close();
1203+
}
1204+
11171205
private CompletableFuture<Void> consumerReceiveForDLQ(Consumer<byte[]> consumer, AtomicInteger totalReceived,
11181206
int sendMessages, int maxRedeliveryCount) {
11191207
return CompletableFuture.runAsync(() -> {

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,83 @@ public void testRetryTopic() throws Exception {
137137
checkConsumer.close();
138138
}
139139

140+
@Test
141+
public void testRetryTopicWithProducerBuilder() throws Exception {
142+
final String topic = "persistent://my-property/my-ns/retry-topic-with-producer-builder";
143+
final int maxRedeliveryCount = 2;
144+
final int sendMessages = 100;
145+
146+
// enable batch
147+
DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> {
148+
producerBuilder.enableBatching(true);
149+
};
150+
String subscriptionName = "my-subscription";
151+
String subscriptionNameDLQ = "my-subscription-DLQ";
152+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
153+
.topic(topic)
154+
.subscriptionName(subscriptionName)
155+
.subscriptionType(SubscriptionType.Shared)
156+
.enableRetry(true)
157+
.deadLetterPolicy(DeadLetterPolicy.builder()
158+
.maxRedeliverCount(maxRedeliveryCount)
159+
.retryLetterProducerBuilderCustomizer(producerBuilderCustomizer)
160+
.build())
161+
.receiverQueueSize(100)
162+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
163+
.subscribe();
164+
165+
@Cleanup
166+
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
167+
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
168+
.topic(topic + "-" + subscriptionName + "-DLQ")
169+
.subscriptionName(subscriptionNameDLQ)
170+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
171+
.subscribe();
172+
173+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
174+
.topic(topic)
175+
.create();
176+
177+
for (int i = 0; i < sendMessages; i++) {
178+
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
179+
}
180+
producer.close();
181+
182+
int totalReceived = 0;
183+
do {
184+
Message<byte[]> message = consumer.receive();
185+
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
186+
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
187+
totalReceived++;
188+
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
189+
190+
int totalInDeadLetter = 0;
191+
do {
192+
Message<byte[]> message = deadLetterConsumer.receive();
193+
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
194+
deadLetterConsumer.acknowledge(message);
195+
totalInDeadLetter++;
196+
} while (totalInDeadLetter < sendMessages);
197+
198+
deadLetterConsumer.close();
199+
consumer.close();
200+
201+
Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
202+
.topic(topic)
203+
.subscriptionName(subscriptionName)
204+
.subscriptionType(SubscriptionType.Shared)
205+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
206+
.subscribe();
207+
208+
Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
209+
if (checkMessage != null) {
210+
log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
211+
}
212+
assertNull(checkMessage);
213+
214+
checkConsumer.close();
215+
}
216+
140217
/**
141218
* Retry topic feature relies on the delay queue feature when consumer produce a delayed message
142219
* to the retry topic. The delay queue feature is only supported in shared and key-shared subscription type.

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,24 @@ public class DeadLetterPolicy implements Serializable {
6262
* to be created.
6363
*/
6464
private String initialSubscriptionName;
65+
66+
/**
67+
* Customizer for configuring the producer builder for the retry letter topic.
68+
*
69+
* <p>This field holds a function that allows the caller to customize the producer builder
70+
* settings for the retry letter topic before the producer is created. The customization logic
71+
* can use the provided context (which includes input topic and subscription details) to adjust
72+
* configurations such as timeouts, batching, or message routing.
73+
*/
74+
private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer;
75+
76+
/**
77+
* Customizer for configuring the producer builder for the dead letter topic.
78+
*
79+
* <p>This field holds a function that allows the caller to customize the producer builder
80+
* settings for the dead letter topic before the producer is created. Using the provided context,
81+
* implementations can perform specific adjustments that ensure the dead letter queue operates
82+
* with the appropriate configurations tailored for handling undeliverable messages.
83+
*/
84+
private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer;
6585
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
/**
22+
* Provides context information required for customizing a producer builder.
23+
*
24+
* <p>This interface supplies relevant details such as the name of the input topic and associated subscription name.
25+
* This contextual information helps in correctly configuring the producer for the appropriate topic.
26+
*/
27+
public interface DeadLetterProducerBuilderContext {
28+
/**
29+
* Returns the default name of topic for the dead letter or retry letter producer. This topic name is used
30+
* unless the ProducerBuilderCustomizer overrides it.
31+
*
32+
* @return a {@code String} representing the input topic name
33+
*/
34+
String getDefaultTopicName();
35+
36+
/**
37+
* Returns the name of the input topic for which the dead letter or retry letter producer is being configured.
38+
*
39+
* @return a {@code String} representing the input topic name
40+
*/
41+
String getInputTopicName();
42+
43+
/**
44+
* Returns the name of the subscription for which the dead letter or retry letter producer is being configured.
45+
*
46+
* @return a {@code String} representing the subscription name
47+
*/
48+
String getInputTopicSubscriptionName();
49+
50+
/**
51+
* Returns the name of the consumer for which the dead letter or
52+
* retry letter producer is being configured.
53+
* @return a {@code String} representing the consumer name
54+
*/
55+
String getInputTopicConsumerName();
56+
}
57+
58+

0 commit comments

Comments
 (0)