Skip to content

Commit bd08a13

Browse files
committed
Remove BackPressureHandlerLimiter from the library and make it user-code in tests only (#1251)
1 parent 432d490 commit bd08a13

9 files changed

+243
-229
lines changed

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
2222
import java.time.Duration;
23+
import java.util.function.Supplier;
2324
import org.springframework.core.task.TaskExecutor;
2425
import org.springframework.lang.Nullable;
2526
import org.springframework.retry.backoff.BackOffPolicy;
@@ -55,7 +56,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
5556

5657
private final BackPressureMode backPressureMode;
5758

58-
private final BackPressureLimiter backPressureLimiter;
59+
private final Supplier<BackPressureHandler> backPressureHandlerSupplier;
5960

6061
private final ListenerMode listenerMode;
6162

@@ -89,7 +90,7 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
8990
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
9091
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
9192
this.backPressureMode = builder.backPressureMode;
92-
this.backPressureLimiter = builder.backPressureLimiter;
93+
this.backPressureHandlerSupplier = builder.backPressureHandlerSupplier;
9394
this.listenerMode = builder.listenerMode;
9495
this.messageConverter = builder.messageConverter;
9596
this.acknowledgementMode = builder.acknowledgementMode;
@@ -166,8 +167,8 @@ public BackPressureMode getBackPressureMode() {
166167
}
167168

168169
@Override
169-
public BackPressureLimiter getBackPressureLimiter() {
170-
return this.backPressureLimiter;
170+
public Supplier<BackPressureHandler> getBackPressureHandlerSupplier() {
171+
return this.backPressureHandlerSupplier;
171172
}
172173

173174
@Override
@@ -232,7 +233,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
232233

233234
private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;
234235

235-
private static final BackPressureLimiter DEFAULT_BACKPRESSURE_LIMITER = null;
236+
private static final Supplier<BackPressureHandler> DEFAULT_BACKPRESSURE_LIMITER = null;
236237

237238
private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;
238239

@@ -256,7 +257,7 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,
256257

257258
private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;
258259

259-
private BackPressureLimiter backPressureLimiter = DEFAULT_BACKPRESSURE_LIMITER;
260+
private Supplier<BackPressureHandler> backPressureHandlerSupplier = DEFAULT_BACKPRESSURE_LIMITER;
260261

261262
private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;
262263

@@ -296,7 +297,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
296297
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
297298
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
298299
this.backPressureMode = options.backPressureMode;
299-
this.backPressureLimiter = options.backPressureLimiter;
300+
this.backPressureHandlerSupplier = options.backPressureHandlerSupplier;
300301
this.listenerMode = options.listenerMode;
301302
this.messageConverter = options.messageConverter;
302303
this.acknowledgementMode = options.acknowledgementMode;
@@ -397,8 +398,8 @@ public B backPressureMode(BackPressureMode backPressureMode) {
397398
}
398399

399400
@Override
400-
public B backPressureLimiter(BackPressureLimiter backPressureLimiter) {
401-
this.backPressureLimiter = backPressureLimiter;
401+
public B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier) {
402+
this.backPressureHandlerSupplier = backPressureHandlerSupplier;
402403
return self();
403404
}
404405

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java

+5-8
Original file line numberDiff line numberDiff line change
@@ -227,17 +227,14 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
227227

228228
protected BackPressureHandler createBackPressureHandler() {
229229
O containerOptions = getContainerOptions();
230-
List<BackPressureHandler> backPressureHandlers = new ArrayList<>(2);
230+
if (containerOptions.getBackPressureHandlerSupplier() != null) {
231+
return containerOptions.getBackPressureHandlerSupplier().get();
232+
}
231233
Duration acquireTimeout = containerOptions.getMaxDelayBetweenPolls();
232234
int batchSize = containerOptions.getMaxMessagesPerPoll();
233-
backPressureHandlers.add(SemaphoreBackPressureHandler.builder().batchSize(batchSize)
235+
return SemaphoreBackPressureHandler.builder().batchSize(batchSize)
234236
.totalPermits(containerOptions.getMaxConcurrentMessages()).acquireTimeout(acquireTimeout)
235-
.throughputConfiguration(containerOptions.getBackPressureMode()).build());
236-
if (containerOptions.getBackPressureLimiter() != null) {
237-
backPressureHandlers.add(new BackPressureHandlerLimiter(containerOptions.getBackPressureLimiter(),
238-
acquireTimeout, containerOptions.getStandbyLimitPollingInterval(), batchSize));
239-
}
240-
return new CompositeBackPressureHandler(backPressureHandlers, batchSize);
237+
.throughputConfiguration(containerOptions.getBackPressureMode()).build();
241238
}
242239

243240
protected TaskExecutor createSourcesTaskExecutor() {

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandler.java

+12
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ public interface BackPressureHandler {
5757
*/
5858
void release(int amount, ReleaseReason reason);
5959

60+
/**
61+
* Release the specified amount of permits. Each message that has been processed should release one permit, whether
62+
* processing was successful or not.
63+
* @param amount the amount of permits to release.
64+
*
65+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
66+
* Implement {@link #release(int, ReleaseReason)} instead.
67+
*/
68+
@Deprecated
69+
default void release(int amount) {
70+
}
71+
6072
/**
6173
* Attempts to acquire all permits up to the specified timeout. If successful, means all permits were returned and
6274
* thus no activity is left in the {@link io.awspring.cloud.sqs.listener.source.MessageSource}.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureHandlerLimiter.java

-133
This file was deleted.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureLimiter.java

-44
This file was deleted.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BatchAwareBackPressureHandler.java

+24
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,28 @@ public interface BatchAwareBackPressureHandler extends BackPressureHandler {
3030
* @throws InterruptedException if the Thread is interrupted while waiting for permits.
3131
*/
3232
int requestBatch() throws InterruptedException;
33+
34+
/**
35+
* Release a batch of permits. This has the semantics of letting the {@link BackPressureHandler} know that all
36+
* permits from a batch are being released, in opposition to {@link #release(int)} in which any number of permits
37+
* can be specified.
38+
*
39+
* @deprecated This method is deprecated and will not be called by the Spring Cloud AWS SQS listener anymore.
40+
* Implement {@link BackPressureHandler#release(int, ReleaseReason)} instead.
41+
*/
42+
@Deprecated
43+
default void releaseBatch() {
44+
}
45+
46+
/**
47+
* Return the configured batch size for this handler.
48+
* @return the batch size.
49+
*
50+
* @deprecated This method is deprecated and will not be used by the Spring Cloud AWS SQS listener anymore.
51+
*/
52+
@Deprecated
53+
default int getBatchSize() {
54+
return 0;
55+
}
56+
3357
}

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import java.time.Duration;
2222
import java.util.Collection;
23+
import java.util.function.Supplier;
2324
import org.springframework.core.task.TaskExecutor;
2425
import org.springframework.lang.Nullable;
2526
import org.springframework.retry.backoff.BackOffPolicy;
@@ -61,8 +62,6 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
6162
/**
6263
* {@return the amount of time to wait before checking again for the current limit when the queue processing is on
6364
* standby} Default is 100 milliseconds.
64-
*
65-
* @see BackPressureLimiter#limit()
6665
*/
6766
Duration getStandbyLimitPollingInterval();
6867

@@ -136,10 +135,10 @@ default BackOffPolicy getPollBackOffPolicy() {
136135
BackPressureMode getBackPressureMode();
137136

138137
/**
139-
* Return the {@link BackPressureLimiter} for this container.
140-
* @return the backpressure limiter.
138+
* Return the a {@link Supplier} to create a {@link BackPressureHandler} for this container.
139+
* @return the BackPressureHandler supplier.
141140
*/
142-
BackPressureLimiter getBackPressureLimiter();
141+
Supplier<BackPressureHandler> getBackPressureHandlerSupplier();
143142

144143
/**
145144
* Return the {@link ListenerMode} mode for this container.

spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
2020
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
2121
import java.time.Duration;
22+
import java.util.function.Supplier;
2223
import org.springframework.core.task.TaskExecutor;
2324
import org.springframework.retry.backoff.BackOffPolicy;
2425

@@ -62,7 +63,6 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
6263
*
6364
* @param standbyLimitPollingInterval the limit polling interval when the queue processing is on standby.
6465
* @return this instance.
65-
* @see BackPressureLimiter#limit()
6666
*/
6767
B standbyLimitPollingInterval(Duration standbyLimitPollingInterval);
6868

@@ -156,12 +156,12 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
156156
B backPressureMode(BackPressureMode backPressureMode);
157157

158158
/**
159-
* Set the {@link BackPressureLimiter} for this container. Default is {@code null}.
159+
* Set the {@link Supplier} of {@link BackPressureHandler} for this container. Default is {@code null}.
160160
*
161-
* @param backPressureLimiter the backpressure limiter.
161+
* @param backPressureHandlerSupplier the BackPressureHandler supplier.
162162
* @return this instance.
163163
*/
164-
B backPressureLimiter(BackPressureLimiter backPressureLimiter);
164+
B backPressureHandlerSupplier(Supplier<BackPressureHandler> backPressureHandlerSupplier);
165165

166166
/**
167167
* Set the maximum interval between acknowledgements for batch acknowledgements. The default depends on the specific

0 commit comments

Comments
 (0)