@@ -37,10 +37,12 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
37
37
{
38
38
private static readonly NLog . Logger Logger = NLog . LogManager . GetCurrentClassLogger ( ) ;
39
39
40
- private ConnectState connectState = ConnectState . Disconnected ;
40
+ internal ConnectState ConnectState { get ; set ; }
41
41
42
42
public HiveMQClient ( HiveMQClientOptions ? options = null )
43
43
{
44
+ this . ConnectState = ConnectState . Disconnected ;
45
+
44
46
options ??= new HiveMQClientOptions ( ) ;
45
47
options . Validate ( ) ;
46
48
@@ -66,12 +68,12 @@ public HiveMQClient(HiveMQClientOptions? options = null)
66
68
public List < Subscription > Subscriptions { get ; } = new ( ) ;
67
69
68
70
/// <inheritdoc />
69
- public bool IsConnected ( ) => this . connectState == ConnectState . Connected ;
71
+ public bool IsConnected ( ) => this . ConnectState == ConnectState . Connected ;
70
72
71
73
/// <inheritdoc />
72
74
public async Task < ConnectResult > ConnectAsync ( )
73
75
{
74
- this . connectState = ConnectState . Connecting ;
76
+ this . ConnectState = ConnectState . Connecting ;
75
77
76
78
Logger . Info ( "Connecting to broker at {0}:{1}" , this . Options . Host , this . Options . Port ) ;
77
79
@@ -89,7 +91,7 @@ public async Task<ConnectResult> ConnectAsync()
89
91
// Construct the MQTT Connect packet and queue to send
90
92
var connPacket = new ConnectPacket ( this . Options ) ;
91
93
Logger . Trace ( $ "Queuing packet for send: { connPacket } ") ;
92
- this . sendQueue . Add ( connPacket ) ;
94
+ this . SendQueue . Add ( connPacket ) ;
93
95
94
96
// FIXME: Cancellation token and better timeout value
95
97
ConnAckPacket connAck ;
@@ -100,7 +102,7 @@ public async Task<ConnectResult> ConnectAsync()
100
102
}
101
103
catch ( TimeoutException )
102
104
{
103
- this . connectState = ConnectState . Disconnected ;
105
+ this . ConnectState = ConnectState . Disconnected ;
104
106
throw new HiveMQttClientException ( "Connect timeout. No response received in time." ) ;
105
107
}
106
108
finally
@@ -111,11 +113,11 @@ public async Task<ConnectResult> ConnectAsync()
111
113
112
114
if ( connAck . ReasonCode == ConnAckReasonCode . Success )
113
115
{
114
- this . connectState = ConnectState . Connected ;
116
+ this . ConnectState = ConnectState . Connected ;
115
117
}
116
118
else
117
119
{
118
- this . connectState = ConnectState . Disconnected ;
120
+ this . ConnectState = ConnectState . Disconnected ;
119
121
}
120
122
121
123
connectResult = new ConnectResult ( connAck . ReasonCode , connAck . SessionPresent , connAck . Properties ) ;
@@ -133,7 +135,7 @@ public async Task<ConnectResult> ConnectAsync()
133
135
/// <inheritdoc />
134
136
public async Task < bool > DisconnectAsync ( DisconnectOptions ? options = null )
135
137
{
136
- if ( this . connectState != ConnectState . Connected )
138
+ if ( this . ConnectState != ConnectState . Connected )
137
139
{
138
140
Logger . Warn ( "DisconnectAsync: Client is not connected." ) ;
139
141
return false ;
@@ -152,15 +154,15 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
152
154
} ;
153
155
154
156
// Once this is set, no more incoming packets or outgoing will be accepted
155
- this . connectState = ConnectState . Disconnecting ;
157
+ this . ConnectState = ConnectState . Disconnecting ;
156
158
157
159
var taskCompletionSource = new TaskCompletionSource < DisconnectPacket > ( ) ;
158
160
void TaskHandler ( object ? sender , OnDisconnectSentEventArgs args ) => taskCompletionSource . SetResult ( args . DisconnectPacket ) ;
159
161
EventHandler < OnDisconnectSentEventArgs > eventHandler = TaskHandler ;
160
162
this . OnDisconnectSent += eventHandler ;
161
163
162
164
Logger . Trace ( $ "Queuing packet for send: { disconnectPacket } ") ;
163
- this . sendQueue . Add ( disconnectPacket ) ;
165
+ this . SendQueue . Add ( disconnectPacket ) ;
164
166
165
167
try
166
168
{
@@ -176,26 +178,35 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
176
178
this . OnDisconnectSent -= eventHandler ;
177
179
}
178
180
179
- // Close the socket
181
+ this . HandleDisconnection ( ) ;
182
+
183
+ return true ;
184
+ }
185
+
186
+ /// <summary>
187
+ /// Close the socket and set the connect state to disconnected.
188
+ /// </summary>
189
+ private void HandleDisconnection ( )
190
+ {
191
+ Logger . Debug ( "HandleDisconnection: Connection lost. Handling Disconnection." ) ;
192
+
180
193
this . CloseSocket ( ) ;
181
194
182
195
// Fire the corresponding event
183
- this . AfterDisconnectEventLauncher ( true ) ;
196
+ this . AfterDisconnectEventLauncher ( false ) ;
184
197
185
- this . connectState = ConnectState . Disconnected ;
198
+ this . ConnectState = ConnectState . Disconnected ;
186
199
187
200
// FIXME
188
- if ( this . sendQueue . Count > 0 )
201
+ if ( this . SendQueue . Count > 0 )
189
202
{
190
- Logger . Warn ( "Disconnect : Send queue not empty. Packets pending but we are disconnecting.") ;
203
+ Logger . Warn ( $ "HandleDisconnection : Send queue not empty. { this . SendQueue . Count } packets pending but we are disconnecting (or were disconnected) .") ;
191
204
}
192
205
193
206
// We only clear the send queue on explicit disconnect
194
- while ( this . sendQueue . TryTake ( out _ ) )
207
+ while ( this . SendQueue . TryTake ( out _ ) )
195
208
{
196
209
}
197
-
198
- return true ;
199
210
}
200
211
201
212
/// <inheritdoc />
@@ -210,7 +221,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
210
221
if ( message . QoS == QualityOfService . AtMostOnceDelivery )
211
222
{
212
223
Logger . Trace ( $ "Queuing packet for send: { publishPacket } ") ;
213
- this . sendQueue . Add ( publishPacket ) ;
224
+ this . SendQueue . Add ( publishPacket ) ;
214
225
return new PublishResult ( publishPacket . Message ) ;
215
226
}
216
227
else if ( message . QoS == QualityOfService . AtLeastOnceDelivery )
@@ -223,7 +234,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
223
234
224
235
// Construct the MQTT Connect packet and queue to send
225
236
Logger . Trace ( $ "Queuing packet for send: { publishPacket } ") ;
226
- this . sendQueue . Add ( publishPacket ) ;
237
+ this . SendQueue . Add ( publishPacket ) ;
227
238
228
239
var pubAckPacket = await taskCompletionSource . Task . WaitAsync ( TimeSpan . FromSeconds ( 120 ) ) . ConfigureAwait ( false ) ;
229
240
@@ -239,7 +250,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
239
250
publishPacket . OnPublishQoS2Complete += eventHandler ;
240
251
241
252
// Construct the MQTT Connect packet and queue to send
242
- this . sendQueue . Add ( publishPacket ) ;
253
+ this . SendQueue . Add ( publishPacket ) ;
243
254
244
255
// Wait on the QoS 2 handshake
245
256
var packetList = await taskCompletionSource . Task . WaitAsync ( TimeSpan . FromSeconds ( 120 ) ) . ConfigureAwait ( false ) ;
@@ -322,7 +333,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
322
333
this . OnSubAckReceived += eventHandler ;
323
334
324
335
// Queue the constructed packet to be sent on the wire
325
- this . sendQueue . Add ( subscribePacket ) ;
336
+ this . SendQueue . Add ( subscribePacket ) ;
326
337
327
338
SubAckPacket subAck ;
328
339
SubscribeResult subscribeResult ;
@@ -426,7 +437,7 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
426
437
EventHandler < OnUnsubAckReceivedEventArgs > eventHandler = TaskHandler ;
427
438
this . OnUnsubAckReceived += eventHandler ;
428
439
429
- this . sendQueue . Add ( unsubscribePacket ) ;
440
+ this . SendQueue . Add ( unsubscribePacket ) ;
430
441
431
442
// FIXME: Cancellation token and better timeout value
432
443
UnsubAckPacket unsubAck ;
0 commit comments