Skip to content

Commit 35efb24

Browse files
CopilotHavret
andauthored
Fix TerminateAsync to fire Closed event so producer is unregistered from recovery set
Agent-Logs-Url: https://github.com/Havret/dotnet-activemq-artemis-client/sessions/8bc2f1b9-864e-4a9f-a756-be8df181d7e6 Co-authored-by: Havret <9103861+Havret@users.noreply.github.com>
1 parent b25ba1c commit 35efb24

3 files changed

Lines changed: 107 additions & 0 deletions

File tree

src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducerBase.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public async Task TerminateAsync(Exception exception)
8080
_manualResetEvent.Set();
8181
Log.ProducerTerminated(Logger, exception);
8282
await DisposeResourceSafe(UnderlyingResource).ConfigureAwait(false);
83+
Closed?.Invoke(this);
8384
}
8485

8586
public async ValueTask DisposeAsync()

test/ArtemisNetClient.UnitTests/AutoRecovering/AutoRecoveringAnonymousProducerSpec.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using ActiveMQ.Artemis.Client.Exceptions;
5+
using ActiveMQ.Artemis.Client.InternalUtilities;
46
using ActiveMQ.Artemis.Client.UnitTests.Utils;
57
using Amqp;
68
using Amqp.Framing;
@@ -82,5 +84,57 @@ public async Task Should_terminate_anonymous_producer_on_Send_when_link_closed_w
8284
Assert.Throws<ProducerClosedException>(() =>
8385
producer.Send("a2", null, new Message("bar"), CancellationToken));
8486
}
87+
[Fact]
88+
public async Task Should_not_recreate_anonymous_producer_on_connection_recovery_after_terminal_unauthorized_access_error()
89+
{
90+
var endpoint = GetUniqueEndpoint();
91+
var producerAttached = new ManualResetEvent(false);
92+
var testHandler = new TestHandler(@event =>
93+
{
94+
switch (@event.Id)
95+
{
96+
case EventId.LinkRemoteOpen when @event.Context is Attach attach && attach.Role:
97+
producerAttached.Set();
98+
break;
99+
}
100+
});
101+
102+
var host1 = CreateOpenedContainerHost(endpoint, testHandler);
103+
var linkProcessor = host1.CreateTestLinkProcessor();
104+
105+
ListenerLink producerLink = null;
106+
linkProcessor.SetHandler(context =>
107+
{
108+
producerLink = context.Link;
109+
return false;
110+
});
111+
112+
var connection = await CreateConnection(endpoint);
113+
var producer = await connection.CreateAnonymousProducerAsync();
114+
115+
Assert.True(producerAttached.WaitOne(Timeout));
116+
producerAttached.Reset();
117+
118+
try
119+
{
120+
await producerLink.CloseAsync(Timeout, new Error(ErrorCode.UnauthorizedAccess) { Description = "Unauthorized" });
121+
}
122+
catch (Exception)
123+
{
124+
// ignored
125+
}
126+
127+
// SendAsync triggers TerminateAsync which should remove the producer from the recovery set.
128+
await Assert.ThrowsAsync<ProducerClosedException>(() =>
129+
producer.SendAsync("a1", null, new Message("foo"), null, CancellationToken));
130+
131+
host1.Dispose();
132+
using var host2 = CreateOpenedContainerHost(endpoint, testHandler);
133+
134+
// Producer should NOT be recreated after connection recovery.
135+
Assert.False(producerAttached.WaitOne(ShortTimeout));
136+
137+
await DisposeUtil.DisposeAll(connection, host2);
138+
}
85139
}
86140
}

test/ArtemisNetClient.UnitTests/AutoRecovering/AutoRecoveringProducerSpec.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,58 @@ public async Task Should_not_retry_Send_when_producer_link_closed_with_not_found
276276
Assert.Throws<ProducerClosedException>(() => producer.Send(new Message("bar"), CancellationToken));
277277
}
278278

279+
[Fact]
280+
public async Task Should_not_recreate_producer_on_connection_recovery_after_terminal_not_found_error()
281+
{
282+
var endpoint = GetUniqueEndpoint();
283+
var producerAttached = new ManualResetEvent(false);
284+
var testHandler = new TestHandler(@event =>
285+
{
286+
switch (@event.Id)
287+
{
288+
case EventId.LinkRemoteOpen when @event.Context is Attach attach && attach.Role:
289+
producerAttached.Set();
290+
break;
291+
}
292+
});
293+
294+
var host1 = CreateOpenedContainerHost(endpoint, testHandler);
295+
var linkProcessor = host1.CreateTestLinkProcessor();
296+
297+
ListenerLink producerLink = null;
298+
linkProcessor.SetHandler(context =>
299+
{
300+
producerLink = context.Link;
301+
return false;
302+
});
303+
304+
var connection = await CreateConnection(endpoint);
305+
var producer = await connection.CreateProducerAsync("a1", RoutingType.Anycast);
306+
307+
Assert.True(producerAttached.WaitOne(Timeout));
308+
producerAttached.Reset();
309+
310+
try
311+
{
312+
await producerLink.CloseAsync(Timeout, new Error(Amqp.ErrorCode.NotFound) { Description = "AMQ119002: target address a1 does not exist" });
313+
}
314+
catch (Exception)
315+
{
316+
// ignored
317+
}
318+
319+
// SendAsync triggers TerminateAsync which should remove the producer from the recovery set.
320+
await Assert.ThrowsAsync<ProducerClosedException>(() => producer.SendAsync(new Message("foo"), CancellationToken));
321+
322+
host1.Dispose();
323+
using var host2 = CreateOpenedContainerHost(endpoint, testHandler);
324+
325+
// Producer should NOT be recreated after connection recovery.
326+
Assert.False(producerAttached.WaitOne(ShortTimeout));
327+
328+
await DisposeUtil.DisposeAll(connection, host2);
329+
}
330+
279331
private async Task<(IProducer producer, MessageProcessor messageProcessor, TestContainerHost host, IConnection connection)> CreateReattachedProducer()
280332
{
281333
var endpoint = GetUniqueEndpoint();

0 commit comments

Comments
 (0)