Skip to content

Commit af2333b

Browse files
authored
Add Backpressure Support (#171)
1 parent 580b99a commit af2333b

File tree

8 files changed

+199
-121
lines changed

8 files changed

+199
-121
lines changed

Source/HiveMQtt/Client/HiveMQClient.cs

+43-19
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public HiveMQClient(HiveMQClientOptions? options = null)
5757

5858
this.Options = options;
5959
this.cancellationTokenSource = new CancellationTokenSource();
60+
this.ClientReceiveSemaphore = new SemaphoreSlim(this.Options.ClientReceiveMaximum);
61+
62+
// Set protocol default until ConnAck is received
63+
this.BrokerReceiveSemaphore = new SemaphoreSlim(65535);
6064
}
6165

6266
/// <inheritdoc />
@@ -167,7 +171,9 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
167171

168172
try
169173
{
170-
disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
174+
disconnectPacket = await taskCompletionSource.Task
175+
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
176+
.ConfigureAwait(false);
171177
}
172178
catch (TimeoutException)
173179
{
@@ -179,9 +185,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
179185
this.OnDisconnectSent -= eventHandler;
180186
}
181187

182-
await this.HandleDisconnectionAsync().ConfigureAwait(false);
183-
184-
return true;
188+
return await this.HandleDisconnectionAsync().ConfigureAwait(false);
185189
}
186190

187191
/// <inheritdoc />
@@ -210,8 +214,25 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
210214
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
211215
this.OutgoingPublishQueue.Enqueue(publishPacket);
212216

213-
// Wait on the QoS 1 handshake
214-
var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
217+
PubAckPacket pubAckPacket;
218+
try
219+
{
220+
// Wait on the QoS 1 handshake
221+
pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task
222+
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
223+
.ConfigureAwait(false);
224+
}
225+
catch (TimeoutException)
226+
{
227+
Logger.Error("PublishAsync: QoS 1 timeout. No PUBACK response received in time.");
228+
var disconnectOptions = new DisconnectOptions
229+
{
230+
ReasonCode = DisconnectReasonCode.UnspecifiedError,
231+
};
232+
await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
233+
throw;
234+
}
235+
215236
return new PublishResult(publishPacket.Message, pubAckPacket);
216237
}
217238
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
@@ -225,24 +246,20 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
225246
try
226247
{
227248
// Wait on the QoS 2 handshake
228-
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
249+
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task
250+
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
251+
.ConfigureAwait(false);
229252
}
230253
catch (TimeoutException)
231254
{
232255
Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");
233256

234-
// Remove the transaction chain
235-
if (this.TransactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain))
257+
var disconnectOptions = new DisconnectOptions
236258
{
237-
Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}.");
238-
}
239-
240-
// Prepare PublishResult
241-
publishResult = new PublishResult(publishPacket.Message)
242-
{
243-
QoS2ReasonCode = null,
259+
ReasonCode = DisconnectReasonCode.UnspecifiedError,
244260
};
245-
return publishResult;
261+
await this.DisconnectAsync(disconnectOptions).ConfigureAwait(false);
262+
throw;
246263
}
247264

248265
foreach (var packet in packetList)
@@ -331,7 +348,9 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
331348
SubscribeResult subscribeResult;
332349
try
333350
{
334-
subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
351+
subAck = await taskCompletionSource.Task
352+
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
353+
.ConfigureAwait(false);
335354
}
336355
catch (TimeoutException)
337356
{
@@ -441,7 +460,9 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
441460
UnsubscribeResult unsubscribeResult;
442461
try
443462
{
444-
unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
463+
unsubAck = await taskCompletionSource.Task
464+
.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ResponseTimeoutInMs))
465+
.ConfigureAwait(false);
445466

446467
// FIXME: Validate that the packet identifier matches
447468
}
@@ -488,6 +509,9 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)
488509

489510
// Cancel all background tasks and close the socket
490511
this.ConnectState = ConnectState.Disconnected;
512+
513+
// Don't use CancelAsync here to maintain backwards compatibility
514+
// with >=.net6.0. CancelAsync was introduced in .net8.0
491515
this.cancellationTokenSource.Cancel();
492516
this.CloseSocket();
493517

0 commit comments

Comments
 (0)