-
Notifications
You must be signed in to change notification settings - Fork 55
Open
Description
Describe the bug
When bridging another MQTT with MOP, I'm getting the following error.
2024-07-11T20:21:17,698+0000 [mqtt-redirect-io-46-5] ERROR io.streamnative.pulsar.handlers.mqtt.MQTTCommonInboundHandler - Exception was caught while processing MQTT message,
java.lang.IllegalStateException: MQTT is an unknown protocol name
at io.streamnative.pulsar.handlers.mqtt.utils.MqttMessageUtils.checkState(MqttMessageUtils.java:48) ~[3h-3vkPU0Yfb3TTJDBOvGQ/:?]
at io.streamnative.pulsar.handlers.mqtt.MQTTCommonInboundHandler.channelRead(MQTTCommonInboundHandler.java:62) ~[3h-3vkPU0Yfb3TTJDBOvGQ/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.streamnative.pulsar.handlers.mqtt.adapter.CombineHandler.channelRead(CombineHandler.java:31) ~[3h-3vkPU0Yfb3TTJDBOvGQ/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:349) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:535) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:366) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[io.netty-netty-handler-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
To Reproduce
Steps to reproduce the behavior:
- Configure MOP on Pulsar with following options;
messagingProtocols: mqtt
protocolHandlerDirectory: "/pulsar/protocols"
mqttListeners: "mqtt://127.0.0.1:1883"
mqttProxyEnabled: "true"
mqttProxyPort: "5682"
- Setup a new mosquitto instance locally (or via docker) with following config (updating the address of the MOP)
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
connection alroze
address MOP:1883
topic # both 0
- Publish a message on the mosquitto instance
Expected behavior
The messages that are synced between MOP and Mosquitto
Desktop (please complete the following information):
- MOP: Kubernetes v1.28.3
- Pulsar: v3.0.5
- Mosquitto: 2.0.18
Additional context
N/A
Metadata
Metadata
Assignees
Labels
No labels