-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Hi,
I'm one of the Apache Pulsar developers and I was interested to see how Pulsar client is used in this code base.
I spotted a potential problem: Messages are acknowledged without taking message sending result into account.
Sending happens asynchronously here:
transitdata-hfp-deduplicator/src/main/java/fi/hsl/transitdata/hfp/Deduplicator.java
Lines 122 to 126 in 89510c4
| .sendAsync() | |
| .exceptionally(t -> { | |
| log.error("Failed to send Pulsar message", t); | |
| return null; | |
| }) .thenRun(() -> {}); |
Acknowledgement is done without taking message sending result into account:
transitdata-hfp-deduplicator/src/main/java/fi/hsl/transitdata/hfp/Deduplicator.java
Line 59 in 89510c4
| ack(received.getMessageId()); |
One possible solution is to ack the message in the thenRun callback in sendPulsarMessage. Detected duplicates could be acked immediately.