Skip to content

Commit b0e3afd

Browse files
committed
Add non-destructive queues support
1 parent fd80e17 commit b0e3afd

5 files changed

Lines changed: 49 additions & 1 deletion

File tree

src/ArtemisNetClient.Extensions.DependencyInjection/ActiveMqArtemisClientDependencyInjectionExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,8 @@ public static IActiveMqBuilder AddConsumer(this IActiveMqBuilder builder, string
236236
MaxConsumers = queueOptions.MaxConsumers,
237237
AutoCreateAddress = queueOptions.AutoCreateAddress,
238238
PurgeOnNoConsumers = queueOptions.PurgeOnNoConsumers,
239-
LastValueKey = queueOptions.LastValueKey
239+
LastValueKey = queueOptions.LastValueKey,
240+
NonDestructive = queueOptions.NonDestructive
240241
});
241242
if (options.AddressConfigurations.TryGetValue(address, out var routingTypes))
242243
{

src/ArtemisNetClient.Extensions.DependencyInjection/Options/QueueOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,10 @@ public class QueueOptions
2323
/// Configures Last-Value key for a queue
2424
/// </summary>
2525
public string LastValueKey { get; set; }
26+
27+
/// <summary>
28+
/// True if the queue is non-destructive
29+
/// </summary>
30+
public bool NonDestructive { get; set; }
2631
}
2732
}

src/ArtemisNetClient/Configuration/QueueConfiguration.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,10 @@ public class QueueConfiguration
3636
/// Configures Last-Value key for a queue
3737
/// </summary>
3838
public string LastValueKey { get; set; }
39+
40+
/// <summary>
41+
/// True if the queue is non-destructive
42+
/// </summary>
43+
public bool NonDestructive { get; set; }
3944
}
4045
}

src/ArtemisNetClient/Management/RequestSerializer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public static async ValueTask<string> QueueConfigurationToJson(QueueConfiguratio
3939
writer.WriteString("filter-string", configuration.FilterExpression ?? string.Empty);
4040
writer.WriteBoolean("auto-delete", configuration.AutoDelete);
4141
writer.WriteNumber("auto-delete-message-count", configuration.AutoDeleteMessageCount);
42+
writer.WriteBoolean("non-destructive", configuration.NonDestructive);
4243
if (configuration.AutoDeleteDelay.HasValue)
4344
{
4445
writer.WriteNumber("auto-delete-delay", Convert.ToInt64(configuration.AutoDeleteDelay.Value.TotalMilliseconds));

test/ArtemisNetClient.IntegrationTests/TopologyManagement/CreateQueueSpec.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,5 +188,41 @@ await topologyManager.CreateQueueAsync(new QueueConfiguration
188188
Assert.Equal("foo3", (await consumer.ReceiveAsync(CancellationToken)).GetBody<string>());
189189
Assert.Equal("foo6", (await consumer.ReceiveAsync(CancellationToken)).GetBody<string>());
190190
}
191+
192+
[Fact]
193+
public async Task Should_create_non_destructive_queue()
194+
{
195+
var connection = await CreateConnection();
196+
var address = Guid.NewGuid().ToString();
197+
var queue = Guid.NewGuid().ToString();
198+
199+
var topologyManager = await connection.CreateTopologyManagerAsync();
200+
await topologyManager.CreateAddressAsync(address, RoutingType.Multicast);
201+
await topologyManager.CreateQueueAsync(new QueueConfiguration
202+
{
203+
Address = address,
204+
Name = queue,
205+
RoutingType = RoutingType.Multicast,
206+
NonDestructive = true
207+
});
208+
209+
var producer = await connection.CreateProducerAsync(address, RoutingType.Multicast);
210+
211+
await producer.SendAsync(new Message("foo"));
212+
213+
var consumer = await connection.CreateConsumerAsync(address, queue, CancellationToken);
214+
var msg = await consumer.ReceiveAsync(CancellationToken);
215+
await consumer.AcceptAsync(msg);
216+
Assert.Equal("foo", msg.GetBody<string>());
217+
218+
// close consumer
219+
await consumer.DisposeAsync();
220+
221+
// recreate consumer, and the message should be redelivered as the queue was created as non-destructive
222+
consumer = await connection.CreateConsumerAsync(address, queue, CancellationToken);
223+
msg = await consumer.ReceiveAsync(CancellationToken);
224+
await consumer.AcceptAsync(msg);
225+
Assert.Equal("foo", msg.GetBody<string>());
226+
}
191227
}
192228
}

0 commit comments

Comments
 (0)