Skip to content

Commit 0496557

Browse files
authored
Follow Broker Enforcements (#240)
1 parent 665fda0 commit 0496557

File tree

3 files changed

+389
-36
lines changed

3 files changed

+389
-36
lines changed

Source/HiveMQtt/Client/HiveMQClient.cs

+71
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,28 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, Cance
212212
{
213213
message.Validate();
214214

215+
// Check if topic alias is used but not supported by broker
216+
var topicAliasMaximum = this.Connection?.ConnectionProperties?.TopicAliasMaximum ?? 0;
217+
if (message.TopicAlias.HasValue)
218+
{
219+
if (topicAliasMaximum == 0)
220+
{
221+
throw new HiveMQttClientException("Topic aliases are not supported by the broker");
222+
}
223+
224+
if (message.TopicAlias.Value > topicAliasMaximum)
225+
{
226+
throw new HiveMQttClientException($"Topic alias exceeds broker's maximum allowed value of {topicAliasMaximum}");
227+
}
228+
}
229+
230+
// Check if retain is used but not supported by broker
231+
var retainSupported = this.Connection?.ConnectionProperties?.RetainAvailable ?? true;
232+
if (!retainSupported && message.Retain)
233+
{
234+
throw new HiveMQttClientException("Retained messages are not supported by the broker");
235+
}
236+
215237
if (message.QoS.HasValue && this.Connection.ConnectionProperties.MaximumQoS.HasValue &&
216238
(ushort)message.QoS.Value > this.Connection.ConnectionProperties.MaximumQoS.Value)
217239
{
@@ -332,6 +354,55 @@ public async Task<SubscribeResult> SubscribeAsync(string topic, QualityOfService
332354
/// <inheritdoc />
333355
public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
334356
{
357+
// Check if subscription identifiers are used but not supported by broker
358+
var subscriptionIdentifiersSupported = this.Connection?.ConnectionProperties?.SubscriptionIdentifiersAvailable ?? true;
359+
if (!subscriptionIdentifiersSupported && options.SubscriptionIdentifier.HasValue)
360+
{
361+
throw new HiveMQttClientException("Subscription identifiers are not supported by the broker");
362+
}
363+
364+
// Check if retain is used but not supported by broker
365+
var retainSupported = this.Connection?.ConnectionProperties?.RetainAvailable ?? true;
366+
if (!retainSupported)
367+
{
368+
// Check if any topic filter has retainAsPublished set to true
369+
foreach (var topicFilter in options.TopicFilters)
370+
{
371+
if (topicFilter.RetainAsPublished is true)
372+
{
373+
throw new HiveMQttClientException("Retained messages are not supported by the broker");
374+
}
375+
}
376+
}
377+
378+
// Check if shared subscriptions are used but not supported by broker
379+
var sharedSubscriptionSupported = this.Connection?.ConnectionProperties?.SharedSubscriptionAvailable ?? true;
380+
if (!sharedSubscriptionSupported)
381+
{
382+
// Check if any topic filter contains shared subscription prefix ($share/)
383+
foreach (var topicFilter in options.TopicFilters)
384+
{
385+
if (topicFilter.Topic.StartsWith("$share/", StringComparison.Ordinal))
386+
{
387+
throw new HiveMQttClientException("Shared subscriptions are not supported by the broker");
388+
}
389+
}
390+
}
391+
392+
// Check if wildcards are used but not supported by broker
393+
var wildcardSupported = this.Connection?.ConnectionProperties?.WildcardSubscriptionAvailable ?? true;
394+
if (!wildcardSupported)
395+
{
396+
// Check if any topic filter contains wildcards (+ or #)
397+
foreach (var topicFilter in options.TopicFilters)
398+
{
399+
if (topicFilter.Topic.Contains('+') || topicFilter.Topic.Contains('#'))
400+
{
401+
throw new HiveMQttClientException("Wildcard subscriptions are not supported by the broker");
402+
}
403+
}
404+
}
405+
335406
// Fire the corresponding event
336407
this.BeforeSubscribeEventLauncher(options);
337408

Tests/HiveMQtt.Test/HiveMQClient/EventExceptionHandlingTest.cs

+9-36
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ public async Task BeforeDisconnectExceptionDoesNotPreventDisconnectAsync()
2323
Assert.True(client.IsConnected());
2424

2525
// Add event handler that throws an exception
26-
client.BeforeDisconnect += (sender, args) =>
27-
{
28-
throw new Exception("Test exception in BeforeDisconnect");
29-
};
26+
client.BeforeDisconnect += (sender, args) => throw new InvalidOperationException("Test exception in BeforeDisconnect");
3027

3128
// Attempt to disconnect - should succeed despite the exception
3229
var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false);
@@ -50,10 +47,7 @@ public async Task OnPublishSentExceptionDoesNotPreventPublishAsync()
5047
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
5148

5249
// Add event handler that throws an exception
53-
client.OnPublishSent += (sender, args) =>
54-
{
55-
throw new Exception("Test exception in OnPublishSent");
56-
};
50+
client.OnPublishSent += (sender, args) => throw new InvalidOperationException("Test exception in OnPublishSent");
5751

5852
// Attempt to publish - should succeed despite the exception
5953
var publishResult = await client.PublishAsync(
@@ -85,10 +79,7 @@ public async Task BeforeSubscribeExceptionDoesNotPreventSubscribeAsync()
8579
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
8680

8781
// Add event handler that throws an exception
88-
client.BeforeSubscribe += (sender, args) =>
89-
{
90-
throw new Exception("Test exception in BeforeSubscribe");
91-
};
82+
client.BeforeSubscribe += (sender, args) => throw new InvalidOperationException("Test exception in BeforeSubscribe");
9283

9384
// Attempt to subscribe - should succeed despite the exception
9485
var subscribeResult = await client.SubscribeAsync(
@@ -126,10 +117,7 @@ public async Task BeforeUnsubscribeExceptionDoesNotPreventUnsubscribeAsync()
126117
Assert.NotEmpty(subscribeResult.Subscriptions);
127118

128119
// Add event handler that throws an exception
129-
client.BeforeUnsubscribe += (sender, args) =>
130-
{
131-
throw new Exception("Test exception in BeforeUnsubscribe");
132-
};
120+
client.BeforeUnsubscribe += (sender, args) => throw new InvalidOperationException("Test exception in BeforeUnsubscribe");
133121

134122
// Attempt to unsubscribe - should succeed despite the exception
135123
var unsubscribeResult = await client.UnsubscribeAsync(
@@ -167,16 +155,10 @@ public async Task OnMessageReceivedExceptionDoesNotPreventMessageDeliveryAsync()
167155
Assert.NotEmpty(subscribeResult.Subscriptions);
168156

169157
// Add event handler that throws an exception
170-
client.OnMessageReceived += (sender, args) =>
171-
{
172-
throw new Exception("Test exception in OnMessageReceived");
173-
};
158+
client.OnMessageReceived += (sender, args) => throw new InvalidOperationException("Test exception in OnMessageReceived");
174159

175160
// Add another event handler to verify message was still delivered
176-
client.OnMessageReceived += (sender, args) =>
177-
{
178-
messageReceived = true;
179-
};
161+
client.OnMessageReceived += (sender, args) => messageReceived = true;
180162

181163
// Publish a message
182164
var publishResult = await client.PublishAsync(
@@ -207,10 +189,7 @@ public async Task BeforeConnectExceptionDoesNotPreventConnectAsync()
207189
var client = new HiveMQClient(options);
208190

209191
// Add event handler that throws an exception
210-
client.BeforeConnect += (sender, args) =>
211-
{
212-
throw new Exception("Test exception in BeforeConnect");
213-
};
192+
client.BeforeConnect += (sender, args) => throw new InvalidOperationException("Test exception in BeforeConnect");
214193

215194
// Attempt to connect - should succeed despite the exception
216195
var connectResult = await client.ConnectAsync().ConfigureAwait(false);
@@ -238,16 +217,10 @@ public async Task OnPubAckReceivedExceptionDoesNotPreventQoS1DeliveryAsync()
238217
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
239218

240219
// Add event handler that throws an exception
241-
client.OnPubAckReceived += (sender, args) =>
242-
{
243-
throw new Exception("Test exception in OnPubAckReceived");
244-
};
220+
client.OnPubAckReceived += (sender, args) => throw new InvalidOperationException("Test exception in OnPubAckReceived");
245221

246222
// Add another event handler to verify PubAck was still received
247-
client.OnPubAckReceived += (sender, args) =>
248-
{
249-
pubAckReceived = true;
250-
};
223+
client.OnPubAckReceived += (sender, args) => pubAckReceived = true;
251224

252225
// Publish a QoS 1 message
253226
var publishResult = await client.PublishAsync(

0 commit comments

Comments
 (0)