Skip to content

Latest commit

 

History

History
1776 lines (1320 loc) · 74.4 KB

sqs.adoc

File metadata and controls

1776 lines (1320 loc) · 74.4 KB

SQS Integration

Amazon Simple Queue Service is a messaging service that provides point-to-point communication with queues. Spring Cloud AWS SQS integration offers support to receive and send messages using common Spring abstractions such as @SqsListener, MessageListenerContainer and MessageListenerContainerFactory.

Compared to JMS or other message services Amazon SQS has limitations that should be taken into consideration.

  • Amazon SQS allows only String payloads, so any Object must be transformed into a String representation. Spring Cloud AWS has dedicated support to transfer Java objects with Amazon SQS messages by converting them to JSON.

  • Amazon SQS has a maximum message size of 256kb per message, so bigger messages will fail to be sent.

A Spring Boot starter is provided to auto-configure SQS integration beans. Maven coordinates, using Spring Cloud AWS BOM:

<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>

Sample Listener Application

Below is a minimal sample application leveraging auto-configuration from Spring Boot.

@SpringBootApplication
public class SqsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SqsApplication.class, args);
    }

    @SqsListener("myQueue")
    public void listen(String message) {
        System.out.println(message);
    }
}

Without Spring Boot, it’s necessary to import the SqsBootstrapConfiguration class in a @Configuration, as well as declare a SqsMessageListenerContainerFactory bean.

public class Listener {

    @SqsListener("myQueue")
    public void listen(String message) {
        System.out.println(message);
    }

}

@Import(SqsBootstrapConfiguration.class)
@Configuration
public class SQSConfiguration {

    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
        return SqsMessageListenerContainerFactory
            .builder()
            .sqsAsyncClient(sqsAsyncClient())
            .build();
    }

    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder().build();
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

}

Sending Messages

Spring Cloud AWS provides the SqsTemplate to send messages to SQS.

SqsTemplate

When using Spring Boot and autoconfiguration, a SqsTemplate instance is autowired by default in case no other template bean is found in the context. This template instance is backed by the autoconfigured SqsAsyncClient, with any configurations provided. SqsTemplate instances are immutable and thread-safe.

Note
The endpoint to which the message will be sent can be either the queue name or URL.

Creating a SqsTemplate Instance

SqsTemplate implements two Operations interfaces: SqsOperations contains blocking methods, and SqsAsyncOperations contains async methods that return CompletableFuture instances. In case only sync or async operations are to be used, the corresponding interface can be utilized to eliminate unnecessary methods.

The following methods can be used to create new SqsTemplate instances with default options:

SqsTemplate template = SqsTemplate.newTemplate(sqsAsyncClient);

SqsOperations blockingTemplate = SqsTemplate.newSyncTemplate(sqsAsyncClient);

SqsAsyncOperations asyncTemplate = SqsTemplate.newAsyncTemplate(sqsAsyncClient);
Note
The returned object is always the SqsTemplate, and the separate methods are only for convenience of the interface return type.

In case more complex configuration is required, a builder is also provided, and a set of options:

SqsTemplate template = SqsTemplate.builder()
    .sqsAsyncClient(this.asyncClient)
        .configure(options -> options
                .acknowledgementMode(TemplateAcknowledgementMode.MANUAL)
                .defaultEndpointName("my-queue"))
        .build();

The builder also offers the buildSyncTemplate() method to return the template as SqsOperations, and buildAsyncTemplate() to return it as SqsAsyncOperations.

Template Options

The following options can be configured through the options object. The defaults are applied in case no other value is provided as a parameter in the operation method.

TemplateOptions Descriptions
Name Type Default Description

acknowledgementMode

TemplateAcknowledgementMode

TemplateAcknowledgementMode #ACKNOWLEDGE_ON_RECEIVE

Whether messages should be acknowledged by the template after being received. Messages can be acknowledged later by using the Acknowledgement#acknowledge methods. See SqsTemplate Acknowledgement for more information.

sendBatchFailureHandlingStrategy

SendBatchFailureStrategy

SendBatchFailureStrategy #THROW

Whether a SendBatchOperationFailedException containing a SendResult.Batch instance should be thrown if at least one message from a sent batch fails to be delivered. With SendBatchFailureStrategy#DO_NOT_THROW, the SendResult.Batch object is returned.

defaultPollTimeout

Duration

10 seconds

The default maximum time to wait for messages when performing a receive request to SQS. See SqsTemplate for more information.

defaultMaxNumberOfMessages

Integer

10

The default maximum of messages to be returned by a receive request to SQS. See SqsTemplate for more information.

defaultEndpointName

String

blank

The default endpoint name to be used by the template. See SqsTemplate for more information.

defaultPayloadClass

Class

null

The default class to which payloads should be converted to. Note that messages sent with the SqsTemplate by default contains a header with the type information, so no configuration is needed. See Template Message Conversion for more information.

additionalHeaderForReceive

String, Object

empty

Set a single header to be added to all messages received by this template.

additionalHeadersForReceive

Map<String, Object>

empty

Set headers to be added to all messages received by this template.

queueNotFoundStrategy

QueueNotFoundStrategy

QueueNotFoundStrategy #CREATE

Set the strategy to use in case a queue is not found. With QueueNotFoundStrategy#FAIL, an exception is thrown in case a queue is not found.

queueAttributeNames

Collection<AttributeNames>

empty

Set the queue attribute names that will be retrieved. Such attributes are available as MessageHeaders in received messages.

messageAttributeNames

Collection<String>

All

Set the message attribute names that will be retrieved with messages on receive operations. Such attributes are available as MessageHeaders in received messages.

messageSystemAttributeNames

Collection <MessageSystemAttributeName>

All

Set the message system attribute names that will be retrieved with messages on receive operations. Such attributes are available as MessageHeaders in received messages.

contentBasedDeduplication

ContentBasedDeduplication

ContentBasedDeduplication #AUTO

Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to. With ContentBasedDeduplication#AUTO, the queue attribute value will be resolved automatically.

Sending Messages

There are a number of available methods to send messages to SQS queues using the SqsTemplate. The following methods are available through the SqsOperations interface, with the respective async counterparts available in the SqsAsyncOperations.

// Send a message to the configured default endpoint.
SendResult<T> send(T payload);

// Send a message to the provided queue with the given payload.
SendResult<T> send(String queue, T payload);

// Send the given Message to the provided queue.
SendResult<T> send(String queue, Message<T> message);

// Send a message with the provided options.
SendResult<T> send(Consumer<SqsSendOptions> to);

// Send a batch of Messages to the provided queue
SendResult.Batch<T> sendMany(String queue, Collection<Message<T>> messages);
Note
To send a collection of objects, it is recommended to use sendMany(String queue, Collection<Message<T>> messages) to optimize throughput. To send a collection of objects in a single message, the collection must be wrapped in an object.

An example using the options variant follows:

SendResult<String> result = template.send(to -> to.queue("myQueue")
    .payload("myPayload")
    .header("myHeaderName", "myHeaderValue")
    .headers(Map.of("myOtherHeaderName", "myOtherHeaderValue"))
    .delaySeconds(10)
);
Note
To send messages to a Fifo queue, the options include messageDeduplicationId and messageGroupId properties. If messageGroupId is not provided, a random UUID is generated by the framework. If messageDeduplicationId is not provided and content deduplication is disabled on AWS, a random UUID is generated. The generated values can be retrieved in the headers of the Message contained in the SendResult object.
SendResult

The SendResult record contains useful information on the send operation.

