Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions lib/src/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ class MqttClient {
}

/// Unsubscribe from a topic.
///
/// Some brokers(AWS for instance) need to have each un subscription acknowledged, use
/// the [expectAcknowledge] parameter for this, default is false.
/// For a batch subscription provide the topic of the first subscription
Expand All @@ -517,6 +518,22 @@ class MqttClient {
);
}

/// Initiates a multiple unsubscription request to the broker.
///
/// This sends multiple unsubscription requests to the broker in a single
/// unsubscription message.
///
/// If you are using a batch subscription and you want to unsubscribe all
/// the topics in the batch us [unsubcribe] with the first batch topic.
///
/// If you want to unsubscribe a list of topics subscribed individually
/// and use only one unsubscription message use this method.
///
/// If the list contains any subscriptions marked as 'batch' subscriptions they will NOT
/// be removed.
void unsubscribeMulti(List<MultiUnsubscription> subscriptions) =>
subscriptionsManager?.unsubscribeMulti(subscriptions);

/// Gets the current status of a subscription by topic.
///
/// A batch subscription contains the status of each subscribed topic as returned
Expand Down
39 changes: 38 additions & 1 deletion lib/src/mqtt_client_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class Subscription extends observe.Observable<observe.ChangeRecord> {
/// Empty if a single subscription.
List<BatchSubscription> subscriptions = [];

/// List of pending unsubscriptions
List<MultiUnsubscription> unSubscriptions = [];

/// The requested subscription, used in re subscribe operations.
/// Empty if a single subscription.
List<BatchSubscription> requestedSubscriptions = [];
Expand Down Expand Up @@ -128,7 +131,7 @@ class Subscription extends observe.Observable<observe.ChangeRecord> {

/// A subscription used in batch subscription processing.
class BatchSubscription {
final String topic;
String topic;

/// Qos, default to failure.
MqttQos qos = MqttQos.failure;
Expand All @@ -138,6 +141,40 @@ class BatchSubscription {

BatchSubscription(this.topic, this.qos);

factory BatchSubscription.fromSubscription(Subscription subscription) =>
BatchSubscription(subscription.topic.rawTopic, subscription.qos);

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is BatchSubscription &&
runtimeType == other.runtimeType &&
topic == other.topic &&
qos == other.qos;

@override
String toString() {
final sb = StringBuffer();
sb.writeln('BatchSubscription:: Topic: $topic, QoS: $qos');
return sb.toString();
}
}

/// A subscription used in multi unsubscription processing.
class MultiUnsubscription {
String topic;

/// Qos, default to failure.
MqttQos qos = MqttQos.failure;

@override
int get hashCode => topic.hashCode + qos.hashCode;

MultiUnsubscription(this.topic, this.qos);

factory MultiUnsubscription.fromSubscription(Subscription subscription) =>
MultiUnsubscription(subscription.topic.rawTopic, subscription.qos);

@override
bool operator ==(Object other) =>
identical(this, other) ||
Expand Down
64 changes: 59 additions & 5 deletions lib/src/mqtt_client_subscriptions_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,47 @@ class SubscriptionsManager {
return cn ??= createNewBatchSubscription(subscriptions);
}

/// Batch unsubscribes in a single UNSUBSCRIBE packet.
void unsubscribeMulti(
List<MultiUnsubscription> subscriptionsList, {
bool expectAcknowledge = false,
}) {
final messageIdentifier = messageIdentifierDispenser
.getNextMessageIdentifier();
final unsubscribeMsg = MqttUnsubscribeMessage().withMessageIdentifier(
messageIdentifier,
);

if (expectAcknowledge) {
unsubscribeMsg.expectAcknowledgement();
}

// Add each topic to the unsubscribe payload
for (final subscription in subscriptionsList) {
unsubscribeMsg.fromTopic(subscription.topic);
}

// Send the combined packet
connectionHandler!.sendMessage(unsubscribeMsg);

// Update local subscription state
if (expectAcknowledge) {
pendingUnsubscriptions[messageIdentifier] = Subscription()
..unSubscriptions = subscriptionsList;
} else {
for (var subscription in subscriptionsList) {
// Don't remove any batch subscriptions
subscriptions.removeWhere(
(_, sub) =>
((sub.topic.rawTopic == subscription.topic) && !sub.batch),
);
if (onUnsubscribed != null) {
onUnsubscribed!(subscription.topic);
}
}
}
}

/// Gets a view on the existing observable, if the subscription
/// already exists.
Subscription? tryGetExistingSubscription(String topic) {
Expand Down Expand Up @@ -335,8 +376,8 @@ class SubscriptionsManager {
return true;
}

/// Cleans up after an unsubscribe message is received from the broker.
/// returns true, always
/// Cleans up after an unsubscribe acknowledge message is received
/// from the broker returns true, always
bool confirmUnsubscribe(MqttMessage? msg) {
final unSubAck = msg as MqttUnsubscribeAckMessage;
final messageIdentifier = unSubAck.variableHeader.messageIdentifier;
Expand All @@ -346,10 +387,23 @@ class SubscriptionsManager {
subscriptions.remove(sub?.messageIdentifier);
}
if (sub != null) {
pendingUnsubscriptions.remove(messageIdentifier);
if (onUnsubscribed != null) {
onUnsubscribed!(sub.topic.rawTopic);
// Unsubscribe each topic if not part of a batch.
if (sub.unSubscriptions.isNotEmpty) {
for (var subscription in sub.unSubscriptions) {
subscriptions.removeWhere(
(_, sub) =>
(sub.topic.rawTopic == subscription.topic) && !sub.batch,
);
if (onUnsubscribed != null) {
onUnsubscribed!(subscription.topic);
}
}
} else {
if (onUnsubscribed != null) {
onUnsubscribed!(sub.topic.rawTopic);
}
}
pendingUnsubscriptions.remove(messageIdentifier);
} else {
MqttLogger.log(
'SubscriptionsManager::confirmUnsubscribe subscription not found in pending unsubscriptions',
Expand Down
172 changes: 172 additions & 0 deletions test/mqtt_client_subscription_manager_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,178 @@ void main() {
expect(subs.pendingUnsubscriptions.length, 0);
expect(cbCalled, isTrue);
});
test('Unsubscribe expect acknowledge multi with ack', () {
var cbCalledCount = 0;
void unsubCallback(String? topic) {
cbCalledCount++;
}

final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(
clientEventBus,
socketOptions: socketOptions,
);
final pm = PublishingManager(testCHS, clientEventBus);
pm.messageIdentifierDispenser.reset();
const topic1 = 'testtopic1';
const topic2 = 'testtopic2';
const topic3 = 'testtopic3';
const qos = MqttQos.atLeastOnce;
final subs = SubscriptionsManager(testCHS, pm, clientEventBus);
subs.onUnsubscribed = unsubCallback;
subs.registerSubscription(topic1, qos);
subs.registerSubscription(topic2, qos);
subs.registerSubscription(topic3, qos);
expect(
subs.getSubscriptionsStatus(topic1),
MqttSubscriptionStatus.pending,
);
expect(
subs.getSubscriptionsStatus(topic2),
MqttSubscriptionStatus.pending,
);
expect(
subs.getSubscriptionsStatus(topic3),
MqttSubscriptionStatus.pending,
);
// Confirm
var subAckMsg = MqttSubscribeAckMessage()
.withMessageIdentifier(1)
.addQosGrant(MqttQos.atLeastOnce);
subs.confirmSubscription(subAckMsg);
expect(
subs.getSubscriptionsStatus(topic1),
MqttSubscriptionStatus.active,
);
subAckMsg = MqttSubscribeAckMessage()
.withMessageIdentifier(2)
.addQosGrant(MqttQos.atLeastOnce);
subs.confirmSubscription(subAckMsg);
expect(
subs.getSubscriptionsStatus(topic2),
MqttSubscriptionStatus.active,
);
subAckMsg = MqttSubscribeAckMessage()
.withMessageIdentifier(3)
.addQosGrant(MqttQos.atLeastOnce);
subs.confirmSubscription(subAckMsg);
expect(
subs.getSubscriptionsStatus(topic3),
MqttSubscriptionStatus.active,
);
// Unsubscribe
subs.unsubscribeMulti([
MultiUnsubscription(topic1, qos),
MultiUnsubscription(topic2, qos),
MultiUnsubscription(topic3, qos),
], expectAcknowledge: true);
expect(
testCHS.sentMessages[3],
const TypeMatcher<MqttUnsubscribeMessage>(),
);

// Unsubscribe ack
final unsubAck = MqttUnsubscribeAckMessage().withMessageIdentifier(4);
subs.confirmUnsubscribe(unsubAck);
expect(
subs.getSubscriptionsStatus(topic1),
MqttSubscriptionStatus.doesNotExist,
);
expect(
subs.getSubscriptionsStatus(topic2),
MqttSubscriptionStatus.doesNotExist,
);
expect(
subs.getSubscriptionsStatus(topic3),
MqttSubscriptionStatus.doesNotExist,
);
expect(subs.pendingUnsubscriptions.length, 0);
expect(cbCalledCount, 3);
});
test('Unsubscribe expect acknowledge multi no ack', () {
var cbCalledCount = 0;
void unsubCallback(String? topic) {
cbCalledCount++;
}

final clientEventBus = events.EventBus();
final testCHS = TestConnectionHandlerSend(
clientEventBus,
socketOptions: socketOptions,
);
final pm = PublishingManager(testCHS, clientEventBus);
pm.messageIdentifierDispenser.reset();
const topic1 = 'testtopic1';
const topic2 = 'testtopic2';
const topic3 = 'testtopic3';
const qos = MqttQos.atLeastOnce;
final subs = SubscriptionsManager(testCHS, pm, clientEventBus);
subs.onUnsubscribed = unsubCallback;
subs.registerSubscription(topic1, qos);
subs.registerSubscription(topic2, qos);
subs.registerSubscription(topic3, qos);
expect(
subs.getSubscriptionsStatus(topic1),
MqttSubscriptionStatus.pending,
);
expect(
subs.getSubscriptionsStatus(topic2),
MqttSubscriptionStatus.pending,
);
expect(
subs.getSubscriptionsStatus(topic3),
MqttSubscriptionStatus.pending,
);
// Confirm
var subAckMsg = MqttSubscribeAckMessage()
.withMessageIdentifier(1)
.addQosGrant(MqttQos.atLeastOnce);
subs.confirmSubscription(subAckMsg);
expect(
subs.getSubscriptionsStatus(topic1),
MqttSubscriptionStatus.active,
);
subAckMsg = MqttSubscribeAckMessage()
.withMessageIdentifier(2)
.addQosGrant(MqttQos.atLeastOnce);
subs.confirmSubscription(subAckMsg);
expect(
subs.getSubscriptionsStatus(topic2),
MqttSubscriptionStatus.active,
);
subAckMsg = MqttSubscribeAckMessage()
.withMessageIdentifier(3)
.addQosGrant(MqttQos.atLeastOnce);
subs.confirmSubscription(subAckMsg);
expect(
subs.getSubscriptionsStatus(topic3),
MqttSubscriptionStatus.active,
);
// Unsubscribe
subs.unsubscribeMulti([
MultiUnsubscription(topic1, qos),
MultiUnsubscription(topic2, qos),
MultiUnsubscription(topic3, qos),
]);
expect(
testCHS.sentMessages[3],
const TypeMatcher<MqttUnsubscribeMessage>(),
);
expect(
subs.getSubscriptionsStatus(topic1),
MqttSubscriptionStatus.doesNotExist,
);
expect(
subs.getSubscriptionsStatus(topic2),
MqttSubscriptionStatus.doesNotExist,
);
expect(
subs.getSubscriptionsStatus(topic3),
MqttSubscriptionStatus.doesNotExist,
);
expect(subs.pendingUnsubscriptions.length, 0);
expect(cbCalledCount, 3);
});
test('Unsubscribe expect acknowledge batch', () {
var cbCalled = false;
void unsubCallback(String? topic) {
Expand Down