Skip to content

Commit d8a0d1a

Browse files
authored
Add Last Will and Testament Support (#85)
1 parent 71e468a commit d8a0d1a

28 files changed

+665
-168
lines changed

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
<ItemGroup Label="Package References">
2727
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" PrivateAssets="all" Version="17.4.33" />
2828
<PackageReference Include="MinVer" PrivateAssets="all" Version="4.2.0" />
29-
<PackageReference Include="StyleCop.Analyzers" PrivateAssets="all" Version="1.2.0-beta.435" />
29+
<PackageReference Include="StyleCop.Analyzers" PrivateAssets="all" Version="1.1.118" />
3030
</ItemGroup>
3131

3232
</Project>

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,47 @@ await client.PublishAsync(
111111
).ConfigureAwait(false);
112112
```
113113

114+
### Last Will and Testament
115+
116+
The Last Will and Testament support of MQTT can be used to notify subscribers that your client is offline.
117+
118+
For a more in-depth explanation, see [What is MQTT Last Will and Testament (LWT)? – MQTT Essentials: Part 9](https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/).
119+
120+
```C#
121+
// Specify the Last Will and Testament specifics in HiveMQClientOptions
122+
var options = new HiveMQClientOptions
123+
{
124+
LastWillAndTestament = new LastWillAndTestament("last/will", QualityOfService.AtLeastOnceDelivery, "last will message"),
125+
};
126+
127+
// Optionally set extended properties on the Last Will and Testament message
128+
options.LastWillAndTestament.WillDelayInterval = 1;
129+
options.LastWillAndTestament.PayloadFormatIndicator = 1;
130+
options.LastWillAndTestament.MessageExpiryInterval = 100;
131+
options.LastWillAndTestament.ContentType = "application/text";
132+
options.LastWillAndTestament.ResponseTopic = "response/topic";
133+
options.LastWillAndTestament.CorrelationData = new byte[] { 1, 2, 3, 4, 5 };
134+
options.LastWillAndTestament.UserProperties.Add("userPropertyKey", "userPropertyValue");
135+
136+
// ConnectAsync will transmit the Last Will and Testament configuration.
137+
var client = new HiveMQClient(options);
138+
connectResult = await client.ConnectAsync().ConfigureAwait(false);
139+
140+
// The Last Will and Testament message will be sent to the "last/will" topic if your clients get
141+
// unexpectedly disconnected or alternatively, if your client disconnects with `DisconnectWithWillMessage`
142+
var disconnectOptions = new DisconnectOptions { ReasonCode = DisconnectReasonCode.DisconnectWithWillMessage };
143+
var disconnectResult = await client.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
144+
``````
145+
146+
Because the client above disconnected with `DisconnectReasonCode.DisconnectWithWillMessage`, subscribers to the `last/will` topic will receive the Last Will and Testament message as specified above.
147+
148+
### More
149+
114150
For more examples that you can easily copy/paste, see our [Examples](https://github.com/hivemq/hivemq-mqtt-client-dotnet/blob/main/Documentation/Examples.md).
115151
116152
There is even an https://github.com/hivemq/hivemq-mqtt-client-dotnet/tree/main/Examples/HiveMQtt-CLI to demonstrate usage of the package.
117153
154+
118155
## Other MQTT Clients
119156

120157
* [Java](https://github.com/hivemq/hivemq-mqtt-client)

Source/Directory.Build.props

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@
3030
<None Include="..\..\README.md" Pack="true" PackagePath="\" />
3131
</ItemGroup>
3232

33-
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
34-
<CodeAnalysisRuleSet>..\..\StyleCop.Analyzers.ruleset</CodeAnalysisRuleSet>
33+
<!-- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> -->
34+
<PropertyGroup>
35+
<CodeAnalysisRuleSet>./../../StyleCop.Analyzers.ruleset</CodeAnalysisRuleSet>
3536
</PropertyGroup>
37+
<ItemGroup>
38+
<AdditionalFiles Include="$(MSBuildThisFileDirectory)./../stylecop.json" Link="stylecop.json" />
39+
</ItemGroup>
3640

3741
</Project>

Source/HiveMQtt/Client/HiveMQClient.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
1817
namespace HiveMQtt.Client;
1918

2019
using System;
@@ -109,7 +108,7 @@ public async Task<ConnectResult> ConnectAsync()
109108

110109
// Data massage: This class is used for end users. Let's prep the data so it's easily understandable.
111110
// If the Session Expiry Interval is absent the value in the CONNECT Packet used.
112-
connectResult.Properties.SessionExpiryInterval ??= (UInt32)this.Options.SessionExpiryInterval;
111+
connectResult.Properties.SessionExpiryInterval ??= (uint)this.Options.SessionExpiryInterval;
113112

114113
// Fire the corresponding event
115114
this.AfterConnectEventLauncher(connectResult);

Source/HiveMQtt/Client/HiveMQClientEvents.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-present HiveMQ and the HiveMQ Community
2+
* Copyright 2023-present HiveMQ and the HiveMQ Community
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

Source/HiveMQtt/Client/HiveMQClientSocket.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ internal async Task<bool> ConnectSocketAsync()
7979
ipAddress = address;
8080
break;
8181
}
82+
8283
if (address.AddressFamily == AddressFamily.InterNetwork)
8384
{
8485
ipAddress = address;
@@ -87,12 +88,9 @@ internal async Task<bool> ConnectSocketAsync()
8788
}
8889
}
8990

90-
if (ipAddress == null)
91-
{
92-
// We have multiple address returned, but none of them match the PreferIPv6 option.
93-
// Use the first one whatever it is.
94-
ipAddress = ipHostInfo.AddressList[0];
95-
}
91+
// We have multiple address returned, but none of them match the PreferIPv6 option.
92+
// Use the first one whatever it is.
93+
ipAddress ??= ipHostInfo.AddressList[0];
9694

9795
IPEndPoint ipEndPoint = new(ipAddress, this.Options.Port);
9896

@@ -126,11 +124,14 @@ internal async Task<bool> ConnectSocketAsync()
126124
return socketConnected;
127125
}
128126