public record SendResult<T>(UUID messageId, String endpoint, Message<T> message, Map<String, Object> additionalInformation) {

	public record Batch<T>(Collection<SendResult<T>> successful, Collection<SendResult.Failed<T>> failed) {}

	public record Failed<T> (String errorMessage, String endpoint, Message<T> message, Map<String, Object> additionalInformation) {}

}

When the send operation is successful, the SendResult object is created with:

  • the messageId returned from SQS for the message

  • the endpoint the message was sent to

  • the Message instance that was sent, with any additional headers that might have been added by the framework

  • an additionalInformation map with the sequenceNumber generated for the message in Fifo queues.

When the send operation fails for single message operations, a MessagingOperationFailedException containing the message is thrown.

For Batch send operations, a SendResult.Batch object is returned. This object contains a Collection of successful and failed results.

In case there are messages that failed to be sent within a batch, corresponding SendResult.Failed objects are generated. The SendBatch.Failed object contains:

  • the errorMessage returned by SQS

  • the endpoint the message was to be sent to

  • the Message instance that was tried to be sent, with any additional headers that might have been added by the framework

  • an additionalInformation map with the code and senderFault parameters returned by SQS.

By default, if there’s at least one failed message in a send batch operation, a SendBatchOperationFailedException will be thrown. Such exception contains a SendResult.Batch<?> property containing both successful and failed messages.

This behavior can be configured using the sendBatchFailureHandlingStrategy option when creating the template. If SendBatchFailureStrategy#DO_NOT_THROW is configured, no exception is thrown and a SendResult.Batch object containing both successful and failed messages is returned.

For convenience, the additionalInformation parameters can be found as constants in the SqsTemplateParameters class.

Template Message Conversion

Message conversion by default is handled by a SqsMessagingMessageConverter instance, which contains:

  • SqsHeaderMapper for mapping headers to and from messageAttributes

  • CompositeMessageConverter with a StringMessageConverter and a MappingJackson2MessageConverter for converting payloads to and from JSON.

A custom MessagingMessageConverter implementation can be provided in the SqsTemplate.builder():

SqsOperations template = SqsTemplate
    .builder()
    .sqsAsyncClient(sqsAsyncClient)
    .messageConverter(converter)
    .buildSyncTemplate();

The default SqsMessagingMessageConverter instance can also be configured in the builder:

SqsOperations template = SqsTemplate
    .builder()
    .sqsAsyncClient(sqsAsyncClient)
    .configureDefaultConverter(converter -> {
            converter.setObjectMapper(objectMapper);
            converter.setHeaderMapper(headerMapper);
            converter.setPayloadTypeHeader("my-custom-type-header");
        }
    )
    .buildSyncTemplate();
Specifying a Payload Class for Receive Operations

By default, the SqsTemplate adds a header with name JavaType containing the fully qualified name of the payload class to all messages sent. Such header is used in receive operations by the SqsTemplate, SqsMessageListenerContainer and @SqsListener to identify to which class the payload should be deserialized to.

This behavior can be configured in the SqsMessagingMessageConverter using the setPayloadTypeHeaderValueFunction method. The function receives a Message object and returns a String with the value to be used in the header, the payload’s class FQCN by default. If null is returned by the function, no header with type information is added.

The typeHeaderName can be configured using the setPayloadTypeHeader method.

In case type mapping information is not available, the payload class can be specified either in the Template Options or in the receive() method variants:

Optional<Message<SampleRecord>> receivedMessage = template
			.receive(queue, SampleRecord.class);

Receiving Messages

The framework offers the following options to receive messages from a queue.

SqsTemplate

The SqsTemplate offers convenient methods to receive messages from Standard and Fifo SQS queues. These methods are separated into two interfaces that are implemented by SqsTemplate: SqsOperations and SqsAsyncOperations. If only sync or async operations are to be used, using the specific interface can narrow down the methods.

See SqsTemplate for more information on the interfaces, Creating a SqsTemplate Instance and Template Options.

The following methods are available through the SqsOperations interface, with the respective async counterparts available in the SqsAsyncOperations.

// Receive a message from the configured default endpoint and options.
Optional<Message<?>> receive();

// Receive a message from the provided queue and convert the payload to the provided class.
<T> Optional<Message<T>> receive(String queue, Class<T> payloadClass);

// Receive a message with the provided options.
Optional<Message<?>> receive(Consumer<SqsReceiveOptions> from);

// Receive a message with the provided options and convert the payload to the provided class.
<T> Optional<Message<T>> receive(Consumer<SqsReceiveOptions> from, Class<T> payloadClass);

// Receive a batch of messages from the configured default endpoint and options.
Collection<Message<?>> receiveMany();

// Receive a batch of messages from the provided queue and convert the payloads to the provided class.
<T> Collection<Message<T>> receiveMany(String queue, Class<T> payloadClass);

// Receive a batch of messages with the provided options.
Collection<Message<?>> receiveMany(Consumer<SqsReceiveOptions> from);

// Receive a batch of messages with the provided options and convert the payloads to the provided class.
<T> Collection<Message<T>> receiveMany(Consumer<SqsReceiveOptions> from, Class<T> payloadClass);

The following is an example for receiving a message with options:

Optional<Message<SampleRecord>> receivedMessage = template
        .receive(from -> from.queue("my-queue")
                .visibilityTimeout(Duration.ofSeconds(10))
                .pollTimeout(Duration.ofSeconds(5))
                .additionalHeaders(Map.of("my-custom-header-name", "my-custom-header-value")),
		SampleRecord.class);
Note
To receive messages from a Fifo queue, the options include a receiveRequestAttemptId parameter. If no such parameter is provided, a random one is generated.
SqsTemplate Acknowledgement

The SqsTemplate by default acknowledges all received messages, which can be changed by setting TemplateAcknowledgementMode.MANUAL in the template options:

SqsTemplate.builder().configure(options -> options.acknowledgementMode(TemplateAcknowledgementMode.MANUAL));

If an error occurs during acknowledgement, a SqsAcknowledgementException is thrown, containing both the messages that were successfully acknowledged and those which failed.

To acknowledge messages received with MANUAL acknowledgement, the Acknowledgement#acknowledge and Acknowledgement#acknowledgeAsync methods can be used.

Message Listeners

To receive messages in a manually created container, a MessageListener or AsyncMessageListener must be provided. Both interfaces come with single message and a batch methods. These are functional interfaces and a lambda or method reference can be provided for the single message methods.

Single message / batch modes and message payload conversion can be configured via SqsContainerOptions. See Message Conversion and Payload Deserialization for more information.

@FunctionalInterface
public interface MessageListener<T> {

	void onMessage(Message<T> message);

	default void onMessage(Collection<Message<T>> messages) {
		throw new UnsupportedOperationException("Batch not implemented by this MessageListener");
	}

}
@FunctionalInterface
public interface AsyncMessageListener<T> {

	CompletableFuture<Void> onMessage(Message<T> message);

	default CompletableFuture<Void> onMessage(Collection<Message<T>> messages) {
		return CompletableFutures
				.failedFuture(new UnsupportedOperationException("Batch not implemented by this AsyncMessageListener"));
	}

}

SqsMessageListenerContainer

