Skip to content

Commit 04711d6

Browse files
committed
Add auto-delete queues support #249
1 parent b30e652 commit 04711d6

5 files changed

Lines changed: 179 additions & 2 deletions

File tree

src/ActiveMQ.Artemis.Client/Configuration/QueueConfiguration.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace ActiveMQ.Artemis.Client
1+
using System;
2+
3+
namespace ActiveMQ.Artemis.Client
24
{
35
public class QueueConfiguration
46
{
@@ -13,5 +15,21 @@ public class QueueConfiguration
1315
public bool PurgeOnNoConsumers { get; set; }
1416
public bool AutoCreateAddress { get; set; }
1517
public string FilterExpression { get; set; }
18+
19+
/// <summary>
20+
/// Delete created queues automatically when there are no consumers attached.
21+
/// </summary>
22+
public bool AutoDelete { get; set; } = false;
23+
24+
/// <summary>
25+
/// The message count that the queue must be less than or equal to before auto-deleting it.
26+
/// To disable the message count check, -1 can be set. Default is 0 (empty queue).
27+
/// </summary>
28+
public int AutoDeleteMessageCount { get; set; }
29+
30+
/// <summary>
31+
/// Delay for auto-deleting queues that do not have any consumers attached.
32+
/// </summary>
33+
public TimeSpan? AutoDeleteDelay { get; set; }
1634
}
1735
}

