From c939b3e4c895e26db3bef2d0d04ab0d16f10fdb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vin=C3=ADcius=20Souza?= Date: Fri, 20 Feb 2026 09:50:52 -0300 Subject: [PATCH 1/3] add async/sync acknowledgement result callback in sqs auto configuration --- .../sqs/SqsAutoConfiguration.java | 6 ++ .../sqs/SqsAutoConfigurationTest.java | 56 +++++++++++++++++++ .../LegacySqsAutoConfigurationTest.java | 55 ++++++++++++++++++ 3 files changed, 117 insertions(+) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index c6c89301e..98d713804 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -25,6 +25,8 @@ import io.awspring.cloud.sqs.config.SqsListenerConfigurer; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; @@ -123,6 +125,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider observationRegistry, ObjectProvider observationConventionProvider, ObjectProvider> interceptors, + ObjectProvider> acknowledgementResultCallback, + ObjectProvider> asyncAcknowledgementResultCallback, ObjectProvider messageConverterFactory, MessagingMessageConverter messagingMessageConverter) { @@ -133,6 +137,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); + acknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback); + asyncAcknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback); messageConverterFactory.ifAvailable(mcf -> mcf.configureLegacyObjectMapper(messagingMessageConverter)); if (this.sqsProperties.isObservationEnabled()) { observationRegistry diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index f4ef3ffcf..45f9fcce3 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -28,6 +28,8 @@ import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; @@ -206,6 +208,8 @@ void configuresFactoryComponentsAndOptionsWithDefaults() { var factory = context.getBean(SqsMessageListenerContainerFactory.class); assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList() .isEmpty(); + assertThat(factory).extracting("acknowledgementResultCallback").isNull(); + assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull(); assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class)) .extracting(ContainerOptionsBuilder::build) .isInstanceOfSatisfying(ContainerOptions.class, options -> { @@ -221,6 +225,36 @@ void configuresFactoryComponentsAndOptionsWithDefaults() { } // @formatter:on + @Test + void configuresFactoryWithBlockingAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("acknowledgementResultCallback") + .isEqualTo(context.getBean(AcknowledgementResultCallback.class)); + }); + } + + @Test + void configuresFactoryWithAsyncAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("asyncAcknowledgementResultCallback") + .isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class)); + }); + } + @Test void configuresMessageConverter() { this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") @@ -344,4 +378,26 @@ JsonMapper jsonMapper() { } + @Configuration(proxyBeanMethods = false) + static class BlockingAcknowledgementCallbackConfiguration { + + @Bean + AcknowledgementResultCallback acknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + }; + } + + } + + @Configuration(proxyBeanMethods = false) + static class AsyncAcknowledgementCallbackConfiguration { + + @Bean + AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback() { + return new AsyncAcknowledgementResultCallback<>() { + }; + } + + } + } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java index 22336aede..4a42bde13 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java @@ -33,6 +33,8 @@ import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; @@ -224,6 +226,8 @@ void configuresFactoryComponentsAndOptions() { .getBean(SqsMessageListenerContainerFactory.class); assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors") .asList().isNotEmpty(); + assertThat(factory).extracting("acknowledgementResultCallback").isNull(); + assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull(); assertThat(factory).extracting("containerOptionsBuilder") .asInstanceOf(type(ContainerOptionsBuilder.class)) .extracting(ContainerOptionsBuilder::build) @@ -245,6 +249,36 @@ void configuresFactoryComponentsAndOptions() { }); } + @Test + void configuresFactoryWithBlockingAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("acknowledgementResultCallback") + .isEqualTo(context.getBean(AcknowledgementResultCallback.class)); + }); + } + + @Test + void configuresFactoryWithAsyncAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("asyncAcknowledgementResultCallback") + .isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class)); + }); + } + @Test void configuresObjectMapper() { this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") @@ -364,4 +398,25 @@ MessagingMessageConverter messageConverter() { } + @Configuration(proxyBeanMethods = false) + static class BlockingAcknowledgementCallbackConfiguration { + + @Bean + AcknowledgementResultCallback acknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + }; + } + + } + + @Configuration(proxyBeanMethods = false) + static class AsyncAcknowledgementCallbackConfiguration { + + @Bean + AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback() { + return new AsyncAcknowledgementResultCallback<>() { + }; + } + + } } From 3f8d93b7fccf3f0132c28fd8af8bce29259d6f46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vin=C3=ADcius=20Rodrigues?= Date: Sat, 28 Feb 2026 19:21:27 -0300 Subject: [PATCH 2/3] docs(sqs): document auto-injection of AcknowledgementResultCallback --- docs/src/main/asciidoc/sqs.adoc | 63 ++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index bf120b162..1f29c0805 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -2020,9 +2020,10 @@ Implementations of this interface will be executed after an acknowledgement exec ==== Acknowledgement Result Callback -The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. +The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. -```java +[source, java] +---- public interface AcknowledgementResultCallback { default void onSuccess(Collection> messages) { @@ -2032,9 +2033,10 @@ public interface AcknowledgementResultCallback { } } -``` +---- -```java +[source, java] +---- public interface AsyncAcknowledgementResultCallback { default CompletableFuture onSuccess(Collection> messages) { @@ -2046,18 +2048,63 @@ public interface AsyncAcknowledgementResultCallback { } } -``` +---- -```java +The auto-configured factory automatically injects any `AcknowledgementResultCallback` or `AsyncAcknowledgementResultCallback` beans declared in the application context: + +[source, java] +---- +@Bean +public AcknowledgementResultCallback acknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + @Override + public void onSuccess(Collection> messages) { + // Custom logic after successful acknowledgement + } + + @Override + public void onFailure(Collection> messages, Throwable t) { + // Custom logic after acknowledgement failure + } + }; +} +---- + +Or using the async variant: + +[source, java] +---- +@Bean +public AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback() { + return new AsyncAcknowledgementResultCallback<>() { + @Override + public CompletableFuture onSuccess(Collection> messages) { + // Custom async logic after successful acknowledgement + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onFailure(Collection> messages, Throwable t) { + // Custom async logic after acknowledgement failure + return CompletableFuture.completedFuture(null); + } + }; +} +---- + +Alternatively, implementations can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: + +[source, java] +---- @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClient(sqsAsyncClient) - .acknowledgementResultCallback(getAcknowledgementResultCallback()) + .acknowledgementResultCallback(acknowledgementResultCallback()) .build(); } -``` +---- NOTE: When `immediate acknowledgement` is set, as is the default for `FIFO` queues, the callback will be executed **before** the next message in the batch is processed, and next message processing will wait for the callback completion. This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue. From 28f2712c7d61c0b71bca25deabfcf145a46e5ca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vin=C3=ADcius=20Rodrigues?= Date: Sun, 15 Mar 2026 13:18:53 -0300 Subject: [PATCH 3/3] chore: Add since 4.1.0 tag to declare the start of the new feature. --- docs/src/main/asciidoc/sqs.adoc | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 1f29c0805..8e8d8b727 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -2050,7 +2050,7 @@ public interface AsyncAcknowledgementResultCallback { } ---- -The auto-configured factory automatically injects any `AcknowledgementResultCallback` or `AsyncAcknowledgementResultCallback` beans declared in the application context: +Since 4.1.0, the auto-configured factory automatically injects any `AcknowledgementResultCallback` or `AsyncAcknowledgementResultCallback` beans declared in the application context: [source, java] ---- @@ -2110,6 +2110,21 @@ NOTE: When `immediate acknowledgement` is set, as is the default for `FIFO` queu This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue. For `batch parallel processing`, as is the default for `Standard` queues the callback execution happens asynchronously. +If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`. +Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed. + +[source, java] +---- +@Override +public void onFailure(Collection> messages, Throwable t) { + if (t instanceof SqsAcknowledgementException ex) { + Collection> failedMessages = ex.getFailedAcknowledgementMessages(); + // retry only failedMessages + } +} +---- + +NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification. === Global Configuration for @SqsListeners