The MessageListenerContainer manages the entire messages` lifecycle, from polling, to processing, to acknowledging.

It can be instantiated directly, using a SqsMessageListenerContainerFactory, or using @SqsListener annotations. If declared as a @Bean, the Spring context will manage its lifecycle, starting the container on application startup and stopping it on application shutdown. See Container Lifecycle for more information.

It implements the MessageListenerContainer interface:

public interface MessageListenerContainer<T> extends SmartLifecycle {

	String getId();

	void setId(String id);

	void setMessageListener(MessageListener<T> messageListener);

	void setAsyncMessageListener(AsyncMessageListener<T> asyncMessageListener);

}
Note
The generic parameter <T> stands for the payload type of messages to be consumed by this container. This allows ensuring at compile-time that all components used with the container are for the same type. If more than one payload type is to be used by the same container or factory, simply type it as Object. This type is not considered for payload conversion.

A container can be instantiated in a familiar Spring way in a @Configuration annotated class. For example:

@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    SqsMessageListenerContainer<Object> container = new SqsMessageListenerContainer<>(sqsAsyncClient);
    container.setMessageListener(System.out::println);
    container.setQueueNames("myTestQueue");
    return container;
}

This framework also provides a convenient Builder that allows a different approach, such as:

@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainer
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
}

The container’s lifecycle can also be managed manually:

void myMethod(SqsAsyncClient sqsAsyncClient) {
    SqsMessageListenerContainer<Object> container = SqsMessageListenerContainer
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
    container.start();
    container.stop();
}

SqsMessageListenerContainerFactory

A MessageListenerContainerFactory can be used to create MessageListenerContainer instances, both directly or through @SqsListener annotations.

It can be created in a familiar Spring way, such as:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
    factory.setSqsAsyncClient(sqsAsyncClient);
    return factory;
}

Or through the Builder:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .build();
}
Important
Using this method for setting the SqsAsyncClient instance in the factory, all containers created by this factory will share the same SqsAsyncClient instance. For high-throughput applications, a Supplier<SqsAsyncClient> can be provided instead through the factory’s setSqsAsyncClientSupplier or the builder’s sqsAsyncSupplier methods. In this case each container will receive a SqsAsyncClient instance. Alternatively, a single SqsAsyncClient instance can be configured for higher throughput. See the AWS documentation for more information on tradeoffs of each approach.

The factory can also be used to create a container directly, such as:

@Bean
MessageListenerContainer<Object> myListenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .build()
            .createContainer("myQueue");
}

@SqsListener Annotation

The simplest way to consume SQS messages is by annotating a method in a @Component class with the @SqsListener annotation. The framework will then create the MessageListenerContainer and set a MessagingMessageListenerAdapter to invoke the method when a message is received.

When using Spring Boot with auto-configuration, no configuration is necessary.

Most attributes on the annotation can be resolved from SpEL (#{…​}) or property placeholders (${…​}).

Queue Names

One or more queues can be specified in the annotation through the queueNames or value properties - there’s no distinction between the two properties.

Instead of queue names, queue urls can also be provided. Using urls instead of queue names can result in slightly faster startup times since it prevents the framework from looking up the queue url when the containers start.

@SqsListener({"${my.queue.url}", "myOtherQueue"})
public void listenTwoQueues(String message) {
    System.out.println(message);
}

Any number of @SqsListener annotations can be used in a bean class, and each annotated method will be handled by a separate MessageListenerContainer.

Note
Queues declared in the same annotation will share the container, though each will have separate throughput and acknowledgement controls.
SNS Messages

Since 3.1.1, when receiving SNS messages through the @SqsListener, the message includes all attributes of the SnsNotification. To only receive need the Message part of the payload, you can utilize the @SnsNotificationMessage annotation.

For handling individual message processing, the @SnsNotificationMessage annotation can be used in the following manner:

@SqsListener("my-queue")
public void listen(@SnsNotificationMessage Pojo pojo) {
	System.out.println(pojo.field);
}

For batch message processing, use the @SnsNotificationMessage annotation with a List<Pojo> parameter.

@SqsListener("my-queue")
public void listen(@SnsNotificationMessage List<Pojo> pojos) {
	System.out.println(pojos.size());
}

EventBridge Messages

Since 3.3.0, similar to the @SnsNotificationMessage, there is also a @EventBridgeMessage annotation that can be used to receive EventBridge messages. To only receive the detail part of the payload, you can utilize the @EventBridgeMessage annotation. As with @SnsNotificationMessage, this supports both individual messages, or lists.

@SqsListener("my-queue")
public void listen(@EventBridgeMessage Pojo pojo) {
	System.out.println(pojo.field);
}

For batch message processing, use the @EventBridgeMessage annotation with a List<Pojo> parameter.

@SqsListener("my-queue")
public void listen(@EventBridgeMessage List<Pojo> pojos) {
	System.out.println(pojos.size());
}
Specifying a MessageListenerContainerFactory

A MessageListenerContainerFactory can be specified through the factory property. Such factory will then be used to create the container for the annotated method.

If not specified, a factory with the defaultSqsListenerContainerFactory name will be looked up. For changing this default name, see Global Configuration for @SqsListeners.

@SqsListener(queueNames = "myQueue", factory = "myFactory")
public void listen(String message) {
    System.out.println(message);
}

When using a Spring Boot application with auto-configuration, a default factory is provided if there are no other factory beans declared in the context.

Other Annotation Properties

The following properties can be specified in the @SqsListener annotation. Such properties override the equivalent SqsContainerOptions for the resulting MessageListenerContainer.

  • id - Specify the resulting container’s id. This can be used for fetching the container from the MessageListenerContainerRegistry, and is used by the container and its components for general logging and thread naming.

  • maxConcurrentMessages - Set the maximum number of messages that can be inflight at any given moment. See Message Processing Throughput for more information.

  • pollTimeoutSeconds - Set the maximum time to wait before a poll returns from SQS. Note that if there are messages available the call may return earlier than this setting.

  • messageVisibilitySeconds - Set the minimum visibility for the messages retrieved in a poll. Note that for FIFO single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener. See FIFO Support for more information.

  • acknowledgementMode - Set the acknowledgement mode for the container. If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options. See Acknowledgement Mode for more information.

Listener Method Arguments

A number of possible argument types are allowed in the listener method’s signature.

  • MyPojo - POJO types are automatically deserialized from JSON.

  • Message<MyPojo> - Provides a Message<MyPojo> instance with the deserialized payload and MessageHeaders.

  • List<MyPojo> - Enables batch mode and receives the batch that was polled from SQS.

  • List<Message<MyPojo>> - Enables batch mode and receives the batch that was polled from SQS along with headers.

  • @Header(String headerName) - provides the specified header.

  • @Headers - provides the MessageHeaders or a Map<String, Object>

  • Acknowledgement - provides methods for manually acknowledging messages for single message listeners. AcknowledgementMode must be set to MANUAL (see Acknowledging Messages)

  • BatchAcknowledgement - provides methods for manually acknowledging partial or whole message batches for batch listeners. AcknowledgementMode must be set to MANUAL (see Acknowledging Messages)

  • Visibility - provides the changeTo() method that enables changing the message’s visibility to the provided value.

  • BatchVisibility - provides changeTo() methods that enables changing partial or whole message batches visibility to the provided value.

  • QueueAttributes - provides queue attributes for the queue that received the message. See Retrieving Attributes from SQS for how to specify the queue attributes that will be fetched from SQS

  • software.amazon.awssdk.services.sqs.model.Message - provides the original Message from SQS

Note
To receive a collection of objects in a single message, the collection must be wrapped in an object. See Sending Messages.

Here’s a sample with many arguments:

@SqsListener("${my-queue-name}")
public void listen(Message<MyPojo> message, MyPojo pojo, MessageHeaders headers, Acknowledgement ack, Visibility visibility, QueueAttributes queueAttributes, software.amazon.awssdk.services.sqs.model.Message originalMessage) {
    Assert.notNull(message);
    Assert.notNull(pojo);
    Assert.notNull(headers);
    Assert.notNull(ack);
    Assert.notNull(visibility);
    Assert.notNull(queueAttributes);
    Assert.notNull(originalMessage);
}
Important
Batch listeners support a single List<MyPojo> and List<Message<MyPojo>> method arguments, and optional BatchAcknowledgement (or AsyncBatchAcknowledgement) and BatchVisibility arguments. MessageHeaders should be extracted from the Message instances through the getHeaders() method.

Batch Processing

All message processing interfaces have both single message and batch methods. This means the same set of components can be used to process both single and batch methods, and share logic where applicable.

When batch mode is enabled, the framework will serve the entire result of a poll to the listener. If a value greater than 10 is provided for maxMessagesPerPoll, the result of multiple polls will be combined and up to the respective amount of messages will be served to the listener.

To enable batch processing using @SqsListener, a single List<MyPojo> or List<Message<MyPojo>> method argument should be provided in the listener method. The listener method can also have: - an optional BatchAcknowledgement argument for AcknowledgementMode.MANUAL - an optional BatchVisibility argument

Alternatively, SqsContainerOptions can be set to ListenerMode.BATCH in the SqsContainerOptions in the factory or container.

Note
The same factory can be used to create both single message and batch containers for @SqsListener methods.
Important
In case the same factory is shared by both delivery methods, any supplied ErrorHandler, MessageInterceptor or MessageListener should implement the proper methods.

Container Options

Each MessageListenerContainer can have a different set of options. MessageListenerContainerFactory instances have a SqsContainerOptions.Builder instance property that is used as a template for the containers it creates.

Both factory and container offer a configure method that can be used to change the options:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .configure(options -> options
                    .messagesPerPoll(5)
                    .pollTimeout(Duration.ofSeconds(10)))
            .sqsAsyncClient(sqsAsyncClient)
            .build();
}
@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainer
            .builder()
            .configure(options -> options
                    .messagesPerPoll(5)
                    .pollTimeout(Duration.ofSeconds(10)))
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
}

The SqsContainerOptions instance is immutable and can be retrieved via the container.getContainerOptions() method. If more complex configurations are necessary, the toBuilder and fromBuilder methods provide ways to create a new copy of the options, and then set it back to the factory or container:

void myMethod(MessageListenerContainer<Object> container) {
    SqsContainerOptions.Builder modifiedOptions = container.getContainerOptions()
            .toBuilder()
            .pollTimeout(Duration.ofSeconds(5))
            .shutdownTimeout(Duration.ofSeconds(20));
    container.configure(options -> options.fromBuilder(modifiedOptions));
}

A copy of the options can also be created with containerOptions.createCopy() or containerOptionsBuilder.createCopy().

Using Auto-Configuration

The Spring Boot Starter for SQS provides the following auto-configuration properties:

Name

Description

Required

Default value

spring.cloud.aws.sqs.enabled

Enables the SQS integration.

No

true

spring.cloud.aws.sqs.endpoint

Configures endpoint used by SqsAsyncClient.

No

http://localhost:4566

spring.cloud.aws.sqs.region

Configures region used by SqsAsyncClient.

No

eu-west-1

spring.cloud.aws.sqs.listener.max-inflight-messages-per-queue

Maximum number of inflight messages per queue.

No

10

spring.cloud.aws.sqs.listener.max-messages-per-poll

Maximum number of messages to be received per poll.

No

10

spring.cloud.aws.sqs.listener.poll-timeout

Maximum amount of time to wait for messages in a poll.

No

10 seconds

spring.cloud.aws.sqs.queue-not-found-strategy

The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist.

No

CREATE

SqsContainerOptions Descriptions
Property Range Default Description

maxConcurrentMessages

1 - Integer.MAX_VALUE

10

The maximum number of messages from each queue that can be processed simultaneously in this container. This number will be used for defining the thread pool size for the container following (maxConcurrentMessages * number of queues). For batching acknowledgements a message is considered as no longer inflight when it’s handed to the acknowledgement queue. See Acknowledging Messages.

maxMessagesPerPoll

1 - Integer.MAX_VALUE

10

The maximum number of messages that will be received by a poll to a SQS queue in this container. If a value greater than 10 is provided, the result of multiple polls will be combined, which can be useful for batch listeners.

See AWS documentation for more information.

pollTimeout

1 - 10 seconds

10 seconds

The maximum duration for a poll to a SQS queue before returning empty. Longer polls decrease the chance of empty polls when messages are available. See AWS documentation for more information.

maxDelayBetweenPolls

1 - 10 seconds

10 seconds

The maximum time the framework will wait for permits to be available for a queue before attempting the next poll. After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than maxMessagesPerPoll messages, unless otherwise configured. See Message Processing Throughput.

pollBackOffPolicy

Any valid BackOffPolicy implementation

ExponentialBackOffPolicy

The back off policy to be applied when a polling thread throws an error. The default is an exponential policy with a delay of 1s, a multiplier of 2.0, and a maximum of 10s.

autoStartup

true, false

true

Determines wherever container should start automatically. When set to false the container will not launch on startup, requiring manual intervention to start it. See Container Lifecycle.

listenerShutdownTimeout

0 - undefined

20 seconds

The amount of time the container will wait for a queue to complete message processing before attempting to forcefully shutdown. See Container Lifecycle.

acknowledgementShutdownTimeout

0 - undefined

20 seconds

The amount of time the container will wait for acknowledgements to complete for a queue after message processing has ended. See Container Lifecycle.

backPressureMode

AUTO, ALWAYS_POLL_MAX_MESSAGES, FIXED_HIGH_THROUGHPUT

AUTO

Configures the backpressure strategy to be used by the container. See Configuring BackPressureMode.

listenerMode

SINGLE_MESSAGE, BATCH

SINGLE_MESSAGE

Configures whether this container will use single message or batch listeners. This value is overriden by @SqsListener depending on whether the listener method contains a List argument. See Batch Processing.

queueAttributeNames

Collection<QueueAttributeName>

Empty list

Configures the QueueAttributes that will be retrieved from SQS when a container starts. See Retrieving Attributes from SQS.

messageAttributeNames

Collection<String>

ALL

Configures the MessageAttributes that will be retrieved from SQS for each message. See Retrieving Attributes from SQS.

messageSystemAttributeNames

Collection<String>

ALL

Configures the MessageSystemAttribute that will be retrieved from SQS for each message. See Retrieving Attributes from SQS.

fifoBatchGroupingStrategy

PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES, PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH

PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES

Specifies how messages from FIFO queues should be grouped when retrieved by the container when listener mode is batch. See FIFO Support.

messageConverter

MessagingMessageConverter

SqsMessagingMessageConverter

Configures the MessagingMessageConverter that will be used to convert SQS messages into Spring Messaging Messages. See Message Conversion and Payload Deserialization.

acknowledgementMode

ON_SUCCESS, ALWAYS, MANUAL

ON_SUCCESS

Configures the processing outcomes that will trigger automatic acknowledging of messages. See Acknowledging Messages.

acknowledgementInterval

0 - undefined

1 second for Standard SQS, Duration.ZERO for FIFO SQS

Configures the interval between acknowledges for batching. Set to Duration.ZERO along with acknowledgementThreshold to zero to enable immediate acknowledgement See Acknowledging Messages.

acknowledgementThreshold

0 - undefined

10 for Standard SQS, 0 for FIFO SQS

Configures the minimal amount of messages in the acknowledgement queue to trigger acknowledgement of a batch. Set to zero along with acknowledgementInterval to Duration.ZERO to enable immediate acknowledgement. See Acknowledging Messages.

acknowledgementOrdering

PARALLEL, ORDERED

PARALLEL for Standard SQS and FIFO queues with immediate acknowledgement, ORDERED for FIFO queues with acknowledgement batching enabled.

Configures the order acknowledgements should be made. Fifo queues can be acknowledged in parallel for immediate acknowledgement since the next message for a message group will only start being processed after the previous one has been acknowledged. See Acknowledging Messages.

componentsTaskExecutor

TaskExecutor

null

Provides a TaskExecutor instance to be used by the MessageListenerContainer internal components. See Providing a TaskExecutor.

acknowledgementResultTaskExecutor

TaskExecutor

null

Provides a TaskExecutor instance to be used by the AcknowledgementProcessor for blocking AcknowledgementResultCallback. See Providing a TaskExecutor.

messageVisibility

Duration

null

Specify the message visibility duration for messages polled in this container. For FIFO queues, visibility is extended for all messages in a message group before each message is processed. See FIFO Support. Otherwise, visibility is specified once when polling SQS.

queueNotFoundStrategy

FAIL, CREATE

CREATE

Configures the behavior when a queue is not found at container startup. See Container Lifecycle.

Retrieving Attributes from SQS

QueueAttributes, MessageAttributes and MessageSystemAttributes can be retrieved from SQS. These can be configured using the SqsContainerOptions queueAttributeNames, messageAttributeNames and messageSystemAttributeNames methods.

QueueAttributes for a queue are retrieved when containers start, and can be looked up by adding the QueueAttributes method parameter in a @SqsListener method, or by getting the SqsHeaders.SQS_QUEUE_ATTRIBUTES_HEADER header.

MessageAttributes and MessageSystemAttributes are retrieved with each message, and are mapped to message headers. Those can be retrieved with @Header parameters, or directly in the Message. The message headers are prefixed with SqsHeaders.SQS_MA_HEADER_PREFIX ("Sqs_MA_") for message attributes and SqsHeaders.SQS_MSA_HEADER_PREFIX ("Sqs_MSA_") for message system attributes.

Note
By default, no QueueAttributes and ALL MessageAttributes and MessageSystemAttributes are retrieved.

Container Lifecycle

The MessageListenerContainer interface extends SmartLifecycle, which provides methods to control the container’s lifecycle.

Containers created from @SqsListener annotations are registered in a MessageListenerContainerRegistry bean that is registered by the framework. The containers themselves are not Spring-managed beans, and the registry is responsible for managing these containers` lifecycle in application startup and shutdown.