src/ActiveMQ.Artemis.Client/Management/RequestSerializer.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.IO;
34
using System.Linq;
45
using System.Text;
@@ -35,6 +36,16 @@ public static async ValueTask<string> QueueConfigurationToJson(QueueConfiguratio
3536
writer.WriteBoolean("purge-on-no-consumers", configuration.PurgeOnNoConsumers);
3637
writer.WriteBoolean("auto-create-address", configuration.AutoCreateAddress);
3738
writer.WriteString("filter-string", configuration.FilterExpression ?? string.Empty);
39+
writer.WriteBoolean("auto-delete", configuration.AutoDelete);
40+
writer.WriteNumber("auto-delete-message-count", configuration.AutoDeleteMessageCount);
41+
if (configuration.AutoDeleteDelay.HasValue)
42+
{
43+
writer.WriteNumber("auto-delete-delay", Convert.ToInt64(configuration.AutoDeleteDelay.Value.TotalMilliseconds));
44+
}
45+
else
46+
{
47+
writer.WriteNumber("auto-delete-delay", -1);
48+
}
3849
writer.WriteEndObject();
3950
}
4051

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using ActiveMQ.Artemis.Client.TestUtils;
5+
using Xunit;
6+
using Xunit.Abstractions;
7+
8+
namespace ActiveMQ.Artemis.Client.IntegrationTests.TopologyManagement
9+
{
10+
public class AutoDeletingQueuesSpec : ActiveMQNetIntegrationSpec
11+
{
12+
public AutoDeletingQueuesSpec(ITestOutputHelper output) : base(output)
13+
{
14+
}
15+
16+
[Fact]
17+
public async Task Should_auto_delete_queue_when_no_consumers_attached()
18+
{
19+
await using var connection = await CreateConnection();
20+
await using var topologyManager = await connection.CreateTopologyManagerAsync();
21+
22+
var address = Guid.NewGuid().ToString();
23+
var queueName = Guid.NewGuid().ToString();
24+
await topologyManager.CreateQueueAsync(new QueueConfiguration
25+
{
26+
Name = queueName,
27+
RoutingType = RoutingType.Multicast,
28+
Address = address,
29+
Durable = true,
30+
AutoCreateAddress = true,
31+
AutoDelete = true,
32+
}, CancellationToken);
33+
34+
await using var producer = await connection.CreateProducerAsync(address);
35+
var consumer = await connection.CreateConsumerAsync(address, queueName);
36+
37+
await producer.SendAsync(new Message("foo"));
38+
var msg = await consumer.ReceiveAsync();
39+
await consumer.AcceptAsync(msg);
40+
await consumer.DisposeAsync();
41+
42+
Assert.DoesNotContain(queueName, await topologyManager.GetQueueNamesAsync());
43+
}
44+
45+
[Fact]
46+
public async Task Should_delete_queue_after_specified_delay()
47+
{
48+
await using var connection = await CreateConnection();
49+
await using var topologyManager = await connection.CreateTopologyManagerAsync();
50+
51+
var address = Guid.NewGuid().ToString();
52+
var queueName = Guid.NewGuid().ToString();
53+
await topologyManager.CreateQueueAsync(new QueueConfiguration
54+
{
55+
Name = queueName,
56+
RoutingType = RoutingType.Multicast,
57+
Address = address,
58+
Durable = true,
59+
AutoCreateAddress = true,
60+
AutoDelete = true,
61+
AutoDeleteDelay = TimeSpan.FromMilliseconds(500)
62+
}, CancellationToken);
63+
64+
await using var producer = await connection.CreateProducerAsync(address);
65+
var consumer = await connection.CreateConsumerAsync(address, queueName);
66+
67+
await producer.SendAsync(new Message("foo"));
68+
var msg = await consumer.ReceiveAsync();
69+
await consumer.AcceptAsync(msg);
70+
await consumer.DisposeAsync();
71+
72+
var queues = await Retry.RetryUntil(
73+
() => topologyManager.GetQueueNamesAsync(),
74+
x => !x.Contains(queueName),
75+
TimeSpan.FromMinutes(1));
76+
77+
Assert.DoesNotContain(queueName, queues);
78+
}
79+
80+
[Fact]
81+
public async Task Should_not_delete_queue_when_AutoDeleteMessageCount_not_reached()
82+
{
83+
await using var connection = await CreateConnection();
84+
await using var topologyManager = await connection.CreateTopologyManagerAsync();
85+
86+
var address = Guid.NewGuid().ToString();
87+
var queueName = Guid.NewGuid().ToString();
88+
await topologyManager.CreateQueueAsync(new QueueConfiguration
89+
{
90+
Name = queueName,
91+
RoutingType = RoutingType.Multicast,
92+
Address = address,
93+
Durable = true,
94+
AutoCreateAddress = true,
95+
AutoDelete = true,
96+
AutoDeleteMessageCount = 1
97+
}, CancellationToken);
98+
99+
await using var producer = await connection.CreateProducerAsync(address);
100+
var consumer = await connection.CreateConsumerAsync(address, queueName);
101+
102+
await producer.SendAsync(new Message("foo"));
103+
await producer.SendAsync(new Message("foo1"));
104+
await producer.SendAsync(new Message("foo2"));
105+
106+
var msg = await consumer.ReceiveAsync();
107+
await consumer.AcceptAsync(msg);
108+
await consumer.DisposeAsync();
109+
110+
Assert.Contains(queueName, await topologyManager.GetQueueNamesAsync());
111+
112+
consumer = await connection.CreateConsumerAsync(address, queueName);
113+
msg = await consumer.ReceiveAsync();
114+
await consumer.AcceptAsync(msg);
115+
await consumer.DisposeAsync();
116+
117+
// one unacknowledged message remained, thus broker should remove the queue
118+
Assert.DoesNotContain(queueName, await topologyManager.GetQueueNamesAsync());
119+
}
120+
}
121+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace ActiveMQ.Artemis.Client.TestUtils
6+
{
7+
public static class Retry
8+
{
9+
public static async Task<T> RetryUntil<T>(Func<Task<T>> func, Func<T, bool> until, TimeSpan timeout)
10+
{
11+
var cts = new CancellationTokenSource(timeout);
12+
while (true)
13+
{
14+
var result = await func();
15+
if (until(result))
16+
return result;
17+
if (cts.IsCancellationRequested)
18+
return result;
19+
20+
// ReSharper disable once MethodSupportsCancellation
21+
await Task.Delay(TimeSpan.FromMilliseconds(100));
22+
}
23+
}
24+
}
25+
}

test/artemis/etc-override/broker-00.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@
1818
</anycast>
1919
</address>
2020
</addresses>
21+
22+
<message-expiry-scan-period>1000</message-expiry-scan-period>
2123
</core>
2224
</configuration>

0 commit comments

Comments
 (0)