Skip to content

Conversation

@sbmpost
Copy link

@sbmpost sbmpost commented Sep 2, 2025

case (topicFilter, topicFilterFlags) =>
topicFilter.encode(packetBsb)
packetBsb.putByte(topicFilterFlags.underlying.toByte)
packetBsb.putByte(topicFilterFlags.underlying.toByte >> 1)
Copy link
Author

@sbmpost sbmpost Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning
Copy link
Member

This is a general purpose repo. Few if any of the Pekko team are MQTT experts.
It would really help if

  • the PR was written in plain English and not a set of links to barely decipherable specs
  • the PR included test coverage

@pjfanning
Copy link
Member

pjfanning commented Sep 2, 2025

PR code does not even compile.

[error] /home/runner/work/pekko-connectors/pekko-connectors/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:458:74: value ReservedUnsubAck is not a member of object org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags
[error]     extends ControlPacket(ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedUnsubAck)
[error]                                                                          ^
[error] /home/runner/work/pekko-connectors/pekko-connectors/mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/model.scala:714:64: type mismatch;
[error]  found   : Int
[error]  required: Byte
[error]           packetBsb.putByte(topicFilterFlags.underlying.toByte >> 1)

@sbmpost
Copy link
Author

sbmpost commented Sep 2, 2025

@pjfanning

Compilation should have been fixed now.

For more context see:
akka/alpakka#2731
akka/alpakka#2963

This was already reported by me years ago but never made it into pekko.

case (topicFilter, topicFilterFlags) =>
topicFilter.encode(packetBsb)
packetBsb.putByte(topicFilterFlags.underlying.toByte)
packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*/
final case class UnsubAck(packetId: PacketId)
extends ControlPacket(ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedUnsubAck)
extends ControlPacket(ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedGeneral)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning
Copy link
Member

pjfanning commented Sep 2, 2025

@pjfanning

Compilation should have been fixed now.

For more context see: akka/alpakka#2731 akka/alpakka#2963

This was already reported by me years ago but never made it into pekko.

For licensing reasons, we cannot accept code based on Alpakka commits.
The code changes will only be accepted if they were written by the person submitting the PR. If you wrote the Alpakka PRs with no assistance from the Akka team, we may be able to accept them but we cannot accept anything based on code submitted to Akpakka otherwise.

@sbmpost
Copy link
Author

sbmpost commented Sep 2, 2025

@pjfanning
Link: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
shows the reserved bits for UnsubAck and PingReq:

UNSUBACK | Reserved | 0 | 0 | 0 | 0
PINGREQ | Reserved | 0 | 0 | 0 | 0

Both the same value. However the code as is, uses UnsubAckReserved for UnsubAck and ReservedGeneral for PingReq which doesn't make sense. When I created issue akka/alpakka#2963, I pointed out exactly how and where in the code to fix this. So if there is any copyright involved, it would be mine and I am sharing it now with Pekko.

Link: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349311
shows the lower two bits of the Subscribe message are to be used for QoS service: 0 0 0 0 0 0 q q
The code however assumes the ControlPacketFlags for QoS are: 0 0 0 0 0 q q 0
which is correct for the Publish message, but not for the Subscribe message

For this one there is issue akka/alpakka#2731, in which I explained how the bit shift is required for the Subscribe message. The Alpakka developers decided to fix this differently, so I am quite sure this fix is OK.

@sbmpost
Copy link
Author

sbmpost commented Sep 2, 2025

@pjfanning
I see it failed on binary compatibility checks this time...
Will fix this as well...

@sbmpost
Copy link
Author

sbmpost commented Sep 2, 2025

@pjfanning

You were right about the tests. They are failing. I know how to fix them, but it is not going to be as clean as I would like it to be. I am going to take some time to think about a proper solution. I suppose we can leave this PR open for now?

