diff --git a/lib/src/messages/mqtt_client_mqtt_message.dart b/lib/src/messages/mqtt_client_mqtt_message.dart index 36b1e03..b1d954e 100644 --- a/lib/src/messages/mqtt_client_mqtt_message.dart +++ b/lib/src/messages/mqtt_client_mqtt_message.dart @@ -40,22 +40,13 @@ class MqttMessage { // Pass the input stream sequentially through the component // deserialization(create) methods to build a full MqttMessage. header = MqttHeader.fromByteBuffer(messageStream); - //expected position after reading payload - final expectedPos = messageStream.position + header.messageSize; - if (messageStream.availableBytes < header.messageSize) { messageStream.reset(); throw InvalidMessageException( 'Available bytes is less than the message size', ); } - final message = MqttMessageFactory.getMessage(header, messageStream); - - if (messageStream.position < expectedPos) { - messageStream.skipBytes = expectedPos - messageStream.position; - } - - return message; + return MqttMessageFactory.getMessage(header, messageStream); } on Exception catch (e, stack) { Error.throwWithStackTrace( InvalidMessageException( diff --git a/lib/src/messages/publish/mqtt_client_mqtt_publish_message.dart b/lib/src/messages/publish/mqtt_client_mqtt_publish_message.dart index 6d87786..15ae54e 100644 --- a/lib/src/messages/publish/mqtt_client_mqtt_publish_message.dart +++ b/lib/src/messages/publish/mqtt_client_mqtt_publish_message.dart @@ -101,7 +101,9 @@ class MqttPublishMessage extends MqttMessage { sb.writeln(variableHeader.toString()); MqttLogger.logPayloads ? sb.writeln(payload.toString()) - : sb.writeln(' ---> Payload logging is off <--- '); + : sb.writeln( + ' ---> Payload logging is off, payload size is ${payload.message.length} bytes <--- ', + ); return sb.toString(); } } diff --git a/lib/src/messages/publish/mqtt_client_mqtt_publish_payload.dart b/lib/src/messages/publish/mqtt_client_mqtt_publish_payload.dart index 36e1e6d..60dbd41 100644 --- a/lib/src/messages/publish/mqtt_client_mqtt_publish_payload.dart +++ b/lib/src/messages/publish/mqtt_client_mqtt_publish_payload.dart @@ -40,7 +40,7 @@ class MqttPublishPayload extends MqttPayload { // The length of the bytes is the length specified in the header, // minus any bytes spent in the variable header. final messageBytes = header!.messageSize - variableHeader!.length; - message = payloadStream.read(messageBytes); + message = payloadStream.readPayload(messageBytes); } /// Writes the payload to the supplied stream. diff --git a/lib/src/utility/mqtt_client_byte_buffer.dart b/lib/src/utility/mqtt_client_byte_buffer.dart index 4f6c425..f364fca 100644 --- a/lib/src/utility/mqtt_client_byte_buffer.dart +++ b/lib/src/utility/mqtt_client_byte_buffer.dart @@ -11,6 +11,11 @@ part of '../../mqtt_client.dart'; /// This class is in effect a cut-down implementation of the C# NET /// System.IO class with Mqtt client specific extensions. class MqttByteBuffer { + /// Large payload handling. + /// If count of bytes to read for a payload is larger than this then + /// large payload handling is invoked. + static const largePayload = 32767; + /// The underlying byte buffer typed.Uint8Buffer? buffer; @@ -50,7 +55,9 @@ class MqttByteBuffer { /// Shrink the buffer void shrink() { - buffer!.removeRange(0, _position); + _position < buffer!.length + ? buffer!.removeRange(0, _position) + : buffer!.clear(); _position = 0; } @@ -98,6 +105,53 @@ class MqttByteBuffer { ..addAll(buffer!.getRange(_position - count, _position)); } + /// Reads a sequence of bytes from the current + /// buffer and advances the position within the buffer + /// by the number of bytes read. + /// + /// Specifically intended for reading payload data from publish messages which can + /// be quite large. + typed.Uint8Buffer readPayload(int count) { + if ((length < count) || (_position + count) > length) { + throw Exception( + 'mqtt_client::ByteBuffer::readPayload: The buffer does not have ' + 'enough bytes for the read operation ' + 'length $length, count $count, position $_position, buffer $buffer', + ); + } + // If not a large payload use the normal buffer read method. + if (count <= largePayload) { + return read(count); + } + // See where the position is, if not 0 we can remove the range 0.._position + // as we know we are looking for a payload. + if (_position != 0) { + buffer!.removeRange(0, _position); + _position = 0; + } + // _position is now guaranteed to be 0 and at the start of the payload data. + // If the length of the buffer is equal to count then just return it. + final savedData = typed.Uint8Buffer(); + if (buffer!.length == count) { + _position = buffer!.length; + return typed.Uint8Buffer()..addAll(buffer!); + } else { + // Trailing data, save it. + savedData.addAll(buffer!.getRange(_position + count, length).toList()); + // Remove it, leaving just the payload + buffer!.removeRange(_position + count, length); + // Save the payload data + final tmp = typed.Uint8Buffer()..addAll(buffer!); + // Clear the buffer + buffer!.clear(); + // Restore the trailing data and set the position to zero + buffer!.addAll(savedData); + _position = 0; + // Return the payload + return tmp; + } + } + /// Writes a byte to the current position in the buffer /// and advances the position within the buffer by one byte. void writeByte(int byte) { diff --git a/test/issues/issue602/main.dart b/test/issues/issue602/main.dart index a6b5b29..20fdf5e 100644 --- a/test/issues/issue602/main.dart +++ b/test/issues/issue602/main.dart @@ -6,7 +6,7 @@ import 'package:mqtt_client/mqtt_server_client.dart'; void main() async { final client = MqttServerClient('localhost', 'dart_client1111'); client.port = 1883; - client.logging(on: false); + client.logging(on: false, logPayloads: false); final connMess = MqttConnectMessage(); client.connectionMessage = connMess; diff --git a/test/issues/issue602/timings.txt b/test/issues/issue602/timings.txt index 5bd2b05..77fabb6 100644 --- a/test/issues/issue602/timings.txt +++ b/test/issues/issue602/timings.txt @@ -29,4 +29,57 @@ Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Len 1-2025-06-04 11:06:12.966009 -- MqttServerConnection::_onData - message available event fired - ---> From top to bottom this is taking 102.744ms to receive this complete message. \ No newline at end of file + ---> From top to bottom this is taking 102.744ms to receive this complete message. + + ---> Phase 2 + + ---> Staring point + + 1-2025-06-06 09:34:42.843495 -- MqttConnection::onData + 1-2025-06-06 09:34:42.844101 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ... + 1-2025-06-06 09:34:42.849468 -- MqttConnection::onData + 1-2025-06-06 09:34:42.851880 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ... + 1-2025-06-06 09:34:42.856728 -- MqttConnection::onData + 1-2025-06-06 09:34:42.860684 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ... + 1-2025-06-06 09:34:42.860917 -- MqttConnection::onData + 1-2025-06-06 09:34:43.006837 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish + Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681327 + Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14} + ---> Payload logging is off, payload size is 9681313 bytes <--- + + 1-2025-06-06 09:34:43.007226 -- MqttServerConnection::_onData - message available event fired + 1-2025-06-06 09:34:43.007272 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish + +---> Showing 145.920 ms publish message creation + +---> Adding toList() on the getRange call + +1-2025-06-06 09:39:53.065715 -- MqttConnection::onData +1-2025-06-06 09:39:53.066209 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ... +1-2025-06-06 09:39:53.070209 -- MqttConnection::onData +1-2025-06-06 09:39:53.072963 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ... +1-2025-06-06 09:39:53.076327 -- MqttConnection::onData +1-2025-06-06 09:39:53.251499 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish +Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681327 +Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14} + ---> Payload logging is off, payload size is 9681313 bytes <--- + +1-2025-06-06 09:39:53.251780 -- MqttServerConnection::_onData - message available event fired +1-2025-06-06 09:39:53.251822 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish + +---> Gives 175.172ms, considerably worse. + +---> New payload specific buffer code + +1-2025-06-06 10:31:37.163618 -- MqttConnection::onData +1-2025-06-06 10:31:37.164452 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ... +1-2025-06-06 10:31:37.165975 -- MqttConnection::onData +1-2025-06-06 10:31:37.225203 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish +Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681329 +Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14} + ---> Payload logging is off, payload size is 9681315 bytes <--- +1-2025-06-06 10:31:37.225349 -- MqttServerConnection::_onData - message available event fired +1-2025-06-06 10:31:37.225421 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish + +---> Sizes look correct, time is now 59.228ms + diff --git a/test/mqtt_client_connection_unsecure_test.dart b/test/mqtt_client_connection_unsecure_test.dart index 4987866..79f7e01 100644 --- a/test/mqtt_client_connection_unsecure_test.dart +++ b/test/mqtt_client_connection_unsecure_test.dart @@ -587,7 +587,7 @@ void main() { print('Sleeping....'); await MqttUtilities.asyncSleep(2); expect( - MqttLogger.testOutput.contains('---> Payload logging is off <---'), + MqttLogger.testOutput.contains('---> Payload logging is off'), isTrue, ); expect(!MqttLogger.testOutput.contains('hello'), isTrue); diff --git a/test/mqtt_client_message_test.dart b/test/mqtt_client_message_test.dart index 26ba545..0308fc9 100644 --- a/test/mqtt_client_message_test.dart +++ b/test/mqtt_client_message_test.dart @@ -1022,6 +1022,91 @@ void main() { } expect(raised, isTrue); }); + test('Deserialisation - Large payload', () { + // Payload larger than large payload limit + final largePayload = List.filled(32800, 0); + largePayload.first = 0xde; + largePayload.last = 0xad; + // Tests basic message deserialization from a raw byte array. + // Message Specs________________ + // <30><0C><00><04>fred + final sampleMessage = [ + 0x30, + 0xA6, + 0x80, + 0x02, + 0x00, + 0x04, + 'f'.codeUnitAt(0), + 'r'.codeUnitAt(0), + 'e'.codeUnitAt(0), + 'd'.codeUnitAt(0), + ]; + sampleMessage.addAll(largePayload); + final buff = typed.Uint8Buffer(); + buff.addAll(sampleMessage); + final byteBuffer = MqttByteBuffer(buff); + final baseMessage = MqttMessage.createFrom(byteBuffer); + // Check that the message was correctly identified as a publish message. + expect(baseMessage, const TypeMatcher()); + // Validate the message deserialization + expect(baseMessage.header!.duplicate, isFalse); + expect(baseMessage.header!.retain, isFalse); + expect(baseMessage.header!.qos, MqttQos.atMostOnce); + expect(baseMessage.header!.messageType, MqttMessageType.publish); + expect(baseMessage.header!.messageSize, 32806); + final pm = baseMessage as MqttPublishMessage; + // Check the payload + expect(pm.payload.message.length, 32800); + expect(pm.payload.message.first, 0xde); + expect(pm.payload.message.last, 0xad); + }); + test('Deserialisation - Large payload - trailing bytes', () { + // Payload larger that large payload limit + final largePayload = List.filled(32800, 0); + largePayload.first = 0xde; + largePayload.last = 0xad; + // Tests basic message deserialization from a raw byte array. + // Message Specs________________ + // <30><0C><00><04>fred + final sampleMessage = [ + 0x30, + 0xA6, + 0x80, + 0x02, + 0x00, + 0x04, + 'f'.codeUnitAt(0), + 'r'.codeUnitAt(0), + 'e'.codeUnitAt(0), + 'd'.codeUnitAt(0), + ]; + sampleMessage.addAll(largePayload); + // Ping response + sampleMessage.addAll([0xD0, 0x0]); + final buff = typed.Uint8Buffer(); + buff.addAll(sampleMessage); + final byteBuffer = MqttByteBuffer(buff); + final baseMessage = MqttMessage.createFrom(byteBuffer); + // Check that the message was correctly identified as a publish message. + expect(baseMessage, const TypeMatcher()); + // Validate the message deserialization + expect(baseMessage.header!.duplicate, isFalse); + expect(baseMessage.header!.retain, isFalse); + expect(baseMessage.header!.qos, MqttQos.atMostOnce); + expect(baseMessage.header!.messageType, MqttMessageType.publish); + expect(baseMessage.header!.messageSize, 32806); + final pm = baseMessage as MqttPublishMessage; + // Check the payload + expect(pm.payload.message.length, 32800); + expect(pm.payload.message.first, 0xde); + expect(pm.payload.message.last, 0xad); + byteBuffer.shrink(); + expect(byteBuffer.length, 2); + final secondMessage = MqttMessage.createFrom(byteBuffer); + // Check that the message was correctly identified as a ping response. + expect(secondMessage, const TypeMatcher()); + }); test('Serialisation - Qos Level 2 Exactly Once', () { final expected = [ 0x34, @@ -1163,6 +1248,14 @@ void main() { expect(actual[12], expected[12]); // o expect(actual[13], expected[13]); // ! }); + test('Serialisation - Large payload', () { + final payload = List.filled(32800, 0); + final payloadBuff = typed.Uint8Buffer()..addAll(payload); + final msg = MqttPublishMessage().toTopic('fred').publishData(payloadBuff); + final actual = MessageSerializationHelper.getMessageBytes(msg); + expect(actual.length, 32810); + }); + test('Serialisation - With non-default Qos', () { final msg = MqttPublishMessage() .toTopic('mark')