Skip to content

Commit cc040f9

Browse files
authored
Merge pull request #1901 from EvheniyHlushko/fix-confirm-semaphore-cancellation
Fix unconditional semaphore release in BasicPublishAsync when Cancell…
2 parents 0c7fbd2 + c6c3799 commit cc040f9

File tree

3 files changed

+52
-59
lines changed

3 files changed

+52
-59
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
using System.Diagnostics;
3535
using System.Runtime.CompilerServices;
3636
using System.Threading;
37+
using System.Threading.RateLimiting;
3738
using System.Threading.Tasks;
3839
using RabbitMQ.Client.Framing;
3940

@@ -49,11 +50,12 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
4950
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
5051
{
5152
PublisherConfirmationInfo? publisherConfirmationInfo = null;
53+
RateLimitLease? lease =
54+
await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken)
55+
.ConfigureAwait(false);
5256
try
5357
{
54-
publisherConfirmationInfo =
55-
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
56-
.ConfigureAwait(false);
58+
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();
5759

5860
await MaybeEnforceFlowControlAsync(cancellationToken)
5961
.ConfigureAwait(false);
@@ -93,6 +95,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
9395
}
9496
finally
9597
{
98+
MaybeReleasePublisherConfirmationLock(lease);
9699
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
97100
.ConfigureAwait(false);
98101
}
@@ -104,11 +107,12 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
104107
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
105108
{
106109
PublisherConfirmationInfo? publisherConfirmationInfo = null;
110+
RateLimitLease? lease =
111+
await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken)
112+
.ConfigureAwait(false);
107113
try
108114
{
109-
publisherConfirmationInfo =
110-
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
111-
.ConfigureAwait(false);
115+
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();
112116

113117
await MaybeEnforceFlowControlAsync(cancellationToken)
114118
.ConfigureAwait(false);
@@ -148,6 +152,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
148152
}
149153
finally
150154
{
155+
MaybeReleasePublisherConfirmationLock(lease);
151156
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
152157
.ConfigureAwait(false);
153158
}

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 40 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,15 @@ internal partial class Channel : IChannel, IRecoverable
5252
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
5353
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
5454

55-
private sealed class PublisherConfirmationInfo : IDisposable
55+
private sealed class PublisherConfirmationInfo
5656
{
5757
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
58-
private readonly RateLimitLease? _lease;
5958

6059
internal PublisherConfirmationInfo(ulong publishSequenceNumber,
61-
TaskCompletionSource<bool>? publisherConfirmationTcs,
62-
RateLimitLease? lease)
60+
TaskCompletionSource<bool>? publisherConfirmationTcs)
6361
{
6462
PublishSequenceNumber = publishSequenceNumber;
6563
_publisherConfirmationTcs = publisherConfirmationTcs;
66-
_lease = lease;
6764
}
6865

6966
internal ulong PublishSequenceNumber { get; }
@@ -89,11 +86,6 @@ internal bool MaybeHandleException(Exception ex)
8986

9087
return exceptionWasHandled;
9188
}
92-
93-
public void Dispose()
94-
{
95-
_lease?.Dispose();
96-
}
9789
}
9890

9991
public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
@@ -289,29 +281,53 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
289281
}
290282

291283
[MethodImpl(MethodImplOptions.AggressiveInlining)]
292-
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken)
284+
private async Task<RateLimitLease?> MaybeAcquirePublisherConfirmationLockAsync(CancellationToken cancellationToken)
293285
{
294286
if (_publisherConfirmationsEnabled)
295287
{
296288
RateLimitLease? lease = null;
297-
if (_publisherConfirmationTrackingEnabled)
289+
if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null)
298290
{
299-
if (_outstandingPublisherConfirmationsRateLimiter is not null)
291+
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(cancellationToken: cancellationToken)
292+
.ConfigureAwait(false);
293+
if (false == lease.IsAcquired)
300294
{
301-
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
302-
cancellationToken: cancellationToken)
303-
.ConfigureAwait(false);
304-
305-
if (!lease.IsAcquired)
306-
{
307-
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
308-
}
295+
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
309296
}
310297
}
311298

312-
await _confirmSemaphore.WaitAsync(cancellationToken)
313-
.ConfigureAwait(false);
299+
try
300+
{
301+
await _confirmSemaphore.WaitAsync(cancellationToken)
302+
.ConfigureAwait(false);
303+
}
304+
catch (OperationCanceledException)
305+
{
306+
lease?.Dispose();
307+
throw;
308+
}
309+
310+
return lease;
311+
}
312+
313+
return null;
314+
}
315+
316+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
317+
private void MaybeReleasePublisherConfirmationLock(RateLimitLease? lease)
318+
{
319+
if (_publisherConfirmationsEnabled)
320+
{
321+
_confirmSemaphore.Release();
322+
lease?.Dispose();
323+
}
324+
}
314325

326+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
327+
private PublisherConfirmationInfo? MaybeStartPublisherConfirmationTracking()
328+
{
329+
if (_publisherConfirmationsEnabled)
330+
{
315331
ulong publishSequenceNumber = _nextPublishSeqNo;
316332

317333
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
@@ -326,7 +342,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
326342

327343
_nextPublishSeqNo++;
328344

329-
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
345+
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
330346
}
331347
else
332348
{
@@ -362,25 +378,6 @@ private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmat
362378
{
363379
if (_publisherConfirmationsEnabled)
364380
{
365-
try
366-
{
367-
_confirmSemaphore.Release();
368-
}
369-
catch (SemaphoreFullException ex)
370-
{
371-
/*
372-
* rabbitmq/rabbitmq-dotnet-client-1793
373-
* If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring
374-
* _confirmSemaphore, the above Release() call will throw SemaphoreFullException.
375-
* In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw
376-
* a "bug found" exception here.
377-
*/
378-
if (publisherConfirmationInfo is not null)
379-
{
380-
throw new InvalidOperationException(InternalConstants.BugFound, ex);
381-
}
382-
}
383-
384381
if (publisherConfirmationInfo is not null)
385382
{
386383
try
@@ -393,10 +390,6 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
393390
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
394391
throw;
395392
}
396-
finally
397-
{
398-
publisherConfirmationInfo.Dispose();
399-
}
400393
}
401394
}
402395
}

projects/Test/Integration/GH/TestGitHubIssues.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// This source code is dual-licensed under the Apache License, version
1+
// This source code is dual-licensed under the Apache License, version
22
// 2.0, and the Mozilla Public License, version 2.0.
33
//
44
// The APL v2.0:
@@ -370,11 +370,6 @@ await Assert.ThrowsAnyAsync<InvalidOperationException>(async () =>
370370
});
371371
break;
372372
}
373-
catch (SemaphoreFullException ex0)
374-
{
375-
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex0);
376-
retryCount++;
377-
}
378373
catch (PublishException)
379374
{
380375
retryCount++;

0 commit comments

Comments
 (0)