build.sbt Outdated
},
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[Problem]("*.impl.*"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.ReservedUnsubAck"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have a look in the repo? We tend to use files in src/mima-filters to add the excludes - a new file with each PR that breaks bin compat.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the repo, but couldn't find a file for mqtt-streaming folder. Should I create a new one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning I created the file.

@sbmpost
Copy link
Author

sbmpost commented Sep 2, 2025

@pjfanning

PR should be good now. Instead of introducing new QoS flags for the Subscribe message, I thought it was best to make it work with the existing QoS flags (see below). In this way existing code shouldn't need any modifications. I suppose someone will want to have a critical look at this PR before it is merged. I totally understand if this is going to take a while.

val QoSAtMostOnceDelivery = ControlPacketFlags(0)
val QoSAtLeastOnceDelivery = ControlPacketFlags(1 << 1)
val QoSExactlyOnceDelivery = ControlPacketFlags(2 << 1)
val QoSReserved = ControlPacketFlags(3 << 1)

@pjfanning pjfanning added this to the 2.0.0 milestone Sep 17, 2025
@pjfanning
Copy link
Member

@sbmpost would it be feasible to add test coverage?

@pjfanning
Copy link
Member

Also as a matter of interest, have you tried the mqtt v5 support that is in pekko-connectors 1.2.0? The release should be out this week but we have snapshot versions and the 1.2.0 candidate is in a staging area for testing.

# specific language governing permissions and limitations
# under the License.

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.ReservedUnsubAck")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to a 1.2.x.backward.excludes instead of 1.1.x.backward.excludes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbmpost could you make this change? We released 1.2.0 a week or 2 ago and now that diff is really with 1.2 as opposed to 1.1

@sbmpost
Copy link
Author

sbmpost commented Sep 18, 2025

@pjfanning

Test coverage could be nice indeed. For this to make sense we would need to test against a mqtt broker that is known to be strict on following the specification. In fact I found out about these bugs because I was connecting with IoT core from AWS, which didn't work as expected. To summarize: yes test coverage would be a good idea, but it requires a (independent) broker to test against. I am not sure if this is feasible, but I am open to suggestions of course.

@sbmpost
Copy link
Author

sbmpost commented Sep 18, 2025

@pjfanning

As for mqtt 5: For my work we rely on mqtt 3. Part of my job is to make sure everything runs stable and if there is a problem, I do a deep dive deep to find the root cause. Here is another example of this:

akka/alpakka#2905

In conclusion: I learned quite a bit about mqtt 3 out of necessity but I am not so much involved with mqtt 5.

@pjfanning
Copy link
Member

pjfanning commented Oct 2, 2025

@He-Pin @mdedetrich @raboof what do you think? Should we merge this as is or do we need to spend time beefing up the mqtt testing? I suspect that there really isn't going to be someone who has the expertise and bandwidth to improve the test situation and that we are likely just going to have to accept our contributors' assertions.

@sbmpost
Copy link
Author

sbmpost commented Oct 5, 2025

@pjfanning

I think I would like to test with AWS IoT just once more, to be absolutely sure. I will post the results when I am done.

@He-Pin He-Pin requested a review from Copilot October 10, 2025 03:25
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request fixes two MQTT protocol compliance issues related to QoS encoding in Subscribe packets and flag handling in UnsubAck packets, ensuring adherence to the MQTT v3.1.1 specification.

  • Removes the unused ReservedUnsubAck flag and replaces it with the standard ReservedGeneral flag
  • Corrects Subscribe QoS encoding by right-shifting the QoS value by 1 bit during encoding
  • Fixes Subscribe QoS decoding by left-shifting the QoS value by 1 bit during decoding

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
model.scala Removes ReservedUnsubAck flag, updates UnsubAck to use ReservedGeneral, and fixes QoS bit manipulation in Subscribe encode/decode
mqtt-streaming.backwards.excludes Adds MiMa exclusion for the removed ReservedUnsubAck method to maintain binary compatibility

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

case (topicFilter, topicFilterFlags) =>
topicFilter.encode(packetBsb)
packetBsb.putByte(topicFilterFlags.underlying.toByte)
packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)
Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QoS encoding appears to be shifting in the wrong direction. According to MQTT v3.1.1 specification section 3.8.3.1, QoS values (0, 1, 2) should be encoded in bits 1-0 of the byte. Right-shifting by 1 would place QoS values in bits 2-1 instead. This should likely be a left shift by 1 or no shift at all depending on how topicFilterFlags.underlying represents the QoS value.

Suggested change
packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)
packetBsb.putByte(topicFilterFlags.underlying.toByte)

Copilot uses AI. Check for mistakes.
if (remainingLen > 0) {
val packetLenAtTopicFilter = v.len
val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte & 0xFF))
val topicFilter = (v.decodeString(), ControlPacketFlags((v.getByte << 1) & 0xFF))
Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QoS decoding is left-shifting by 1 bit, but this appears inconsistent with typical MQTT QoS encoding where QoS values occupy the least significant bits (bits 1-0). If the encoding is right-shifting by 1, then decoding should also right-shift by 1 to extract the QoS from the correct bit positions. The shift directions in encode and decode operations should be inverse operations.

