Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions lib/src/messages/mqtt_client_mqtt_message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,13 @@ class MqttMessage {
// Pass the input stream sequentially through the component
// deserialization(create) methods to build a full MqttMessage.
header = MqttHeader.fromByteBuffer(messageStream);
//expected position after reading payload
final expectedPos = messageStream.position + header.messageSize;

if (messageStream.availableBytes < header.messageSize) {
messageStream.reset();
throw InvalidMessageException(
'Available bytes is less than the message size',
);
}
final message = MqttMessageFactory.getMessage(header, messageStream);

if (messageStream.position < expectedPos) {
messageStream.skipBytes = expectedPos - messageStream.position;
}

return message;
return MqttMessageFactory.getMessage(header, messageStream);
} on Exception catch (e, stack) {
Error.throwWithStackTrace(
InvalidMessageException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ class MqttPublishMessage extends MqttMessage {
sb.writeln(variableHeader.toString());
MqttLogger.logPayloads
? sb.writeln(payload.toString())
: sb.writeln(' ---> Payload logging is off <--- ');
: sb.writeln(
' ---> Payload logging is off, payload size is ${payload.message.length} bytes <--- ',
);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MqttPublishPayload extends MqttPayload {
// The length of the bytes is the length specified in the header,
// minus any bytes spent in the variable header.
final messageBytes = header!.messageSize - variableHeader!.length;
message = payloadStream.read(messageBytes);
message = payloadStream.readPayload(messageBytes);
}

/// Writes the payload to the supplied stream.
Expand Down
56 changes: 55 additions & 1 deletion lib/src/utility/mqtt_client_byte_buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ part of '../../mqtt_client.dart';
/// This class is in effect a cut-down implementation of the C# NET
/// System.IO class with Mqtt client specific extensions.
class MqttByteBuffer {
/// Large payload handling.
/// If count of bytes to read for a payload is larger than this then
/// large payload handling is invoked.
static const largePayload = 32767;

/// The underlying byte buffer
typed.Uint8Buffer? buffer;

Expand Down Expand Up @@ -50,7 +55,9 @@ class MqttByteBuffer {

/// Shrink the buffer
void shrink() {
buffer!.removeRange(0, _position);
_position < buffer!.length
? buffer!.removeRange(0, _position)
: buffer!.clear();
_position = 0;
}

Expand Down Expand Up @@ -98,6 +105,53 @@ class MqttByteBuffer {
..addAll(buffer!.getRange(_position - count, _position));
}

/// Reads a sequence of bytes from the current
/// buffer and advances the position within the buffer
/// by the number of bytes read.
///
/// Specifically intended for reading payload data from publish messages which can
/// be quite large.
typed.Uint8Buffer readPayload(int count) {
if ((length < count) || (_position + count) > length) {
throw Exception(
'mqtt_client::ByteBuffer::readPayload: The buffer does not have '
'enough bytes for the read operation '
'length $length, count $count, position $_position, buffer $buffer',
);
}
// If not a large payload use the normal buffer read method.
if (count <= largePayload) {
return read(count);
}
// See where the position is, if not 0 we can remove the range 0.._position
// as we know we are looking for a payload.
if (_position != 0) {
buffer!.removeRange(0, _position);
_position = 0;
}
// _position is now guaranteed to be 0 and at the start of the payload data.
// If the length of the buffer is equal to count then just return it.
final savedData = typed.Uint8Buffer();
if (buffer!.length == count) {
_position = buffer!.length;
return typed.Uint8Buffer()..addAll(buffer!);
} else {
// Trailing data, save it.
savedData.addAll(buffer!.getRange(_position + count, length).toList());
// Remove it, leaving just the payload
buffer!.removeRange(_position + count, length);
// Save the payload data
final tmp = typed.Uint8Buffer()..addAll(buffer!);
// Clear the buffer
buffer!.clear();
// Restore the trailing data and set the position to zero
buffer!.addAll(savedData);
_position = 0;
// Return the payload
return tmp;
}
}

/// Writes a byte to the current position in the buffer
/// and advances the position within the buffer by one byte.
void writeByte(int byte) {
Expand Down
2 changes: 1 addition & 1 deletion test/issues/issue602/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import 'package:mqtt_client/mqtt_server_client.dart';
void main() async {
final client = MqttServerClient('localhost', 'dart_client1111');
client.port = 1883;
client.logging(on: false);
client.logging(on: false, logPayloads: false);
final connMess = MqttConnectMessage();

client.connectionMessage = connMess;
Expand Down
55 changes: 54 additions & 1 deletion test/issues/issue602/timings.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,57 @@ Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Len

1-2025-06-04 11:06:12.966009 -- MqttServerConnection::_onData - message available event fired

---> From top to bottom this is taking 102.744ms to receive this complete message.
---> From top to bottom this is taking 102.744ms to receive this complete message.

---> Phase 2

---> Staring point

1-2025-06-06 09:34:42.843495 -- MqttConnection::onData
1-2025-06-06 09:34:42.844101 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-06 09:34:42.849468 -- MqttConnection::onData
1-2025-06-06 09:34:42.851880 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-06 09:34:42.856728 -- MqttConnection::onData
1-2025-06-06 09:34:42.860684 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-06 09:34:42.860917 -- MqttConnection::onData
1-2025-06-06 09:34:43.006837 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681327
Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14}
---> Payload logging is off, payload size is 9681313 bytes <---

1-2025-06-06 09:34:43.007226 -- MqttServerConnection::_onData - message available event fired
1-2025-06-06 09:34:43.007272 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish

---> Showing 145.920 ms publish message creation

---> Adding toList() on the getRange call

1-2025-06-06 09:39:53.065715 -- MqttConnection::onData
1-2025-06-06 09:39:53.066209 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-06 09:39:53.070209 -- MqttConnection::onData
1-2025-06-06 09:39:53.072963 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-06 09:39:53.076327 -- MqttConnection::onData
1-2025-06-06 09:39:53.251499 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681327
Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14}
---> Payload logging is off, payload size is 9681313 bytes <---

1-2025-06-06 09:39:53.251780 -- MqttServerConnection::_onData - message available event fired
1-2025-06-06 09:39:53.251822 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish

---> Gives 175.172ms, considerably worse.

---> New payload specific buffer code

1-2025-06-06 10:31:37.163618 -- MqttConnection::onData
1-2025-06-06 10:31:37.164452 -- MqttServerConnection::_ondata - message is not yet valid, waiting for more data ...
1-2025-06-06 10:31:37.165975 -- MqttConnection::onData
1-2025-06-06 10:31:37.225203 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publish
Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 9681329
Publish Variable Header: TopicName={station1/all}, MessageIdentifier={0}, VH Length={14}
---> Payload logging is off, payload size is 9681315 bytes <---
1-2025-06-06 10:31:37.225349 -- MqttServerConnection::_onData - message available event fired
1-2025-06-06 10:31:37.225421 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish

---> Sizes look correct, time is now 59.228ms

2 changes: 1 addition & 1 deletion test/mqtt_client_connection_unsecure_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ void main() {
print('Sleeping....');
await MqttUtilities.asyncSleep(2);
expect(
MqttLogger.testOutput.contains('---> Payload logging is off <---'),
MqttLogger.testOutput.contains('---> Payload logging is off'),
isTrue,
);
expect(!MqttLogger.testOutput.contains('hello'), isTrue);
Expand Down
93 changes: 93 additions & 0 deletions test/mqtt_client_message_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,91 @@ void main() {
}
expect(raised, isTrue);
});
test('Deserialisation - Large payload', () {
// Payload larger than large payload limit
final largePayload = List<int>.filled(32800, 0);
largePayload.first = 0xde;
largePayload.last = 0xad;
// Tests basic message deserialization from a raw byte array.
// Message Specs________________
// <30><0C><00><04>fred
final sampleMessage = <int>[
0x30,
0xA6,
0x80,
0x02,
0x00,
0x04,
'f'.codeUnitAt(0),
'r'.codeUnitAt(0),
'e'.codeUnitAt(0),
'd'.codeUnitAt(0),
];
sampleMessage.addAll(largePayload);
final buff = typed.Uint8Buffer();
buff.addAll(sampleMessage);
final byteBuffer = MqttByteBuffer(buff);
final baseMessage = MqttMessage.createFrom(byteBuffer);
// Check that the message was correctly identified as a publish message.
expect(baseMessage, const TypeMatcher<MqttPublishMessage>());
// Validate the message deserialization
expect(baseMessage.header!.duplicate, isFalse);
expect(baseMessage.header!.retain, isFalse);
expect(baseMessage.header!.qos, MqttQos.atMostOnce);
expect(baseMessage.header!.messageType, MqttMessageType.publish);
expect(baseMessage.header!.messageSize, 32806);
final pm = baseMessage as MqttPublishMessage;
// Check the payload
expect(pm.payload.message.length, 32800);
expect(pm.payload.message.first, 0xde);
expect(pm.payload.message.last, 0xad);
});
test('Deserialisation - Large payload - trailing bytes', () {
// Payload larger that large payload limit
final largePayload = List<int>.filled(32800, 0);
largePayload.first = 0xde;
largePayload.last = 0xad;
// Tests basic message deserialization from a raw byte array.
// Message Specs________________
// <30><0C><00><04>fred
final sampleMessage = <int>[
0x30,
0xA6,
0x80,
0x02,
0x00,
0x04,
'f'.codeUnitAt(0),
'r'.codeUnitAt(0),
'e'.codeUnitAt(0),
'd'.codeUnitAt(0),
];
sampleMessage.addAll(largePayload);
// Ping response
sampleMessage.addAll([0xD0, 0x0]);
final buff = typed.Uint8Buffer();
buff.addAll(sampleMessage);
final byteBuffer = MqttByteBuffer(buff);
final baseMessage = MqttMessage.createFrom(byteBuffer);
// Check that the message was correctly identified as a publish message.
expect(baseMessage, const TypeMatcher<MqttPublishMessage>());
// Validate the message deserialization
expect(baseMessage.header!.duplicate, isFalse);
expect(baseMessage.header!.retain, isFalse);
expect(baseMessage.header!.qos, MqttQos.atMostOnce);
expect(baseMessage.header!.messageType, MqttMessageType.publish);
expect(baseMessage.header!.messageSize, 32806);
final pm = baseMessage as MqttPublishMessage;
// Check the payload
expect(pm.payload.message.length, 32800);
expect(pm.payload.message.first, 0xde);
expect(pm.payload.message.last, 0xad);
byteBuffer.shrink();
expect(byteBuffer.length, 2);
final secondMessage = MqttMessage.createFrom(byteBuffer);
// Check that the message was correctly identified as a ping response.
expect(secondMessage, const TypeMatcher<MqttPingResponseMessage>());
});
test('Serialisation - Qos Level 2 Exactly Once', () {
final expected = <int>[
0x34,
Expand Down Expand Up @@ -1163,6 +1248,14 @@ void main() {
expect(actual[12], expected[12]); // o
expect(actual[13], expected[13]); // !
});
test('Serialisation - Large payload', () {
final payload = List<int>.filled(32800, 0);
final payloadBuff = typed.Uint8Buffer()..addAll(payload);
final msg = MqttPublishMessage().toTopic('fred').publishData(payloadBuff);
final actual = MessageSerializationHelper.getMessageBytes(msg);
expect(actual.length, 32810);
});

test('Serialisation - With non-default Qos', () {
final msg = MqttPublishMessage()
.toTopic('mark')
Expand Down