Skip to content

Commit c50db43

Browse files
committed
Fix CancellationTokenRegistration memory leak
1 parent 6607a2b commit c50db43

12 files changed

Lines changed: 42 additions & 32 deletions

File tree

src/ActiveMQ.Artemis.Client/Builders/AnonymousProducerBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public async Task<IAnonymousProducer> CreateAsync(AnonymousProducerConfiguration
2929
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
3030

3131
cancellationToken.ThrowIfCancellationRequested();
32-
cancellationToken.Register(() => _tcs.TrySetCanceled());
32+
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
3333

3434
var target = new Target
3535
{

src/ActiveMQ.Artemis.Client/Builders/ConnectionBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public ConnectionBuilder(ILoggerFactory loggerFactory, Func<IMessageIdPolicy> me
2525
public async Task<IConnection> CreateAsync(Endpoint endpoint, CancellationToken cancellationToken)
2626
{
2727
cancellationToken.ThrowIfCancellationRequested();
28-
cancellationToken.Register(() => _tcs.TrySetCanceled());
28+
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
2929

3030
var connectionFactory = new Amqp.ConnectionFactory();
3131
try

src/ActiveMQ.Artemis.Client/Builders/ConsumerBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public async Task<IConsumer> CreateAsync(ConsumerConfiguration configuration, Ca
3030
CheckConfiguration(configuration);
3131

3232
cancellationToken.ThrowIfCancellationRequested();
33-
cancellationToken.Register(() => _tcs.TrySetCanceled());
33+
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
3434

3535
var source = new Source
3636
{

src/ActiveMQ.Artemis.Client/Builders/ProducerBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public async Task<IProducer> CreateAsync(ProducerConfiguration configuration, Ca
3434
if (string.IsNullOrWhiteSpace(configuration.Address)) throw new ArgumentNullException(nameof(configuration.Address), "The address cannot be empty.");
3535

3636
cancellationToken.ThrowIfCancellationRequested();
37-
cancellationToken.Register(() => _tcs.TrySetCanceled());
37+
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
3838

3939
var target = new Target
4040
{

src/ActiveMQ.Artemis.Client/Builders/RpcClientBuilder.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ public async Task<RpcClient> CreateAsync(string address, CancellationToken cance
2626

2727
private async Task<SenderLink> CreateSenderLink(string address, CancellationToken cancellationToken)
2828
{
29-
var tcs = TaskUtil.CreateTaskCompletionSource<bool>(cancellationToken);
29+
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<bool>(ref cancellationToken);
30+
using var _ = ctr;
3031
var senderLink = new SenderLink(_session, Guid.NewGuid().ToString(), new Target
3132
{
3233
Address = address
@@ -36,7 +37,6 @@ private async Task<SenderLink> CreateSenderLink(string address, CancellationToke
3637
senderLink.Closed -= OnClosed;
3738
return senderLink;
3839

39-
4040
void OnAttached(ILink link, Attach attach)
4141
{
4242
if (attach != null)
@@ -56,7 +56,8 @@ void OnClosed(IAmqpObject sender, Error error)
5656

5757
private async Task<(ReceiverLink receiverLink, string address)> CreateReceiverLink(CancellationToken cancellationToken)
5858
{
59-
var tcs = TaskUtil.CreateTaskCompletionSource<string>(cancellationToken);
59+
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<string>(ref cancellationToken);
60+
using var _ = ctr;
6061
var receiverLink = new ReceiverLink(_session, Guid.NewGuid().ToString(), new Source
6162
{
6263
Dynamic = true
@@ -68,7 +69,7 @@ void OnClosed(IAmqpObject sender, Error error)
6869

6970
void OnAttached(ILink link, Attach attach)
7071
{
71-
if (attach != null && attach.Source is Source source)
72+
if (attach is { Source: Source source })
7273
{
7374
tcs.TrySetResult(source.Address);
7475
}

src/ActiveMQ.Artemis.Client/Builders/SessionBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public SessionBuilder(Amqp.Connection connection)
2323
public async Task<Session> CreateAsync(CancellationToken cancellationToken)
2424
{
2525
cancellationToken.ThrowIfCancellationRequested();
26-
cancellationToken.Register(() => _tcs.TrySetCanceled());
26+
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
2727

2828
var begin = new Begin
2929
{

src/ActiveMQ.Artemis.Client/Builders/TransactionCoordinatorBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public TransactionCoordinatorBuilder(Session session)
2323
public async Task<TransactionCoordinator> CreateAsync(CancellationToken cancellationToken)
2424
{
2525
cancellationToken.ThrowIfCancellationRequested();
26-
cancellationToken.Register(() => _tcs.TrySetCanceled());
26+
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());
2727

2828
var attach = new Attach
2929
{
Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
1-
using System.Threading;
1+
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34

45
namespace ActiveMQ.Artemis.Client.InternalUtilities
56
{
67
internal static class TaskUtil
78
{
8-
public static TaskCompletionSource<T> CreateTaskCompletionSource<T>(CancellationToken cancellationToken)
9+
public static (TaskCompletionSource<T> tcs, CancellationTokenRegistration ctr) CreateTaskCompletionSource<T>(ref CancellationToken cancellationToken)
10+
{
11+
return CreateTaskCompletionSource<T>(ref cancellationToken, null);
12+
}
13+
14+
public static (TaskCompletionSource<T> tcs, CancellationTokenRegistration ctr) CreateTaskCompletionSource<T>(ref CancellationToken cancellationToken, Action cleanup)
915
{
1016
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
11-
if (cancellationToken != default)
17+
var ctr = cancellationToken != default ? cancellationToken.Register(() =>
1218
{
13-
cancellationToken.Register(() => tcs.TrySetCanceled());
14-
}
15-
16-
return tcs;
19+
if (tcs.TrySetCanceled())
20+
{
21+
cleanup?.Invoke();
22+
}
23+
}) : default;
24+
return (tcs, ctr);
1725
}
1826
}
1927
}

src/ActiveMQ.Artemis.Client/Management/RpcClient.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public async Task<Message> SendAsync(Message message, CancellationToken cancella
4141
message.SetCorrelationId(correlationId);
4242
message.Properties.ReplyTo = _replyToAddress;
4343

44-
var tcs = TaskUtil.CreateTaskCompletionSource<Message>(cancellationToken);
44+
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<Message>(ref cancellationToken);
45+
using var cancellationTokenRegistration = ctr;
46+
cancellationToken.Register(() => tcs.TrySetCanceled());
4547
try
4648
{
4749
_pendingRequests.TryAdd(correlationId, tcs);

src/ActiveMQ.Artemis.Client/ProducerBase.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,11 @@ protected async Task SendInternalAsync(string address, RoutingType? routingType,
3939

4040
var txnId = await _transactionsManager.GetTxnIdAsync(transaction, cancellationToken).ConfigureAwait(false);
4141
var transactionalState = txnId != null ? new TransactionalState { TxnId = txnId } : null;
42-
var tcs = TaskUtil.CreateTaskCompletionSource<bool>(cancellationToken);
43-
cancellationToken.Register(() =>
42+
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<bool>(ref cancellationToken, () =>
4443
{
45-
if (tcs.TrySetCanceled())
46-
{
47-
_senderLink.Cancel(message.InnerMessage);
48-
}
44+
_senderLink.Cancel(message.InnerMessage);
4945
});
46+
using var _ = ctr;
5047
message.DurabilityMode ??= _configuration.MessageDurabilityMode ?? DurabilityMode.Durable;
5148
Send(address, routingType, message, transactionalState, _onOutcome, tcs);
5249
await tcs.Task.ConfigureAwait(false);

0 commit comments

Comments
 (0)