Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/src/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ class MqttClient {
: subscriptionsManager!.registerBatchSubscription(subscriptions);
}

/// Initiates a batch unsubscription request to the broker.
/// This sends multiple unsubscription requests to the broker in a single
/// unsubscription message.
void unsubscribeBatch(List<BatchSubscription> subscriptions) {
if (connectionStatus!.state != MqttConnectionState.connected) {
throw ConnectionException(connectionHandler?.connectionStatus.state);
}

subscriptionsManager?.registerBatchUnsubscription(subscriptions);
}

/// Re subscribe.
/// Unsubscribes all confirmed subscriptions and re subscribes them
/// without sending unsubscribe messages to the broker.
Expand Down
34 changes: 34 additions & 0 deletions lib/src/mqtt_client_subscriptions_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,40 @@ class SubscriptionsManager {
return cn ??= createNewBatchSubscription(subscriptions);
}

/// Batch unsubscribes in a single UNSUBSCRIBE packet.
void registerBatchUnsubscription(
List<BatchSubscription> subscriptionsList,
{bool expectAcknowledge = false}
) {
final messageIdentifier = messageIdentifierDispenser.getNextMessageIdentifier();
final unsubscribeMsg = MqttUnsubscribeMessage()
.withMessageIdentifier(messageIdentifier);

if (expectAcknowledge) {
unsubscribeMsg.expectAcknowledgement();
}

// Add each topic to the unsubscribe payload
for (final subscription in subscriptionsList) {
unsubscribeMsg.fromTopic(subscription.topic);
}

// Send the combined packet
connectionHandler!.sendMessage(unsubscribeMsg);

// Update local subscription state
if (expectAcknowledge) {
pendingUnsubscriptions[messageIdentifier] = Subscription()..subscriptions = subscriptionsList;
} else {
for (var subscription in subscriptionsList) {
subscriptions.removeWhere((_, sub) => sub.topic.rawTopic == subscription.topic);
if (onUnsubscribed != null) {
onUnsubscribed!(subscription.topic);
}
}
}
}

/// Gets a view on the existing observable, if the subscription
/// already exists.
Subscription? tryGetExistingSubscription(String topic) {
Expand Down
Loading