Skip to content

Commit bb998d4

Browse files
committed
Introduce a CompositeBackPressureHandler allowing for composition of BackPressureHandlers (#1251)
1 parent 51bad1a commit bb998d4

7 files changed

+113
-103
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ protected BackPressureHandler createBackPressureHandler() {
235235
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
236236
if (containerOptions.getBackPressureLimiter() != null) {
237237
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
238-
acquireTimeout, containerOptions.getStandbyLimitPollingInterval()));
238+
acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
239239
}
240240
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
241241
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java

+37-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public interface BackPressureHandler {
3030

3131
/**
32-
* Request a number of permits. Each obtained permit allows the
32+
* Requests a number of permits. Each obtained permit allows the
3333
* {@link io.awspring.cloud.sqs.listener.source.MessageSource} to retrieve one message.
3434
* @param amount the amount of permits to request.
3535
* @return the amount of permits obtained.
@@ -38,11 +38,24 @@ public interface BackPressureHandler {
3838
int request(int amount) throws InterruptedException;
3939

4040
/**
41-
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
42-
* processing was successful or not.
41+
* Releases the specified amount of permits for processed messages. Each message that has been processed should
42+
* release one permit, whether processing was successful or not.
43+
* <p>
44+
* This method can is called in the following use cases:
45+
* <ul>
46+
* <li>{@link ReleaseReason#LIMITED}: permits were not used because another BackPressureHandler has a lower permits
47+
* limit and the difference in permits needs to be returned.</li>
48+
* <li>{@link ReleaseReason#NONE_FETCHED}: none of the permits were actually used because no messages were retrieved
49+
* from SQS. Permits need to be returned.</li>
50+
* <li>{@link ReleaseReason#PARTIAL_FETCH}: some of the permits were used (some messages were retrieved from SQS).
51+
* The unused ones need to be returned. The amount to be returned might be {@literal 0}, in which case it means all
52+
* the permits will be used as the same number of messages were fetched from SQS.</li>
53+
* <li>{@link ReleaseReason#PROCESSED}: a message processing finished, successfully or not.</li>
54+
* </ul>
4355
* @param amount the amount of permits to release.
56+
* @param reason the reason why the permits were released.
4457
*/
45-
void release(int amount);
58+
void release(int amount, ReleaseReason reason);
4659

4760
/**
4861
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
@@ -52,4 +65,24 @@ public interface BackPressureHandler {
5265
*/
5366
boolean drain(Duration timeout);
5467

68+
enum ReleaseReason {
69+
/**
70+
* Permits were not used because another BackPressureHandler has a lower permits limit and the difference need
71+
* to be aligned across all handlers.
72+
*/
73+
LIMITED,
74+
/**
75+
* No messages were retrieved from SQS, so all permits need to be returned.
76+
*/
77+
NONE_FETCHED,
78+
/**
79+
* Some messages were fetched from SQS. Unused permits need to be returned.
80+
*/
81+
PARTIAL_FETCH,
82+
/**
83+
* The processing of one or more messages finished, successfully or not.
84+
*/
85+
PROCESSED;
86+
}
87+
5588
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*
2828
* @see BackPressureLimiter
2929
*/
30-
public class BackPressureHandlerLimiter implements BackPressureHandler {
30+
public class BackPressureHandlerLimiter implements BatchAwareBackPressureHandler, IdentifiableContainerComponent {
3131

3232
/**
3333
* The {@link BackPressureLimiter} which computes a limit on how many permits can be requested at a given moment.
@@ -54,11 +54,31 @@ public class BackPressureHandlerLimiter implements BackPressureHandler {
5454

5555
private final ReducibleSemaphore semaphore = new ReducibleSemaphore(0);
5656

57+
private final int batchSize;
58+
59+
private String id;
60+
5761
public BackPressureHandlerLimiter(BackPressureLimiter backPressureLimiter, Duration acquireTimeout,
58-
Duration standbyLimitPollingInterval) {
62+
Duration standbyLimitPollingInterval, int batchSize) {
5963
this.backPressureLimiter = backPressureLimiter;
6064
this.acquireTimeout = acquireTimeout;
6165
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
66+
this.batchSize = batchSize;
67+
}
68+
69+
@Override
70+
public void setId(String id) {
71+
this.id = id;
72+
}
73+
74+
@Override
75+
public String getId() {
76+
return id;
77+
}
78+
79+
@Override
80+
public int requestBatch() throws InterruptedException {
81+
return request(batchSize);
6282
}
6383

6484
@Override
@@ -75,7 +95,7 @@ public int request(int amount) throws InterruptedException {
7595
}
7696

7797
@Override
78-
public void release(int amount) {
98+
public void release(int amount, ReleaseReason reason) {
7999
semaphore.release(amount);
80100
}
81101

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java

-14
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,4 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
3030
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
3131
*/
3232
int requestBatch() throws InterruptedException;
33-
34-
/**
35-
* Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
36-
* permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
37-
* can be specified.
38-
*/
39-
void releaseBatch();
40-
41-
/**
42-
* Return the configured batch size for this handler.
43-
* @return the batch size.
44-
*/
45-
int getBatchSize();
46-
4733
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/CompositeBackPressureHandler.java

+7-20
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class CompositeBackPressureHandler implements BatchAwareBackPressureHandl
2323
private final List<BackPressureHandler> backPressureHandlers;
2424

2525
private final int batchSize;
26+
2627
private String id;
2728

2829
public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandlers, int batchSize) {
@@ -33,6 +34,9 @@ public CompositeBackPressureHandler(List<BackPressureHandler> backPressureHandle
3334
@Override
3435
public void setId(String id) {
3536
this.id = id;
37+
backPressureHandlers.stream().filter(IdentifiableContainerComponent.class::isInstance)
38+
.map(IdentifiableContainerComponent.class::cast)
39+
.forEach(bph -> bph.setId(bph.getClass().getSimpleName() + "-" + id));
3640
}
3741

3842
@Override
@@ -45,16 +49,6 @@ public int requestBatch() throws InterruptedException {
4549
return request(batchSize);
4650
}
4751

48-
@Override
49-
public void releaseBatch() {
50-
release(batchSize);
51-
}
52-
53-
@Override
54-
public int getBatchSize() {
55-
return batchSize;
56-
}
57-
5852
@Override
5953
public int request(int amount) throws InterruptedException {
6054
int obtained = amount;
@@ -66,23 +60,16 @@ public int request(int amount) throws InterruptedException {
6660
for (int i = 0; i < backPressureHandlers.size(); i++) {
6761
int obtainedForBph = obtainedPerBph[i];
6862
if (obtainedForBph > obtained) {
69-
if (amount == batchSize) {
70-
backPressureHandlers.get(i).release(amount);
71-
// FIXME what if we cannot acquire 'obtained' (< 'amount') permits?
72-
backPressureHandlers.get(i).request(obtained);
73-
}
74-
else {
75-
backPressureHandlers.get(i).release(obtainedForBph - obtained);
76-
}
63+
backPressureHandlers.get(i).release(obtainedForBph - obtained, ReleaseReason.LIMITED);
7764
}
7865
}
7966
return obtained;
8067
}
8168

8269
@Override
83-
public void release(int amount) {
70+
public void release(int amount, ReleaseReason reason) {
8471
for (BackPressureHandler handler : backPressureHandlers) {
85-
handler.release(amount);
72+
handler.release(amount, reason);
8673
}
8774
}
8875

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java

+36-50
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.Arrays;
2020
import java.util.concurrent.Semaphore;
2121
import java.util.concurrent.TimeUnit;
22-
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525
import org.springframework.util.Assert;
@@ -47,7 +47,7 @@ public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandl
4747

4848
private volatile CurrentThroughputMode currentThroughputMode;
4949

50-
private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false);
50+
private final AtomicInteger lowThroughputPermitsAcquired = new AtomicInteger(0);
5151

5252
private String id;
5353

@@ -79,42 +79,39 @@ public String getId() {
7979
}
8080

8181
@Override
82-
public int request(int amount) throws InterruptedException {
83-
if (amount == batchSize) {
84-
return requestBatch();
85-
}
86-
return tryAcquire(amount, this.currentThroughputMode) ? amount : 0;
82+
public int requestBatch() throws InterruptedException {
83+
return request(batchSize);
8784
}
8885

8986
// @formatter:off
9087
@Override
91-
public int requestBatch() throws InterruptedException {
88+
public int request(int amount) throws InterruptedException {
9289
return CurrentThroughputMode.LOW.equals(this.currentThroughputMode)
93-
? requestInLowThroughputMode()
94-
: requestInHighThroughputMode();
90+
? requestInLowThroughputMode(amount)
91+
: requestInHighThroughputMode(amount);
9592
}
9693

97-
private int requestInHighThroughputMode() throws InterruptedException {
98-
return tryAcquire(this.batchSize, CurrentThroughputMode.HIGH)
99-
? this.batchSize
100-
: tryAcquirePartial();
94+
private int requestInHighThroughputMode(int amount) throws InterruptedException {
95+
return tryAcquire(amount, CurrentThroughputMode.HIGH)
96+
? amount
97+
: tryAcquirePartial(amount);
10198
}
10299
// @formatter:on
103100

104-
private int tryAcquirePartial() throws InterruptedException {
101+
private int tryAcquirePartial(int max) throws InterruptedException {
105102
int availablePermits = this.semaphore.availablePermits();
106103
if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) {
107104
return 0;
108105
}
109-
int permitsToRequest = Math.min(availablePermits, this.batchSize);
106+
int permitsToRequest = Math.min(availablePermits, max);
110107
CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode;
111108
logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}",
112109
permitsToRequest, availablePermits, this.id, currentThroughputModeNow);
113110
boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow);
114111
return hasAcquiredPartial ? permitsToRequest : 0;
115112
}
116113

117-
private int requestInLowThroughputMode() throws InterruptedException {
114+
private int requestInLowThroughputMode(int amount) throws InterruptedException {
118115
// Although LTM can be set / unset by many processes, only the MessageSource thread gets here,
119116
// so no actual concurrency
120117
logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id,
@@ -123,11 +120,11 @@ private int requestInLowThroughputMode() throws InterruptedException {
123120
if (hasAcquired) {
124121
logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits());
125122
// We've acquired all permits - there's no other process currently processing messages
126-
if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) {
123+
if (this.lowThroughputPermitsAcquired.getAndSet(amount) != 0) {
127124
logger.warn("hasAcquiredFullPermits was already true. Permits left: {}",
128125
this.semaphore.availablePermits());
129126
}
130-
return this.batchSize;
127+
return amount;
131128
}
132129
else {
133130
return 0;
@@ -150,19 +147,22 @@ private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputMo
150147
}
151148

152149
@Override
153-
public void releaseBatch() {
154-
maybeSwitchToLowThroughputMode();
155-
int permitsToRelease = getPermitsToRelease(this.batchSize);
150+
public void release(int amount, ReleaseReason reason) {
151+
logger.trace("Releasing {} permits ({}) for {}. Permits left: {}", amount, reason, this.id,
152+
this.semaphore.availablePermits());
153+
switch (reason) {
154+
case NONE_FETCHED -> maybeSwitchToLowThroughputMode();
155+
case PARTIAL_FETCH -> maybeSwitchToHighThroughputMode(amount);
156+
case PROCESSED, LIMITED -> {
157+
// No need to switch throughput mode
158+
}
159+
}
160+
int permitsToRelease = getPermitsToRelease(amount);
156161
this.semaphore.release(permitsToRelease);
157-
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
162+
logger.debug("Released {} permits ({}) for {}. Permits left: {}", permitsToRelease, reason, this.id,
158163
this.semaphore.availablePermits());
159164
}
160165

161-
@Override
162-
public int getBatchSize() {
163-
return this.batchSize;
164-
}
165-
166166
private void maybeSwitchToLowThroughputMode() {
167167
if (!BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(this.backPressureConfiguration)
168168
&& CurrentThroughputMode.HIGH.equals(this.currentThroughputMode)) {
@@ -172,37 +172,23 @@ private void maybeSwitchToLowThroughputMode() {
172172
}
173173
}
174174

175-
@Override
176-
public void release(int amount) {
177-
if (amount == batchSize) {
178-
releaseBatch();
179-
return;
175+
private void maybeSwitchToHighThroughputMode(int amount) {
176+
if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
177+
logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
178+
this.semaphore.availablePermits());
179+
this.currentThroughputMode = CurrentThroughputMode.HIGH;
180180
}
181-
logger.trace("Releasing {} permits for {}. Permits left: {}", amount, this.id,
182-
this.semaphore.availablePermits());
183-
maybeSwitchToHighThroughputMode(amount);
184-
int permitsToRelease = getPermitsToRelease(amount);
185-
this.semaphore.release(permitsToRelease);
186-
logger.trace("Released {} permits for {}. Permits left: {}", permitsToRelease, this.id,
187-
this.semaphore.availablePermits());
188181
}
189182

190183
private int getPermitsToRelease(int amount) {
191-
return this.hasAcquiredFullPermits.compareAndSet(true, false)
184+
int lowThroughputPermits = this.lowThroughputPermitsAcquired.getAndSet(0);
185+
return lowThroughputPermits > 0
192186
// The first process that gets here should release all permits except for inflight messages
193187
// We can have only one batch of messages at this point since we have all permits
194-
? this.totalPermits - (this.batchSize - amount)
188+
? this.totalPermits - (lowThroughputPermits - amount)
195189
: amount;
196190
}
197191

198-
private void maybeSwitchToHighThroughputMode(int amount) {
199-
if (CurrentThroughputMode.LOW.equals(this.currentThroughputMode)) {
200-
logger.debug("{} unused permit(s), setting TM HIGH for {}. Permits left: {}", amount, this.id,
201-
this.semaphore.availablePermits());
202-
this.currentThroughputMode = CurrentThroughputMode.HIGH;
203-
}
204-
}
205-
206192
@Override
207193
public boolean drain(Duration timeout) {
208194
logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(),

0 commit comments

Comments
 (0)