Note
The DefaultListenerContainerRegistry ` implementation provided by the framework allows the phase value to be set through the `setPhase method. The default value is MessageListenerContainer.DEFAULT_PHASE.

At startup, the containers will make requests to SQS to retrieve the queues` urls for the provided queue names or ARNs, and for retrieving QueueAttributes if so configured. Providing queue urls instead of names and not requesting queue attributes can result in slightly better startup times since there’s no need for such requests.

Note
If retrieving the queue url fails due to the queue not existing, the framework can be configured to either create the queue or fail. If a URL is provided instead of a queue name the framework will not make this request at startup, and thus if the queue does not exist it will fail at runtime. This configuration is available in SqsContainerOptions queueNotFoundStrategy.

At shutdown, by default containers will wait for all polling, processing and acknowledging operations to finish, up to SqsContainerOptions.getShutdownTimeout(). After this period, operations will be canceled and the container will attempt to forcefully shutdown.

Containers as Spring Beans

Manually created containers can be registered as beans, e.g. by declaring a @Bean in a @Configuration annotated class. In these cases the containers lifecycle will be managed by the Spring context at application startup and shutdown.

@Bean
MessageListenerContainer<Object> listenerContainer(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainer
            .builder()
            .sqsAsyncClient(sqsAsyncClient)
            .messageListener(System.out::println)
            .queueNames("myTestQueue")
            .build();
}
Note
The SqsMessageListenerContainer.builder() allows to specify the SmartLifecycle.phase, to override the default value defined in MessageListenerContainer.DEFAULT_PHASE
Retrieving Containers from the Registry

Containers can be retrieved by fetching the MessageListenerContainer bean from the container and using the getListenerContainers and getContainerById methods. Then lifecycle methods can be used to start and stop instances.

@Autowired
MessageListenerContainerRegistry registry;

public void myLifecycleMethod() {
    MessageListenerContainer container = registry.getContainerById("myId");
    container.stop();
    container.start();
}
Lifecycle Execution

By default, all lifecycle actions performed by the MessageListenerContainerRegistry and internally by the MessageListenerContainer instances are executed in parallel.

This behavior can be disabled by setting LifecycleHandler.get().setParallelLifecycle(false).

Note
Spring-managed MessageListenerContainer beans' lifecycle actions are always performed sequentially.

FIFO Support

FIFO SQS queues are fully supported for receiving messages - queues with names that ends in .fifo will automatically be setup as such.

  • Messages are polled with a receiveRequestAttemptId, and the received batch of messages is split according to the message`s MessageGroupId.

  • Each message from a given group will then be processed in order, while each group is processed in parallel.

  • To receive messages from multiple groups in a batch, set fifoBatchGroupingStrategy to PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH in SqsContainerOptions.

  • If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their message visibility expires.

  • Messages which were already successfully processed and acknowledged will not be served again.

  • FIFO queues also have different defaults for acknowledging messages, see Acknowledgement Defaults for more information.

  • If a message visibility is set through @SqsListener or SqsContainerOptions, visibility will be extended for all messages in the message group before each message is processed.

