Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Exceptions;
using ActiveMQ.Artemis.Client.Transactions;
using Amqp;
using Microsoft.Extensions.Logging;

namespace ActiveMQ.Artemis.Client.AutoRecovering
Expand All @@ -28,6 +29,13 @@ public async Task SendAsync(string address, RoutingType? routingType, Message me
await _producer.SendAsync(address, routingType, message, transaction, cancellationToken).ConfigureAwait(false);
return;
}
catch (ProducerClosedException e) when (e.ErrorCode is ErrorCode.UnauthorizedAccess)
{
await TerminateAsync(e).ConfigureAwait(false);
Comment on lines +32 to +34

// Producer cannot be recovered when broker denies authorization.
throw;
}
catch (ProducerClosedException)
{
HandleProducerClosed();
Expand All @@ -48,6 +56,13 @@ public void Send(string address, RoutingType? routingType, Message message, Canc
_producer.Send(address, routingType, message, cancellationToken);
return;
}
catch (ProducerClosedException e) when (e.ErrorCode is ErrorCode.UnauthorizedAccess)
{
TerminateAsync(e).GetAwaiter().GetResult();

// Producer cannot be recovered when broker denies authorization.
throw;
}
catch (ProducerClosedException)
{
HandleProducerClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ private Task StartRecoveryLoop()
}
catch (Exception e)
{
_recoverables.Remove(recoverable);
await recoverable.TerminateAsync(e).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public async Task TerminateAsync(Exception exception)
_manualResetEvent.Set();
Log.ConsumerTerminated(_logger, exception);
await DisposeUnderlyingConsumerSafe(_consumer).ConfigureAwait(false);
Closed?.Invoke(this);
}

public async ValueTask DisposeAsync()
Expand Down
18 changes: 15 additions & 3 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public async Task SendAsync(Message message, Transaction transaction, Cancellati
await producer.SendAsync(message, transaction, cancellationToken).ConfigureAwait(false);
return;
}
catch (ProducerClosedException e) when (e.ErrorCode == ErrorCode.UnauthorizedAccess)
catch (ProducerClosedException e) when (IsTerminalProducerException(e))
{
await TerminateAsync(e).ConfigureAwait(false);
Comment on lines +33 to 35
// Producer does not have permissions to send on specified address

// Producer cannot be recovered for terminal broker-side close reasons.
throw;
}
catch (ProducerClosedException)
Expand Down Expand Up @@ -64,6 +64,13 @@ public void Send(Message message, CancellationToken cancellationToken)
producer.Send(message, cancellationToken);
return;
}
catch (ProducerClosedException e) when (IsTerminalProducerException(e))
{
TerminateAsync(e).GetAwaiter().GetResult();
Comment on lines +67 to +69

// Producer cannot be recovered for terminal broker-side close reasons.
throw;
}
catch (ProducerClosedException)
{
HandleProducerClosed();
Expand All @@ -79,6 +86,11 @@ public void Send(Message message, CancellationToken cancellationToken)
}
}

private static bool IsTerminalProducerException(ProducerClosedException exception)
{
return exception.ErrorCode is ErrorCode.UnauthorizedAccess or ErrorCode.NotFound;
}

protected override IAsyncDisposable UnderlyingResource => _producer;

protected override async Task RecoverUnderlyingProducer(IConnection connection, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public async Task TerminateAsync(Exception exception)
_manualResetEvent.Set();
Log.ProducerTerminated(Logger, exception);
await DisposeResourceSafe(UnderlyingResource).ConfigureAwait(false);
Closed?.Invoke(this);
}

public async ValueTask DisposeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@ public async Task TerminateAsync(Exception exception)
_failureCause = exception;
_manualResetEvent.Set();
await DisposeUnderlyingRpcClientSafe(_requestReplyClient).ConfigureAwait(false);
Closed?.Invoke(this);
}

public async ValueTask DisposeAsync()
{
await DisposeUnderlyingRpcClient(_requestReplyClient).ConfigureAwait(false);
Closed?.Invoke(this);
}

