Skip to content

No ack sent on message redelivery after a reconnect #796

Description

@patrickdmw

🐛 Bug Report

Same as #526

When I
connect using the hivemq-mqtt-clients Mqtt5AsyncClient to a hivemq4 broker
with manual acknowledgement enabled
and I once do not ack an incoming message
the client receives no more messages after it,
after a reconnect the client receives the same initially not acked message again,
event if it is acked this time,
the client receives no more messages after it,
and after another reconnect the client receives the same initially not acked message again

🔬 How To Reproduce

Steps to reproduce the behavior:

  • Create an Mqtt5AsyncClient
  • Connect the client to a broker
  • Subscribe to a topic using qos(MqttQos.AT_LEAST_ONCE) and manualAcknowledgement(true)
  • Receive some messages
  • Do not ack one of them
  • No further messages are received
  • Disconnect the client from the HiveMQ CC portal
  • Client receives the first not acked message
  • Ack the message this time
  • Client does not receive more messages
  • Disconnect the client from the HiveMQ CC portal
  • Client receives the first not acked message
  • Ack the message again
  • repeat forever

Restarting the application does resolve the issue. Then the first not acked message is received again, but this time, after acknowledging it, the other messages are also received.

Code sample

https://github.com/patrickdmw/mqtt-transformer

public class BridgeListener {
    public void start() {
        var builder = MqttClient.builder()
                .useMqttVersion5()
                .identifier("bridge")
                .serverHost("localhost")
                .serverPort(1883)
                .automaticReconnectWithDefaultConfig();

        Mqtt5AsyncClient mqttClient = builder.buildAsync();

        mqttClient
                .publishes(MqttGlobalPublishFilter.REMAINING, this::handleMessage, true);

        mqttClient
                .connectWith()
                .cleanStart(false)
                .sessionExpiryInterval(sessionExpiryInterval)
                .send();

        LOG.info("Successfully connected to MQTT broker");

        subscribe(subscribeTopic);
    }

    private void subscribe(String topicFilter) {
        LOG.infof("Subscribing to MQTT topic: %s", topicFilter);
        mqttClient.subscribeWith()
                .topicFilter(topicFilter)
                .qos(MqttQos.AT_LEAST_ONCE)
                .callback(this::handleMessage)
                .manualAcknowledgement(true)
                .send()
                .whenComplete((subAck, ex) -> {
                    if (ex != null) {
                        LOG.errorf(ex, "Failed to subscribe to topic: %s", topicFilter);
                    } else {
                        LOG.infof("Successfully subscribed to topic: %s (reasonCodes=%s)", topicFilter, subAck.getReasonCodes());
                    }
                });
    }

    void handleMessage(Mqtt5Publish publish) {
        byte[] payload = publish.getPayloadAsBytes();
        String message = new String(payload, StandardCharsets.UTF_8);
        LOG.debugf("Received message: %s", message);
        try {
            boolean validate = true;
            if (validate && message.contains("failure")) {
                throw new IllegalArgumentException("Validation failed: " + "bad message");
            }
            publisher.publishMessage(payload);
            publish.acknowledge();
        } catch (Exception e) {
            LOG.errorf(e, "Failed to process message: %s", message);
        }
    }
}

Environment

Where are you running/using this client? Locally using ./gradlew quarkusDev

Hardware or Device? Developer machine

What version of this client are you using? 1.3.13

JVM version? 26.0.1

Operating System? Ubuntu 22.04

Which MQTT protocol version is being used?

Which MQTT broker (name and version)? hivemq/hivemq4:4.51.0

Screenshots

📈 Expected behavior

After a reconnect, the client should receive all queued messages, and be able to acknowledge them normally, then receive the next message.

📎 Additional context

Restarting the application does fix the issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions