Skip to content

Commit df6f60f

Browse files
authored
Websocket Support (#190)
1 parent 62c89a3 commit df6f60f

22 files changed

+3270
-1350
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Copyright 2024-present HiveMQ and the HiveMQ Community
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
namespace HiveMQtt.Client.Connection;
17+
18+
using System.Diagnostics;
19+
using HiveMQtt.Client.Internal;
20+
using HiveMQtt.Client.Transport;
21+
using HiveMQtt.MQTT5;
22+
using HiveMQtt.MQTT5.Packets;
23+
using HiveMQtt.MQTT5.Types;
24+
25+
/// <summary>
26+
/// Represents a connection manager for the MQTT client.
27+
/// </summary>
28+
public partial class ConnectionManager : IDisposable
29+
{
30+
private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
31+
32+
// The HiveMQClient this ConnectionManager is associated with
33+
internal HiveMQClient Client { get; }
34+
35+
// This is how we kill innocent and not so innocent Tasks
36+
private CancellationTokenSource cancellationTokenSource;
37+
38+
// The state of the connection
39+
internal ConnectState State { get; set; }
40+
41+
// The protocol specific transport layer (TCP, WebSocket, etc.)
42+
internal BaseTransport Transport { get; set; }
43+
44+
// The MQTT Properties for the active connection.
45+
internal MQTT5Properties ConnectionProperties { get; set; } = new();
46+
47+
// The outgoing publish packets queue. Publish packets are separated from other control packets
48+
// so that we can correctly respect the Broker's flow control.
49+
internal AwaitableQueueX<PublishPacket> OutgoingPublishQueue { get; } = new();
50+
51+
// Non-publish control packets queue; everything else
52+
internal AwaitableQueueX<ControlPacket> SendQueue { get; } = new();
53+
54+
// Received control packets queue
55+
internal AwaitableQueueX<ControlPacket> ReceivedQueue { get; } = new();
56+
57+
// Incoming Publish QoS > 0 in-flight transactions indexed by packet identifier
58+
internal BoundedDictionaryX<int, List<ControlPacket>> IPubTransactionQueue { get; set; }
59+
60+
// Outgoing Publish QoS > 0 in-flight transactions indexed by packet identifier
61+
internal BoundedDictionaryX<int, List<ControlPacket>> OPubTransactionQueue { get; set; }
62+
63+
// We generate new PacketIDs here.
64+
internal PacketIDManager PacketIDManager { get; } = new();
65+
66+
// This is used to know if and when we need to send a MQTT PingReq
67+
private readonly Stopwatch lastCommunicationTimer = new();
68+
69+
/// <summary>
70+
/// Initializes a new instance of the <see cref="ConnectionManager"/> class.
71+
/// </summary>
72+
/// <param name="client">The HiveMQClient this ConnectionManager is associated with.</param>
73+
public ConnectionManager(HiveMQClient client)
74+
{
75+
this.Client = client;
76+
this.cancellationTokenSource = new CancellationTokenSource();
77+
this.IPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(this.Client.Options.ClientReceiveMaximum);
78+
this.OPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(65535);
79+
this.State = ConnectState.Disconnected;
80+
81+
// Connect the appropriate transport
82+
if (this.Client.Options.Host.StartsWith("ws://", StringComparison.OrdinalIgnoreCase) ||
83+
this.Client.Options.Host.StartsWith("wss://", StringComparison.OrdinalIgnoreCase))
84+
{
85+
// this.Transport = new WebSocketTransport(this.Client.Options);
86+
this.Transport = new TCPTransport(this.Client.Options);
87+
}
88+
else
89+
{
90+
this.Transport = new TCPTransport(this.Client.Options);
91+
}
92+
93+
Logger.Trace("Trace Level Logging Legend:");
94+
Logger.Trace(" -(W)- == ConnectionWriter");
95+
Logger.Trace(" -(PW)- == ConnectionPublishWriter");
96+
Logger.Trace(" -(R)- == ConnectionReader");
97+
Logger.Trace(" -(CM)- == ConnectionMonitor");
98+
Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");
99+
}
100+
101+
internal async Task<bool> ConnectAsync()
102+
{
103+
// Connect the appropriate transport
104+
if (this.Client.Options.WebSocketServer.Length > 0)
105+
{
106+
this.Transport = new WebSocketTransport(this.Client.Options);
107+
}
108+
else
109+
{
110+
this.Transport = new TCPTransport(this.Client.Options);
111+
}
112+
113+
// Reset the CancellationTokenSource in case this is a reconnect
114+
this.cancellationTokenSource.Dispose();
115+
this.cancellationTokenSource = new CancellationTokenSource();
116+
117+
var connected = await this.Transport.ConnectAsync().ConfigureAwait(false);
118+
119+
if (!connected)
120+
{
121+
Logger.Error("Failed to connect to broker");
122+
return false;
123+
}
124+
125+
// Start the traffic processors
126+
this.ConnectionPublishWriterTask = this.ConnectionPublishWriterAsync(this.cancellationTokenSource.Token);
127+
this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
128+
this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
129+
this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
130+
this.ConnectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token);
131+
132+
return true;
133+
}
134+
135+
/// <summary>
136+
/// Cancel all background tasks.
137+
/// </summary>
138+
/// <returns>A task representing the asynchronous operation.</returns>
139+
internal async Task CancelBackgroundTasksAsync()
140+
{
141+
// Don't use CancelAsync here to maintain backwards compatibility
142+
// with >=.net6.0. CancelAsync was introduced in .net8.0
143+
this.cancellationTokenSource.Cancel();
144+
145+
// Delay for a short period to allow the tasks to cancel
146+
await Task.Delay(1000).ConfigureAwait(false);
147+
148+
// Reset the tasks
149+
if (this.ConnectionPublishWriterTask is not null && this.ConnectionPublishWriterTask.IsCompleted)
150+
{
151+
this.ConnectionPublishWriterTask = null;
152+
}
153+
else
154+
{
155+
Logger.Error("ConnectionPublishWriterTask did not complete");
156+
}
157+
158+
if (this.ConnectionWriterTask is not null && this.ConnectionWriterTask.IsCompleted)
159+
{
160+
this.ConnectionWriterTask = null;
161+
}
162+
else
163+
{
164+
Logger.Error("ConnectionWriterTask did not complete");
165+
}
166+
167+
if (this.ConnectionReaderTask is not null && this.ConnectionReaderTask.IsCompleted)
168+
{
169+
this.ConnectionReaderTask = null;
170+
}
171+
else
172+
{
173+
Logger.Error("ConnectionReaderTask did not complete");
174+
}
175+
176+
if (this.ReceivedPacketsHandlerTask is not null && this.ReceivedPacketsHandlerTask.IsCompleted)
177+
{
178+
this.ReceivedPacketsHandlerTask = null;
179+
}
180+
else
181+
{
182+
Logger.Error("ReceivedPacketsHandlerTask did not complete");
183+
}
184+
185+
if (this.ConnectionMonitorTask is not null && this.ConnectionMonitorTask.IsCompleted)
186+
{
187+
this.ConnectionMonitorTask = null;
188+
}
189+
else
190+
{
191+
Logger.Error("ConnectionMonitorTask did not complete");
192+
}
193+
}
194+
195+
/// <summary>
196+
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
197+
/// </summary>
198+
public void Dispose()
199+
{
200+
this.Dispose();
201+
/*
202+
This object will be cleaned up by the Dispose method.
203+
Therefore, you should call GC.SuppressFinalize to
204+
take this object off the finalization queue
205+
and prevent finalization code for this object
206+
from executing a second time.
207+
*/
208+
GC.SuppressFinalize(this);
209+
}
210+
}

0 commit comments

Comments
 (0)