Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7d752f6
Issue 611
shamblett Jun 25, 2025
8279ea6
Issue 611
shamblett Jun 25, 2025
8f822ad
Issue 611
shamblett Jun 25, 2025
1299af1
Issue 611
shamblett Jun 25, 2025
4b48d10
Issue 611
shamblett Jun 25, 2025
811b171
Issue 611
shamblett Jun 25, 2025
1732738
Issue 611
shamblett Jun 25, 2025
6cecd52
Issue 611
shamblett Jun 25, 2025
1a08168
Issue 611
shamblett Jun 25, 2025
056a827
Issue 611
shamblett Jun 25, 2025
903dc61
Issue 611
shamblett Jun 25, 2025
b815c7c
Issue 611
shamblett Jun 25, 2025
abd68b8
Issue 611
shamblett Jun 26, 2025
6f09954
Issue 611
shamblett Jun 26, 2025
59021a0
Issue 611 - Subscription tests
shamblett Jun 26, 2025
1d34336
Issue 611 - all sub manager tests running
shamblett Jun 26, 2025
f678d1c
Issue 611 - Subscription tests all running
shamblett Jun 26, 2025
4958540
Issue 611 - Subscription Manager updates complete
shamblett Jun 27, 2025
c697690
Issue 611 - Subscription Manager updates complete
shamblett Jun 27, 2025
e21004d
Issue 611 - Subscription Manager updates complete
shamblett Jun 27, 2025
13c5248
Issue 611 - Subscription Manager tests
shamblett Jun 27, 2025
45590b6
Issue 611 - Subscription Manager tests
shamblett Jun 27, 2025
54a7068
Issue 611 - Subscription Manager tests
shamblett Jun 28, 2025
6ccc4c3
Issue 611 - Subscription Manager tests complete, all tests running
shamblett Jun 28, 2025
f19dc4b
Issue 611 - Example updated
shamblett Jun 28, 2025
7b1b97c
Issue 611 - QoS examples updated
shamblett Jun 29, 2025
206848f
Issue 611 - Read through
shamblett Jun 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions example/mqtt_client_publish_qos1.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

bool unsubscribed = false;

/// A QOS1 publishing example, two QOS one topics are subscribed to and published in quick succession,
/// tests QOS1 protocol handling.
/// tests QOS1 protocol handling. This example also shows how to use batch topic subscription.
/// This sample also uses the Hive MQ broker.
Future<int> main() async {
final client = MqttServerClient('test.mosquitto.org', '');
final client = MqttServerClient('broker.hivemq.com', '');

/// Set the correct MQTT protocol for mosquito
/// Set the correct MQTT protocol
client.setProtocolV311();
client.logging(on: false);
client.keepAlivePeriod = 20;
Expand All @@ -27,7 +30,7 @@ Future<int> main() async {
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
print('EXAMPLE::Hive client connecting....');
client.connectionMessage = connMess;

try {
Expand All @@ -39,35 +42,33 @@ Future<int> main() async {

/// Check we are connected
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
print('EXAMPLE::Hive client connected');
} else {
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ${client.connectionStatus!.state}',
'EXAMPLE::ERROR Hive client connection failed - disconnecting, state is ${client.connectionStatus!.state}',
);
client.disconnect();
exit(-1);
}

/// Lets try our subscriptions
print('EXAMPLE:: <<<< SUBSCRIBE 1 >>>>');
print('EXAMPLE:: <<<< SUBSCRIBING >>>>');
const topic1 = 'SJHTopic1'; // Not a wildcard topic
client.subscribe(topic1, MqttQos.atLeastOnce);
print('EXAMPLE:: <<<< SUBSCRIBE 2 >>>>');
final sub1 = BatchSubscription(topic1, MqttQos.atLeastOnce);
const topic2 = 'SJHTopic2'; // Not a wildcard topic
client.subscribe(topic2, MqttQos.atLeastOnce);
final sub2 = BatchSubscription(topic2, MqttQos.atMostOnce);
client.subscribeBatch([sub1, sub2]);
const topic3 = 'SJHTopic3'; // Not a wildcard topic - no subscription

client.updates!.listen((messageList) {
final recMess = messageList[0];
if (recMess is! MqttReceivedMessage<MqttPublishMessage>) return;
final pubMess = recMess.payload;
final pubMess = recMess.payload as MqttPublishMessage;
final pt = MqttPublishPayload.bytesToStringAsString(
pubMess.payload.message,
);
print(
'EXAMPLE::Change notification:: topic is <${recMess.topic}>, payload is <-- $pt -->',
);
print('');
});

/// If needed you can listen for published messages that have completed the publishing
Expand All @@ -87,32 +88,50 @@ Future<int> main() async {
final builder2 = MqttClientPayloadBuilder();
builder2.addString('Hello from mqtt_client topic 2');
print('EXAMPLE:: <<<< PUBLISH 2 >>>>');
client.publishMessage(topic2, MqttQos.atLeastOnce, builder2.payload!);
client.publishMessage(topic2, MqttQos.atMostOnce, builder2.payload!);

final builder3 = MqttClientPayloadBuilder();
builder3.addString('Hello from mqtt_client topic 3');
print('EXAMPLE:: <<<< PUBLISH 3 - NO SUBSCRIPTION >>>>');
client.publishMessage(topic3, MqttQos.atLeastOnce, builder3.payload!);
client.publishMessage(topic3, MqttQos.exactlyOnce, builder3.payload!);

print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
await MqttUtilities.asyncSleep(20);

print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic1);
client.unsubscribe(topic2);

await MqttUtilities.asyncSleep(10);
await MqttUtilities.asyncSleep(5);
unsubscribed = true;
final status = client.getSubscriptionsStatus(topic1);
if (status != MqttSubscriptionStatus.doesNotExist) {
print('EXAMPLE::Unsubscribing - failed to unsubscribe batch topic $topic1');
}
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
}

/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
if (topic == 'SJHTopic1') {
print('EXAMPLE::Subscription confirmed for topic $topic, this is correct');
} else {
print(
'EXAMPLE::Subscription confirmed for topic $topic, this is incorrect',
);
}
}