Important
A MessageListenerContainer can either have only Standard queues or FIFO queues - not both. This is valid both for manually created containers and @SqsListener annotated methods.

Message Interceptor

The framework offers the MessageInterceptor and the AsyncMessageInterceptor interfaces:

public interface MessageInterceptor<T> {

	default Message<T> intercept(Message<T> message) {
		return message;
	}

	default Collection<Message<T>> intercept(Collection<Message<T>> messages) {
		return messages;
	}

	default void afterProcessing(Message<T> message, Throwable t) {
	}

	default void afterProcessing(Collection<Message<T>> messages, Throwable t) {
	}

}
public interface AsyncMessageInterceptor<T> {

	default CompletableFuture<Message<T>> intercept(Message<T> message) {
		return CompletableFuture.completedFuture(message);
	}

	default CompletableFuture<Collection<Message<T>>> intercept(Collection<Message<T>> messages) {
		return CompletableFuture.completedFuture(messages);
	}

	default CompletableFuture<Void> afterProcessing(Message<T> message, Throwable t) {
		return CompletableFuture.completedFuture(null);
	}

	default CompletableFuture<Void> afterProcessing(Collection<Message<T>> messages, Throwable t) {
		return CompletableFuture.completedFuture(null);
	}

}

When using the auto-configured factory, simply declare a @Bean and the interceptor will be set:

@Bean
public MessageInterceptor<Object> messageInterceptor() {
    return new MessageInterceptor<Object>() {
            @Override
            public Message<Object> intercept(Message<Object> message) {
                return MessageBuilder
                    .fromMessage(message)
                    .setHeader("newHeader", "newValue")
                    .build();
            }
        };
}