private async Task DisposeUnderlyingRpcClientSafe(IRequestReplyClient requestReplyClient)
Expand Down
5 changes: 4 additions & 1 deletion src/ArtemisNetClient/ProducerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ private static void OnOutcome(ILink sender, Amqp.Message message, Outcome outcom
}
else if (link.IsDetaching() || link.IsClosed)
{
tcs.TrySetException(new ProducerClosedException());
var error = link.Error;
tcs.TrySetException(error != null
? new ProducerClosedException(error.Description, error.Condition, null)
: new ProducerClosedException());
}
else if (outcome.Descriptor.Code == MessageOutcomes.Rejected.Descriptor.Code)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using ActiveMQ.Artemis.Client.Exceptions;
using ActiveMQ.Artemis.Client.InternalUtilities;
using ActiveMQ.Artemis.Client.UnitTests.Utils;
using Amqp;
using Amqp.Framing;
using Amqp.Handler;
using Amqp.Listener;
using Xunit;
using Xunit.Abstractions;

namespace ActiveMQ.Artemis.Client.UnitTests.AutoRecovering
{
public class AutoRecoveringAnonymousProducerSpec : ActiveMQNetSpec
{
public AutoRecoveringAnonymousProducerSpec(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task Should_terminate_anonymous_producer_on_SendAsync_when_link_closed_with_unauthorized_access()
{
using var host = CreateOpenedContainerHost();
var linkProcessor = host.CreateTestLinkProcessor();

ListenerLink producerLink = null;
linkProcessor.SetHandler(context =>
{
producerLink = context.Link;
return false;
});

await using var connection = await CreateConnection(host.Endpoint);
var producer = await connection.CreateAnonymousProducerAsync();

try
{
await producerLink.CloseAsync(Timeout, new Error(ErrorCode.UnauthorizedAccess) { Description = "Unauthorized" });
}
catch (Exception)
{
// ignored
}

var exception = await Assert.ThrowsAsync<ProducerClosedException>(() =>
producer.SendAsync("a1", null, new Message("foo"), null, CancellationToken));
Assert.Equal(ErrorCode.UnauthorizedAccess, exception.ErrorCode);

await Assert.ThrowsAsync<ProducerClosedException>(() =>
producer.SendAsync("a2", null, new Message("bar"), null, CancellationToken));
}

[Fact]
public async Task Should_terminate_anonymous_producer_on_Send_when_link_closed_with_unauthorized_access()
{
using var host = CreateOpenedContainerHost();
var linkProcessor = host.CreateTestLinkProcessor();

ListenerLink producerLink = null;
linkProcessor.SetHandler(context =>
{
producerLink = context.Link;
return false;
});

await using var connection = await CreateConnection(host.Endpoint);
var producer = await connection.CreateAnonymousProducerAsync();

try
{
await producerLink.CloseAsync(Timeout, new Error(ErrorCode.UnauthorizedAccess) { Description = "Unauthorized" });
}
catch (Exception)
{
// ignored
}

var exception = Assert.Throws<ProducerClosedException>(() =>
producer.Send("a1", null, new Message("foo"), CancellationToken));
Assert.Equal(ErrorCode.UnauthorizedAccess, exception.ErrorCode);

Assert.Throws<ProducerClosedException>(() =>
producer.Send("a2", null, new Message("bar"), CancellationToken));
}
[Fact]
public async Task Should_not_recreate_anonymous_producer_on_connection_recovery_after_terminal_unauthorized_access_error()
{
var endpoint = GetUniqueEndpoint();
var producerAttached = new ManualResetEvent(false);
var testHandler = new TestHandler(@event =>
{
switch (@event.Id)
{
case EventId.LinkRemoteOpen when @event.Context is Attach attach && attach.Role:
producerAttached.Set();
break;
}
});

var host1 = CreateOpenedContainerHost(endpoint, testHandler);
var linkProcessor = host1.CreateTestLinkProcessor();

ListenerLink producerLink = null;
linkProcessor.SetHandler(context =>
{
producerLink = context.Link;
return false;
});

var connection = await CreateConnection(endpoint);
var producer = await connection.CreateAnonymousProducerAsync();

Assert.True(producerAttached.WaitOne(Timeout));
producerAttached.Reset();

try
{
await producerLink.CloseAsync(Timeout, new Error(ErrorCode.UnauthorizedAccess) { Description = "Unauthorized" });
}
catch (Exception)
{
// ignored
}

// SendAsync triggers TerminateAsync which should remove the producer from the recovery set.
await Assert.ThrowsAsync<ProducerClosedException>(() =>
producer.SendAsync("a1", null, new Message("foo"), null, CancellationToken));

host1.Dispose();
using var host2 = CreateOpenedContainerHost(endpoint, testHandler);

// Producer should NOT be recreated after connection recovery.
Assert.False(producerAttached.WaitOne(ShortTimeout));

await DisposeUtil.DisposeAll(connection, host2);
}
}
}
Loading
Loading