Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
Pulsar: master
Minimal reproduce step
@Test
public void testUnackedMessages() throws Exception {
String topicName = "testUnackedMessages";
String subscriptionName = "my-sub";
String serviceUrl = "pulsar://localhost:6650";
@Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
@Cleanup Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Exclusive).messageListener((c, message) -> {
// noop
}).subscribe();
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.send("1".getBytes());
TopicStats stats = admin.topics().getStats(topicName);
assertThat(stats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isNotEqualTo(0);
}
What did you expect to see?
unackedMessages is not equal to 0.
What did you see instead?
unackedMessages is equal to 0.
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!