Skip to content

Commit e19652f

Browse files
EvheniyHlushkodanielmarbach
authored andcommitted
Refactor: Move semaphore ownership to PublisherConfirmationInfo
Move the semaphore ownership and release responsibility from MaybeEndPublisherConfirmationTrackingAsync to the PublisherConfirmationInfo class itself. This improves encapsulation by making the PublisherConfirmationInfo responsible for managing the semaphore resource it owns. Changes: - Add _semaphore field and _semaphoreReleased flag to PublisherConfirmationInfo - Add ReleaseSemaphore() method with idempotence check - Update constructor to accept SemaphoreSlim parameter - Update Dispose() to call ReleaseSemaphore() - Simplify control flow in MaybeStartPublisherConfirmationTrackingAsync (early return pattern) - Update MaybeEndPublisherConfirmationTrackingAsync to use ReleaseSemaphore() from info object
1 parent c6f0592 commit e19652f

File tree

1 file changed

+71
-59
lines changed

1 file changed

+71
-59
lines changed

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 71 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,34 @@ internal partial class Channel : IChannel, IRecoverable
5454

5555
private sealed class PublisherConfirmationInfo : IDisposable
5656
{
57-
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
57+
private readonly SemaphoreSlim _semaphore;
58+
private readonly TaskCompletionSource<bool>? _publisherConfirmationTcs;
5859
private readonly RateLimitLease? _lease;
60+
private bool _semaphoreReleased;
5961

60-
internal PublisherConfirmationInfo(ulong publishSequenceNumber,
62+
internal PublisherConfirmationInfo(
63+
SemaphoreSlim semaphore,
64+
ulong publishSequenceNumber,
6165
TaskCompletionSource<bool>? publisherConfirmationTcs,
6266
RateLimitLease? lease)
6367
{
68+
_semaphore = semaphore;
6469
PublishSequenceNumber = publishSequenceNumber;
6570
_publisherConfirmationTcs = publisherConfirmationTcs;
6671
_lease = lease;
6772
}
6873

6974
internal ulong PublishSequenceNumber { get; }
7075

76+
internal void ReleaseSemaphore()
77+
{
78+
if (!_semaphoreReleased)
79+
{
80+
_semaphoreReleased = true;
81+
_semaphore.Release();
82+
}
83+
}
84+
7185
internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
7286
{
7387
if (_publisherConfirmationTcs is not null)
@@ -92,6 +106,7 @@ internal bool MaybeHandleException(Exception ex)
92106

93107
public void Dispose()
94108
{
109+
ReleaseSemaphore();
95110
_lease?.Dispose();
96111
}
97112
}
@@ -291,54 +306,51 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
291306
[MethodImpl(MethodImplOptions.AggressiveInlining)]
292307
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTrackingAsync(CancellationToken cancellationToken)
293308
{
294-
if (_publisherConfirmationsEnabled)
309+
if (!_publisherConfirmationsEnabled)
295310
{
296-
RateLimitLease? lease = null;
297-
if (_publisherConfirmationTrackingEnabled)
298-
{
299-
if (_outstandingPublisherConfirmationsRateLimiter is not null)
300-
{
301-
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
302-
cancellationToken: cancellationToken)
303-
.ConfigureAwait(false);
304-
305-
if (!lease.IsAcquired)
306-
{
307-
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
308-
}
309-
}
310-
}
311+
return null;
312+
}
311313

312-
try
313-
{
314-
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
315-
}
316-
catch (OperationCanceledException)
314+
RateLimitLease? lease = null;
315+
if (_publisherConfirmationTrackingEnabled && _outstandingPublisherConfirmationsRateLimiter is not null)
316+
{
317+
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
318+
cancellationToken: cancellationToken)
319+
.ConfigureAwait(false);
320+
if (!lease.IsAcquired)
317321
{
318-
lease?.Dispose();
319-
throw;
322+
throw new InvalidOperationException("Could not acquire a lease from the rate limiter.");
320323
}
324+
}
321325

322-
ulong publishSequenceNumber = _nextPublishSeqNo;
326+
try
327+
{
328+
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
329+
}
330+
catch (OperationCanceledException)
331+
{
332+
lease?.Dispose();
333+
throw;
334+
}
323335

324-
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
325-
if (_publisherConfirmationTrackingEnabled)
336+
ulong publishSequenceNumber = _nextPublishSeqNo;
337+
338+
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
339+
if (_publisherConfirmationTrackingEnabled)
340+
{
341+
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
342+
if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs))
326343
{
327-
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
328-
if (!_confirmsTaskCompletionSources.TryAdd(publishSequenceNumber, publisherConfirmationTcs))
329-
{
330-
throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists.");
331-
}
344+
_confirmSemaphore.Release();
345+
lease?.Dispose();
346+
throw new InvalidOperationException($"Failed to track the publisher confirmation for sequence number '{publishSequenceNumber}' because it already exists.");
332347
}
348+
}
333349

334-
_nextPublishSeqNo++;
350+
_nextPublishSeqNo++;
351+
352+
return new PublisherConfirmationInfo(_confirmSemaphore, publishSequenceNumber, publisherConfirmationTcs, lease);
335353

336-
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
337-
}
338-
else
339-
{
340-
return null;
341-
}
342354
}
343355

344356
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -367,28 +379,28 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
367379
private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmationInfo? publisherConfirmationInfo,
368380
CancellationToken cancellationToken)
369381
{
370-
if (_publisherConfirmationsEnabled)
382+
if (publisherConfirmationInfo is null)
371383
{
372-
if (publisherConfirmationInfo is not null)
373-
{
374-
_confirmSemaphore.Release();
384+
return;
385+
}
375386

376-
try
377-
{
378-
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
379-
.ConfigureAwait(false);
380-
}
381-
catch (OperationCanceledException)
382-
{
383-
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
384-
throw;
385-
}
386-
finally
387-
{
388-
publisherConfirmationInfo.Dispose();
389-
}
390-
}
387+
publisherConfirmationInfo.ReleaseSemaphore();
388+
389+
try
390+
{
391+
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
392+
.ConfigureAwait(false);
393+
}
394+
catch (OperationCanceledException)
395+
{
396+
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
397+
throw;
391398
}
399+
finally
400+
{
401+
publisherConfirmationInfo.Dispose();
402+
}
403+
392404
}
393405

394406
[MethodImpl(MethodImplOptions.AggressiveInlining)]

0 commit comments

Comments
 (0)