Alternatively, implementations can be set in the MessageListenerContainerFactory or directly in the MessageListenerContainer:

@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
    return SqsMessageListenerContainerFactory
        .builder()
        .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
        .messageInterceptor(new MessageInterceptor<Object>() {
            @Override
            public Message<Object> intercept(Message<Object> message) {
                return MessageBuilder
                    .fromMessage(message)
                    .setHeader("newHeader", "newValue")
                    .build();
            }
        })
        .build();
}
Note
Multiple interceptors can be added to the same factory or container.

The intercept methods are executed before a message is processed, and a different message can be returned.

Important
In case a different message is returned, it’s important to add the SqsHeaders.SQS_RECEIPT_HANDLE_HEADER with the value of the original handler so the original message is acknowledged after processing. Also, a SqsHeaders.SQS_MESSAGE_ID_HEADER must always be present.
Important
The intercept methods must not return null.

The afterProcessing methods are executed after message is processed and the ErrorHandler is invoked, but before the message is acknowledged.

Error Handling

By default, messages that have an error thrown by the listener will not be acknowledged, and the message can be polled again after visibility timeout expires.

Alternatively, the framework offers the ErrorHandler and AsyncErrorHandler interfaces, which are invoked after a listener execution fails.

public interface ErrorHandler<T> {

	default void handle(Message<T> message, Throwable t) {
	}

	default void handle(Collection<Message<T>> messages, Throwable t) {
	}

}
public interface AsyncErrorHandler<T> {

	default CompletableFuture<Void> handle(Message<T> message, Throwable t) {
		return CompletableFutures.failedFuture(t);
	}

	default CompletableFuture<Void> handle(Collection<Message<T>> messages, Throwable t) {
		return CompletableFutures.failedFuture(t);
	}

}

When using the auto-configured factory, simply declare a @Bean and the error handler will be set:

@Bean
public ErrorHandler<Object> errorHandler() {
    return new ErrorHandler<Object>() {
        @Override
        public void handle(Message<Object> message, Throwable t) {
            // error handling logic
            // throw if the message should not be acknowledged
        }
    }}

Alternatively, implementations can be set in the MessageListenerContainerFactory or directly in the MessageListenerContainer:

@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
    return SqsMessageListenerContainerFactory
        .builder()
        .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
        .errorHandler(new ErrorHandler<Object>() {
            @Override
            public void handle(Message<Object> message, Throwable t) {
                // error handling logic
            }
        })
        .build();
}

If the error handler execution succeeds, i.e. no error is thrown from the error handler, the message is considered to be recovered and is acknowledged according to the acknowledgement configuration.

Important
If the message should not be acknowledged and the ON_SUCCESS acknowledgement mode is set, it’s important to propagate the error. For simply executing an action in case of errors, an interceptor should be used instead, checking the presence of the throwable argument for detecting a failed execution.

Message Conversion and Payload Deserialization

Payloads are automatically deserialized from JSON for @SqsListener annotated methods using a MappingJackson2MessageConverter.

Note
When using Spring Boot’s auto-configuration, if there’s a single ObjectMapper in Spring Context, such object mapper will be used for converting messages. This includes the one provided by Spring Boot’s auto-configuration itself. For configuring a different ObjectMapper, see Global Configuration for @SqsListeners.

For manually created MessageListeners, MessageInterceptor and ErrorHandler components, or more fine-grained conversion such as using interfaces or inheritance in listener methods, type mapping is required for payload deserialization.

By default, the framework looks for a MessageHeader named Sqs_MA_JavaType containing the fully qualified class name (FQCN) for which the payload should be deserialized to. If such header is found, the message is automatically deserialized to the provided class.

Further configuration can be achieved by providing a configured MessagingMessageConverter instance in the SqsContainerOptions.

Note
If type mapping is setup or type information is added to the headers, payloads are deserialized right after the message is polled. Otherwise, for @SqsListener annotated methods, payloads are deserialized right before the message is sent to the listener. For providing custom MessageConverter instances to be used by @SqsListener methods, see Global Configuration for @SqsListeners

Configuring a MessagingMessageConverter

The framework provides the SqsMessagingMessageConverter, which implements the MessagingMessageConverter interface.

public interface MessagingMessageConverter<S> {

	Message<?> toMessagingMessage(S source);

	S fromMessagingMessage(Message<?> message);

}

The default header-based type mapping can be configured to use a different header name by using the setPayloadTypeHeader method.

It is also possible not to include payload type information in the header by using the doNotSendPayloadTypeHeader method.

More complex mapping can be achieved by using the setPayloadTypeMapper method, which overrides the default header-based mapping. This method receives a Function<Message<?>, Class<?>> payloadTypeMapper that will be applied to incoming messages.

The default MappingJackson2MessageConverter can be replaced by using the setPayloadMessageConverter method.

The framework also provides the SqsHeaderMapper, which implements the HeaderMapper interface and is invoked by the SqsMessagingMessageConverter. To provide a different HeaderMapper implementation, use the setHeaderMapper method.

An example of such configuration follows:

// Create converter instance
SqsMessagingMessageConverter messageConverter = new SqsMessagingMessageConverter();

// Configure Type Header
messageConverter.setPayloadTypeHeader("myTypeHeader");

// Do not send Type Header
messageConverter.doNotSendPayloadTypeHeader();

// Configure Header Mapper
SqsHeaderMapper headerMapper = new SqsHeaderMapper();
headerMapper.setAdditionalHeadersFunction(((sqsMessage, accessor) -> {
    accessor.setHeader("myCustomHeader", "myValue");
    return accessor.toMessageHeaders();
}));
messageConverter.setHeaderMapper(headerMapper);

// Configure Payload Converter
MappingJackson2MessageConverter payloadConverter = new MappingJackson2MessageConverter();
payloadConverter.setPrettyPrint(true);
messageConverter.setPayloadMessageConverter(payloadConverter);

// Set MessageConverter to the factory or container
factory.configure(options -> options.messageConverter(messageConverter));

Interfaces and Subclasses in Listener Methods

Interfaces and subclasses can be used in @SqsListener annotated methods by configuring a type mapper:

messageConverter.setPayloadTypeMapper(message -> {
    String eventTypeHeader = message.getHeaders().get("myEventTypeHeader", String.class);
    return "eventTypeA".equals(eventTypeHeader)
        ? MyTypeA.class
        : MyTypeB.class;
});

And then, in the listener method:

@SpringBootApplication
public class SqsApplication {

    public static void main(String[] args) {
        SpringApplication.run(SqsApplication.class, args);
    }

    // Retrieve the converted payload
    @SqsListener("myQueue")
    public void listen(MyInterface message) {
        System.out.println(message);
    }

    // Or retrieve a Message with the converted payload
    @SqsListener("myOtherQueue")
    public void listen(Message<MyInterface> message) {
        System.out.println(message);
    }
}

Acknowledging Messages

In SQS acknowledging a message is the same as deleting the message from the queue. A number of Acknowledgement strategies are available and can be configured via SqsContainerOptions. Optionally, a callback action can be added to be executed after either a successful or failed acknowledgement.

Here’s an example of a possible configuration:

@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
            .builder()
            .configure(options -> options
                    .acknowledgementMode(AcknowledgementMode.ALWAYS)
                    .acknowledgementInterval(Duration.ofSeconds(3))
                    .acknowledgementThreshold(5)
                    .acknowledgementOrdering(AcknowledgementOrdering.ORDERED)
            )
            .sqsAsyncClient(sqsAsyncClient)
            .build();
}

