Skip to content

Commit 432d490

Browse files
committed
Introduce a CompositeBackPressureHandler allowing for composition of BackPressureHandlers (#1251)
1 parent 93cb447 commit 432d490

7 files changed

+201
-126
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
3636
import io.awspring.cloud.sqs.listener.source.MessageSource;
3737
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
38+
import java.time.Duration;
3839
import java.util.ArrayList;
3940
import java.util.Collection;
4041
import java.util.List;
@@ -226,17 +227,17 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
226227

227228
protected BackPressureHandler createBackPressureHandler() {
228229
O containerOptions = getContainerOptions();
229-
BatchAwareBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler.builder()
230-
.batchSize(containerOptions.getMaxMessagesPerPoll())
231-
.totalPermits(containerOptions.getMaxConcurrentMessages())
232-
.acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
233-
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
230+
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
231+
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
232+
int batchSize = containerOptions.getMaxMessagesPerPoll();
233+
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
234+
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
235+
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
234236
if (containerOptions.getBackPressureLimiter() != null) {
235-
backPressureHandler = new BackPressureHandlerLimiter(backPressureHandler,
236-
containerOptions.getBackPressureLimiter(), containerOptions.getStandbyLimitPollingInterval(),
237-
containerOptions.getMaxDelayBetweenPolls());
237+
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
238+
acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
238239
}
239-
return backPressureHandler;
240+
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
240241
}
241242

242243
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java

+37-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public interface BackPressureHandler {
3030

3131
/**
32-
* Request a number of permits. Each obtained permit allows the
32+
* Requests a number of permits. Each obtained permit allows the
3333
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
3434
* @param amount the amount of permits to request.
3535
* @return the amount of permits obtained.
@@ -38,11 +38,24 @@ public interface BackPressureHandler {
3838
int request(int amount) throws InterruptedException;
3939

4040
/**
41-
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
42-
* processing was successful or not.
41+
* Releases the specified amount of permits for processed messages. Each message that has been processed should
42+
* release one permit, whether processing was successful or not.
43+
* <p>
44+
* This method can is called in the following use cases:
45+
* <ul>
46+
* <li>{@link ReleaseReason#LIMITED}: permits were not used because another BackPressureHandler has a lower permits
47+
* limit and the difference in permits needs to be returned.</li>
48+
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
49+
* from SQS. Permits need to be returned.</li>
50+
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
51+
* The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
52+
* the permits will be used as the same number of messages were fetched from SQS.</li>
53+
* <li>{@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.</li>
54+
* </ul>
4355
* @param amount the amount of permits to release.
56+
* @param reason the reason why the permits were released.
4457
*/
45-
void release(int amount);
58+
void release(int amount, ReleaseReason reason);
4659

4760
/**
4861
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
@@ -52,4 +65,24 @@ public interface BackPressureHandler {
5265
*/
5366
boolean drain(Duration timeout);
5467

68+
enum ReleaseReason {
69+
/**
70+
* Permits were not used because another BackPressureHandler has a lower permits limit and the difference need
71+
* to be aligned across all handlers.
72+
*/
73+
LIMITED,
74+
/**
75+
* No messages were retrieved from SQS, so all permits need to be returned.
76+
*/
77+
NONE_FETCHED,
78+
/**
79+
* Some messages were fetched from SQS. Unused permits need to be returned.
80+
*/
81+
PARTIAL_FETCH,
82+
/**
83+
* The processing of one or more messages finished, successfully or not.
84+
*/
85+
PROCESSED;
86+
}
87+
5588
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java

+24-44
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,7 @@
2727
*
2828
* @see BackPressureLimiter
2929
*/
30-
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler {
31-
32-
/**
33-
* The {@link BatchAwareBackPressureHandler} which permits should be limited by the {@link #backPressureLimiter}.
34-
*/
35-
private final BatchAwareBackPressureHandler backPressureHandler;
30+
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
3631

3732
/**
3833
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
@@ -59,50 +54,54 @@ public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler
5954

6055
private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);
6156

62-
public BackPressureHandlerLimiter(BatchAwareBackPressureHandler backPressureHandler,
63-
BackPressureLimiter backPressureLimiter, Duration standbyLimitPollingInterval, Duration acquireTimeout) {
64-
this.backPressureHandler = backPressureHandler;
57+
private final int batchSize;
58+
59+
private String id;
60+
61+
public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
62+
Duration standbyLimitPollingInterval, int batchSize) {
6563
this.backPressureLimiter = backPressureLimiter;
6664
this.acquireTimeout = acquireTimeout;
6765
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
66+
this.batchSize = batchSize;
6867
}
6968

7069
@Override
71-
public int requestBatch() throws InterruptedException {
72-
int permits = updatePermitsLimit();
73-
int batchSize = getBatchSize();
74-
if (permits < batchSize) {
75-
return acquirePermits(permits, backPressureHandler::request);
76-
}
77-
return acquirePermits(batchSize, p -> backPressureHandler.requestBatch());
70+
public void setId(String id) {
71+
this.id = id;
7872
}
7973

8074
@Override
81-
public void releaseBatch() {
82-
semaphore.release(getBatchSize());
83-
backPressureHandler.releaseBatch();
75+
public String getId() {
76+
return id;
8477
}
8578

8679
@Override
87-
public int getBatchSize() {
88-
return backPressureHandler.getBatchSize();
80+
public int requestBatch() throws InterruptedException {
81+
return request(batchSize);
8982
}
9083

9184
@Override
9285
public int request(int amount) throws InterruptedException {
9386
int permits = Math.min(updatePermitsLimit(), amount);
94-
return acquirePermits(permits, backPressureHandler::request);
87+
if (permits == 0) {
88+
Thread.sleep(standbyLimitPollingInterval.toMillis());
89+
return 0;
90+
}
91+
if (semaphore.tryAcquire(permits, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
92+
return permits;
93+
}
94+
return 0;
9595
}
9696

9797
@Override
98-
public void release(int amount) {
98+
public void release(int amount, ReleaseReason reason) {
9999
semaphore.release(amount);
100-
backPressureHandler.release(amount);
101100
}
102101

103102
@Override
104103
public boolean drain(Duration timeout) {
105-
return backPressureHandler.drain(timeout);
104+
return true;
106105
}
107106

108107
private int updatePermitsLimit() {
@@ -120,25 +119,6 @@ else if (newLimit > oldLimit) {
120119
});
121120
}
122121

123-
private interface PermitsRequester {
124-
int request(int amount) throws InterruptedException;
125-
}
126-
127-
private int acquirePermits(int amount, PermitsRequester permitsRequester) throws InterruptedException {
128-
if (amount == 0) {
129-
Thread.sleep(standbyLimitPollingInterval.toMillis());
130-
return 0;
131-
}
132-
if (semaphore.tryAcquire(amount, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
133-
int obtained = permitsRequester.request(amount);
134-
if (obtained < amount) {
135-
semaphore.release(amount - obtained);
136-
}
137-
return obtained;
138-
}
139-
return 0;
140-
}
141-
142122
private static class ReducibleSemaphore extends Semaphore {
143123

144124
ReducibleSemaphore(int permits) {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java

-14
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,4 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
3030
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
3131
*/
3232
int requestBatch() throws InterruptedException;
33-
34-
/**
35-
* Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
36-
* permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
37-
* can be specified.
38-
*/
39-
void releaseBatch();
40-
41-
/**
42-
* Return the configured batch size for this handler.
43-
* @return the batch size.
44-
*/
45-
int getBatchSize();
46-
4733
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener;
17+
18+
import java.time.Duration;
19+
import java.util.List;
20+
21+
public class CompositeBackPressureHandler implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
22+
23+
private final List<BackPressureHandler> backPressureHandlers;
24+
25+
private final int batchSize;
26+
27+
private String id;
28+
29+
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize) {
30+
this.backPressureHandlers = backPressureHandlers;
31+
this.batchSize = batchSize;
32+
}
33+
34+
@Override
35+
public void setId(String id) {
36+
this.id = id;
37+
backPressureHandlers.stream().filter(IdentifiableContainerComponent.class::isInstance)
38+
.map(IdentifiableContainerComponent.class::cast)
39+
.forEach(bph -> bph.setId(bph.getClass().getSimpleName() + "-" + id));
40+
}
41+
42+
@Override
43+
public String getId() {
44+
return id;
45+
}
46+
47+
@Override
48+
public int requestBatch() throws InterruptedException {
49+
return request(batchSize);
50+
}
51+
52+
@Override
53+
public int request(int amount) throws InterruptedException {
54+
int obtained = amount;
55+
int[] obtainedPerBph = new int[backPressureHandlers.size()];
56+
for (int i = 0; i < backPressureHandlers.size() && obtained > 0; i++) {
57+
obtainedPerBph[i] = backPressureHandlers.get(i).request(obtained);
58+
obtained = Math.min(obtained, obtainedPerBph[i]);
59+
}
60+
for (int i = 0; i < backPressureHandlers.size(); i++) {
61+
int obtainedForBph = obtainedPerBph[i];
62+
if (obtainedForBph > obtained) {
63+
backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED);
64+
}
65+
}
66+
return obtained;
67+
}
68+
69+
@Override
70+
public void release(int amount, ReleaseReason reason) {
71+
for (BackPressureHandler handler : backPressureHandlers) {
72+
handler.release(amount, reason);
73+
}
74+
}
75+
76+
@Override
77+
public boolean drain(Duration timeout) {
78+
boolean result = true;
79+
for (BackPressureHandler handler : backPressureHandlers) {
80+
result &= !handler.drain(timeout);
81+
}
82+
return result;
83+
}
84+
}

0 commit comments

Comments
 (0)