Suggested change
val topicFilter = (v.decodeString(), ControlPacketFlags((v.getByte << 1) & 0xFF))
val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte & 0x03))

Copilot uses AI. Check for mistakes.
@sbmpost
Copy link
Author

sbmpost commented Oct 13, 2025

@pjfanning

I have tested with AWS IoT. The results are below.

Subscribing with QoSAtLeastOnceDelivery without the encoding fix gives "Upstream finished":

[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Subscribe(PacketId(1),Vector((22574/lwt,ControlPacketFlags(2)), (13182/lwt,ControlPacketFlags(2)), (67782/lwt,ControlPacketFlags(2)), (13185/lwt,ControlPacketFlags(2)), (114855/lwt,ControlPacketFlags(2)), (13187/lwt,ControlPacketFlags(2)))))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Upstream finished."}

With QoS encoding fix we get a "SubAck":

[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Subscribe(PacketId(1),Vector((22574/lwt,ControlPacketFlags(2)), (13182/lwt,ControlPacketFlags(2)), (67782/lwt,ControlPacketFlags(2)), (13185/lwt,ControlPacketFlags(2)), (114855/lwt,ControlPacketFlags(2)), (13187/lwt,ControlPacketFlags(2)))))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Element: Right(SubAck(PacketId(1),Vector(ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1), ControlPacketFlags(1))))"}

And for an Unsubscribe without the decode fix we get "UnknownPacketType":

[info] {"level":"DEBUG","origin":"MqttSource","sc":{},"title":"MQTT: sending unsubscribe command: 22574/lwt"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Unsubscribe(PacketId(1),Vector(22574/lwt)))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Element: Left(UnknownPacketType(ControlPacketType(11),ControlPacketFlags(0)))"}

However after fixing the decode we see a proper "UnsubAck":

[info] {"level":"DEBUG","origin":"MqttSource","sc":{},"title":"MQTT: sending unsubscribe command: 22574/lwt"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-commandFlow] Element: Right(Unsubscribe(PacketId(1),Vector(22574/lwt)))"}
[info] {"level":"DEBUG","origin":"org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.ActorMqttClientSession","sc":{},"title":"[client-events] Element: Right(UnsubAck(PacketId(1)))"}

Ah, and about the AI review... that review is painfully incorrect:

The QoS encoding appears to be shifting in the wrong direction. According to MQTT v3.1.1 specification section 3.8.3.1, QoS values (0, 1, 2) should be encoded in bits 1-0 of the byte. Right-shifting by 1 would place QoS values in bits 2-1 instead

The AI doesn't take into account that the Pekko QoS constants have been defined as (0, 2, 4). Also it thinks that shifting to the right means that bits are moving to the left.

If the encoding is right-shifting by 1, then decoding should also right-shift by 1 to extract the QoS from the correct bit positions. The shift directions in encode and decode operations should be inverse operations.

Euhm... what? Since when is the inverse of a right-shift another right-shift? The lower 2 bits of a Subscribe message encode the QoS which is one of the values (0, 1, 2). Upon receiving such a message, shifting those bits to the left gives us back the Pekko QoS constants (0, 2, 4).

@mdedetrich
Copy link
Contributor

@pjfanning

Test coverage could be nice indeed. For this to make sense we would need to test against a mqtt broker that is known to be strict on following the specification. In fact I found out about these bugs because I was connecting with IoT core from AWS, which didn't work as expected. To summarize: yes test coverage would be a good idea, but it requires a (independent) broker to test against. I am not sure if this is feasible, but I am open to suggestions of course.

So one option we can do is that like with the S3 connector, we can use the ASF infra provided AWS account and then we can designate some tests to be run against AWS IoT (these tests would only run on main and not in PR's but its better than nothing).

@pjfanning thoughts?

@pjfanning
Copy link
Member

@mdedetrich I'm happy enough with testing against AWS. This is not a requirement for this PR though - that is, I wouldn't expect @sbmpost to write the tests because it would be easier for someone who had access to the AWS account that we test with to be able to set up any resources that we need for testing.

@mdedetrich
Copy link
Contributor

mdedetrich commented Oct 24, 2025

@mdedetrich I'm happy enough with testing against AWS. This is not a requirement for this PR though - that is, I wouldn't expect @sbmpost to write the tests because it would be easier for someone who had access to the AWS account that we test with to be able to set up any resources that we need for testing.

Oh agreed, @sbmpost should make sure that these PR changes work as expected on the AWS account that he is testing against and then we can split out the test suites in the same way its done with s3 so that we can run extra tests against a provided AWS test account from ASF infra.

I have made an issue for this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants