Skip to content

Commit a97edce

Browse files
Fix batching Azure ServiceBus messages to partitioned endpoints
Fixes #2000
1 parent f050756 commit a97edce

File tree

6 files changed

+111
-1
lines changed

6 files changed

+111
-1
lines changed

src/Transports/Azure/Wolverine.AzureServiceBus.Tests/end_to_end.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Wolverine.AzureServiceBus.Internal;
55
using Wolverine.Configuration;
66
using Wolverine.Tracking;
7+
using Wolverine.Transports;
78
using Xunit;
89

910
namespace Wolverine.AzureServiceBus.Tests;
@@ -44,6 +45,18 @@ public async Task InitializeAsync()
4445
// Require sessions on this subscription
4546
.RequireSessions(1)
4647

48+
.ProcessInline();
49+
50+
opts.PublishMessage<AsbMessage4>().ToAzureServiceBusTopic("asb4").BufferedInMemory();
51+
opts.ListenToAzureServiceBusSubscription("asb4")
52+
.FromTopic("asb4", cfg =>
53+
{
54+
cfg.EnablePartitioning = true;
55+
})
56+
57+
// Require sessions on this subscription
58+
.RequireSessions(1)
59+
4760
.ProcessInline();
4861
}).StartAsync();
4962

@@ -155,11 +168,39 @@ public async Task send_and_receive_multiple_messages_to_subscription_with_sessio
155168
session.Received.MessagesOf<AsbMessage3>().Select(x => x.Name)
156169
.ShouldBe(new string[]{"Red", "Green", "Refactor"});
157170
}
171+
172+
[Fact]
173+
public async Task split_messages_with_different_sessionids_into_separate_batches()
174+
{
175+
Func<IMessageContext, Task> sendMany = async bus =>
176+
{
177+
await bus.SendAsync(new AsbMessage4("Dummy 1.1"), new DeliveryOptions { GroupId = "1" });
178+
await bus.SendAsync(new AsbMessage4("Dummy 1.2"), new DeliveryOptions { GroupId = "1" });
179+
await bus.SendAsync(new AsbMessage4("Dummy 2.1"), new DeliveryOptions { GroupId = "2" });
180+
await bus.SendAsync(new AsbMessage4("Dummy 3.1"), new DeliveryOptions { GroupId = "3" });
181+
await bus.SendAsync(new AsbMessage4("Dummy 4.1"), new DeliveryOptions { GroupId = "4" });
182+
await bus.SendAsync(new AsbMessage4("Dummy 4.2"), new DeliveryOptions { GroupId = "4" });
183+
};
184+
185+
var session = await _host.TrackActivity()
186+
.IncludeExternalTransports()
187+
.Timeout(30.Seconds())
188+
.ExecuteAndWaitAsync(sendMany);
189+
190+
// Verify that all messages were received and processed in order inside the session
191+
var names = session.Received.MessagesOf<AsbMessage4>().Select(x => x.Name).ToList();
192+
names.Count.ShouldBe(6);
193+
names.Where(x => x.StartsWith("Dummy 1")).ShouldBe(["Dummy 1.1", "Dummy 1.2"]);
194+
names.Where(x => x.StartsWith("Dummy 2")).ShouldBe(["Dummy 2.1"]);
195+
names.Where(x => x.StartsWith("Dummy 3")).ShouldBe(["Dummy 3.1"]);
196+
names.Where(x => x.StartsWith("Dummy 4")).ShouldBe(["Dummy 4.1", "Dummy 4.2"]);
197+
}
158198
}
159199

160200
public record AsbMessage1(string Name);
161201
public record AsbMessage2(string Name);
162202
public record AsbMessage3(string Name);
203+
public record AsbMessage4(string Name);
163204

164205
public static class AsbMessageHandler
165206
{
@@ -177,4 +218,9 @@ public static void Handle(AsbMessage3 message)
177218
{
178219
// nothing
179220
}
221+
222+
public static void Handle(AsbMessage4 message)
223+
{
224+
// nothing
225+
}
180226
}

src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusEndpoint.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public AzureServiceBusEndpoint(AzureServiceBusTransport parent, Uri uri, Endpoin
5050
public abstract ValueTask<bool> CheckAsync();
5151
public abstract ValueTask TeardownAsync(ILogger logger);
5252
public abstract ValueTask SetupAsync(ILogger logger);
53+
public abstract bool IsPartitioned { get; }
5354

5455
protected override bool supportsMode(EndpointMode mode)
5556
{

src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusQueue.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ await Parent.WithServiceBusClientAsync(async client =>
102102
});
103103
}
104104

