Skip to content

Spring Infinity Loop For QueueDoesNotExistException #205

Open
@enkuru

Description

@enkuru
java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: Sqs, Status Code: 400, Request ID: abf44cc8-bd4a-5abf-85a0-118a756161e8)
	at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

The logs above has been repeated nearly 1 million times in a 25 minutes period due to accidental deletion of the related queue. This caused to max usage of the CPU on the servers (in Elastic Beanstalk) and not receiving the incoming requests.

Our message consuming class is;

@Slf4j
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(value = "transaction.queue.consumer.enabled")
public class TransactionMessageConsumer {
    private final TransactionService transactionService;

    private final Tracer tracer;

    @SqsListener(value = "${cloud.aws.sqs.transaction-queue}")
    public void receive(final TransactionMessageVo transactionMessageVo) {

and Configuration class is;

@Configuration
@ConditionalOnProperty("transaction.queue.publisher.enabled")
public class SQSClientConfiguration {

    @Bean
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.create();
    }

    @Bean
    public SqsTemplate sqsTemplate(final SqsAsyncClient sqsAsyncClient) {
        return SqsTemplate.newTemplate(sqsAsyncClient);
    }

    @Bean
    public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(final SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory.builder()
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }
}

As you can see, we have a simple configuration. We have tried to find a solution to this problem (to slow down polling rate on failure), we could not succeeded.

I think the issue is on this code as it do not consider alike situations and runs infinitely;

AbstractPollingMessageSource.java::191

private void pollAndEmitMessages() {
		while (isRunning()) {
			try {
				if (!isRunning()) {
					continue;
				}
				logger.trace("Requesting permits for queue {}", this.pollingEndpointName);
				final int acquiredPermits = this.backPressureHandler.requestBatch();
				if (acquiredPermits == 0) {
					logger.trace("No permits acquired for queue {}", this.pollingEndpointName);
					continue;
				}
				logger.trace("{} permits acquired for queue {}", acquiredPermits, this.pollingEndpointName);
				if (!isRunning()) {
					logger.debug("MessageSource was stopped after permits where acquired. Returning {} permits",
							acquiredPermits);
					this.backPressureHandler.release(acquiredPermits);
					continue;
				}
				// @formatter:off
				managePollingFuture(doPollForMessages(acquiredPermits))
					.exceptionally(this::handlePollingException)
					.thenApply(msgs -> releaseUnusedPermits(acquiredPermits, msgs))
					.thenApply(this::convertMessages)
					.thenCompose(this::emitMessagesToPipeline)
					.exceptionally(this::handleSinkException);
				// @formatter:on
			}
			catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new IllegalStateException(
						"MessageSource thread interrupted for endpoint " + this.pollingEndpointName, e);
			}
			catch (Exception e) {
				logger.error("Error in MessageSource for queue {}. Resuming", this.pollingEndpointName, e);
			}
		}
		logger.debug("Execution thread stopped for queue {}", this.pollingEndpointName);
	}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions