Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ SqsTemplate.builder().configure(options -> options.acknowledgementMode(TemplateA
```

If an error occurs during acknowledgement, a `SqsAcknowledgementException` is thrown, containing both the messages that were successfully acknowledged and those which failed.
See <<sqs-acknowledgement-result-callback>> for details on inspecting partial failure results and the fail-safe correlation behavior.

To acknowledge messages received with `MANUAL` acknowledgement, the `Acknowledgement#acknowledge` and `Acknowledgement#acknowledgeAsync` methods can be used.

Expand Down Expand Up @@ -2018,9 +2019,10 @@ NOTE: PARALLEL is the default for FIFO because ordering is guaranteed for proces
This assures no messages from a given `MessageGroup` will be polled until the previous batch is acknowledged.
Implementations of this interface will be executed after an acknowledgement execution completes with either success or failure.

[[sqs-acknowledgement-result-callback]]
==== Acknowledgement Result Callback

The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`.
The framework offers the `AcknowledgementResultCallback` and `AsyncAcknowledgementResultCallback` interfaces that can be added to a `SqsMessageListenerContainer` or `SqsMessageListenerContainerFactory`.

```java
public interface AcknowledgementResultCallback<T> {
Expand Down Expand Up @@ -2048,6 +2050,21 @@ public interface AsyncAcknowledgementResultCallback<T> {
}
```

If an acknowledgement operation partially fails, for example when `DeleteMessageBatch` returns failed entries, the callback `onFailure` receives a `SqsAcknowledgementException`.
Use `getSuccessfullyAcknowledgedMessages()` and `getFailedAcknowledgementMessages()` to inspect the acknowledgement result and retry only failed messages if needed.

```java
@Override
public void onFailure(Collection<Message<Object>> messages, Throwable t) {
if (t instanceof SqsAcknowledgementException ex) {
Collection<Message<?>> failedMessages = ex.getFailedAcknowledgementMessages();
// retry only failedMessages
}
}
```

NOTE: If the failure IDs returned by AWS cannot be correlated with the original request IDs, a fail-safe is applied: `getSuccessfullyAcknowledgedMessages()` returns an empty collection and `getFailedAcknowledgementMessages()` returns all messages in the batch to prevent silent misclassification.

```java
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.awspring.cloud.sqs.listener.QueueAttributesAware;
import io.awspring.cloud.sqs.listener.SqsAsyncClientAware;
import io.awspring.cloud.sqs.listener.SqsHeaders;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
Expand All @@ -34,8 +36,10 @@
import org.springframework.util.Assert;
import org.springframework.util.StopWatch;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;

/**
* {@link AcknowledgementExecutor} implementation for SQS queues. Handle the messages deletion, usually requested by an
Expand Down Expand Up @@ -95,12 +99,61 @@ private CompletableFuture<Void> deleteMessages(Collection<Message<T>> messagesTo
StopWatch watch = new StopWatch();
watch.start();
return CompletableFutures.exceptionallyCompose(this.sqsAsyncClient
.deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck))
.thenRun(() -> {}),
t -> CompletableFutures.failedFuture(createAcknowledgementException(messagesToAck, t)))
.deleteMessageBatch(createDeleteMessageBatchRequest(messagesToAck)).thenCompose(
response -> handleDeleteMessageBatchResponse(messagesToAck, response)),
t -> toAcknowledgementFailure(messagesToAck, t))
.whenComplete((v, t) -> logAckResult(messagesToAck, t, watch));
}

private CompletableFuture<Void> handleDeleteMessageBatchResponse(Collection<Message<T>> messagesToAck,
DeleteMessageBatchResponse response) {
if (!response.failed().isEmpty()) {
return CompletableFutures.<Void>failedFuture(createPartialFailureException(messagesToAck, response));
}
return CompletableFuture.<Void>completedFuture(null);
}

private CompletableFuture<Void> toAcknowledgementFailure(Collection<Message<T>> messagesToAck, Throwable throwable) {
Throwable cause = throwable instanceof CompletionException && throwable.getCause() != null ? throwable.getCause()
: throwable;
if (cause instanceof SqsAcknowledgementException) {
return CompletableFutures.<Void>failedFuture(cause);
}
return CompletableFutures.<Void>failedFuture(createAcknowledgementException(messagesToAck, cause));
}

private SqsAcknowledgementException createPartialFailureException(Collection<Message<T>> messages,
DeleteMessageBatchResponse response) {
Set<String> messageIds = messages.stream().map(MessageHeaderUtils::getId).collect(Collectors.toSet());
Set<String> failedIds = response.failed().stream()
.map(BatchResultErrorEntry::id)
.collect(Collectors.toSet());

if (!messageIds.containsAll(failedIds)) {
logger.warn("Could not correlate all acknowledgement failure ids in queue {}: {}", this.queueName,
failedIds);
return new SqsAcknowledgementException("Could not correlate acknowledgement failure ids: " + failedIds,
Collections.emptyList(), messages.stream().map(msg -> (Message<?>) msg).collect(Collectors.toList()),
this.queueUrl, null);
}

List<Message<?>> successfulMessages = new ArrayList<>();
List<Message<?>> failedMessages = new ArrayList<>();

for(Message<T> msg : messages) {
if(failedIds.contains(MessageHeaderUtils.getId(msg))) {
failedMessages.add(msg);
} else {
successfulMessages.add(msg);
}
}

logger.warn("Some messages could not be acknowledged in queue {}: {}", this.queueName, failedIds);

return new SqsAcknowledgementException("Error acknowledging messages " + failedIds, successfulMessages,
failedMessages, this.queueUrl, null);
}

private DeleteMessageBatchRequest createDeleteMessageBatchRequest(Collection<Message<T>> messagesToAck) {
return DeleteMessageBatchRequest
.builder()
Expand All @@ -113,7 +166,7 @@ private DeleteMessageBatchRequestEntry toDeleteMessageEntry(Message<T> message)
return DeleteMessageBatchRequestEntry
.builder()
.receiptHandle(MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER))
.id(UUID.randomUUID().toString())
.id(MessageHeaderUtils.getId(message))
.build();
}
// @formatter:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.awspring.cloud.sqs.listener.SqsHeaders;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.junit.jupiter.api.Test;
Expand All @@ -38,8 +39,10 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;

/**
* Tests for {@link SqsAcknowledgementExecutor}.
Expand All @@ -58,23 +61,31 @@ class SqsAcknowledgementExecutorTests {
@Mock
Message<String> message;

@Mock
Message<String> secondMessage;

String queueName = "sqsAcknowledgementExecutorTestsQueueName";

String queueUrl = "sqsAcknowledgementExecutorTestsQueueUrl";

String receiptHandle = "sqsAcknowledgementExecutorTestsQueueReceiptHandle";

String secondReceiptHandle = "sqsAcknowledgementExecutorTestsQueueSecondReceiptHandle";

MessageHeaders messageHeaders = new MessageHeaders(
Collections.singletonMap(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, receiptHandle));

MessageHeaders secondMessageHeaders = new MessageHeaders(
Collections.singletonMap(SqsHeaders.SQS_RECEIPT_HANDLE_HEADER, secondReceiptHandle));

@Test
void shouldDeleteMessages() throws Exception {
Collection<Message<String>> messages = Collections.singletonList(message);
given(message.getHeaders()).willReturn(messageHeaders);
given(queueAttributes.getQueueName()).willReturn(queueName);
given(queueAttributes.getQueueUrl()).willReturn(queueUrl);
given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class)))
.willReturn(CompletableFuture.completedFuture(null));
.willReturn(CompletableFuture.completedFuture(DeleteMessageBatchResponse.builder().build()));

SqsAcknowledgementExecutor<String> executor = new SqsAcknowledgementExecutor<>();
executor.setSqsAsyncClient(sqsAsyncClient);
Expand Down Expand Up @@ -127,4 +138,68 @@ void shouldWrapIfErrorIsThrown() {
.extracting(SqsAcknowledgementException::getQueue).isEqualTo(queueUrl);
}

@Test
void shouldWrapPartialBatchFailure() {
Message<String> failedMessage = message;
Message<String> successfulMessage = secondMessage;
MessageHeaders failedMessageHeaders = messageHeaders;
MessageHeaders successfulMessageHeaders = secondMessageHeaders;
Collection<Message<String>> messagesToAck = List.of(failedMessage, successfulMessage);

given(failedMessage.getHeaders()).willReturn(failedMessageHeaders);
given(successfulMessage.getHeaders()).willReturn(successfulMessageHeaders);
given(queueAttributes.getQueueName()).willReturn(queueName);
given(queueAttributes.getQueueUrl()).willReturn(queueUrl);

BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id(failedMessageHeaders.getId().toString())
.code("ReceiptHandleIsInvalid").message("Receipt handle expired").build();

DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry)
.build();

given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class)))
.willReturn(CompletableFuture.completedFuture(partialFailureResponse));

SqsAcknowledgementExecutor<String> executor = new SqsAcknowledgementExecutor<>();
executor.setSqsAsyncClient(sqsAsyncClient);
executor.setQueueAttributes(queueAttributes);

assertThatThrownBy(() -> executor.execute(messagesToAck).join()).isInstanceOf(CompletionException.class)
.getCause().isInstanceOf(SqsAcknowledgementException.class)
.asInstanceOf(type(SqsAcknowledgementException.class)).satisfies(ex -> {
assertThat(ex.getFailedAcknowledgementMessages()).containsExactly(failedMessage);
assertThat(ex.getSuccessfullyAcknowledgedMessages()).containsExactly(successfulMessage);
});
}

@Test
void shouldTreatAllMessagesAsFailedIfAwsFailureIdCannotBeCorrelated() {
Collection<Message<String>> messagesToAck = List.of(message, secondMessage);

given(message.getHeaders()).willReturn(messageHeaders);
given(secondMessage.getHeaders()).willReturn(secondMessageHeaders);
given(queueAttributes.getQueueName()).willReturn(queueName);
given(queueAttributes.getQueueUrl()).willReturn(queueUrl);

BatchResultErrorEntry failedEntry = BatchResultErrorEntry.builder().id("unknown-id")
.code("ReceiptHandleIsInvalid").message("Receipt handle expired").build();

DeleteMessageBatchResponse partialFailureResponse = DeleteMessageBatchResponse.builder().failed(failedEntry)
.build();

given(sqsAsyncClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class)))
.willReturn(CompletableFuture.completedFuture(partialFailureResponse));

SqsAcknowledgementExecutor<String> executor = new SqsAcknowledgementExecutor<>();
executor.setSqsAsyncClient(sqsAsyncClient);
executor.setQueueAttributes(queueAttributes);

assertThatThrownBy(() -> executor.execute(messagesToAck).join()).isInstanceOf(CompletionException.class)
.getCause().isInstanceOf(SqsAcknowledgementException.class)
.asInstanceOf(type(SqsAcknowledgementException.class)).satisfies(ex -> {
assertThat(ex.getSuccessfullyAcknowledgedMessages()).isEmpty();
assertThat(ex.getFailedAcknowledgementMessages()).containsExactlyInAnyOrder(message, secondMessage);
});
}

}