/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (unsubscribed) {
print(
'EXAMPLE::OnDisconnected client callback - Client disconnection - this is correct',
);
} else {
print(
'EXAMPLE::OnDisconnected client callback - Client disconnection - not unsubscribed - this is correct',
);
}
}
15 changes: 7 additions & 8 deletions example/mqtt_client_publish_qos1_manually_acknowledge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import 'package:mqtt_client/mqtt_server_client.dart';
/// A QOS1 publishing example, two QOS one topics are subscribed to and published in quick succession,
/// tests QOS1 protocol handling when manual acknowledgement is in force.
Future<int> main() async {
final client = MqttServerClient('test.mosquitto.org', '');
final client = MqttServerClient('broker.hivemq.com', '');

/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();
Expand All @@ -28,7 +28,7 @@ Future<int> main() async {
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
print('EXAMPLE::Hive client connecting....');
client.connectionMessage = connMess;

try {
Expand All @@ -40,10 +40,10 @@ Future<int> main() async {

/// Check we are connected
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
print('EXAMPLE::Hive client connected');
} else {
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ${client.connectionStatus!.state}',
'EXAMPLE::ERROR Hive client connection failed - disconnecting, state is ${client.connectionStatus!.state}',
);
client.disconnect();
exit(-1);
Expand All @@ -61,8 +61,7 @@ Future<int> main() async {
try {
client.updates!.listen((messageList) {
final recMess = messageList[0];
if (recMess is! MqttReceivedMessage<MqttPublishMessage>) return;
final pubMess = recMess.payload;
final pubMess = recMess.payload as MqttPublishMessage;
final pt = MqttPublishPayload.bytesToStringAsString(
pubMess.payload.message,
);
Expand Down Expand Up @@ -119,13 +118,13 @@ Future<int> main() async {
client.publishMessage(topic3, MqttQos.atLeastOnce, builder3.payload!);

print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
await MqttUtilities.asyncSleep(20);

print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic1);
client.unsubscribe(topic2);

await MqttUtilities.asyncSleep(10);
await MqttUtilities.asyncSleep(5);
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
Expand Down
16 changes: 7 additions & 9 deletions example/mqtt_client_publish_qos2.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import 'package:mqtt_client/mqtt_server_client.dart';
/// A QOS2 publishing example, two QOS two topics are subscribed to and published in quick succession,
/// tests QOS2 protocol handling.
Future<int> main() async {
final client = MqttServerClient('test.mosquitto.org', '');
final client = MqttServerClient('broker.hivemq.com', '');

/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();
Expand All @@ -27,7 +27,7 @@ Future<int> main() async {
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
print('EXAMPLE::Hive client connecting....');
client.connectionMessage = connMess;

try {
Expand All @@ -39,10 +39,10 @@ Future<int> main() async {

/// Check we are connected
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
print('EXAMPLE::Hive client connected');
} else {
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ${client.connectionStatus!.state}',
'EXAMPLE::ERROR Hive client connection failed - disconnecting, state is ${client.connectionStatus!.state}',
);
client.disconnect();
exit(-1);
Expand All @@ -58,15 +58,13 @@ Future<int> main() async {

client.updates!.listen((messageList) {
final recMess = messageList[0];
if (recMess is! MqttReceivedMessage<MqttPublishMessage>) return;
final pubMess = recMess.payload;
final pubMess = recMess.payload as MqttPublishMessage;
final pt = MqttPublishPayload.bytesToStringAsString(
pubMess.payload.message,
);
print(
'EXAMPLE::Change notification:: topic is <${recMess.topic}>, payload is <-- $pt -->',
);
print('');
});

/// If needed you can listen for published messages that have completed the publishing
Expand All @@ -90,13 +88,13 @@ Future<int> main() async {
client.publishMessage(topic2, MqttQos.exactlyOnce, builder2.payload!);

print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
await MqttUtilities.asyncSleep(20);

print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic1);
client.unsubscribe(topic2);

await MqttUtilities.asyncSleep(2);
await MqttUtilities.asyncSleep(5);
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
Expand Down
Loading