diff --git a/.gitignore b/.gitignore index 28a87c99..311ef313 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ doc/api/ .idea .DS_Store .directory +/test/issues/issue602/venv/ diff --git a/example/mqtt_browser_client.dart b/example/mqtt_browser_client.dart index 4daed38d..61ce2c2d 100644 --- a/example/mqtt_browser_client.dart +++ b/example/mqtt_browser_client.dart @@ -152,7 +152,7 @@ Future main() async { /// Ok, we will now sleep a while, in this gap you will see ping request/response /// messages being exchanged by the keep alive mechanism. print('EXAMPLE::Sleeping....'); - await MqttUtilities.asyncSleep(60); + await MqttUtilities.asyncSleep(20); /// Finally, unsubscribe and exit gracefully print('EXAMPLE::Unsubscribing'); diff --git a/lib/mqtt_client.dart b/lib/mqtt_client.dart index ec838035..1abdbe94 100644 --- a/lib/mqtt_client.dart +++ b/lib/mqtt_client.dart @@ -22,6 +22,8 @@ part 'src/mqtt_client_constants.dart'; part 'src/mqtt_client_protocol.dart'; +part 'src/mqtt_client_environment.dart'; + part 'src/mqtt_client_events.dart'; part 'src/exception/mqtt_client_client_identifier_exception.dart'; diff --git a/lib/src/mqtt_browser_client.dart b/lib/src/mqtt_browser_client.dart index 255fb9f6..8cfe0bdc 100644 --- a/lib/src/mqtt_browser_client.dart +++ b/lib/src/mqtt_browser_client.dart @@ -46,6 +46,8 @@ class MqttBrowserClient extends MqttClient { String? username, String? password, ]) async { + // A browser client + MqttClientEnvironment.isWebClient = true; instantiationCorrect = true; clientEventBus = events.EventBus(); clientEventBus?.on().listen( diff --git a/lib/src/mqtt_client_environment.dart b/lib/src/mqtt_client_environment.dart new file mode 100644 index 00000000..eaa88c19 --- /dev/null +++ b/lib/src/mqtt_client_environment.dart @@ -0,0 +1,14 @@ +/* + * Package : mqtt_client + * Author : S. Hamblett + * Date : 09/06/2025 + * Copyright : S.Hamblett + */ + +part of '../mqtt_client.dart'; + +/// Client Environment +class MqttClientEnvironment { + /// Browser or server client + static bool isWebClient = false; +} diff --git a/lib/src/mqtt_server_client.dart b/lib/src/mqtt_server_client.dart index c3ef8ba5..b986bb18 100644 --- a/lib/src/mqtt_server_client.dart +++ b/lib/src/mqtt_server_client.dart @@ -91,6 +91,8 @@ class MqttServerClient extends MqttClient { String? username, String? password, ]) async { + // A server client + MqttClientEnvironment.isWebClient = false; instantiationCorrect = true; clientEventBus = events.EventBus(); clientEventBus?.on().listen( diff --git a/lib/src/utility/mqtt_client_byte_buffer.dart b/lib/src/utility/mqtt_client_byte_buffer.dart index f364fca1..f154ae22 100644 --- a/lib/src/utility/mqtt_client_byte_buffer.dart +++ b/lib/src/utility/mqtt_client_byte_buffer.dart @@ -100,9 +100,18 @@ class MqttByteBuffer { 'length $length, count $count, position $_position, buffer $buffer', ); } - _position += count; - return typed.Uint8Buffer() - ..addAll(buffer!.getRange(_position - count, _position)); + if (MqttClientEnvironment.isWebClient) { + final tmp = typed.Uint8Buffer(); + tmp.addAll(buffer!.getRange(_position, _position + count)); + _position += count; + final tmp2 = typed.Uint8Buffer(); + tmp2.addAll(tmp); + return tmp2; + } else { + _position += count; + return typed.Uint8Buffer() + ..addAll(buffer!.getRange(_position - count, _position)); + } } /// Reads a sequence of bytes from the current diff --git a/test/issues/issue608.dart b/test/issues/issue608.dart new file mode 100644 index 00000000..26833cb5 --- /dev/null +++ b/test/issues/issue608.dart @@ -0,0 +1,118 @@ +import 'dart:async'; + +import 'package:mqtt_client/mqtt_browser_client.dart'; +import 'package:mqtt_client/mqtt_client.dart'; + +class MqttParams { + final url = 'ws://test.mosquitto.org'; + final port = 8080; + final keepAlivePeriod = 60; + final username = null; + final password = null; +} + +class MqttWebInitializer { + late MqttBrowserClient _client; + + void initClient({ + required MqttParams params, + String willTopic = 'willTopic', + String willMessage = 'will message', + }) { + _client = MqttBrowserClient(params.url, ''); + final clientId = 'Geppo-Web}'; + _client.logging(on: false); + _client.port = params.port; + _client.setProtocolV311(); + _client.websocketProtocols = MqttClientConstants.protocolsSingleDefault; + // Keep alive period (in secondi) a 0 = disabilitato + if (params.keepAlivePeriod > 0) { + _client.keepAlivePeriod = params.keepAlivePeriod; + } + _client.onDisconnected = onDisconnected; + _client.onConnected = onConnected; + _client.onSubscribed = onSubscribed; + _client.onUnsubscribed = onUnsubscribed; + + final MqttConnectMessage connectMessage = MqttConnectMessage(); + if (params.username != null && params.password != null) { + connectMessage.authenticateAs(params.username!, params.password!); + } + connectMessage + .withClientIdentifier(clientId) + .withWillTopic(willTopic) + .withWillMessage(willMessage) + .withWillQos(MqttQos.atLeastOnce) + .startClean(); + + connectMessage.withWillQos(MqttQos.exactlyOnce); + print('EXECUTING - MQTT CONNECT...'); + _client.connectionMessage = connectMessage; + } + + MqttClient get client => _client; + + Future connect([ + String? username, + String? password, + ]) async { + MqttClientConnectionStatus? status; + try { + print('EXECUTING - MQTT CONNECTING...'); + + if (username == null && password == null) { + status = await _client.connect(username, password); + } else { + status = await _client.connect(); + } + } catch (e) { + print('EXCEPTION - MQTT CONNECT: $e'); + } + return status; + } + + void disconnect() { + print('EXECUTING - MQTT DISCONNECTING...'); + _client.disconnect(); + } + + void onConnected() { + print('EXECUTING - MQTT CONNECTED'); + } + + void onDisconnected() { + print('EXECUTING - MQTT DISCONNECTED'); + } + + void onSubscribed(String topic) { + print('EXECUTING - MQTT SUBSCRIBED TO TOPIC: $topic'); + } + + void onUnsubscribed(String? topic) { + print('EXECUTING - MQTT UNSUBSCRIBED FROM TOPIC: $topic'); + } + + void subscribe(String topic, {MqttQos qos = MqttQos.exactlyOnce}) { + print('EXECUTING - MQTT SUBSCRIBING TO TOPIC: $topic'); + _client.subscribe(topic, qos); + } + + void unsubscribe(String topic) { + print('EXECUTING - MQTT UNSUBSCRIBED FROM TOPIC: $topic'); + _client.unsubscribe(topic); + } +} + +void main() async { + final browser = MqttWebInitializer(); + browser.initClient(params: MqttParams()); + print('EXECUTING - Connecting'); + await browser.connect(); + print('EXECUTING - Waiting before subscription'); + await Future.delayed(Duration(seconds: 5)); + browser.subscribe('my/dashboard/topic/chatList/#', qos: MqttQos.exactlyOnce); + print('EXECUTING - Subscribed - waiting for disconnect'); + await Future.delayed(Duration(seconds: 10)); + browser.unsubscribe('my/dashboard/topic/chatList/#'); + browser.disconnect(); +} diff --git a/test/manual/mqtt_client_mosquitto_ws_browser_test_manual.dart b/test/manual/mqtt_client_mosquitto_ws_browser_test_manual.dart index 5e240da6..0a984834 100644 --- a/test/manual/mqtt_client_mosquitto_ws_browser_test_manual.dart +++ b/test/manual/mqtt_client_mosquitto_ws_browser_test_manual.dart @@ -45,12 +45,14 @@ void main() { .withWillQos(MqttQos.atLeastOnce); client.connectionMessage = connMess; var ok = true; + expect(MqttClientEnvironment.isWebClient, isFalse); try { await client.connect(); var connectionOK = false; if (client.connectionStatus!.state == MqttConnectionState.connected) { print('Browser client connected locally'); connectionOK = true; + expect(MqttClientEnvironment.isWebClient, isTrue); } else { print( 'Browser client connection failed - disconnecting, status is ${client.connectionStatus}', diff --git a/test/manual/mqtt_client_mosquitto_wss_browser_test_manual.dart b/test/manual/mqtt_client_mosquitto_wss_browser_test_manual.dart index 6aefdfd2..7e2b7b3f 100644 --- a/test/manual/mqtt_client_mosquitto_wss_browser_test_manual.dart +++ b/test/manual/mqtt_client_mosquitto_wss_browser_test_manual.dart @@ -48,9 +48,11 @@ void main() { try { await client.connect(); var connectionOK = false; + expect(MqttClientEnvironment.isWebClient, isFalse); if (client.connectionStatus!.state == MqttConnectionState.connected) { print('Browser client connected locally'); connectionOK = true; + expect(MqttClientEnvironment.isWebClient, isTrue); } else { print( 'Browser client connection failed - disconnecting, status is ${client.connectionStatus}', diff --git a/test/mqtt_client_connection_unsecure_test.dart b/test/mqtt_client_connection_unsecure_test.dart index 79f7e01a..3c3d0e0b 100644 --- a/test/mqtt_client_connection_unsecure_test.dart +++ b/test/mqtt_client_connection_unsecure_test.dart @@ -365,11 +365,13 @@ void main() { socketTimeout: null, ); ch.onConnected = connectCb; + expect(MqttClientEnvironment.isWebClient, isFalse); final status = await ch.connect( mockBrokerAddress, mockBrokerPort, MqttConnectMessage().withClientIdentifier(testClientId), ); + expect(MqttClientEnvironment.isWebClient, isFalse); expect(ch.connectionStatus.state, MqttConnectionState.connected); expect( ch.connectionStatus.returnCode,