Each option is explained in the following sections.

Note
All options are available for both single message and batch message listeners.

Acknowledgement Mode

  • ON_SUCCESS - Acknowledges a message or batch of messages after successful processing.

  • ALWAYS - Acknowledges a message or batch of messages after processing returns success or error.

  • MANUAL - The framework won’t acknowledge messages automatically and Acknowledgement objects can be received in the listener method.

The Acknowledgement strategy can be configured in the SqsContainerOptions or in the @SqsListener annotation.

Acknowledgement Batching

The acknowledgementInterval and acknowledgementThreshold options enable acknowledgement batching. Acknowledgements will be executed after either the amount of time specified in the interval or the number of messages to acknowledge reaches the threshold.

Setting acknowledgementInterval to Duration.ZERO will disable the periodic acknowledgement, which will be executed only when the number of messages to acknowledge reaches the specified acknowledgementThreshold.

Setting acknowledgementThreshold to 0 will disable acknowledging per number of messages, and messages will be acknowledged only on the specified acknowldgementInterval

Important
When using acknowledgement batching messages stay inflight for SQS purposes until their respective batch is acknowledged. MessageVisibility should be taken into consideration when configuring this strategy.
Immediate Acknowledging

Setting both acknowledgementInterval and acknowledgementThreshold to Duration.ZERO and 0 respectively enables Immediate Acknowledging.

With this configuration, messages are acknowledged sequentially after being processed, and the message is only considered processed after the message is successfully acknowledged.

Important
If an immediate acknowledging triggers an error, message processing is considered failed and will be retried after the specified visibilityTimeout.

Manual Acknowledgement

Acknowledgements can be handled manually by setting AcknowledgementMode.MANUAL in the SqsContainerOptions. Manual acknowledgement can be used in conjunction with acknowledgement batching - the message will be queued for acknowledgement but won’t be executed until one of the acknowledgement thresholds is reached. It can also be used in conjunction with immediate acknowledgement.

The Acknowledgement#acknowledge and Acknowledgement#acknowledgeAsync methods are also available to acknowledge messages received in MANUAL acknowledgement mode.

The following arguments can be used in listener methods to manually acknowledge:

Acknowledgement

The Acknowledgement interface can be used to acknowledge messages in ListenerMode.SINGLE_MESSAGE.

public interface Acknowledgement {

	/**
	 * Acknowledge the message.
	 */
	void acknowledge();

	/**
	 * Asynchronously acknowledge the message.
	 */
	CompletableFuture<Void> acknowledgeAsync();

}
BatchAcknowledgement

The BatchAcknowledgement interface can be used to acknowledge messages in ListenerMode.BATCH.

The acknowledge(Collection<Message<T>) method enables acknowledging partial batches.

public interface BatchAcknowledgement<T> {

	/**
	 * Acknowledge all messages from the batch.
	 */
	void acknowledge();

	/**
	 * Asynchronously acknowledge all messages from the batch.
	 */
	CompletableFuture<Void> acknowledgeAsync();

	/**
	 * Acknowledge the provided messages.
	 */
	void acknowledge(Collection<Message<T>> messagesToAcknowledge);

	/**
	 * Asynchronously acknowledge the provided messages.
	 */
	CompletableFuture<Void> acknowledgeAsync(Collection<Message<T>> messagesToAcknowledge);

}

Acknowledgement Ordering

  • PARALLEL - Acknowledges the messages as soon as one of the above criteria is met - many acknowledgement calls can be made in parallel.

  • ORDERED - One batch of acknowledgements will be executed after the previous one is completed, ensuring FIFO ordering for batching acknowledgements.

  • ORDERED_BY_GROUP - One batch of acknowledgements will be executed after the previous one for the same group is completed, ensuring FIFO ordering of acknowledgements with parallelism between message groups. Only available for FIFO queues.

Acknowledgement Defaults

The defaults for acknowledging differ for Standard and FIFO SQS queues.

Standard SQS
  • Acknowledgement Interval: One second

  • Acknowledgement Threshold: Ten messages

  • Acknowledgement Ordering: PARALLEL

FIFO SQS
  • Acknowledgement Interval: Zero (Immediate)

  • Acknowledgement Threshold: Zero (Immediate)

  • Acknowledgement Ordering: PARALLEL if immediate acknowledgement, ORDERED if batching is enabled (one or both above defaults are overridden).

Note
PARALLEL is the default for FIFO because ordering is guaranteed for processing. This assures no messages from a given MessageGroup will be polled until the previous batch is acknowledged. Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure.

Acknowledgement Result Callback

The framework offers the AcknowledgementResultCallback and AsyncAcknowledgementCallback interfaces that can be added to a SqsMessageListenerContainer or SqsMessageListenerContainerFactory.

public interface AcknowledgementResultCallback<T> {

	default void onSuccess(Collection<Message<T>> messages) {
	}

	default void onFailure(Collection<Message<T>> messages, Throwable t) {
	}

}
public interface AsyncAcknowledgementResultCallback<T> {

	default CompletableFuture<Void> onSuccess(Collection<Message<T>> messages) {
		return CompletableFuture.completedFuture(null);
	}

	default CompletableFuture<Void> onFailure(Collection<Message<T>> messages, Throwable t) {
		return CompletableFuture.completedFuture(null);
	}

}
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
	return SqsMessageListenerContainerFactory
		.builder()
		.sqsAsyncClient(sqsAsyncClient)
		.acknowledgementResultCallback(getAcknowledgementResultCallback())
		.build();
}
Note
When immediate acknowledgement is set, as is the default for FIFO queues, the callback will be executed before the next message in the batch is processed, and next message processing will wait for the callback completion. This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue. For batch parallel processing, as is the default for Standard queues the callback execution happens asynchronously.

Global Configuration for @SqsListeners

A set of configurations can be set for all containers from @SqsListener by providing SqsListenerConfigurer beans.

@FunctionalInterface
public interface SqsListenerConfigurer {

	void configure(EndpointRegistrar registrar);

}

The following attributes can be configured in the registrar:

  • setMessageHandlerMethodFactory - provide a different factory to be used to create the invocableHandlerMethod instances that wrap the listener methods.

  • setListenerContainerRegistry - provide a different MessageListenerContainerRegistry implementation to be used to register the MessageListenerContainers

  • setMessageListenerContainerRegistryBeanName - provide a different bean name to be used to retrieve the MessageListenerContainerRegistry

  • setObjectMapper - set the ObjectMapper instance that will be used to deserialize payloads in listener methods. See Message Conversion and Payload Deserialization for more information on where this is used.

  • setValidator - set the Validator instance that will be used for payload validation in listener methods.

  • manageMessageConverters - gives access to the list of message converters that will be used to convert messages. By default, StringMessageConverter, SimpleMessageConverter and MappingJackson2MessageConverter are used.

  • manageArgumentResolvers - gives access to the list of argument resolvers that will be used to resolve the listener method arguments. The order of resolvers is important - PayloadMethodArgumentResolver should generally be last since it’s used as default.

A simple example would be:

@Bean
SqsListenerConfigurer configurer(ObjectMapper objectMapper) {
    return registrar -> registrar.setObjectMapper(objectMapper);
}
Note
Any number of SqsListenerConfigurer beans can be registered in the context. All instances will be looked up at application startup and iterated through.

Message Processing Throughput

The following options are available for tuning the application’s throughput. When a configuration is available both in the SqsContainerOptions and @SqsListener annotation, the annotation value takes precedence, if any.

