Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags.ReservedUnsubAck")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to a 1.2.x.backward.excludes instead of 1.1.x.backward.excludes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbmpost could you make this change? We released 1.2.0 a week or 2 ago and now that diff is really with 1.2 as opposed to 1.1

Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ object ControlPacketFlags {
val ReservedPubRel = ControlPacketFlags(1 << 1)
val ReservedSubscribe = ControlPacketFlags(1 << 1)
val ReservedUnsubscribe = ControlPacketFlags(1 << 1)
val ReservedUnsubAck = ControlPacketFlags(1 << 1)
val DUP = ControlPacketFlags(1 << 3)
val QoSAtMostOnceDelivery = ControlPacketFlags(0)
val QoSAtLeastOnceDelivery = ControlPacketFlags(1 << 1)
Expand Down Expand Up @@ -456,7 +455,7 @@ final case class Unsubscribe @InternalApi private[streaming] (packetId: PacketId
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*/
final case class UnsubAck(packetId: PacketId)
extends ControlPacket(ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedUnsubAck)
extends ControlPacket(ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedGeneral)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* 3.12 PINGREQ – PING request
Expand Down Expand Up @@ -712,7 +711,7 @@ object MqttCodec {
v.topicFilters.foreach {
case (topicFilter, topicFilterFlags) =>
topicFilter.encode(packetBsb)
packetBsb.putByte(topicFilterFlags.underlying.toByte)
packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QoS encoding appears to be shifting in the wrong direction. According to MQTT v3.1.1 specification section 3.8.3.1, QoS values (0, 1, 2) should be encoded in bits 1-0 of the byte. Right-shifting by 1 would place QoS values in bits 2-1 instead. This should likely be a left shift by 1 or no shift at all depending on how topicFilterFlags.underlying represents the QoS value.

Suggested change
packetBsb.putByte((topicFilterFlags.underlying >> 1).toByte)
packetBsb.putByte(topicFilterFlags.underlying.toByte)

Copilot uses AI. Check for mistakes.
}
// Fixed header
(v: ControlPacket).encode(bsb, packetBsb.length)
Expand Down Expand Up @@ -819,7 +818,7 @@ object MqttCodec {
v.decodeSubAck(l)
case (ControlPacketType.UNSUBSCRIBE, ControlPacketFlags.ReservedUnsubscribe) =>
v.decodeUnsubscribe(l)
case (ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedUnsubAck) =>
case (ControlPacketType.UNSUBACK, ControlPacketFlags.ReservedGeneral) =>
v.decodeUnsubAck()
case (ControlPacketType.PINGREQ, ControlPacketFlags.ReservedGeneral) =>
Right(PingReq)
Expand Down Expand Up @@ -978,7 +977,7 @@ object MqttCodec {
: Vector[(Either[DecodeError, String], ControlPacketFlags)] =
if (remainingLen > 0) {
val packetLenAtTopicFilter = v.len
val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte & 0xFF))
val topicFilter = (v.decodeString(), ControlPacketFlags((v.getByte << 1) & 0xFF))
Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QoS decoding is left-shifting by 1 bit, but this appears inconsistent with typical MQTT QoS encoding where QoS values occupy the least significant bits (bits 1-0). If the encoding is right-shifting by 1, then decoding should also right-shift by 1 to extract the QoS from the correct bit positions. The shift directions in encode and decode operations should be inverse operations.

Suggested change
val topicFilter = (v.decodeString(), ControlPacketFlags((v.getByte << 1) & 0xFF))
val topicFilter = (v.decodeString(), ControlPacketFlags(v.getByte & 0x03))

Copilot uses AI. Check for mistakes.
decodeTopicFilters(remainingLen - (packetLenAtTopicFilter - v.len), topicFilters :+ topicFilter)
} else {
topicFilters
Expand Down