Skip to content

Commit dda9ee1

Browse files
authored
Fix MQTTProxyProtocolMethodProcessor processPingReq not respond bug (#1814)
1 parent 06b9763 commit dda9ee1

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.createMqtt5ConnectMessage;
1717
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.createMqttPublishMessage;
1818
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.createMqttSubscribeMessage;
19+
import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.pingResp;
1920
import com.google.common.collect.Lists;
2021
import io.netty.channel.ChannelHandlerContext;
2122
import io.netty.handler.codec.mqtt.MqttConnectMessage;
@@ -62,6 +63,7 @@
6263
import lombok.Getter;
6364
import lombok.extern.slf4j.Slf4j;
6465
import org.apache.commons.collections4.CollectionUtils;
66+
import org.apache.commons.collections4.MapUtils;
6567
import org.apache.commons.lang3.StringUtils;
6668
import org.apache.pulsar.broker.PulsarService;
6769
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -243,6 +245,10 @@ public void processPubAck(MqttAdapterMessage adapter) {
243245
@Override
244246
public void processPingReq(final MqttAdapterMessage msg) {
245247
String clientId = connection.getClientId();
248+
if (MapUtils.isEmpty(topicBrokers)) {
249+
connection.send(pingResp());
250+
return;
251+
}
246252
topicBrokers.values().forEach(adapterChannel -> {
247253
adapterChannel.thenAccept(channel -> {
248254
msg.setClientId(clientId);

0 commit comments

Comments
 (0)