Skip to content

Commit 453ccd5

Browse files
committed
validate changes
1 parent f63ffb7 commit 453ccd5

File tree

2 files changed

+15
-28
lines changed

2 files changed

+15
-28
lines changed

pubsub/pubsub-gcp/src/main/java/com/salesforce/multicloudj/pubsub/gcp/GcpSubscription.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public GcpSubscription() {
6262
public GcpSubscription(Builder builder) {
6363
super(builder);
6464
this.nackLazy = builder.nackLazy;
65-
this.receiveBatcherOptions = builder.receiveBatcherOptions;
6665
}
6766

6867
public GcpSubscription(Builder builder, SubscriptionAdminClient subscriptionAdminClient) {
@@ -228,9 +227,9 @@ protected List<Message> doReceiveBatch(int batchSize) {
228227
.setMaxMessages(Math.max(1, batchSize))
229228
.setReturnImmediately(true)
230229
.build();
231-
230+
232231
PullResponse resp = getOrCreateSubscriptionAdminClient().pullCallable().call(req);
233-
232+
234233
List<Message> receivedMessages = new ArrayList<>();
235234
for (ReceivedMessage rm : resp.getReceivedMessagesList()) {
236235
Message m = convertToMessage(rm);
@@ -311,8 +310,7 @@ public Builder builder() {
311310

312311
public static class Builder extends AbstractSubscription.Builder<GcpSubscription> {
313312
private boolean nackLazy = false;
314-
private Batcher.Options receiveBatcherOptions = null;
315-
313+
316314
public Builder() {
317315
this.providerId = GcpConstants.PROVIDER_ID;
318316
}
@@ -358,26 +356,14 @@ public GcpSubscription.Builder withCredentialsOverrider(CredentialsOverrider cre
358356
*
359357
* This is useful when you don't want immediate retry but prefer to wait for
360358
* the natural timeout before reprocessing the message.
361-
*
359+
*
362360
* @param nackLazy true to enable lazy NACK mode, false for immediate redelivery
363361
* @return this builder for method chaining
364362
*/
365363
public GcpSubscription.Builder withNackLazy(boolean nackLazy) {
366364
this.nackLazy = nackLazy;
367365
return this;
368366
}
369-
370-
/**
371-
* Sets custom receive batcher options.
372-
* This is primarily used in tests to control prefetch behavior.
373-
*
374-
* @param options the batcher options to use
375-
* @return this builder for method chaining
376-
*/
377-
public GcpSubscription.Builder withReceiveBatcherOptions(Batcher.Options options) {
378-
this.receiveBatcherOptions = options;
379-
return this;
380-
}
381367

382368
@Override
383369
public GcpSubscription build() {

pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/GcpPubsubIT.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,18 @@ public AbstractSubscription createSubscriptionDriverWithIndex(int index) {
152152
String subscriptionNameWithIndex = index > 0 ? subscriptionName + "-" + index : subscriptionName;
153153
String fullSubscriptionName = "projects/" + GcpPubsubIT.PROJECT_ID + "/subscriptions/" + subscriptionNameWithIndex;
154154

155-
// Use maxHandlers=1 for integration tests to avoid WireMock scenario state race conditions
156-
Batcher.Options testBatcherOptions = new Batcher.Options()
157-
.setMaxHandlers(1)
158-
.setMinBatchSize(1)
159-
.setMaxBatchSize(1)
160-
.setMaxBatchByteSize(0);
161-
162155
GcpSubscription.Builder subscriptionBuilder = new GcpSubscription.Builder()
163-
.withSubscriptionName(fullSubscriptionName)
164-
.withReceiveBatcherOptions(testBatcherOptions);
165-
GcpSubscription sub = new GcpSubscription(subscriptionBuilder, client);
156+
.withSubscriptionName(fullSubscriptionName);
157+
GcpSubscription sub = new GcpSubscription(subscriptionBuilder, client) {
158+
@Override
159+
protected Batcher.Options createReceiveBatcherOptions() {
160+
return new Batcher.Options()
161+
.setMaxHandlers(1)
162+
.setMinBatchSize(1)
163+
.setMaxBatchSize(1)
164+
.setMaxBatchByteSize(0);
165+
}
166+
};
166167

167168
if (index == 0) {
168169
subscription = sub;

0 commit comments

Comments
 (0)