Skip to content

Commit 17a273c

Browse files
authored
Improved Wait Strategy for Individual Packets (#164)
1 parent 9221aa7 commit 17a273c

File tree

2 files changed

+61
-24
lines changed

2 files changed

+61
-24
lines changed

Source/HiveMQtt/Client/HiveMQClient.cs

+3-18
Original file line numberDiff line numberDiff line change
@@ -207,37 +207,25 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
207207
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
208208
{
209209
// QoS 1: Acknowledged Delivery
210-
var taskCompletionSource = new TaskCompletionSource<PubAckPacket>();
211-
void TaskHandler(object? sender, OnPublishQoS1CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubAckPacket);
212-
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
213-
publishPacket.OnPublishQoS1Complete += eventHandler;
214-
215210
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
216211
this.OutgoingPublishQueue.Enqueue(publishPacket);
217212

218-
var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
219-
220-
publishPacket.OnPublishQoS1Complete -= eventHandler;
213+
// Wait on the QoS 1 handshake
214+
var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
221215
return new PublishResult(publishPacket.Message, pubAckPacket);
222216
}
223217
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
224218
{
225219
// QoS 2: Assured Delivery
226220
PublishResult? publishResult = null;
227-
var taskCompletionSource = new TaskCompletionSource<List<ControlPacket>>();
228-
void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PacketList);
229-
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
230-
publishPacket.OnPublishQoS2Complete += eventHandler;
231-
232221
Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
233222
this.OutgoingPublishQueue.Enqueue(publishPacket);
234223

235224
List<ControlPacket> packetList;
236225
try
237226
{
238227
// Wait on the QoS 2 handshake
239-
// FIXME: Timeout value
240-
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
228+
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
241229
}
242230
catch (TimeoutException)
243231
{
@@ -254,7 +242,6 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
254242
{
255243
QoS2ReasonCode = null,
256244
};
257-
publishPacket.OnPublishQoS2Complete -= eventHandler;
258245
return publishResult;
259246
}
260247

@@ -271,8 +258,6 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
271258
throw new HiveMQttClientException("PublishAsync: QoS 2 complete but no PubRec packet received.");
272259
}
273260

274-
// Remove our wait handler
275-
publishPacket.OnPublishQoS2Complete -= eventHandler;
276261
return publishResult;
277262
}
278263

Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

+58-6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ public PublishPacket(MQTT5PublishMessage message, int packetIdentifier)
3939
{
4040
this.PacketIdentifier = (ushort)packetIdentifier;
4141
this.Message = message;
42+
43+
// Setup the QoS 1 TaskCompletionSource so users can simply call
44+
//
45+
// await PublishPacket.OnPublishQoS1CompleteTCS
46+
//
47+
// to wait for the QoS 1 publish to complete.
48+
this.OnPublishQoS1Complete += (sender, args) => this.OnPublishQoS1CompleteTCS.SetResult(args.PubAckPacket);
49+
50+
// Setup the QoS 2 TaskCompletionSource so users can simply call
51+
//
52+
// await PublishPacket.OnPublishQoS2CompleteTCS
53+
//
54+
// to wait for the QoS 2 publish to complete.
55+
this.OnPublishQoS2Complete += (sender, args) => this.OnPublishQoS2CompleteTCS.SetResult(args.PacketList);
4256
}
4357

4458
/// <summary>
@@ -66,23 +80,61 @@ public PublishPacket(ReadOnlySequence<byte> packetData)
6680

6781
internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet)
6882
{
69-
var eventArgs = new OnPublishQoS1CompleteEventArgs(packet);
70-
Logger.Trace("OnPublishQoS1CompleteEventLauncher");
71-
this.OnPublishQoS1Complete?.Invoke(this, eventArgs);
83+
if (this.OnPublishQoS1Complete != null && this.OnPublishQoS1Complete.GetInvocationList().Length > 0)
84+
{
85+
var eventArgs = new OnPublishQoS1CompleteEventArgs(packet);
86+
Logger.Trace("OnPublishQoS1CompleteEventLauncher");
87+
_ = Task.Run(() => this.OnPublishQoS1Complete?.Invoke(this, eventArgs)).ContinueWith(
88+
t =>
89+
{
90+
if (t.IsFaulted)
91+
{
92+
Logger.Error("OnPublishQoS1CompleteEventLauncher exception: " + t.Exception.Message);
93+
}
94+
},
95+
TaskScheduler.Default);
96+
}
7297
}
7398

99+
/// <summary>
100+
/// Gets the awaitable TaskCompletionSource for the QoS 1 publish transaction.
101+
/// <para>
102+
/// Valid for outgoing Publish messages QoS 1. A TaskCompletionSource that is set when the QoS 1 publish transaction is complete.
103+
/// </para>
104+
/// </summary>
105+
public TaskCompletionSource<PubAckPacket> OnPublishQoS1CompleteTCS { get; } = new();
106+
74107
/// <summary>
75108
/// Valid for outgoing Publish messages QoS 2. An event that is fired after the the QoS 2 PubComp is received.
76109
/// </summary>
77110
public event EventHandler<OnPublishQoS2CompleteEventArgs> OnPublishQoS2Complete = new((client, e) => { });
78111

79112
internal virtual void OnPublishQoS2CompleteEventLauncher(List<ControlPacket> packetList)
80113
{
81-
var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList);
82-
Logger.Trace("OnPublishQoS2CompleteEventLauncher");
83-
this.OnPublishQoS2Complete?.Invoke(this, eventArgs);
114+
if (this.OnPublishQoS2Complete != null && this.OnPublishQoS2Complete.GetInvocationList().Length > 0)
115+
{
116+
var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList);
117+
Logger.Trace("OnPublishQoS2CompleteEventLauncher");
118+
_ = Task.Run(() => this.OnPublishQoS2Complete?.Invoke(this, eventArgs)).ContinueWith(
119+
t =>
120+
{
121+
if (t.IsFaulted)
122+
{
123+
Logger.Error("OnPublishQoS2CompleteEventLauncher exception: " + t.Exception.Message);
124+
}
125+
},
126+
TaskScheduler.Default);
127+
}
84128
}
85129

130+
/// <summary>
131+
/// Gets the awaitable TaskCompletionSource for the QoS 2 publish transaction.
132+
/// <para>
133+
/// Valid for outgoing Publish messages QoS 2. A TaskCompletionSource that is set when the QoS 2 publish transaction is complete.
134+
/// </para>
135+
/// </summary>
136+
public TaskCompletionSource<List<ControlPacket>> OnPublishQoS2CompleteTCS { get; } = new();
137+
86138
/// <summary>
87139
/// Decode the received MQTT Publish packet.
88140
/// </summary>

0 commit comments

Comments
 (0)