Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Spring Cloud AWS has dedicated support to transfer Java objects with Amazon SQS

NOTE: Since 4.1.0 Spring Cloud AWS SQS also supports SQS-compatible brokers such as Yandex Message Queue. See <<sqs-compatible-brokers-support>>.

NOTE: Since 4.1.0, the framework supports <<virtual-threads, virtual threads>> (JDK 21+) for message listener containers.

A Spring Boot starter is provided to auto-configure SQS integration beans.
Maven coordinates, using <<index.adoc#bill-of-materials, Spring Cloud AWS BOM>>:

Expand Down Expand Up @@ -2333,6 +2335,8 @@ NOTE: In order to achieve higher throughput, it's encouraged that, at least for

==== Threading and Blocking Components

NOTE: See <<virtual-threads>> for virtual thread support.

Message processing always starts in a framework thread from the default or provided `TaskExecutor`.

If an async component is invoked and the execution returns to the framework on a different thread, such thread will be used until a `blocking` component is found, when the execution switches back to a `TaskExecutor` thread to avoid blocking i.e. `SqsAsyncClient` or `HttpClient` threads.
Expand All @@ -2349,8 +2353,34 @@ The default `TaskExecutor` is a `ThreadPoolTaskExecutor`, and a different `compo

When providing a custom executor, it's important that it's configured to support all threads that will be created, which should be (maxConcurrentMessages * total number of queues).

IMPORTANT: To avoid unnecessary thread hopping between blocking components, a `MessageExecutionThreadFactory` MUST be set to the executor.
Since 4.1.0, the framework supports <<virtual-threads, virtual threads>>. If platform threads are used, a `MessageExecutionThreadFactory` MUST be set to the executor to avoid unnecessary thread hopping between blocking components.

[[virtual-threads]]
==== Virtual Threads Support

Since 4.1.0, the framework supports virtual threads (JDK 21+). To use virtual threads, provide a virtual thread executor as the `componentsTaskExecutor`:

[source,java]
----
@Bean
SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setVirtualThreads(true);
SqsMessageListenerContainerFactory<Object> factory = new SqsMessageListenerContainerFactory<>();
factory.configure(options -> options
.componentsTaskExecutor(executor)
.maxConcurrentMessages(5000));
factory.setSqsAsyncClient(sqsAsyncClient);
return factory;
}
----

When virtual threads are detected, the framework automatically:

* Skips the `MessageExecutionThreadFactory` requirement
* Detects when a `CompletableFuture` completes on a platform thread and hops back to a virtual thread before continuing execution

Virtual threads allow higher `maxConcurrentMessages` values compared to platform threads, since each virtual thread has minimal memory overhead. This is particularly beneficial for I/O-bound workloads where listeners spend time waiting on databases, HTTP calls, or other services.

=== Client Customization

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs;

import java.lang.reflect.Method;
import org.jspecify.annotations.Nullable;

/**
* Utility class for virtual thread detection. Uses reflection to access {@code Thread.isVirtual()} (JDK 21+) while
* maintaining compatibility with JDK 17.
*
* @author Tomaz Fernandes
* @since 4.1.0
* @see MessageExecutionThread
*/
public class VirtualThreadUtils {

@Nullable
private static final Method IS_VIRTUAL;

static {
Method m = null;
try {
m = Thread.class.getMethod("isVirtual");
}
catch (NoSuchMethodException e) {
// JDK < 21
}
IS_VIRTUAL = m;
}

private VirtualThreadUtils() {
}

/**
* Check whether the given thread is a virtual thread.
* @param thread the thread to check.
* @return {@code true} if the thread is virtual, {@code false} if not or if running on JDK &lt; 21.
*/
public static boolean isVirtual(Thread thread) {
if (IS_VIRTUAL == null) {
return false;
}
try {
return (boolean) IS_VIRTUAL.invoke(thread);
}
catch (Exception e) {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.awspring.cloud.sqs.MessageExecutionThread;
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
import io.awspring.cloud.sqs.UnsupportedThreadFactoryException;
import io.awspring.cloud.sqs.VirtualThreadUtils;
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandler;
import io.awspring.cloud.sqs.listener.backpressure.BackPressureHandlerFactory;
import io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage;
Expand Down Expand Up @@ -145,10 +146,14 @@ private void configureComponents(ContainerComponentFactory<T, O> componentFactor
}

private void verifyThreadType() {
if (!MessageExecutionThread.class.isAssignableFrom(Thread.currentThread().getClass())) {
throw new UnsupportedThreadFactoryException("Custom TaskExecutors must use a %s."
.formatted(MessageExecutionThreadFactory.class.getSimpleName()));
if (Thread.currentThread() instanceof MessageExecutionThread) {
return;
}
if (VirtualThreadUtils.isVirtual(Thread.currentThread())) {
return;
}
throw new UnsupportedThreadFactoryException("Custom TaskExecutors must use a %s or virtual threads."
.formatted(MessageExecutionThreadFactory.class.getSimpleName()));
}
// @formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageExecutionThread;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.VirtualThreadUtils;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
Expand Down Expand Up @@ -112,25 +113,30 @@ public void setTaskExecutor(TaskExecutor taskExecutor) {
}

protected <T> CompletableFuture<T> execute(Supplier<T> executable) {
if (Thread.currentThread() instanceof MessageExecutionThread) {
logger.trace("Already in a {}, not switching", MessageExecutionThread.class.getSimpleName());
if (isOnExpectedThread()) {
logger.trace("Already on expected thread, not switching");
return supplyInSameThread(executable);
}
logger.trace("Not in a {}, submitting to executor", MessageExecutionThread.class.getSimpleName());
logger.trace("Not on expected thread, submitting to executor");
Assert.notNull(this.taskExecutor, "Task executor not set");
return supplyInNewThread(executable);
}

protected CompletableFuture<Void> execute(Runnable executable) {
if (Thread.currentThread() instanceof MessageExecutionThread) {
logger.trace("Already in a {}, not switching", MessageExecutionThread.class.getSimpleName());
if (isOnExpectedThread()) {
logger.trace("Already on expected thread, not switching");
return runInSameThread(executable);
}
logger.trace("Not in a {}, submitting to executor", MessageExecutionThread.class.getSimpleName());
logger.trace("Not on expected thread, submitting to executor");
Assert.notNull(this.taskExecutor, "Task executor not set");
return runInNewThread(executable);
}

private boolean isOnExpectedThread() {
return Thread.currentThread() instanceof MessageExecutionThread
|| VirtualThreadUtils.isVirtual(Thread.currentThread());
}

private CompletableFuture<Void> runInSameThread(Runnable blockingProcess) {
try {
blockingProcess.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

/**
* Tests for {@link VirtualThreadUtils}.
*
* @author Tomaz Fernandes
*/
class VirtualThreadUtilsTests {

@Test
void shouldReturnFalseForPlatformThread() {
assertThat(VirtualThreadUtils.isVirtual(Thread.currentThread())).isFalse();
}

@Test
void shouldReturnFalseForMessageExecutionThread() {
MessageExecutionThread thread = new MessageExecutionThread();
assertThat(VirtualThreadUtils.isVirtual(thread)).isFalse();
}

}
Loading
Loading