Skip to content

Commit 20e0384

Browse files
committed
fix
1 parent 1e4eced commit 20e0384

File tree

1 file changed

+6
-13
lines changed

1 file changed

+6
-13
lines changed

Diff for: pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java

+6-13
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,10 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception
262262
String subName = "test-sub";
263263

264264
admin.topics().createNonPartitionedTopic(topicName);
265+
@Cleanup
265266
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
266267

268+
@Cleanup
267269
Consumer<byte[]> consumer = pulsarClient.newConsumer()
268270
.topic(topicName)
269271
.subscriptionName(subName)
@@ -274,23 +276,14 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception
274276
String message = "my-message-" + i;
275277
producer.send(message.getBytes());
276278
}
277-
producer.close();
278279

279280
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10);
280281

281282
// 2. receive the message
282-
Thread t = new Thread(() -> {
283-
while (true) {
284-
Message<byte[]> msg;
285-
try {
286-
msg = consumer.receive();
287-
consumer.acknowledge(msg);
288-
} catch (PulsarClientException e) {
289-
throw new RuntimeException(e);
290-
}
291-
}
292-
});
293-
t.start();
283+
for (int i = 0; i < 10; i++) {
284+
Message<byte[]> msg = consumer.receive();
285+
consumer.acknowledge(msg);
286+
}
294287

295288
// 3. consumed all messages and the msgBacklog is 0
296289
Awaitility.await().untilAsserted(() ->

0 commit comments

Comments
 (0)