@@ -97,7 +97,7 @@ public async Task<ConnectResult> ConnectAsync()
97
97
98
98
// Construct the MQTT Connect packet and queue to send
99
99
var connPacket = new ConnectPacket ( this . Options ) ;
100
- Logger . Trace ( $ "Queuing packet for send: { connPacket . GetType ( ) . Name } id= { connPacket . PacketIdentifier } ") ;
100
+ Logger . Trace ( $ "Queuing CONNECT packet for send. ") ;
101
101
this . SendQueue . Enqueue ( connPacket ) ;
102
102
103
103
ConnAckPacket connAck ;
@@ -174,7 +174,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
174
174
EventHandler < OnDisconnectSentEventArgs > eventHandler = TaskHandler ;
175
175
this . OnDisconnectSent += eventHandler ;
176
176
177
- Logger . Trace ( $ "Queuing packet for send: { disconnectPacket . GetType ( ) . Name } id= { disconnectPacket . PacketIdentifier } ") ;
177
+ Logger . Trace ( $ "Queuing DISCONNECT packet for send. ") ;
178
178
this . SendQueue . Enqueue ( disconnectPacket ) ;
179
179
180
180
try
@@ -201,23 +201,25 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, Cance
201
201
{
202
202
message . Validate ( ) ;
203
203
204
- var packetIdentifier = this . GeneratePacketIdentifier ( ) ;
205
- var publishPacket = new PublishPacket ( message , ( ushort ) packetIdentifier ) ;
206
-
207
204
// QoS 0: Fast Service
208
205
if ( message . QoS == QualityOfService . AtMostOnceDelivery )
209
206
{
210
- Logger . Trace ( $ "Queuing packet for send: { publishPacket . GetType ( ) . Name } id={ publishPacket . PacketIdentifier } ") ;
207
+ var publishPacket = new PublishPacket ( message , 0 ) ;
208
+ Logger . Trace ( $ "Queuing QoS 0 publish packet for send: { publishPacket . GetType ( ) . Name } ") ;
209
+
211
210
this . OutgoingPublishQueue . Enqueue ( publishPacket ) ;
212
211
return new PublishResult ( publishPacket . Message ) ;
213
212
}
214
213
else if ( message . QoS == QualityOfService . AtLeastOnceDelivery )
215
214
{
216
215
// QoS 1: Acknowledged Delivery
217
- Logger . Trace ( $ "Queuing packet for send: { publishPacket . GetType ( ) . Name } id={ publishPacket . PacketIdentifier } ") ;
216
+ var packetIdentifier = await this . PacketIDManager . GetAvailablePacketIDAsync ( ) . ConfigureAwait ( false ) ;
217
+ var publishPacket = new PublishPacket ( message , ( ushort ) packetIdentifier ) ;
218
+ PubAckPacket pubAckPacket ;
219
+
220
+ Logger . Trace ( $ "Queuing QoS 1 publish packet for send: { publishPacket . GetType ( ) . Name } id={ publishPacket . PacketIdentifier } ") ;
218
221
this . OutgoingPublishQueue . Enqueue ( publishPacket ) ;
219
222
220
- PubAckPacket pubAckPacket ;
221
223
try
222
224
{
223
225
// Wait on the QoS 1 handshake
@@ -236,8 +238,11 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, Cance
236
238
else if ( message . QoS == QualityOfService . ExactlyOnceDelivery )
237
239
{
238
240
// QoS 2: Assured Delivery
241
+ var packetIdentifier = await this . PacketIDManager . GetAvailablePacketIDAsync ( ) . ConfigureAwait ( false ) ;
242
+ var publishPacket = new PublishPacket ( message , ( ushort ) packetIdentifier ) ;
239
243
PublishResult ? publishResult = null ;
240
- Logger . Trace ( $ "Queuing packet for send: { publishPacket . GetType ( ) . Name } id={ publishPacket . PacketIdentifier } ") ;
244
+
245
+ Logger . Trace ( $ "Queuing QoS 2 publish packet for send: { publishPacket . GetType ( ) . Name } id={ publishPacket . PacketIdentifier } ") ;
241
246
this . OutgoingPublishQueue . Enqueue ( publishPacket ) ;
242
247
243
248
List < ControlPacket > packetList ;
@@ -314,7 +319,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
314
319
315
320
// FIXME: We should only ever have one subscribe in flight at any time (for now)
316
321
// Construct the MQTT Subscribe packet
317
- var packetIdentifier = this . GeneratePacketIdentifier ( ) ;
322
+ var packetIdentifier = await this . PacketIDManager . GetAvailablePacketIDAsync ( ) . ConfigureAwait ( false ) ;
318
323
var subscribePacket = new SubscribePacket ( options , ( ushort ) packetIdentifier ) ;
319
324
320
325
// Setup the task completion source to wait for the SUBACK
@@ -422,7 +427,7 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
422
427
// Fire the corresponding event
423
428
this . BeforeUnsubscribeEventLauncher ( unsubOptions . Subscriptions ) ;
424
429
425
- var packetIdentifier = this . GeneratePacketIdentifier ( ) ;
430
+ var packetIdentifier = await this . PacketIDManager . GetAvailablePacketIDAsync ( ) . ConfigureAwait ( false ) ;
426
431
var unsubscribePacket = new UnsubscribePacket ( unsubOptions , ( ushort ) packetIdentifier ) ;
427
432
428
433
var taskCompletionSource = new TaskCompletionSource < UnsubAckPacket > ( ) ;
0 commit comments