Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,8 @@ Implementations of this interface will be executed after an acknowledgement exec

The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`.

```java
[source, java]
----
public interface AcknowledgementResultCallback<T> {

default void onSuccess(Collection<Message<T>> messages) {
Expand All @@ -2034,9 +2035,10 @@ public interface AcknowledgementResultCallback<T> {
}

}
```
----

```java
[source, java]
----
public interface AsyncAcknowledgementResultCallback<T> {

default CompletableFuture<Void> onSuccess(Collection<Message<T>> messages) {
Expand All @@ -2048,38 +2050,83 @@ public interface AsyncAcknowledgementResultCallback<T> {
}

}
```
----

If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`.
Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed.
Since 4.1.0, the auto-configured factory automatically injects any `AcknowledgementResultCallback` or `AsyncAcknowledgementResultCallback` beans declared in the application context:

```java
@Override
public void onFailure(Collection<Message<Object>> messages, Throwable t) {
if (t instanceof SqsAcknowledgementException ex) {
Collection<Message<?>> failedMessages = ex.getFailedAcknowledgementMessages();
// retry only failedMessages
}
[source, java]
----
@Bean
public AcknowledgementResultCallback<Object> acknowledgementResultCallback() {
return new AcknowledgementResultCallback<>() {
@Override
public void onSuccess(Collection<Message<Object>> messages) {
// Custom logic after successful acknowledgement
}

@Override
public void onFailure(Collection<Message<Object>> messages, Throwable t) {
// Custom logic after acknowledgement failure
}
};
}
```
----

NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification.
Or using the async variant:

```java
[source, java]
----
@Bean
public AsyncAcknowledgementResultCallback<Object> asyncAcknowledgementResultCallback() {
return new AsyncAcknowledgementResultCallback<>() {
@Override
public CompletableFuture<Void> onSuccess(Collection<Message<Object>> messages) {
// Custom async logic after successful acknowledgement
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> onFailure(Collection<Message<Object>> messages, Throwable t) {
// Custom async logic after acknowledgement failure
return CompletableFuture.completedFuture(null);
}
};
}
----

Alternatively, implementations can be set in the `MessageListenerContainerFactory` or directly in the `MessageListenerContainer`:

[source, java]
----
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
return SqsMessageListenerContainerFactory
.builder()
.sqsAsyncClient(sqsAsyncClient)
.acknowledgementResultCallback(getAcknowledgementResultCallback())
.acknowledgementResultCallback(acknowledgementResultCallback())
.build();
}
```
----

NOTE: When `immediate acknowledgement` is set, as is the default for `FIFO` queues, the callback will be executed **before** the next message in the batch is processed, and next message processing will wait for the callback completion.
This can be useful for taking action such as retrying to delete the messages, or stopping the container to prevent duplicate processing in case an acknowledgement fails in a FIFO queue.
For `batch parallel processing`, as is the default for `Standard` queues the callback execution happens asynchronously.

If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`.
Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed.

[source, java]
----
@Override
public void onFailure(Collection<Message<Object>> messages, Throwable t) {
if (t instanceof SqsAcknowledgementException ex) {
Collection<Message<?>> failedMessages = ex.getFailedAcknowledgementMessages();
// retry only failedMessages
}
}
----

NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification.

=== Global Configuration for @SqsListeners

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.SqsContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
Expand Down Expand Up @@ -123,6 +125,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<ObservationRegistry> observationRegistry,
ObjectProvider<SqsListenerObservation.Convention> observationConventionProvider,
ObjectProvider<MessageInterceptor<Object>> interceptors,
ObjectProvider<AcknowledgementResultCallback<Object>> acknowledgementResultCallback,
ObjectProvider<AsyncAcknowledgementResultCallback<Object>> asyncAcknowledgementResultCallback,
ObjectProvider<JacksonMessageConverterMigration> messageConverterFactory,
MessagingMessageConverter<?> messagingMessageConverter) {

Expand All @@ -133,6 +137,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
errorHandler.ifAvailable(factory::setErrorHandler);
interceptors.forEach(factory::addMessageInterceptor);
asyncInterceptors.forEach(factory::addMessageInterceptor);
acknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback);
asyncAcknowledgementResultCallback.ifAvailable(factory::setAcknowledgementResultCallback);
messageConverterFactory.ifAvailable(mcf -> mcf.configureLegacyObjectMapper(messagingMessageConverter));
if (this.sqsProperties.isObservationEnabled()) {
observationRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
Expand Down Expand Up @@ -206,6 +208,8 @@ void configuresFactoryComponentsAndOptionsWithDefaults() {
var factory = context.getBean(SqsMessageListenerContainerFactory.class);
assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors").asList()
.isEmpty();
assertThat(factory).extracting("acknowledgementResultCallback").isNull();
assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull();
assertThat(factory).extracting("containerOptionsBuilder").asInstanceOf(type(ContainerOptionsBuilder.class))
.extracting(ContainerOptionsBuilder::build)
.isInstanceOfSatisfying(ContainerOptions.class, options -> {
Expand All @@ -221,6 +225,36 @@ void configuresFactoryComponentsAndOptionsWithDefaults() {
}
// @formatter:on

@Test
void configuresFactoryWithBlockingAcknowledgementCallback() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> {
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
assertThat(context).hasSingleBean(AcknowledgementResultCallback.class);

SqsMessageListenerContainerFactory<?> factory = context
.getBean(SqsMessageListenerContainerFactory.class);

assertThat(factory).extracting("acknowledgementResultCallback")
.isEqualTo(context.getBean(AcknowledgementResultCallback.class));
});
}

@Test
void configuresFactoryWithAsyncAcknowledgementCallback() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> {
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class);

SqsMessageListenerContainerFactory<?> factory = context
.getBean(SqsMessageListenerContainerFactory.class);

assertThat(factory).extracting("asyncAcknowledgementResultCallback")
.isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class));
});
}

@Test
void configuresMessageConverter() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
Expand Down Expand Up @@ -344,4 +378,26 @@ JsonMapper jsonMapper() {

}

@Configuration(proxyBeanMethods = false)
static class BlockingAcknowledgementCallbackConfiguration {

@Bean
AcknowledgementResultCallback<Object> acknowledgementResultCallback() {
return new AcknowledgementResultCallback<>() {
};
}

}

@Configuration(proxyBeanMethods = false)
static class AsyncAcknowledgementCallbackConfiguration {

@Bean
AsyncAcknowledgementResultCallback<Object> asyncAcknowledgementResultCallback() {
return new AsyncAcknowledgementResultCallback<>() {
};
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.operations.SqsTemplate;
Expand Down Expand Up @@ -224,6 +226,8 @@ void configuresFactoryComponentsAndOptions() {
.getBean(SqsMessageListenerContainerFactory.class);
assertThat(factory).hasFieldOrProperty("errorHandler").extracting("asyncMessageInterceptors")
.asList().isNotEmpty();
assertThat(factory).extracting("acknowledgementResultCallback").isNull();
assertThat(factory).extracting("asyncAcknowledgementResultCallback").isNull();
assertThat(factory).extracting("containerOptionsBuilder")
.asInstanceOf(type(ContainerOptionsBuilder.class))
.extracting(ContainerOptionsBuilder::build)
Expand All @@ -245,6 +249,36 @@ void configuresFactoryComponentsAndOptions() {
});
}

@Test
void configuresFactoryWithBlockingAcknowledgementCallback() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(BlockingAcknowledgementCallbackConfiguration.class).run(context -> {
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
assertThat(context).hasSingleBean(AcknowledgementResultCallback.class);

SqsMessageListenerContainerFactory<?> factory = context
.getBean(SqsMessageListenerContainerFactory.class);

assertThat(factory).extracting("acknowledgementResultCallback")
.isEqualTo(context.getBean(AcknowledgementResultCallback.class));
});
}

@Test
void configuresFactoryWithAsyncAcknowledgementCallback() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
.withUserConfiguration(AsyncAcknowledgementCallbackConfiguration.class).run(context -> {
assertThat(context).hasSingleBean(SqsMessageListenerContainerFactory.class);
assertThat(context).hasSingleBean(AsyncAcknowledgementResultCallback.class);

SqsMessageListenerContainerFactory<?> factory = context
.getBean(SqsMessageListenerContainerFactory.class);

assertThat(factory).extracting("asyncAcknowledgementResultCallback")
.isEqualTo(context.getBean(AsyncAcknowledgementResultCallback.class));
});
}

@Test
void configuresObjectMapper() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.enabled:true")
Expand Down Expand Up @@ -364,4 +398,25 @@ MessagingMessageConverter<Message> messageConverter() {

}

@Configuration(proxyBeanMethods = false)
static class BlockingAcknowledgementCallbackConfiguration {

@Bean
AcknowledgementResultCallback<Object> acknowledgementResultCallback() {
return new AcknowledgementResultCallback<>() {
};
}

}

@Configuration(proxyBeanMethods = false)
static class AsyncAcknowledgementCallbackConfiguration {

@Bean
AsyncAcknowledgementResultCallback<Object> asyncAcknowledgementResultCallback() {
return new AsyncAcknowledgementResultCallback<>() {
};
}

}
}