105+
public override bool IsPartitioned { get => Options.EnablePartitioning; }
106+
105107
private async Task purgeWithSessions(ServiceBusClient client)
106108
{
107109
var cancellation = new CancellationTokenSource();

src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSenderProtocol.cs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch
4646
}
4747
}
4848

49+
if (_endpoint.IsPartitioned)
50+
await sendPartitionedBatches(callback, messages, batch);
51+
else
52+
await sendBatches(callback, messages, batch);
53+
}
54+
55+
private async Task sendBatches(ISenderCallback callback, List<ServiceBusMessage> messages, OutgoingMessageBatch batch)
56+
{
4957
try
5058
{
5159
var serviceBusMessageBatch = await _sender.CreateMessageBatchAsync();
@@ -55,7 +63,7 @@ public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch
5563
if (!serviceBusMessageBatch.TryAddMessage(message))
5664
{
5765
_logger.LogInformation("Wolverine had to break up outgoing message batches at {Uri}, you may want to reduce the MaximumMessagesToReceive configuration. No messages were lost, this is strictly informative", _endpoint.Uri);
58-
66+
5967
// Send the currently full batch
6068
await _sender.SendMessagesAsync(serviceBusMessageBatch, _runtime.Cancellation);
6169
serviceBusMessageBatch.Dispose();
@@ -77,4 +85,53 @@ public async Task SendBatchAsync(ISenderCallback callback, OutgoingMessageBatch
7785
await callback.MarkProcessingFailureAsync(batch, e);
7886
}
7987
}
88+
89+
private async Task sendPartitionedBatches(ISenderCallback callback, List<ServiceBusMessage> messages, OutgoingMessageBatch batch)
90+
{
91+
try
92+
{
93+
var serviceBusMessageBatch = await _sender.CreateMessageBatchAsync();
94+
95+
var groupedMessages = messages
96+
.GroupBy(x => x.SessionId)
97+
.ToDictionary(x => x.Key, x => x.ToList());
98+
99+
foreach (var (sessionId, messageGroup) in groupedMessages)
100+
{
101+
_logger.LogDebug("Processing batch with session id '{SessionId}'", sessionId);
102+
103+
foreach (var message in messageGroup)
104+
{
105+
if (!serviceBusMessageBatch.TryAddMessage(message))
106+
{
107+
_logger.LogInformation("Wolverine had to break up outgoing message batches at {Uri}, you may want to reduce the MaximumMessagesToReceive configuration. No messages were lost, this is strictly informative", _endpoint.Uri);
108+
109+
// Send the currently full batch
110+
await _sender.SendMessagesAsync(serviceBusMessageBatch, _runtime.Cancellation);
111+
serviceBusMessageBatch.Dispose();
112+
113+
// Create a new batch and add the message to it
114+
serviceBusMessageBatch = await _sender.CreateMessageBatchAsync();
115+
serviceBusMessageBatch.TryAddMessage(message);
116+
}
117+
}
118+
119+
await _sender.SendMessagesAsync(serviceBusMessageBatch, _runtime.Cancellation);
120+
serviceBusMessageBatch = await _sender.CreateMessageBatchAsync();
121+
}
122+
123+
// Send the final batch
124+
if (serviceBusMessageBatch.Count > 0)
125+
{
126+
await _sender.SendMessagesAsync(serviceBusMessageBatch, _runtime.Cancellation);
127+
}
128+
serviceBusMessageBatch.Dispose();
129+
130+
await callback.MarkSuccessfulAsync(batch);
131+
}
132+
catch (Exception e)
133+
{
134+
await callback.MarkProcessingFailureAsync(batch, e);
135+
}
136+
}
80137
}

src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ public override async ValueTask InitializeAsync(ILogger logger)
175175
_hasInitialized = true;
176176
}
177177

178+
public override bool IsPartitioned { get => Topic.IsPartitioned; }
179+
178180
internal async ValueTask InitializeAsync(ServiceBusAdministrationClient client, ILogger logger)
179181
{
180182
if (Parent.AutoProvision)

src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusTopic.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,6 @@ public AzureServiceBusSubscription FindOrCreateSubscription(string subscriptionN
116116

117117
return subscription;
118118
}
119+
120+
public override bool IsPartitioned { get => Options.EnablePartitioning; }
119121
}

0 commit comments

Comments
 (0)