Skip to content

Commit 685a5c4

Browse files
Merge branch 'dev'
2 parents 0dd5891 + e58dabd commit 685a5c4

13 files changed

Lines changed: 1265 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## Release 2026-03-31
2+
3+
### AWS.Messaging (1.2.0)
4+
* Added SendBatchAsync API
5+
16
## Release 2026-02-25
27

38
### AWS.Messaging (1.1.2)

README.md

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,100 @@ await _eventBridgePublisher.PublishAsync(message, new EventBridgeOptions
162162
});
163163
```
164164

165+
## Batch publishing to SQS
166+
167+
The `ISQSPublisher` also supports sending messages in batches using the SQS [`SendMessageBatch`](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html) API. This can significantly increase throughput — from 300 messages per second with individual sends to up to 3,000 messages per second with batching.
168+
169+
The `SendBatchAsync` method accepts a collection of messages and automatically chunks them into groups of 10 (the SQS maximum per batch request).
170+
171+
### Simple batch — no per-message options
172+
For standard (non-FIFO) queues where no per-message options are needed, you can pass the messages as a simple collection:
173+
174+
```csharp
175+
var sqsPublisher = serviceProvider.GetRequiredService<ISQSPublisher>();
176+
177+
var messages = new List<ChatMessage>
178+
{
179+
new ChatMessage { MessageDescription = "Hello" },
180+
new ChatMessage { MessageDescription = "World" },
181+
new ChatMessage { MessageDescription = "Batch!" }
182+
};
183+
184+
// Send all messages in a batch
185+
var response = await sqsPublisher.SendBatchAsync(messages);
186+
187+
// Check the results
188+
Console.WriteLine($"Successful: {response.Successful.Count}");
189+
Console.WriteLine($"Failed: {response.Failed.Count}");
190+
```
191+
192+
### Per-message options using `SQSBatchEntry`
193+
When each message in the batch needs its own options (e.g., different `MessageGroupId` values for FIFO queues), use `SQSBatchEntry<T>` to pair each message with its own `SQSMessageOptions`:
194+
195+
```csharp
196+
var entries = new List<SQSBatchEntry<ChatMessage>>
197+
{
198+
new SQSBatchEntry<ChatMessage>(
199+
new ChatMessage { MessageDescription = "User A's message" },
200+
new SQSMessageOptions { MessageGroupId = "userA" }),
201+
new SQSBatchEntry<ChatMessage>(
202+
new ChatMessage { MessageDescription = "User B's message" },
203+
new SQSMessageOptions { MessageGroupId = "userB" }),
204+
};
205+
206+
// Send with per-message options
207+
var response = await sqsPublisher.SendBatchAsync(entries);
208+
```
209+
210+
You can also override the queue URL or SQS client for the entire batch using `SQSBatchOptions`:
211+
212+
```csharp
213+
var response = await sqsPublisher.SendBatchAsync(entries, new SQSBatchOptions
214+
{
215+
QueueUrl = "https://sqs.us-west-2.amazonaws.com/012345678910/AnotherQueue"
216+
});
217+
```
218+
219+
### Handling the batch response
220+
`SendBatchAsync` returns a `SQSSendBatchResponse` that contains:
221+
* `Successful` — a list of `SQSSendBatchResponseEntry` with the `Id` and `MessageId` (SQS-assigned ID) for each successfully sent message.
222+
* `Failed` — a list of `SQSSendBatchResponseFailedEntry` with `Id`, `Code`, `Message`, and `SenderFault` for each message that failed.
223+
224+
The `Id` on each response entry corresponds to the `MessageEnvelope.Id` that the framework generated for that message, so you can correlate successes and failures back to your original inputs.
225+
226+
```csharp
227+
var response = await sqsPublisher.SendBatchAsync(messages);
228+
229+
foreach (var success in response.Successful)
230+
{
231+
Console.WriteLine($"Sent message {success.Id} with SQS MessageId: {success.MessageId}");
232+
}
233+
234+
foreach (var failure in response.Failed)
235+
{
236+
Console.WriteLine($"Failed to send {failure.Id}: [{failure.Code}] {failure.Message}");
237+
}
238+
```
239+
240+
> **Note:** Messages are automatically chunked into groups of 10. If you send 25 messages, the framework will make 3 SQS `SendMessageBatch` API calls (10 + 10 + 5) and aggregate the results into a single `SQSSendBatchResponse`.
241+
242+
### Error handling and partial results
243+
If an `AmazonSQSException` is thrown during a batch send (e.g., on the second chunk after the first chunk already succeeded), the framework throws a `FailedToPublishBatchException`. This exception includes a `PartialResponse` property containing any results from chunks that were sent before the failure occurred, so you can determine what was already published and avoid duplicate retries.
244+
245+
```csharp
246+
try
247+
{
248+
var response = await sqsPublisher.SendBatchAsync(messages);
249+
}
250+
catch (FailedToPublishBatchException ex)
251+
{
252+
// Some messages from earlier chunks may have been sent successfully
253+
var alreadySent = ex.PartialResponse?.Successful ?? new List<SQSSendBatchResponseEntry>();
254+
Console.WriteLine($"{alreadySent.Count} message(s) were sent before the failure.");
255+
Console.WriteLine($"Error: {ex.InnerException?.Message}");
256+
}
257+
```
258+
165259
# Consuming Messages
166260

167261
To consume messages, implement a message handler using the `IMessageHandler` interface for each message type you wish to process. The mapping between message types and message handlers is configured in the project startup.
@@ -438,7 +532,8 @@ To use the AWS Message Processing Framework for .NET to publish a message to an
438532
"Sid": "Statement1",
439533
"Effect": "Allow",
440534
"Action": [
441-
"sqs:sendmessage"
535+
"sqs:sendmessage",
536+
"sqs:sendmessagebatch"
442537
],
443538
"Resource": [
444539
"arn:aws:sqs:<region>:<account>:<queue>"

src/AWS.Messaging/AWS.Messaging.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<NoWarn>CS1591</NoWarn>
1919
<SignAssembly>true</SignAssembly>
2020
<AssemblyOriginatorKeyFile>..\..\public.snk</AssemblyOriginatorKeyFile>
21-
<Version>1.1.2</Version>
21+
<Version>1.2.0</Version>
2222
<PublishRepositoryUrl>true</PublishRepositoryUrl>
2323
<EmbedUntrackedSources>true</EmbedUntrackedSources>
2424
<IncludeSymbols>true</IncludeSymbols>

src/AWS.Messaging/Exceptions.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,28 @@ public class InvalidFifoPublishingRequestException : AWSMessagingException
273273
public InvalidFifoPublishingRequestException(string message, Exception? innerException = null) : base(message, innerException) { }
274274
}
275275

276+
/// <summary>
277+
/// Thrown if an exception occurs while publishing a batch of messages.
278+
/// Contains partial results from chunks that may have already succeeded before the failure.
279+
/// </summary>
280+
public class FailedToPublishBatchException : FailedToPublishException
281+
{
282+
/// <summary>
283+
/// The partial response containing results from batch chunks that were sent before the failure occurred.
284+
/// Earlier chunks may have succeeded even though a later chunk failed.
285+
/// </summary>
286+
public Publishers.SQS.SQSSendBatchResponse? PartialResponse { get; }
287+
288+
/// <summary>
289+
/// Creates an instance of <see cref="FailedToPublishBatchException"/>.
290+
/// </summary>
291+
public FailedToPublishBatchException(string message, Exception? innerException = null, Publishers.SQS.SQSSendBatchResponse? partialResponse = null)
292+
: base(message, innerException)
293+
{
294+
PartialResponse = partialResponse;
295+
}
296+
}
297+
276298
/// <summary>
277299
/// Thrown if the <see cref="EventBridgePublishResponse"/> contains a message with an error code
278300
/// </summary>

src/AWS.Messaging/Publishers/SQS/ISQSPublisher.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ namespace AWS.Messaging.Publishers.SQS
88
/// <summary>
99
/// This interface allows sending messages from application code to Amazon SQS.
1010
/// It exposes the <see cref="SendAsync{T}(T, SQSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SQSOptions"/> to set additional parameters while sending messages to SQS.
11+
/// It also exposes <see cref="SendBatchAsync{T}(IEnumerable{T}, CancellationToken)"/> and
12+
/// <see cref="SendBatchAsync{T}(IEnumerable{SQSBatchEntry{T}}, SQSBatchOptions?, CancellationToken)"/> methods
13+
/// for sending up to multiple messages in batches using the SQS SendMessageBatch API.
1114
/// Using dependency injection, this interface is available to inject anywhere in the code.
1215
/// </summary>
1316
public interface ISQSPublisher : ICommandPublisher
@@ -19,5 +22,34 @@ public interface ISQSPublisher : ICommandPublisher
1922
/// <param name="sqsOptions">Contains additional parameters that can be set while sending a message to an SQS queue</param>
2023
/// <param name="token">The cancellation token used to cancel the request.</param>
2124
Task<SQSSendResponse> SendAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default);
25+
26+
/// <summary>
27+
/// Sends a batch of application messages to SQS using the SendMessageBatch API.
28+
/// Messages are automatically chunked into groups of 10 (the SQS maximum per batch request).
29+
/// <para>
30+
/// This is a convenience overload for sending simple batches where no per-message options
31+
/// are needed. For FIFO queues or when per-message options are required, use
32+
/// <see cref="SendBatchAsync{T}(IEnumerable{SQSBatchEntry{T}}, SQSBatchOptions?, CancellationToken)"/> instead.
33+
/// </para>
34+
/// </summary>
35+
/// <typeparam name="T">The .NET type of the application message.</typeparam>
36+
/// <param name="messages">The application messages that will be serialized and sent to an SQS queue.</param>
37+
/// <param name="token">The cancellation token used to cancel the request.</param>
38+
/// <returns>A <see cref="SQSSendBatchResponse"/> containing the results for each message in the batch.</returns>
39+
Task<SQSSendBatchResponse> SendBatchAsync<T>(IEnumerable<T> messages, CancellationToken token = default);
40+
41+
/// <summary>
42+
/// Sends a batch of application messages to SQS using the SendMessageBatch API,
43+
/// with per-message <see cref="SQSMessageOptions"/> allowing different options (e.g. <see cref="SQSMessageOptions.MessageGroupId"/>)
44+
/// for each message in the batch.
45+
/// Messages are automatically chunked into groups of 10 (the SQS maximum per batch request).
46+
/// </summary>
47+
/// <typeparam name="T">The .NET type of the application message.</typeparam>
48+
/// <param name="entries">The batch entries, each containing a message and optional per-message options.</param>
49+
/// <param name="batchOptions">Optional batch-level parameters such as <see cref="SQSBatchOptions.QueueUrl"/>
50+
/// and <see cref="SQSBatchOptions.OverrideClient"/>.</param>
51+
/// <param name="token">The cancellation token used to cancel the request.</param>
52+
/// <returns>A <see cref="SQSSendBatchResponse"/> containing the results for each message in the batch.</returns>
53+
Task<SQSSendBatchResponse> SendBatchAsync<T>(IEnumerable<SQSBatchEntry<T>> entries, SQSBatchOptions? batchOptions = null, CancellationToken token = default);
2254
}
2355
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
namespace AWS.Messaging.Publishers.SQS;
5+
6+
/// <summary>
7+
/// Represents a single entry in a batch send operation to SQS.
8+
/// Pairs an application message with optional per-message <see cref="SQSMessageOptions"/>.
9+
/// </summary>
10+
/// <typeparam name="T">The .NET type of the application message.</typeparam>
11+
public class SQSBatchEntry<T>
12+
{
13+
/// <summary>
14+
/// Creates an instance of <see cref="SQSBatchEntry{T}"/>.
15+
/// </summary>
16+
public SQSBatchEntry()
17+
{
18+
}
19+
20+
/// <summary>
21+
/// Creates an instance of <see cref="SQSBatchEntry{T}"/> with the specified message and options.
22+
/// </summary>
23+
/// <param name="message">The application message to send.</param>
24+
/// <param name="options">Optional per-message SQS options.</param>
25+
public SQSBatchEntry(T message, SQSMessageOptions? options = null)
26+
{
27+
Message = message;
28+
Options = options;
29+
}
30+
31+
/// <summary>
32+
/// The application message that will be serialized and sent to an SQS queue.
33+
/// </summary>
34+
public T Message { get; set; } = default!;
35+
36+
/// <summary>
37+
/// Optional per-message options such as <see cref="SQSMessageOptions.MessageGroupId"/>,
38+
/// <see cref="SQSMessageOptions.MessageDeduplicationId"/>, <see cref="SQSMessageOptions.DelaySeconds"/>,
39+
/// and <see cref="SQSMessageOptions.MessageAttributes"/>.
40+
/// </summary>
41+
public SQSMessageOptions? Options { get; set; }
42+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using Amazon.SQS;
5+
6+
namespace AWS.Messaging.Publishers.SQS;
7+
8+
/// <summary>
9+
/// Contains batch-level properties that apply to an entire batch send operation to SQS.
10+
/// <para>
11+
/// This class is used on the <see cref="ISQSPublisher.SendBatchAsync{T}(IEnumerable{SQSBatchEntry{T}}, SQSBatchOptions?, CancellationToken)"/>
12+
/// method to override the queue URL or SQS client for the batch. Per-message properties such as
13+
/// <see cref="SQSMessageOptions.MessageGroupId"/> should be set on each <see cref="SQSBatchEntry{T}.Options"/> instead.
14+
/// </para>
15+
/// </summary>
16+
public class SQSBatchOptions
17+
{
18+
/// <summary>
19+
/// The SQS queue URL which the publisher will use for the batch. This can be used to override the queue URL
20+
/// that is configured for a given message type when publishing a batch.
21+
/// </summary>
22+
public string? QueueUrl { get; set; }
23+
24+
/// <summary>
25+
/// An alternative SQS client that can be used to publish this batch,
26+
/// instead of the client provided by the registered <see cref="Configuration.IAWSClientProvider"/> implementation.
27+
/// </summary>
28+
public IAmazonSQS? OverrideClient { get; set; }
29+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using Amazon.SQS.Model;
5+
6+
namespace AWS.Messaging.Publishers.SQS;
7+
8+
/// <summary>
9+
/// Contains per-message properties that can be set when sending individual messages
10+
/// within a batch to an SQS queue.
11+
/// <para>
12+
/// This class is used on <see cref="SQSBatchEntry{T}.Options"/> to specify message-level
13+
/// settings such as <see cref="MessageGroupId"/> and <see cref="MessageDeduplicationId"/>.
14+
/// </para>
15+
/// </summary>
16+
public class SQSMessageOptions
17+
{
18+
/// <summary>
19+
/// The length of time, in seconds, for which to delay a specific message.
20+
/// Its valid values are between 0 to 900.
21+
/// Messages with a positive DelaySeconds value become available for processing after the delay period is finished.
22+
/// If you don't specify a value, the default value for the queue applies.
23+
/// When you set FifoQueue, you can't set DelaySeconds per message. You can set this parameter only on a queue level.
24+
/// </summary>
25+
public int? DelaySeconds { get; set; }
26+
27+
/// <summary>
28+
/// Each message attribute consists of a Name, Type, and Value.
29+
/// For more information, see <see href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes">the Amazon SQS developer guide.</see>
30+
/// </summary>
31+
public Dictionary<string, MessageAttributeValue>? MessageAttributes { get; set; }
32+
33+
/// <summary>
34+
/// This parameter applies only to FIFO(first-in-first-out) queues and is used for deduplication of sent messages.
35+
/// If a message with a particular MessageDeduplicationId is sent successfully, any messages sent with the same
36+
/// MessageDeduplicationId are accepted successfully but aren't delivered during the 5-minute deduplication interval.
37+
/// For more information, see <see href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-exactly-once-processing.html">Exactly-once processing</see>
38+
/// in the Amazon SQS Developer Guide.
39+
/// <para>
40+
/// Each message in a batch should have a unique MessageDeduplicationId. Setting the same deduplication ID
41+
/// across multiple messages in a batch would incorrectly indicate they are duplicates of each other.
42+
/// </para>
43+
/// </summary>
44+
public string? MessageDeduplicationId { get; set; }
45+
46+
/// <summary>
47+
/// This parameter applies only to FIFO(first-in-first-out) queues and specifies that a message belongs to a specific message group.
48+
/// Messages that belong to the same message group are processed in a FIFO manner
49+
/// (however, messages in different message groups might be processed out of order).
50+
/// To interleave multiple ordered streams within a single queue, use MessageGroupId values
51+
/// (for example, session data for multiple users).
52+
/// </summary>
53+
public string? MessageGroupId { get; set; }
54+
}

0 commit comments

Comments
 (0)