diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 2bc3fbb6d..720ebaf8a 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -2024,7 +2024,8 @@ Implementations of this interface will be executed after an acknowledgement exec The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`. -```java +[source, java] +---- public interface AcknowledgementResultCallback { default void onSuccess(Collection> messages) { @@ -2034,9 +2035,10 @@ public interface AcknowledgementResultCallback { } } -``` +---- -```java +[source, java] +---- public interface AsyncAcknowledgementResultCallback { default CompletableFuture onSuccess(Collection> messages) { @@ -2048,38 +2050,83 @@ public interface AsyncAcknowledgementResultCallback { } } -``` +---- -If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`. -Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed. +Since 4.1.0, the auto-configured factory automatically injects any `AcknowledgementResultCallback` or `AsyncAcknowledgementResultCallback` beans declared in the application context: -```java -@Override -public void onFailure(Collection> messages, Throwable t) { - if (t instanceof SqsAcknowledgementException ex) { - Collection> failedMessages = ex.getFailedAcknowledgementMessages(); - // retry only failedMessages - } +[source, java] +---- +@Bean +public AcknowledgementResultCallback acknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + @Override + public void onSuccess(Collection> messages) { + // Custom logic after successful acknowledgement + } + + @Override + public void onFailure(Collection> messages, Throwable t) { + // Custom logic after acknowledgement failure + } + }; } -``` +---- -NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification. +Or using the async variant: -```java +[source, java] +---- +@Bean +public AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback() { + return new AsyncAcknowledgementResultCallback<>() { + @Override + public CompletableFuture onSuccess(Collection> messages) { + // Custom async logic after successful acknowledgement + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture onFailure(Collection> messages, Throwable t) { + // Custom async logic after acknowledgement failure + return CompletableFuture.completedFuture(null); + } + }; +} +---- + +Alternatively, implementations can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`: + +[source, java] +---- @Bean public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) { return SqsMessageListenerContainerFactory .builder() .sqsAsyncClient(sqsAsyncClient) - .acknowledgementResultCallback(getAcknowledgementResultCallback()) + .acknowledgementResultCallback(acknowledgementResultCallback()) .build(); } -``` +---- NOTE: When `immediate acknowledgement` is set, as is the default for `FIFO` queues, the callback will be executed **before** the next message in the batch is processed, and next message processing will wait for the callback completion. This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue. For `batch parallel processing`, as is the default for `Standard` queues the callback execution happens asynchronously. +If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`. +Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed. + +[source, java] +---- +@Override +public void onFailure(Collection> messages, Throwable t) { + if (t instanceof SqsAcknowledgementException ex) { + Collection> failedMessages = ex.getFailedAcknowledgementMessages(); + // retry only failedMessages + } +} +---- + +NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification. === Global Configuration for @SqsListeners diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java index c6c89301e..98d713804 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java @@ -25,6 +25,8 @@ import io.awspring.cloud.sqs.config.SqsListenerConfigurer; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; @@ -123,6 +125,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac ObjectProvider observationRegistry, ObjectProvider observationConventionProvider, ObjectProvider> interceptors, + ObjectProvider> acknowledgementResultCallback, + ObjectProvider> asyncAcknowledgementResultCallback, ObjectProvider messageConverterFactory, MessagingMessageConverter messagingMessageConverter) { @@ -133,6 +137,8 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac errorHandler.ifAvailable(factory::setErrorHandler); interceptors.forEach(factory::addMessageInterceptor); asyncInterceptors.forEach(factory::addMessageInterceptor); + acknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback); + asyncAcknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback); messageConverterFactory.ifAvailable(mcf -> mcf.configureLegacyObjectMapper(messagingMessageConverter)); if (this.sqsProperties.isObservationEnabled()) { observationRegistry diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index f4ef3ffcf..45f9fcce3 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -28,6 +28,8 @@ import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; @@ -206,6 +208,8 @@ void configuresFactoryComponentsAndOptionsWithDefaults() { var factory = context.getBean(SqsMessageListenerContainerFactory.class); assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList() .isEmpty(); + assertThat(factory).extracting("acknowledgementResultCallback").isNull(); + assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull(); assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class)) .extracting(ContainerOptionsBuilder::build) .isInstanceOfSatisfying(ContainerOptions.class, options -> { @@ -221,6 +225,36 @@ void configuresFactoryComponentsAndOptionsWithDefaults() { } // @formatter:on + @Test + void configuresFactoryWithBlockingAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("acknowledgementResultCallback") + .isEqualTo(context.getBean(AcknowledgementResultCallback.class)); + }); + } + + @Test + void configuresFactoryWithAsyncAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("asyncAcknowledgementResultCallback") + .isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class)); + }); + } + @Test void configuresMessageConverter() { this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") @@ -344,4 +378,26 @@ JsonMapper jsonMapper() { } + @Configuration(proxyBeanMethods = false) + static class BlockingAcknowledgementCallbackConfiguration { + + @Bean + AcknowledgementResultCallback acknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + }; + } + + } + + @Configuration(proxyBeanMethods = false) + static class AsyncAcknowledgementCallbackConfiguration { + + @Bean + AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback() { + return new AsyncAcknowledgementResultCallback<>() { + }; + } + + } + } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java index 22336aede..4a42bde13 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/jackson2/LegacySqsAutoConfigurationTest.java @@ -33,6 +33,8 @@ import io.awspring.cloud.sqs.listener.ContainerOptions; import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; import io.awspring.cloud.sqs.operations.SqsTemplate; @@ -224,6 +226,8 @@ void configuresFactoryComponentsAndOptions() { .getBean(SqsMessageListenerContainerFactory.class); assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors") .asList().isNotEmpty(); + assertThat(factory).extracting("acknowledgementResultCallback").isNull(); + assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull(); assertThat(factory).extracting("containerOptionsBuilder") .asInstanceOf(type(ContainerOptionsBuilder.class)) .extracting(ContainerOptionsBuilder::build) @@ -245,6 +249,36 @@ void configuresFactoryComponentsAndOptions() { }); } + @Test + void configuresFactoryWithBlockingAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("acknowledgementResultCallback") + .isEqualTo(context.getBean(AcknowledgementResultCallback.class)); + }); + } + + @Test + void configuresFactoryWithAsyncAcknowledgementCallback() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") + .withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> { + assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class); + assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class); + + SqsMessageListenerContainerFactory factory = context + .getBean(SqsMessageListenerContainerFactory.class); + + assertThat(factory).extracting("asyncAcknowledgementResultCallback") + .isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class)); + }); + } + @Test void configuresObjectMapper() { this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true") @@ -364,4 +398,25 @@ MessagingMessageConverter messageConverter() { } + @Configuration(proxyBeanMethods = false) + static class BlockingAcknowledgementCallbackConfiguration { + + @Bean + AcknowledgementResultCallback acknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + }; + } + + } + + @Configuration(proxyBeanMethods = false) + static class AsyncAcknowledgementCallbackConfiguration { + + @Bean + AsyncAcknowledgementResultCallback asyncAcknowledgementResultCallback() { + return new AsyncAcknowledgementResultCallback<>() { + }; + } + + } }