Description
What happened?
This bug is quite a pickle and is something I've been looking at for a couple hours now.
Non retained messages (of every QOS) are not forwarded by MqttIO.Read after restoring to a checkpoint after a specific error.
This error being a StackOverflowError in org.fusesource.mqtt.client
.
The cause of the error I have found, namely being a message shortage received from the MQTT Broker. When the method call connection.receive(1, TimeUnit.SECONDS)
is made it expects to receive a Message in the next second. After this message isn't attained it will result in the creation of a promise with a callback and this promise being added to a queue in org.fusesource.mqtt.client.FutureConnection
. When you do receive a message one of two possibilities happen: Either the receive is handled before the await is done, which will result in receive returning the Message, or the receive is handled after the await is done, which will result in the message being added to the receive buffer for the next time a receive request is done. However this "putback" method wants to clear the promises that are made before it is able to finally add the message to the buffer. This promise then calls the callback which calls the "putback" method which results in the StackOverflowError if the queue contains around 600 Promise entries. With 1 Promise being added per second, a StackOverflowError will be achieved on receiving a Message after a stale time of +/- 10 minutes.
This error is mostly because of an implementation in the client of fusesource, but is something to be made aware of. Seeing as the last commits on their github repo were more than 5 years ago and issues aren't handled currently.
However this wouldn't be a big issue if it hadn't caused something else to happen. Namely that after the checkpoint is restored the messages that aren't retained on the broker are not being received. The cause of this I haven't found out, but seeing that a new instance of MQTT
is being made for every start the only things that can be shared are the static variables, with the blockingThreadPool
(and therefore blockingExecutor
) being the only one.
I'll have a further look to see if I can pin point the exact place where it's going wrong. Updates will be posted in this thread.
Additional info about the setup that I'm running:
Broker - Mosquitto (Standard Protocol - TCP)
Beam version - 2.55.0 (Not the most up to date, but the code that causes this error should be the same)
Runner - Flink (But likely problem for all runners)
MQTT Client version - 1.15 (Both on 2.55 and current)
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner