Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
3.3.1
3.0.5
Minimal reproduce step
- Execute the following java program:
package org.example;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
public class Main {
public static void main(String[] args) throws PulsarClientException, InterruptedException {
AtomicBoolean running = new AtomicBoolean(true);
String topic = "tbce0";
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:61723")
.build();
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
String message = "Hello, Pulsar!";
for (int i = 0; i < 1000; i++) {
producer.send(message.getBytes());
}
System.out.println("Sent messages: " + message);
producer.close();
Thread.ofVirtual().start(() -> {
try {
Consumer<byte[]> consumer = client.newConsumer()
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("bce-subscription-0")
.subscriptionMode(SubscriptionMode.NonDurable)
//.readCompacted(true)
.subscribe();
int counter = 1;
while (running.get()) {
Message<byte[]> receivedMessage = consumer.receive();
System.out.println(
"Received " + counter++ + " message: " + new String(receivedMessage.getData()));
consumer.acknowledge(receivedMessage);
}
consumer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Exiting...");
running.set(false);
try {
client.close();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}));
Thread.sleep(600_000);
}
}
- Wait for the results:
...
Received 993 message: Hello, Pulsar!
Received 994 message: Hello, Pulsar!
Received 995 message: Hello, Pulsar!
Received 996 message: Hello, Pulsar!
Received 997 message: Hello, Pulsar!
Received 998 message: Hello, Pulsar!
Received 999 message: Hello, Pulsar!
Received 1000 message: Hello, Pulsar!
- Run the following script (if on Kubernetes):
TOPIC=tbce0
kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats $TOPIC | grep msgBacklog
kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics unload $TOPIC
kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats $TOPIC | grep msgBacklog
What did you expect to see?
The cursor should be set to the end of the topic, backlog should be 0.
Before unloading
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
After unloading
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
What did you see instead?
Backlog is not empty:
Before unloading
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
After unloading
"msgBacklog" : 1,
"msgBacklogNoDelayed" : 1,
Anything else?
- You may want to increase the messages retention to not loose it between check, nevertheless it's not mandatory to reproduce
- Unloading is just the easiest way to reproduce, broker restart may cause the same
Are you willing to submit a PR?
- I'm willing to submit a PR!