129-
internal bool CloseSocket()
127+
internal bool CloseSocket(bool? shutdownPipeline = true)
130128
{
131-
// Shutdown the pipeline
132-
this.reader = null;
133-
this.writer = null;
129+
if (shutdownPipeline == true)
130+
{
131+
// Shutdown the pipeline
132+
this.reader = null;
133+
this.writer = null;
134+
}
134135

135136
// Shutdown the socket
136137
this.socket?.Shutdown(SocketShutdown.Both);

Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
5555
if (elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
5656
{
5757
// Send PingReq
58-
var writeResult = await this.writer.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
58+
var writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
5959
this.OnPingReqSentEventLauncher(new PingReqPacket());
6060
stopWatch.Restart();
6161
}
@@ -82,22 +82,22 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
8282
// FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time.
8383
case ConnectPacket connectPacket:
8484
Trace.WriteLine("--> ConnectPacket");
85-
writeResult = await this.writer.WriteAsync(connectPacket.Encode()).ConfigureAwait(false);
85+
writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false);
8686
this.OnConnectSentEventLauncher(connectPacket);
8787
break;
8888
case DisconnectPacket disconnectPacket:
8989
Trace.WriteLine("--> DisconnectPacket");
90-
writeResult = await this.writer.WriteAsync(DisconnectPacket.Encode()).ConfigureAwait(false);
90+
writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false);
9191
this.OnDisconnectSentEventLauncher(disconnectPacket);
9292
break;
9393
case SubscribePacket subscribePacket:
9494
Trace.WriteLine("--> SubscribePacket");
95-
writeResult = await this.writer.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false);
95+
writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false);
9696
this.OnSubscribeSentEventLauncher(subscribePacket);
9797
break;
9898
case UnsubscribePacket unsubscribePacket:
9999
Trace.WriteLine("--> UnsubscribePacket");
100-
writeResult = await this.writer.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false);
100+
writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false);
101101
this.OnUnsubscribeSentEventLauncher(unsubscribePacket);
102102
break;
103103
case PublishPacket publishPacket:
@@ -112,38 +112,39 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
112112
}
113113
}
114114

115-
writeResult = await this.writer.WriteAsync(publishPacket.Encode()).ConfigureAwait(false);
115+
writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false);
116116

