Skip to content
17 changes: 11 additions & 6 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Framing;

Expand All @@ -49,11 +50,12 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
PublisherConfirmationInfo? publisherConfirmationInfo = null;
RateLimitLease? lease =
await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken)
.ConfigureAwait(false);
try
{
publisherConfirmationInfo =
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
.ConfigureAwait(false);
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();

await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -93,6 +95,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
}
finally
{
MaybeReleasePublisherConfirmationLock(lease);
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -104,11 +107,12 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
PublisherConfirmationInfo? publisherConfirmationInfo = null;
RateLimitLease? lease =
await MaybeAcquirePublisherConfirmationLockAsync(cancellationToken)
.ConfigureAwait(false);
try
{
publisherConfirmationInfo =
await MaybeStartPublisherConfirmationTrackingAsync(cancellationToken)
.ConfigureAwait(false);
publisherConfirmationInfo = MaybeStartPublisherConfirmationTracking();

await MaybeEnforceFlowControlAsync(cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -148,6 +152,7 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
}
finally
{
MaybeReleasePublisherConfirmationLock(lease);
await MaybeEndPublisherConfirmationTrackingAsync(publisherConfirmationInfo, cancellationToken)
.ConfigureAwait(false);
}
Expand Down
87 changes: 40 additions & 47 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,15 @@ internal partial class Channel : IChannel, IRecoverable
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;

private sealed class PublisherConfirmationInfo : IDisposable
private sealed class PublisherConfirmationInfo
{
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
private readonly RateLimitLease? _lease;

internal PublisherConfirmationInfo(ulong publishSequenceNumber,
TaskCompletionSource<bool>? publisherConfirmationTcs,
RateLimitLease? lease)
TaskCompletionSource<bool>? publisherConfirmationTcs)
{
PublishSequenceNumber = publishSequenceNumber;
_publisherConfirmationTcs = publisherConfirmationTcs;
_lease = lease;
}

internal ulong PublishSequenceNumber { get; }
Expand All @@ -89,11 +86,6 @@ internal bool MaybeHandleException(Exception ex)

return exceptionWasHandled;
}

public void Dispose()
{
_lease?.Dispose();
}
}

public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -289,29 +281,53 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken)
private async Task<RateLimitLease?> MaybeAcquirePublisherConfirmationLockAsync(CancellationToken cancellationToken)
{
if (_publisherConfirmationsEnabled)
{
RateLimitLease? lease = null;
if (_publisherConfirmationTrackingEnabled)
if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null)
{
if (_outstandingPublisherConfirmationsRateLimiter is not null)
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(cancellationToken: cancellationToken)
.ConfigureAwait(false);
if (false == lease.IsAcquired)
{
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
cancellationToken: cancellationToken)
.ConfigureAwait(false);

if (!lease.IsAcquired)
{
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
}
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
}
}

await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
lease?.Dispose();
throw;
}

return lease;
}

return null;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void MaybeReleasePublisherConfirmationLock(RateLimitLease? lease)
{
if (_publisherConfirmationsEnabled)
{
_confirmSemaphore.Release();
lease?.Dispose();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private PublisherConfirmationInfo? MaybeStartPublisherConfirmationTracking()
{
if (_publisherConfirmationsEnabled)
{
ulong publishSequenceNumber = _nextPublishSeqNo;

TaskCompletionSource<bool>? publisherConfirmationTcs = null;
Expand All @@ -326,7 +342,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

_nextPublishSeqNo++;

return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
}
else
{
Expand Down Expand Up @@ -362,25 +378,6 @@ private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmat
{
if (_publisherConfirmationsEnabled)
{
try
{
_confirmSemaphore.Release();
}
catch (SemaphoreFullException ex)
{
/*
* rabbitmq/rabbitmq-dotnet-client-1793
* If MaybeStartPublisherConfirmationTracking throws an exception *prior* to acquiring
* _confirmSemaphore, the above Release() call will throw SemaphoreFullException.
* In "normal" cases, publisherConfirmationInfo will thus be null, but if not, throw
* a "bug found" exception here.
*/
if (publisherConfirmationInfo is not null)
{
throw new InvalidOperationException(InternalConstants.BugFound, ex);
}
}

if (publisherConfirmationInfo is not null)
{
try
Expand All @@ -393,10 +390,6 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
throw;
}
finally
{
publisherConfirmationInfo.Dispose();
}
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions projects/Test/Integration/GH/TestGitHubIssues.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This source code is dual-licensed under the Apache License, version
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
Expand Down Expand Up @@ -370,11 +370,6 @@ await Assert.ThrowsAnyAsync<InvalidOperationException>(async () =>
});
break;
}
catch (SemaphoreFullException ex0)
{
_output.WriteLine("{0} ex: {1}", _testDisplayName, ex0);
retryCount++;
}
catch (PublishException)
{
retryCount++;
Expand Down
Loading