Skip to content

Commit 93cb447

Browse files
committed
Use a wrapper approach for dynamically limit the permits of SemaphoreBackPressureHandler (#1251)
1 parent 281baf7 commit 93cb447

File tree

3 files changed

+190
-143
lines changed

3 files changed

+190
-143
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
226226

227227
protected BackPressureHandler createBackPressureHandler() {
228228
O containerOptions = getContainerOptions();
229-
return SemaphoreBackPressureHandler.builder().batchSize(containerOptions.getMaxMessagesPerPoll())
229+
BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
230+
.batchSize(containerOptions.getMaxMessagesPerPoll())
230231
.totalPermits(containerOptions.getMaxConcurrentMessages())
231-
.standbyLimitPollingInterval(containerOptions.getStandbyLimitPollingInterval())
232232
.acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
233-
.throughputConfiguration(containerOptions.getBackPressureMode())
234-
.backPressureLimiter(containerOptions.getBackPressureLimiter()).build();
233+
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
234+
if (containerOptions.getBackPressureLimiter() != null) {
235+
backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler,
236+
containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(),
237+
containerOptions.getMaxDelayBetweenPolls());
238+
}
239+
return backPressureHandler;
235240
}
236241

237242
protected TaskExecutor createSourcesTaskExecutor() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.concurrent.Semaphore;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
/**
24+
* A {@link BatchAwareBackPressureHandler} implementation that uses an internal {@link Semaphore} for adapting the
25+
* maximum number of permits that can be acquired by the {@link #backPressureHandler} based on the downstream
26+
* backpressure limit computed by the {@link #backPressureLimiter}.
27+
*
28+
* @see BackPressureLimiter
29+
*/
30+
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler {
31+
32+
/**
33+
* The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}.
34+
*/
35+
private final BatchAwareBackPressureHandler backPressureHandler;
36+
37+
/**
38+
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
39+
*/
40+
private final BackPressureLimiter backPressureLimiter;
41+
42+
/**
43+
* The duration to wait for permits to be acquired.
44+
*/
45+
private final Duration acquireTimeout;
46+
47+
/**
48+
* The duration to sleep when the queue processing is in standby.
49+
*/
50+
private final Duration standbyLimitPollingInterval;
51+
52+
/**
53+
* The limit of permits that can be acquired at the current time. The permits limit is defined in the [0,
54+
* Integer.MAX_VALUE] interval. A value of {@literal 0} means that no permits can be acquired.
55+
* <p>
56+
* This value is updated based on the downstream backpressure reported by the {@link #backPressureLimiter}.
57+
*/
58+
private final AtomicInteger permitsLimit = new AtomicInteger(0);
59+
60+
private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);
61+
62+
public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler,
63+
BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) {
64+
this.backPressureHandler = backPressureHandler;
65+
this.backPressureLimiter = backPressureLimiter;
66+
this.acquireTimeout = acquireTimeout;
67+
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
68+
}
69+
70+
@Override
71+
public int requestBatch() throws InterruptedException {
72+
int permits = updatePermitsLimit();
73+
int batchSize = getBatchSize();
74+
if (permits < batchSize) {
75+
return acquirePermits(permits, backPressureHandler::request);
76+
}
77+
return acquirePermits(batchSize, p -> backPressureHandler.requestBatch());
78+
}
79+
80+
@Override
81+
public void releaseBatch() {
82+
semaphore.release(getBatchSize());
83+
backPressureHandler.releaseBatch();
84+
}
85+
86+
@Override
87+
public int getBatchSize() {
88+
return backPressureHandler.getBatchSize();
89+
}
90+
91+
@Override
92+
public int request(int amount) throws InterruptedException {
93+
int permits = Math.min(updatePermitsLimit(), amount);
94+
return acquirePermits(permits, backPressureHandler::request);
95+
}
96+
97+
@Override
98+
public void release(int amount) {
99+
semaphore.release(amount);
100+
backPressureHandler.release(amount);
101+
}
102+
103+
@Override
104+
public boolean drain(Duration timeout) {
105+
return backPressureHandler.drain(timeout);
106+
}
107+
108+
private int updatePermitsLimit() {
109+
return permitsLimit.updateAndGet(oldLimit -> {
110+
int newLimit = Math.max(0, backPressureLimiter.limit());
111+
if (newLimit < oldLimit) {
112+
int blockedPermits = oldLimit - newLimit;
113+
semaphore.reducePermits(blockedPermits);
114+
}
115+
else if (newLimit > oldLimit) {
116+
int releasedPermits = newLimit - oldLimit;
117+
semaphore.release(releasedPermits);
118+
}
119+
return newLimit;
120+
});
121+
}
122+
123+
private interface PermitsRequester {
124+
int request(int amount) throws InterruptedException;
125+
}
126+
127+
private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException {
128+
if (amount == 0) {
129+
Thread.sleep(standbyLimitPollingInterval.toMillis());
130+
return 0;
131+
}
132+
if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
133+
int obtained = permitsRequester.request(amount);
134+
if (obtained < amount) {
135+
semaphore.release(amount - obtained);
136+
}
137+
return obtained;
138+
}
139+
return 0;
140+
}
141+
142+
private static class ReducibleSemaphore extends Semaphore {
143+
144+
ReducibleSemaphore(int permits) {
145+
super(permits);
146+
}
147+
148+
@Override
149+
public void reducePermits(int reduction) {
150+
super.reducePermits(reduction);
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)