Skip to content

Commit 2f8c83c

Browse files
authored
Refactoring, Cleanup & More Code Documentation (#76)
1 parent 46c1655 commit 2f8c83c

20 files changed

+167
-206
lines changed

Source/HiveMQtt/Client/HiveMQClient.cs

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,9 @@ public async Task<ConnectResult> ConnectAsync()
6969
var socketIsConnected = await this.ConnectSocketAsync().ConfigureAwait(false);
7070

7171
var taskCompletionSource = new TaskCompletionSource<ConnAckPacket>();
72+
void TaskHandler(object? sender, OnConnAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.ConnAckPacket);
7273

73-
EventHandler<OnConnAckReceivedEventArgs> eventHandler = (sender, args) =>
74-
{
75-
taskCompletionSource.SetResult(args.ConnAckPacket);
76-
};
74+
EventHandler<OnConnAckReceivedEventArgs> eventHandler = TaskHandler;
7775
this.OnConnAckReceived += eventHandler;
7876

7977
// Construct the MQTT Connect packet and queue to send
@@ -138,10 +136,8 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
138136
this.connectState = ConnectState.Disconnecting;
139137

140138
var taskCompletionSource = new TaskCompletionSource<DisconnectPacket>();
141-
EventHandler<OnDisconnectSentEventArgs> eventHandler = (sender, args) =>
142-
{
143-
taskCompletionSource.SetResult(args.DisconnectPacket);
144-
};
139+
void TaskHandler(object? sender, OnDisconnectSentEventArgs args) => taskCompletionSource.SetResult(args.DisconnectPacket);
140+
EventHandler<OnDisconnectSentEventArgs> eventHandler = TaskHandler;
145141
this.OnDisconnectSent += eventHandler;
146142

147143
this.sendQueue.Enqueue(disconnectPacket);
@@ -188,11 +184,8 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
188184
{
189185
// QoS 1: Acknowledged Delivery
190186
var taskCompletionSource = new TaskCompletionSource<PubAckPacket>();
191-
192-
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = (sender, args) =>
193-
{
194-
taskCompletionSource.SetResult(args.PubAckPacket);
195-
};
187+
void TaskHandler(object? sender, OnPublishQoS1CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubAckPacket);
188+
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
196189
publishPacket.OnPublishQoS1Complete += eventHandler;
197190

198191
// Construct the MQTT Connect packet and queue to send
@@ -207,11 +200,8 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
207200
{
208201
// QoS 2: Assured Delivery
209202
var taskCompletionSource = new TaskCompletionSource<PubRecPacket>();
210-
211-
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = (sender, args) =>
212-
{
213-
taskCompletionSource.SetResult(args.PubRecPacket);
214-
};
203+
void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubRecPacket);
204+
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
215205
publishPacket.OnPublishQoS2Complete += eventHandler;
216206

217207
// Construct the MQTT Connect packet and queue to send
@@ -274,12 +264,10 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
274264
var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier);
275265

276266
var taskCompletionSource = new TaskCompletionSource<SubAckPacket>();
267+
void TaskHandler(object? sender, OnSubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.SubAckPacket);
277268

278269
// FIXME: We should only ever have one subscribe in flight at any time (for now)
279-
EventHandler<OnSubAckReceivedEventArgs> eventHandler = (sender, args) =>
280-
{
281-
taskCompletionSource.SetResult(args.SubAckPacket);
282-
};
270+
EventHandler<OnSubAckReceivedEventArgs> eventHandler = TaskHandler;
283271
this.OnSubAckReceived += eventHandler;
284272

285273
// Construct the MQTT Connect packet and queue to send
@@ -297,7 +285,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
297285
catch (System.TimeoutException ex)
298286
{
299287
// log.Error(string.Format("Connect timeout. No response received in time.", ex);
300-
throw;
288+
throw ex;
301289
}
302290
finally
303291
{
@@ -352,10 +340,9 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(List<Subscription> subscri
352340
var unsubscribePacket = new UnsubscribePacket(subscriptions, (ushort)packetIdentifier);
353341

354342
var taskCompletionSource = new TaskCompletionSource<UnsubAckPacket>();
355-
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = (sender, args) =>
356-
{
357-
taskCompletionSource.SetResult(args.UnsubAckPacket);
358-
};
343+
344+
void TaskHandler(object? sender, OnUnsubAckReceivedEventArgs args) => taskCompletionSource.SetResult(args.UnsubAckPacket);
345+
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = TaskHandler;
359346
this.OnUnsubAckReceived += eventHandler;
360347

361348
this.sendQueue.Enqueue(unsubscribePacket);

Source/HiveMQtt/Client/HiveMQClientSocket.cs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,23 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
3131
private PipeReader? reader;
3232
private PipeWriter? writer;
3333

34+
internal static bool ValidateServerCertificate(
35+
object sender,
36+
X509Certificate certificate,
37+
X509Chain chain,
38+
SslPolicyErrors sslPolicyErrors)
39+
{
40+
if (sslPolicyErrors == SslPolicyErrors.None)
41+
{
42+
return true;
43+
}
44+
45+
Console.WriteLine("Certificate error: {0}", sslPolicyErrors);
46+
47+
// Do not allow this client to communicate with unauthenticated servers.
48+
return false;
49+
}
50+
3451
/// <summary>
3552
/// Make a TCP connection to a remote broker.
3653
/// </summary>
@@ -102,19 +119,15 @@ internal async Task<bool> ConnectSocketAsync()
102119
this.writer = PipeWriter.Create(this.stream);
103120

104121
// Start the traffic processors
105-
this.trafficOutflowProcessor = this.TrafficOutflowProcessorAsync();
106-
this.trafficInflowProcessor = this.TrafficInflowProcessorAsync();
122+
_ = this.TrafficOutflowProcessorAsync();
123+
_ = this.TrafficInflowProcessorAsync();
107124

108125
// Console.WriteLine($"Socket connected to {this.socket.RemoteEndPoint}");
109126
return socketConnected;
110127
}
111128

112129
internal bool CloseSocket()
113130
{
114-
// Shutdown the traffic processors
115-
this.trafficOutflowProcessor = null;
116-
this.trafficInflowProcessor = null;
117-
118131
// Shutdown the pipeline
119132
this.reader = null;
120133
this.writer = null;
@@ -125,21 +138,4 @@ internal bool CloseSocket()
125138

126139
return true;
127140
}
128-
129-
internal static bool ValidateServerCertificate(
130-
object sender,
131-
X509Certificate certificate,
132-
X509Chain chain,
133-
SslPolicyErrors sslPolicyErrors)
134-
{
135-
if (sslPolicyErrors == SslPolicyErrors.None)
136-
{
137-
return true;
138-
}
139-
140-
Console.WriteLine("Certificate error: {0}", sslPolicyErrors);
141-
142-
// Do not allow this client to communicate with unauthenticated servers.
143-
return false;
144-
}
145141
}

Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
3535
// Transactional packets indexed by packet identifer
3636
private readonly ConcurrentDictionary<int, List<ControlPacket>> transactionQueue = new();
3737

38-
private Task<bool>? trafficOutflowProcessor;
39-
40-
private Task<bool>? trafficInflowProcessor;
41-
4238
/// <summary>
4339
/// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue.
4440
/// </summary>
@@ -161,7 +157,6 @@ private Task<bool> TrafficOutflowProcessorAsync() => Task.Run(async () =>
161157

162158
stopWatch.Restart();
163159
}
164-
165160
} // while
166161

