Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ doc/api/
.idea
.DS_Store
.directory
/test/issues/issue602/venv/
2 changes: 1 addition & 1 deletion example/mqtt_browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Future<int> main() async {
/// Ok, we will now sleep a while, in this gap you will see ping request/response
/// messages being exchanged by the keep alive mechanism.
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
await MqttUtilities.asyncSleep(20);

/// Finally, unsubscribe and exit gracefully
print('EXAMPLE::Unsubscribing');
Expand Down
2 changes: 2 additions & 0 deletions lib/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ part 'src/mqtt_client_constants.dart';

part 'src/mqtt_client_protocol.dart';

part 'src/mqtt_client_environment.dart';

part 'src/mqtt_client_events.dart';

part 'src/exception/mqtt_client_client_identifier_exception.dart';
Expand Down
2 changes: 2 additions & 0 deletions lib/src/mqtt_browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class MqttBrowserClient extends MqttClient {
String? username,
String? password,
]) async {
// A browser client
MqttClientEnvironment.isWebClient = true;
instantiationCorrect = true;
clientEventBus = events.EventBus();
clientEventBus?.on<DisconnectOnNoPingResponse>().listen(
Expand Down
14 changes: 14 additions & 0 deletions lib/src/mqtt_client_environment.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Package : mqtt_client
* Author : S. Hamblett <[email protected]>
* Date : 09/06/2025
* Copyright : S.Hamblett
*/

part of '../mqtt_client.dart';

/// Client Environment
class MqttClientEnvironment {
/// Browser or server client
static bool isWebClient = false;
}
2 changes: 2 additions & 0 deletions lib/src/mqtt_server_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ class MqttServerClient extends MqttClient {
String? username,
String? password,
]) async {
// A server client
MqttClientEnvironment.isWebClient = false;
instantiationCorrect = true;
clientEventBus = events.EventBus();
clientEventBus?.on<DisconnectOnNoPingResponse>().listen(
Expand Down
15 changes: 12 additions & 3 deletions lib/src/utility/mqtt_client_byte_buffer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,18 @@ class MqttByteBuffer {
'length $length, count $count, position $_position, buffer $buffer',
);
}
_position += count;
return typed.Uint8Buffer()
..addAll(buffer!.getRange(_position - count, _position));
if (MqttClientEnvironment.isWebClient) {
final tmp = typed.Uint8Buffer();
tmp.addAll(buffer!.getRange(_position, _position + count));
_position += count;
final tmp2 = typed.Uint8Buffer();
tmp2.addAll(tmp);
return tmp2;
} else {
_position += count;
return typed.Uint8Buffer()
..addAll(buffer!.getRange(_position - count, _position));
}
}

/// Reads a sequence of bytes from the current
Expand Down
118 changes: 118 additions & 0 deletions test/issues/issue608.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import 'dart:async';

import 'package:mqtt_client/mqtt_browser_client.dart';
import 'package:mqtt_client/mqtt_client.dart';

class MqttParams {
final url = 'ws://test.mosquitto.org';
final port = 8080;
final keepAlivePeriod = 60;
final username = null;
final password = null;
}

class MqttWebInitializer {
late MqttBrowserClient _client;

void initClient({
required MqttParams params,
String willTopic = 'willTopic',
String willMessage = 'will message',
}) {
_client = MqttBrowserClient(params.url, '');
final clientId = 'Geppo-Web}';
_client.logging(on: false);
_client.port = params.port;
_client.setProtocolV311();
_client.websocketProtocols = MqttClientConstants.protocolsSingleDefault;
// Keep alive period (in secondi) a 0 = disabilitato
if (params.keepAlivePeriod > 0) {
_client.keepAlivePeriod = params.keepAlivePeriod;
}
_client.onDisconnected = onDisconnected;
_client.onConnected = onConnected;
_client.onSubscribed = onSubscribed;
_client.onUnsubscribed = onUnsubscribed;

final MqttConnectMessage connectMessage = MqttConnectMessage();
if (params.username != null && params.password != null) {
connectMessage.authenticateAs(params.username!, params.password!);
}
connectMessage
.withClientIdentifier(clientId)
.withWillTopic(willTopic)
.withWillMessage(willMessage)
.withWillQos(MqttQos.atLeastOnce)
.startClean();

connectMessage.withWillQos(MqttQos.exactlyOnce);
print('EXECUTING - MQTT CONNECT...');
_client.connectionMessage = connectMessage;
}

MqttClient get client => _client;

Future<MqttClientConnectionStatus?> connect([
String? username,
String? password,
]) async {
MqttClientConnectionStatus? status;
try {
print('EXECUTING - MQTT CONNECTING...');

if (username == null && password == null) {
status = await _client.connect(username, password);
} else {
status = await _client.connect();
}
} catch (e) {
print('EXCEPTION - MQTT CONNECT: $e');
}
return status;
}

void disconnect() {
print('EXECUTING - MQTT DISCONNECTING...');
_client.disconnect();
}

void onConnected() {
print('EXECUTING - MQTT CONNECTED');
}

void onDisconnected() {
print('EXECUTING - MQTT DISCONNECTED');
}

void onSubscribed(String topic) {
print('EXECUTING - MQTT SUBSCRIBED TO TOPIC: $topic');
}

void onUnsubscribed(String? topic) {
print('EXECUTING - MQTT UNSUBSCRIBED FROM TOPIC: $topic');
}

void subscribe(String topic, {MqttQos qos = MqttQos.exactlyOnce}) {
print('EXECUTING - MQTT SUBSCRIBING TO TOPIC: $topic');
_client.subscribe(topic, qos);
}

void unsubscribe(String topic) {
print('EXECUTING - MQTT UNSUBSCRIBED FROM TOPIC: $topic');
_client.unsubscribe(topic);
}
}

void main() async {
final browser = MqttWebInitializer();
browser.initClient(params: MqttParams());
print('EXECUTING - Connecting');
await browser.connect();
print('EXECUTING - Waiting before subscription');
await Future.delayed(Duration(seconds: 5));
browser.subscribe('my/dashboard/topic/chatList/#', qos: MqttQos.exactlyOnce);
print('EXECUTING - Subscribed - waiting for disconnect');
await Future.delayed(Duration(seconds: 10));
browser.unsubscribe('my/dashboard/topic/chatList/#');
browser.disconnect();
}
2 changes: 2 additions & 0 deletions test/manual/mqtt_client_mosquitto_ws_browser_test_manual.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ void main() {
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMess;
var ok = true;
expect(MqttClientEnvironment.isWebClient, isFalse);
try {
await client.connect();
var connectionOK = false;
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('Browser client connected locally');
connectionOK = true;
expect(MqttClientEnvironment.isWebClient, isTrue);
} else {
print(
'Browser client connection failed - disconnecting, status is ${client.connectionStatus}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ void main() {
try {
await client.connect();
var connectionOK = false;
expect(MqttClientEnvironment.isWebClient, isFalse);
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('Browser client connected locally');
connectionOK = true;
expect(MqttClientEnvironment.isWebClient, isTrue);
} else {
print(
'Browser client connection failed - disconnecting, status is ${client.connectionStatus}',
Expand Down
2 changes: 2 additions & 0 deletions test/mqtt_client_connection_unsecure_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,13 @@ void main() {
socketTimeout: null,
);
ch.onConnected = connectCb;
expect(MqttClientEnvironment.isWebClient, isFalse);
final status = await ch.connect(
mockBrokerAddress,
mockBrokerPort,
MqttConnectMessage().withClientIdentifier(testClientId),
);
expect(MqttClientEnvironment.isWebClient, isFalse);
expect(ch.connectionStatus.state, MqttConnectionState.connected);
expect(
ch.connectionStatus.returnCode,
Expand Down