@@ -54,34 +54,21 @@ internal partial class Channel : IChannel, IRecoverable
5454
5555 private sealed class PublisherConfirmationInfo : IDisposable
5656 {
57- private readonly SemaphoreSlim _semaphore ;
5857 private readonly TaskCompletionSource < bool > ? _publisherConfirmationTcs ;
5958 private readonly RateLimitLease ? _lease ;
60- private bool _semaphoreReleased ;
6159
6260 internal PublisherConfirmationInfo (
63- SemaphoreSlim semaphore ,
6461 ulong publishSequenceNumber ,
6562 TaskCompletionSource < bool > ? publisherConfirmationTcs ,
6663 RateLimitLease ? lease )
6764 {
68- _semaphore = semaphore ;
6965 PublishSequenceNumber = publishSequenceNumber ;
7066 _publisherConfirmationTcs = publisherConfirmationTcs ;
7167 _lease = lease ;
7268 }
7369
7470 internal ulong PublishSequenceNumber { get ; }
7571
76- internal void ReleaseSemaphore ( )
77- {
78- if ( ! _semaphoreReleased )
79- {
80- _semaphoreReleased = true ;
81- _semaphore . Release ( ) ;
82- }
83- }
84-
8572 internal async Task MaybeWaitForConfirmationAsync ( CancellationToken cancellationToken )
8673 {
8774 if ( _publisherConfirmationTcs is not null )
@@ -106,7 +93,6 @@ internal bool MaybeHandleException(Exception ex)
10693
10794 public void Dispose ( )
10895 {
109- ReleaseSemaphore ( ) ;
11096 _lease ? . Dispose ( ) ;
11197 }
11298 }
@@ -323,9 +309,11 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
323309 }
324310 }
325311
312+ bool confirmSemaphoreAcquired ;
326313 try
327314 {
328315 await _confirmSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
316+ confirmSemaphoreAcquired = true ;
329317 }
330318 catch ( OperationCanceledException )
331319 {
@@ -341,16 +329,18 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
341329 publisherConfirmationTcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
342330 if ( ! _confirmsTaskCompletionSources . TryAdd ( publishSequenceNumber , publisherConfirmationTcs ) )
343331 {
344- _confirmSemaphore . Release ( ) ;
332+ if ( confirmSemaphoreAcquired )
333+ {
334+ _confirmSemaphore . Release ( ) ;
335+ }
345336 lease ? . Dispose ( ) ;
346337 throw new InvalidOperationException ( $ "Failed to track the publisher confirmation for sequence number '{ publishSequenceNumber } ' because it already exists.") ;
347338 }
348339 }
349340
350341 _nextPublishSeqNo ++ ;
351342
352- return new PublisherConfirmationInfo ( _confirmSemaphore , publishSequenceNumber , publisherConfirmationTcs , lease ) ;
353-
343+ return new PublisherConfirmationInfo ( publishSequenceNumber , publisherConfirmationTcs , lease ) ;
354344 }
355345
356346 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
@@ -384,7 +374,7 @@ private async Task MaybeEndPublisherConfirmationTrackingAsync(PublisherConfirmat
384374 return ;
385375 }
386376
387- publisherConfirmationInfo . ReleaseSemaphore ( ) ;
377+ _confirmSemaphore . Release ( ) ;
388378
389379 try
390380 {
0 commit comments