167162
Trace.WriteLine($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Exiting...{this.connectState}");

Source/HiveMQtt/Client/HiveMQClientUtil.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ from executing a second time.
6161
/// runtime from inside the finalizer and you should not reference
6262
/// other objects. Only unmanaged resources can be disposed.
6363
/// </summary>
64-
/// <param name="disposing">fixme.</param>
64+
/// <param name="disposing">True if called from user code.</param>
6565
protected virtual void Dispose(bool disposing)
6666
{
6767
// Check to see if Dispose has already been called.
@@ -72,14 +72,14 @@ protected virtual void Dispose(bool disposing)
7272
if (disposing)
7373
{
7474
// Dispose managed resources.
75-
{ }
75+
// { }
7676
}
7777

7878
// Call the appropriate methods to clean up
7979
// unmanaged resources here.
8080
// If disposing is false,
8181
// only the following code is executed.
82-
{ }
82+
// { }
8383

8484
// Note disposing has been done.
8585
this.disposed = true;

Source/HiveMQtt/Client/Options/SubscribeOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
*/
1616
namespace HiveMQtt.Client.Options;
1717

18-
using HiveMQtt.MQTT5.Types;
1918
using HiveMQtt.Client.Exceptions;
19+
using HiveMQtt.MQTT5.Types;
2020

2121
public class SubscribeOptions
2222
{

Source/HiveMQtt/Client/Options/UnsubscribeOptions.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
namespace HiveMQtt.Client.Options;
1717

18-
using System.Collections;
19-
2018
public class UnsubscribeOptions
2119
{
2220
public UnsubscribeOptions() => this.UserProperties = new Dictionary<string, string>();

Source/HiveMQtt/Client/Results/SubscribeResult.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ internal SubscribeResult(SubscribeOptions options, SubAckPacket subAckPacket)
6767
/// <para>
6868
/// The topic filter must be the same as one used in the Subscribe operation.
6969
/// </para>
70-
/// <returns>The Subscription for the given topic filter or null if not found.</returns>
7170
/// </summary>
71+
/// <param name="topicFilter">The topic filter to get the Subscription for.</param>
72+
/// <returns>The Subscription for the given topic filter or null if not found.</returns>
7273
public Subscription? GetSubscription(string topicFilter)
7374
{
7475
foreach (var subscription in this.Subscriptions)
@@ -78,14 +79,13 @@ internal SubscribeResult(SubscribeOptions options, SubAckPacket subAckPacket)
7879
return subscription;
7980
}
8081
}
82+
8183
return null;
8284
}
8385

8486
/// <summary>
8587
/// Gets the first Subscription in the list of Subscriptions or null if the list is empty.
8688
/// </summary>
87-
public Subscription? GetFirstSubscription()
88-
{
89-
return this.Subscriptions.FirstOrDefault();
90-
}
89+
/// <returns>The first Subscription in the list of Subscriptions or null if the list is empty.</returns>
90+
public Subscription? GetFirstSubscription() => this.Subscriptions.FirstOrDefault();
9191
}

Source/HiveMQtt/MQTT5/ControlPacket.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,23 @@ public static int DecodeVariableByteInteger(ref SequenceReader<byte> reader, out
9797
/// </summary>
9898
internal ushort PacketIdentifier { get; set; }
9999

100+
/// <summary>
101+
/// Decodes a MQTT packet identifier as a two byte integer from the given <c>MemoryStream</c>. It then
102+
/// does basic validation and range checking on the decoded value.
103+
/// </summary>
104+
/// <param name="reader">SequenceReader containing the packet data to be decoded.</param>
105+
/// <returns>The packet identifier as a two byte integer.</returns>
106+
protected static int DecodePacketIdentifier(ref SequenceReader<byte> reader)
107+
{
108+
var packetIdentifier = DecodeTwoByteInteger(ref reader);
109+
if (packetIdentifier == null || packetIdentifier.Value < 0 || packetIdentifier.Value > ushort.MaxValue)
110+
{
111+
throw new MQTTProtocolException("Invalid packet identifier");
112+
}
113+
114+
return packetIdentifier.Value;
115+
}
116+
100117
/// <summary>
101118
/// Encode a UTF-8 string into a <c>MemoryStream</c>.
102119
///
@@ -128,7 +145,6 @@ protected static int EncodeUTF8String(MemoryStream stream, string s)
128145
/// <returns>A string containing the UTF-8 string.</returns>
129146
protected static string? DecodeUTF8String(ref SequenceReader<byte> reader)
130147
{
131-
132148
if (reader.TryReadBigEndian(out Int16 stringLength))
133149
{
134150
var array = new byte[stringLength];
@@ -269,7 +285,6 @@ protected static int EncodeBinaryData(MemoryStream writer, byte[] binaryData)
269285
/// <returns>A byte[] containing the binary data.</returns>
270286
protected static byte[]? DecodeBinaryData(ref SequenceReader<byte> reader)
271287
{
272-
273288
if (reader.TryReadBigEndian(out Int16 stringLength))
274289
{
275290
var array = new byte[stringLength];
@@ -393,7 +408,7 @@ protected void EncodeProperties(MemoryStream writer)
393408

394409
if (writer.Length > int.MaxValue)
395410
{
396-
throw new ArgumentOutOfRangeException("writer", "The writer stream is too large to encode.");
411+
throw new ArgumentOutOfRangeException(nameof(writer), "The writer stream is too large to encode.");
397412
}
398413

399414
var propertyStream = new MemoryStream((int)writer.Length);
@@ -514,8 +529,8 @@ protected void EncodeProperties(MemoryStream writer)
514529
foreach (var property in this.Properties.UserProperties)
515530
{
516531
propertiesLength += EncodeVariableByteInteger(propertyStream, (int)MQTT5PropertyType.UserProperty);
517-
propertiesLength += EncodeUTF8String(propertyStream, (string)property.Key);
518-
propertiesLength += EncodeUTF8String(propertyStream, (string)property.Value);
532+
propertiesLength += EncodeUTF8String(propertyStream, property.Key);
533+
propertiesLength += EncodeUTF8String(propertyStream, property.Value);
519534
}
520535
}
521536

@@ -567,7 +582,7 @@ protected bool DecodeProperties(ref SequenceReader<byte> reader, int length)
567582
this.Properties.CorrelationData = DecodeBinaryData(ref reader);
568583
break;
569584
case MQTT5PropertyType.SubscriptionIdentifier:
570-
this.Properties.SubscriptionIdentifier = (Int32)DecodeVariableByteInteger(ref reader);
585+
this.Properties.SubscriptionIdentifier = DecodeVariableByteInteger(ref reader);
571586
break;
572587
case MQTT5PropertyType.SessionExpiryInterval:
573588
this.Properties.SessionExpiryInterval = DecodeFourByteInteger(ref reader);
@@ -648,4 +663,5 @@ protected bool DecodeProperties(ref SequenceReader<byte> reader, int length)
648663

649664
return true;
650665
}
666+
651667
}

Source/HiveMQtt/MQTT5/PacketDecoder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static ControlPacket Decode(ReadOnlySequence<byte> buffer, out SequencePo
4040

4141
// Byte 1: Control Packet Type
4242
srBuffer.TryRead(out var cpByte);
43-
var controlPacketType = (int)cpByte >> 4;
43+
var controlPacketType = cpByte >> 4;
4444

4545
// Byte 2-5: Remaining Length of the Variable Header
4646
// Size of VBI in vbiLengthInBytes

0 commit comments

Comments
 (0)