Skip to content

Commit 39cf7ff

Browse files
committed
Add Last-Value Queues support
1 parent 35c78f9 commit 39cf7ff

4 files changed

Lines changed: 52 additions & 1 deletion

File tree

src/ArtemisNetClient.Extensions.DependencyInjection/Options/QueueOptions.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
namespace ActiveMQ.Artemis.Client.Extensions.DependencyInjection
22
{
33
/// <summary>
4-
/// Provides programmatic configuration for a queue to be declared.
4+
/// Provides programmatic configuration for a queue to be declared.
55
/// </summary>
66
public class QueueOptions
77
{
@@ -18,5 +18,10 @@ public class QueueOptions
1818
public bool AutoCreateAddress { get; set; }
1919

2020
public string FilterExpression { get; set; }
21+
22+
/// <summary>
23+
/// Configures Last-Value key for a queue
24+
/// </summary>
25+
public string LastValueKey { get; set; }
2126
}
2227
}

src/ArtemisNetClient/Configuration/QueueConfiguration.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,10 @@ public class QueueConfiguration
3131
/// Delay for auto-deleting queues that do not have any consumers attached.
3232
/// </summary>
3333
public TimeSpan? AutoDeleteDelay { get; set; }
34+
35+
/// <summary>
36+
/// Configures Last-Value key for a queue
37+
/// </summary>
38+
public string LastValueKey { get; set; }
3439
}
3540
}

src/ArtemisNetClient/Management/RequestSerializer.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ public static async ValueTask<string> QueueConfigurationToJson(QueueConfiguratio
4747
{
4848
writer.WriteNumber("auto-delete-delay", -1);
4949
}
50+
if (string.IsNullOrEmpty(configuration.LastValueKey) == false)
51+
{
52+
writer.WriteString("last-value-key", configuration.LastValueKey);
53+
}
5054
writer.WriteEndObject();
5155
}
5256

test/ArtemisNetClient.IntegrationTests/TopologyManagement/CreateQueueSpec.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,5 +151,42 @@ await topologyManager.CreateQueueAsync(new QueueConfiguration
151151

152152
Assert.Equal("foo2", (await newQueue1Consumer.ReceiveAsync(CancellationToken)).GetBody<string>());
153153
}
154+
155+
[Fact]
156+
public async Task Should_create_last_value_queue()
157+
{
158+
var connection = await CreateConnection();
159+
var address = Guid.NewGuid().ToString();
160+
var queue = Guid.NewGuid().ToString();
161+
var lastValueKey = "my-last-value-key";
162+
163+
var topologyManager = await connection.CreateTopologyManagerAsync();
164+
await topologyManager.CreateAddressAsync(address, RoutingType.Multicast);
165+
await topologyManager.CreateQueueAsync(new QueueConfiguration
166+
{
167+
Address = address,
168+
Name = queue,
169+
RoutingType = RoutingType.Multicast,
170+
LastValueKey = lastValueKey
171+
});
172+
173+
var producer = await connection.CreateProducerAsync(address, RoutingType.Multicast);
174+
175+
// Send series of messages with the same last value key
176+
await producer.SendAsync(new Message("foo1") { ApplicationProperties = { [lastValueKey] = "1" } });
177+
await producer.SendAsync(new Message("foo2") { ApplicationProperties = { [lastValueKey] = "1" } });
178+
await producer.SendAsync(new Message("foo3") { ApplicationProperties = { [lastValueKey] = "1" } });
179+
180+
// Send series of messages with another last value key
181+
await producer.SendAsync(new Message("foo4") { ApplicationProperties = { [lastValueKey] = "2" } });
182+
await producer.SendAsync(new Message("foo5") { ApplicationProperties = { [lastValueKey] = "2" } });
183+
await producer.SendAsync(new Message("foo6") { ApplicationProperties = { [lastValueKey] = "2" } });
184+
185+
var consumer = await connection.CreateConsumerAsync(address, queue, CancellationToken);
186+
187+
// Only messages foo3 and foo6 should survive
188+
Assert.Equal("foo3", (await consumer.ReceiveAsync(CancellationToken)).GetBody<string>());
189+
Assert.Equal("foo6", (await consumer.ReceiveAsync(CancellationToken)).GetBody<string>());
190+
}
154191
}
155192
}

0 commit comments

Comments
 (0)