diff --git a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs index 06d92d357..ddfc31a4e 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs @@ -34,6 +34,7 @@ using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.RateLimiting; using System.Threading.Tasks; using RabbitMQ.Client.Framing; @@ -49,11 +50,12 @@ public async ValueTask BasicPublishAsync(string exchange, string ro where TProperties : IReadOnlyBasicProperties, IAmqpHeader { PublisherConfirmationInfo? publisherConfirmationInfo = null; + RateLimitLease? lease = + await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken) + .ConfigureAwait(false); try { - publisherConfirmationInfo = - await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken) - .ConfigureAwait(false); + publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking(); await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); @@ -93,6 +95,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) } finally { + MaybeReleasePublisherConfirmationLock(lease); await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken) .ConfigureAwait(false); } @@ -104,11 +107,12 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac where TProperties : IReadOnlyBasicProperties, IAmqpHeader { PublisherConfirmationInfo? publisherConfirmationInfo = null; + RateLimitLease? lease = + await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken) + .ConfigureAwait(false); try { - publisherConfirmationInfo = - await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken) - .ConfigureAwait(false); + publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking(); await MaybeEnforceFlowControlAsync(cancellationToken) .ConfigureAwait(false); @@ -148,6 +152,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) } finally { + MaybeReleasePublisherConfirmationLock(lease); await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken) .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 485df71fa..69f199bf1 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -52,18 +52,15 @@ internal partial class Channel : IChannel, IRecoverable private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); private RateLimiter? _outstandingPublisherConfirmationsRateLimiter; - private sealed class PublisherConfirmationInfo : IDisposable + private sealed class PublisherConfirmationInfo { private TaskCompletionSource? _publisherConfirmationTcs; - private readonly RateLimitLease? _lease; internal PublisherConfirmationInfo(ulong publishSequenceNumber, - TaskCompletionSource? publisherConfirmationTcs, - RateLimitLease? lease) + TaskCompletionSource? publisherConfirmationTcs) { PublishSequenceNumber = publishSequenceNumber; _publisherConfirmationTcs = publisherConfirmationTcs; - _lease = lease; } internal ulong PublishSequenceNumber { get; } @@ -89,11 +86,6 @@ internal bool MaybeHandleException(Exception ex) return exceptionWasHandled; } - - public void Dispose() - { - _lease?.Dispose(); - } } public async ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) @@ -289,29 +281,53 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private async Task MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken) + private async Task MaybeAcquirePublisherConfirmationLockAsync(CancellationToken cancellationToken) { if (_publisherConfirmationsEnabled) { RateLimitLease? lease = null; - if (_publisherConfirmationTrackingEnabled) + if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null) { - if (_outstandingPublisherConfirmationsRateLimiter is not null) + lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(cancellationToken: cancellationToken) + .ConfigureAwait(false); + if (false == lease.IsAcquired) { - lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync( - cancellationToken: cancellationToken) - .ConfigureAwait(false); - - if (!lease.IsAcquired) - { - throw new InvalidOperationException("Could not acquire a lease from the rate limiter."); - } + throw new InvalidOperationException("Could not acquire a lease from the rate limiter."); } } - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); + try + { + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + } + catch (OperationCanceledException) + { + lease?.Dispose(); + throw; + } + + return lease; + } + + return null; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void MaybeReleasePublisherConfirmationLock(RateLimitLease? lease) + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + lease?.Dispose(); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private PublisherConfirmationInfo? MaybeStartPublisherConfirmationTracking() + { + if (_publisherConfirmationsEnabled) + { ulong publishSequenceNumber = _nextPublishSeqNo; TaskCompletionSource? publisherConfirmationTcs = null; @@ -326,7 +342,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken) _nextPublishSeqNo++; - return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease); + return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs); } else { @@ -362,25 +378,6 @@ private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmat { if (_publisherConfirmationsEnabled) { - try - { - _confirmSemaphore.Release(); - } - catch (SemaphoreFullException ex) - { - /* - * rabbitmq/rabbitmq-dotnet-client-1793 - * If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring - * _confirmSemaphore, the above Release() call will throw SemaphoreFullException. - * In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw - * a "bug found" exception here. - */ - if (publisherConfirmationInfo is not null) - { - throw new InvalidOperationException(InternalConstants.BugFound, ex); - } - } - if (publisherConfirmationInfo is not null) { try @@ -393,10 +390,6 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) _confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _); throw; } - finally - { - publisherConfirmationInfo.Dispose(); - } } } } diff --git a/projects/Test/Integration/GH/TestGitHubIssues.cs b/projects/Test/Integration/GH/TestGitHubIssues.cs index 7eb9bfd2a..9718f76a5 100644 --- a/projects/Test/Integration/GH/TestGitHubIssues.cs +++ b/projects/Test/Integration/GH/TestGitHubIssues.cs @@ -1,4 +1,4 @@ -// This source code is dual-licensed under the Apache License, version +// This source code is dual-licensed under the Apache License, version // 2.0, and the Mozilla Public License, version 2.0. // // The APL v2.0: @@ -370,11 +370,6 @@ await Assert.ThrowsAnyAsync(async () => }); break; } - catch (SemaphoreFullException ex0) - { - _output.WriteLine("{0} ex: {1}", _testDisplayName, ex0); - retryCount++; - } catch (PublishException) { retryCount++;