Skip to content

Commit c3ec6fb

Browse files
committed
Add option to configure broker topology in DI extensions
1 parent fd36100 commit c3ec6fb

4 files changed

Lines changed: 76 additions & 6 deletions

File tree

src/ArtemisNetClient.Extensions.DependencyInjection/ActiveMqArtemisClientDependencyInjectionExtensions.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static IActiveMqBuilder AddActiveMq(this IServiceCollection services, str
6060
var queueConfigurations = activeMqOptions.EnableQueueDeclaration ? activeMqOptions.QueueConfigurations : new List<QueueConfiguration>(0);
6161
var addressConfigurations = activeMqOptions.EnableAddressDeclaration ? activeMqOptions.AddressConfigurations : new Dictionary<string, HashSet<RoutingType>>(0);
6262
var lazyConnection = provider.GetConnection(name);
63-
return new ActiveMqTopologyManager(lazyConnection, queueConfigurations, addressConfigurations);
63+
return new ActiveMqTopologyManager(provider, lazyConnection, queueConfigurations, addressConfigurations, activeMqOptions.ConfigureTopologyActions);
6464
});
6565
builder.Services.AddSingleton(provider =>
6666
{
@@ -107,7 +107,7 @@ public static IActiveMqBuilder ConfigureConnectionFactory(this IActiveMqBuilder
107107
}
108108

109109
/// <summary>
110-
/// Adds action to configure a <see cref="IConnection"/>.
110+
/// Adds an action to configure a <see cref="IConnection"/>.
111111
/// </summary>
112112
/// <param name="builder">The <see cref="IActiveMqBuilder"/>.</param>
113113
/// <param name="configureConnectionAction">A delegate that is used to configure a <see cref="IConnection"/>.</param>
@@ -118,6 +118,18 @@ public static IActiveMqBuilder ConfigureConnection(this IActiveMqBuilder builder
118118
return builder;
119119
}
120120

121+
/// <summary>
122+
/// Adds an action to configure the broker topology.
123+
/// </summary>
124+
/// <param name="builder">The <see cref="IActiveMqBuilder"/>.</param>
125+
/// <param name="configureTopologyAction">A delegate that can be used configure the broker topology.</param>
126+
/// <returns>The <see cref="IActiveMqBuilder"/> that can be used to configure ActiveMQ Artemis Client.</returns>
127+
public static IActiveMqBuilder ConfigureTopology(this IActiveMqBuilder builder, Func<IServiceProvider, ITopologyManager, Task> configureTopologyAction)
128+
{
129+
builder.Services.Configure<ActiveMqOptions>(builder.Name, options => options.ConfigureTopologyActions.Add(configureTopologyAction));
130+
return builder;
131+
}
132+
121133
/// <summary>
122134
/// Adds the <see cref="IConsumer"/>.
123135
/// </summary>

src/ArtemisNetClient.Extensions.DependencyInjection/ActiveMqTopologyManager.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -8,20 +9,28 @@ namespace ActiveMQ.Artemis.Client.Extensions.DependencyInjection
89
{
910
internal class ActiveMqTopologyManager
1011
{
12+
private readonly IServiceProvider _serviceProvider;
1113
private readonly AsyncValueLazy<IConnection> _lazyConnection;
1214
private readonly IReadOnlyList<QueueConfiguration> _queueConfigurations;
1315
private readonly IReadOnlyDictionary<string, HashSet<RoutingType>> _addressConfigurations;
16+
private readonly List<Func<IServiceProvider, ITopologyManager, Task>> _configureTopologyActions;
1417

15-
public ActiveMqTopologyManager(AsyncValueLazy<IConnection> lazyConnection, IReadOnlyList<QueueConfiguration> queueConfigurations, IReadOnlyDictionary<string, HashSet<RoutingType>> addressConfigurations)
18+
public ActiveMqTopologyManager(IServiceProvider serviceProvider,
19+
AsyncValueLazy<IConnection> lazyConnection,
20+
IReadOnlyList<QueueConfiguration> queueConfigurations,
21+
IReadOnlyDictionary<string, HashSet<RoutingType>> addressConfigurations,
22+
List<Func<IServiceProvider, ITopologyManager, Task>> configureTopologyActions)
1623
{
24+
_serviceProvider = serviceProvider;
1725
_lazyConnection = lazyConnection;
1826
_queueConfigurations = queueConfigurations;
1927
_addressConfigurations = addressConfigurations;
28+
_configureTopologyActions = configureTopologyActions;
2029
}
2130

2231
public async Task CreateTopologyAsync(CancellationToken cancellationToken)
2332
{
24-
if (_queueConfigurations.Count == 0 && _addressConfigurations.Count == 0)
33+
if (_queueConfigurations.Count == 0 && _addressConfigurations.Count == 0 && _configureTopologyActions.Count == 0)
2534
{
2635
return;
2736
}
@@ -30,6 +39,11 @@ public async Task CreateTopologyAsync(CancellationToken cancellationToken)
3039
var topologyManager = await connection.CreateTopologyManagerAsync(cancellationToken).ConfigureAwait(false);
3140
await using var _ = topologyManager.ConfigureAwait(false);
3241

42+
foreach (var configureTopologyAction in _configureTopologyActions)
43+
{
44+
await configureTopologyAction(_serviceProvider, topologyManager).ConfigureAwait(false);
45+
}
46+
3347
foreach (var addressConfiguration in _addressConfigurations)
3448
{
3549
await topologyManager.DeclareAddressAsync(addressConfiguration.Key, addressConfiguration.Value, cancellationToken).ConfigureAwait(false);

src/ArtemisNetClient.Extensions.DependencyInjection/Options/ActiveMqOptions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading.Tasks;
34

45
namespace ActiveMQ.Artemis.Client.Extensions.DependencyInjection
56
{
@@ -8,8 +9,9 @@ internal class ActiveMqOptions
89
public bool EnableQueueDeclaration { get; set; }
910
public bool EnableAddressDeclaration { get; set; }
1011
public List<QueueConfiguration> QueueConfigurations { get; } = new List<QueueConfiguration>();
11-
public Dictionary<string, HashSet<RoutingType>> AddressConfigurations { get; set; } = new Dictionary<string, HashSet<RoutingType>>();
12+
public Dictionary<string, HashSet<RoutingType>> AddressConfigurations { get; } = new Dictionary<string, HashSet<RoutingType>>();
1213
public List<Action<IServiceProvider, ConnectionFactory>> ConnectionFactoryActions { get; } = new List<Action<IServiceProvider, ConnectionFactory>>();
14+
public List<Func<IServiceProvider, ITopologyManager, Task>> ConfigureTopologyActions { get; } = new List<Func<IServiceProvider, ITopologyManager, Task>>();
1315
public List<Action<IServiceProvider, IConnection>> ConnectionActions { get; } = new List<Action<IServiceProvider, IConnection>>();
1416
public List<SendObserverRegistration> SendObserverRegistrations { get; } = new List<SendObserverRegistration>();
1517
public List<ReceiveObserverRegistration> ReceiveObserverRegistrations { get; } = new List<ReceiveObserverRegistration>();
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using ActiveMQ.Artemis.Client.Extensions.DependencyInjection;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
namespace ActiveMQ.Artemis.Client.Extensions.AspNetCore.IntegrationTests;
8+
9+
public class ConfigureTopologySpec
10+
{
11+
private readonly ITestOutputHelper _testOutputHelper;
12+
13+
public ConfigureTopologySpec(ITestOutputHelper testOutputHelper)
14+
{
15+
_testOutputHelper = testOutputHelper;
16+
}
17+
18+
[Fact]
19+
public async Task Should_configure_broker_topology_using_registered_delegate()
20+
{
21+
var addressName = Guid.NewGuid().ToString();
22+
var queueName = Guid.NewGuid().ToString();
23+
24+
await using var testFixture = await TestFixture.CreateAsync(_testOutputHelper, activeMqBuilder =>
25+
{
26+
activeMqBuilder.ConfigureTopology(async (_, topologyManager) =>
27+
{
28+
await topologyManager.CreateQueueAsync(new QueueConfiguration
29+
{
30+
Address = addressName,
31+
Name = queueName,
32+
AutoCreateAddress = true
33+
});
34+
});
35+
});
36+
37+
var topologyManager = await testFixture.Connection.CreateTopologyManagerAsync(testFixture.CancellationToken);
38+
var queueNames = await topologyManager.GetQueueNamesAsync(testFixture.CancellationToken);
39+
40+
Assert.Contains(queueName, queueNames);
41+
}
42+
}

0 commit comments

Comments
 (0)