117117
this.OnPublishSentEventLauncher(publishPacket);
118118
break;
119119
case PubAckPacket pubAckPacket:
120120
// This is in response to a received Publish packet. Communication chain management
121121
// was done in the receiver code. Just send the response.
122122
Trace.WriteLine("--> PubAckPacket");
123-
writeResult = await this.writer.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false);
123+
writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false);
124124
this.OnPubAckSentEventLauncher(pubAckPacket);
125125
break;
126126
case PubRecPacket pubRecPacket:
127127
// This is in response to a received Publish packet. Communication chain management
128128
// was done in the receiver code. Just send the response.
129129
Trace.WriteLine("--> PubRecPacket");
130-
writeResult = await this.writer.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false);
130+
writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false);
131131
this.OnPubRecSentEventLauncher(pubRecPacket);
132132
break;
133133
case PubRelPacket pubRelPacket:
134134
// This is in response to a received PubRec packet. Communication chain management
135135
// was done in the receiver code. Just send the response.
136136
Trace.WriteLine("--> PubRelPacket");
137-
writeResult = await this.writer.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false);
137+
writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false);
138138
this.OnPubRelSentEventLauncher(pubRelPacket);
139139
break;
140140
case PubCompPacket pubCompPacket:
141141
// This is in response to a received PubRel packet. Communication chain management
142142
// was done in the receiver code. Just send the response.
143143
Trace.WriteLine("--> PubCompPacket");
144-
writeResult = await this.writer.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false);
144+
writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false);
145145
this.OnPubCompSentEventLauncher(pubCompPacket);
146146
break;
147+
147148
/* case AuthPacket authPacket:
148149
/* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false);
149150
/* this.OnAuthSentEventLauncher(authPacket);
@@ -174,7 +175,7 @@ private Task<bool> TrafficInflowProcessorAsync() => Task.Run(async () =>
174175

175176
while (this.connectState is ConnectState.Connecting or ConnectState.Connected)
176177
{
177-
readResult = await this.reader.ReadAsync().ConfigureAwait(false);
178+
readResult = await this.ReadAsync().ConfigureAwait(false);
178179

179180
if (readResult.IsCanceled)
180181
{
@@ -350,4 +351,22 @@ private Task<bool> TrafficInflowProcessorAsync() => Task.Run(async () =>
350351

351352
return true;
352353
});
354+
355+
internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
356+
{
357+
if (this.writer is null)
358+
{
359+
throw new HiveMQttClientException("Writer is null");
360+
}
361+
return this.writer.WriteAsync(source, cancellationToken);
362+
}
363+
364+
internal ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
365+
{
366+
if (this.reader is null)
367+
{
368+
throw new HiveMQttClientException("Reader is null");
369+
}
370+
return this.reader.ReadAsync(cancellationToken);
371+
}
353372
}

Source/HiveMQtt/Client/HiveMQClientUtil.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,6 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
2121
private bool disposed = false;
2222
private int lastPacketId = 0;
2323

24-
/// <summary>
25-
/// Generate a packet identifier.
26-
/// </summary>
27-
/// <returns>A valid packet identifier.</returns>
28-
protected int GeneratePacketIdentifier()
29-
{
30-
if (this.lastPacketId == ushort.MaxValue)
31-
{
32-
this.lastPacketId = 0;
33-
}
34-
35-
return Interlocked.Increment(ref this.lastPacketId);
36-
}
37-
3824
/// <summary>
3925
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
4026
/// </summary>
@@ -51,6 +37,20 @@ from executing a second time.
5137
GC.SuppressFinalize(this);
5238
}
5339

40+
/// <summary>
41+
/// Generate a packet identifier.
42+
/// </summary>
43+
/// <returns>A valid packet identifier.</returns>
44+
protected int GeneratePacketIdentifier()
45+
{
46+
if (this.lastPacketId == ushort.MaxValue)
47+
{
48+
this.lastPacketId = 0;
49+
}
50+
51+
return Interlocked.Increment(ref this.lastPacketId);
52+
}
53+
5454
/// <summary>
5555
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0
5656
/// Dispose(bool disposing) executes in two distinct scenarios.

0 commit comments

Comments
 (0)