SqsContainerOptions and @SqsListener properties

maxConcurrentMessages

Can be set in either the SqsContainerOptions or the @SqsListener annotation. Represents the maximum number of messages being processed by the container at a given time. Defaults to 10.

This value is enforced per queue, meaning the number of inflight messages in a container can be up to (number of queues in container * maxConcurrentMessages).

Note
When using acknowledgement batching, a message is considered as no longer inflight when it’s delivered to the acknowledgement queue. In this case, the actual number of inflight messages on AWS SQS console can be higher than the configured value. When using immediate acknowledgement, a message is considered as no longer inflight after it’s been acknowledged or throws an error.
maxMessagesPerPoll

Set in SqsContainerOptions or the @SqsListener annotation. Represents the maximum number of messages returned by a single poll to a SQS queue, to a maximum of 10. This value has to be less than or equal to maxConcurrentMessages. Defaults to 10.

Note that even if the queue has more messages, a poll can return less messages than specified. See the AWS documentation for more information.

pollTimeout

Can be set in either the SqsContainerOptions or the @SqsListener annotation. Represents the maximum duration of a poll. Higher values represent long polls and increase the probability of receiving full batches of messages. Defaults to 10 seconds.

maxDelayBetweenPolls

Set in SqsContainerOptions. Represents the maximum amount of time the container will wait for maxMessagesPerPoll permits to be available before trying to acquire a partial batch if so configured. This wait is applied per queue and one queue has no interference in another in this regard. Defaults to 10 seconds.

pollBackOffPolicy

Since 3.2 it’s possible to specify a BackOffPolicy which will be applied when a polling thread throws an exception. The default policy is an exponential back off with a delay of 1000ms, a 2.0 multiplier, and a 10000ms maximum delay. Note that in highly concurrent environments with many polling threads it may happen that a successful poll cancels the next scheduled backoff before it happens, and as such no back offs need to be executed.

Default Polling Behavior

By default, the framework starts all queues in low throughput mode, where it will perform one poll for messages at a time. When a poll returns at least one message, the queue enters a high throughput mode where it will try to fulfill maxConcurrentMessages messages by making (maxConcurrentMessages / maxMessagesPerPoll) parallel polls to the queue. Any poll that returns no messages will trigger a low throughput mode again, until at least one message is returned, triggering high throughput mode again, and so forth.

After maxDelayBetweenPolls, if maxMessagesPerPoll permits are not available, it’ll poll for the difference, i.e. as many messages as have been processed so far, if any.

E.g. Let’s consider a scenario where the container is configured for: maxConcurrentMessages = 20, maxMessagesPerPoll = 10, maxDelayBetweenPolls = 5 seconds, and a pollTimeout = 10 seconds.

The container starts in low throughput mode, meaning it’ll attempt a single poll for 10 messages. If any messages are returned, it’ll switch to high throughput mode, and will make up to 2 simultaneous polls for 10 messages each. If all 20 messages are retrieved, it’ll not attempt any more polls until messages are processed. If after the 5 seconds for maxDelayBetweenPolls 6 messages have been processed, the framework will poll for the 6 messages. If the queue is depleted and a poll returns no messages, it’ll enter low throughput mode again and perform only one poll at a time.

Configuring BackPressureMode

The following BackPressureMode values can be set in SqsContainerOptions to configure polling behavior:

  • AUTO - The default mode, as described in the previous section.

  • ALWAYS_POLL_MAX_MESSAGES - Disables partial batch polling, i.e. if the container is configured for 10 messages per poll, it’ll wait for 10 messages to be processed before attempting to poll for the next 10 messages. Useful for optimizing for fewer polls at the expense of throughput.

  • FIXED_HIGH_THROUGHPUT - Disables low throughput mode, while still attempting partial batch polling as described in the previous section. Useful for really high throughput scenarios where the risk of making parallel polls to an idle queue is preferable to an eventual switch to low throughput mode .

Note
The AUTO setting should be balanced for most use cases, including high throughput ones.

Blocking and Non-Blocking (Async) Components

The SQS integration leverages the CompletableFuture-based async capabilities of AWS SDK 2.0 to deliver a fully non-blocking infrastructure. All processing involved in polling for messages, changing message visibilities and acknowledging messages is done in an async, non-blocking fashion. This allows a higher overall throughput for the application.

When a MessageListener, MessageInterceptor, and ErrorHandler implementation is set to a MesssageListenerContainer or MesssageListenerContainerFactory these are adapted by the framework. This way, blocking and non-blocking components can be used in conjunction with each other.

Listener methods annotated with @SqsListener can either return a simple value, e.g. void, or a CompletableFuture<Void>. The listener method will then be wrapped in either a MessagingMessageListenerAdapter or a AsyncMessagingMessageListenerAdapter respectively.

Note
In order to achieve higher throughput, it’s encouraged that, at least for simpler logic in message listeners, interceptors and error handlers, the async variants are used.

Threading and Blocking Components

Message processing always starts in a framework thread from the default or provided TaskExecutor.

If an async component is invoked and the execution returns to the framework on a different thread, such thread will be used until a blocking component is found, when the execution switches back to a TaskExecutor thread to avoid blocking i.e. SqsAsyncClient or HttpClient threads.

If by the time the execution reaches a blocking component it’s already on a framework thread, it remains in the same thread to avoid excessive thread allocation and hopping.

Important
When using async methods it’s critical not to block the incoming thread, which might be very detrimental to overall performance. If thread-blocking logic has to be used, the blocking logic should be executed on another thread, e.g. using CompletableFuture.supplyAsync(() → myLogic(), myExecutor). Otherwise, a sync interface should be used.

Providing a TaskExecutor

The default TaskExecutor is a ThreadPoolTaskExecutor, and a different componentTaskExecutor supplier can be set in the SqsContainerOptions.

When providing a custom executor, it’s important that it’s configured to support all threads that will be created, which should be (maxConcurrentMessages * total number of queues).

Important
To avoid unnecessary thread hopping between blocking components, a MessageExecutionThreadFactory MUST be set to the executor.

Client Customization

SqsAsyncClient can be further customized by providing a bean of type SqsAsyncClientCustomizer:

@Bean
SqsAsyncClientCustomizer customizer() {
	return builder -> {
		builder.overrideConfiguration(builder.overrideConfiguration().copy(c -> {
			c.apiCallTimeout(Duration.ofMillis(1500));
		}));
	};
}
Warning

builder.overrideConfiguration(..) replaces the configuration object, so always make sure to use builder.overrideConfiguration().copy(c → ..) to configure only certain properties and keep the already pre-configured values for others.

SqsAsyncClientCustomizer is a functional interface that enables configuring SqsAsyncClientBuilder before the SqsAsyncClient is built in auto-configuration.

There can be multiple SqsAsyncClientCustomizer beans present in single application context. @Order(..) annotation can be used to define the order of the execution.

Note that SqsAsyncClientCustomizer beans are applied after AwsAsyncClientCustomizer beans and therefore can overwrite previously set configurations.

IAM Permissions

Following IAM permissions are required by Spring Cloud AWS SQS:

Send message to Queue

sqs:SendMessage

Receive message from queue

sqs:ReceiveMessage

Delete message from queue

sqs:DeleteMessage

To use sqsListener with SimpleMessageListenerContainerFactory you will need to add as well

sqs:GetQueueAttributes

To use SqsListener with Sqs name instead of ARN you will need

sqs:GetQueueUrl

Sample IAM policy granting access to SQS:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl"
            ],
            "Resource": "yourARN"
        }