Skip to content

Commit acee4d2

Browse files
authored
test: enable doubleAck & testMultipleSendReceiveWithoutBatch in pubsub (#212)
1 parent fcb0943 commit acee4d2

File tree

2 files changed

+32
-44
lines changed
  • multicloudj-common/src/test/java/com/salesforce/multicloudj/common/util/common
  • pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client

2 files changed

+32
-44
lines changed

multicloudj-common/src/test/java/com/salesforce/multicloudj/common/util/common/TestsUtil.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,15 @@ public static void stopWireMockRecording() {
139139
wireMockServer.stopRecording();
140140
}
141141
}
142+
143+
/**
144+
* Resets all WireMock scenarios to their initial state.
145+
* This ensures each test starts with a clean scenario state, preventing
146+
* flaky tests caused by scenario state persisting across test runs.
147+
*/
148+
public static void resetWireMockScenarios() {
149+
if (wireMockServer != null) {
150+
wireMockServer.resetScenarios();
151+
}
152+
}
142153
}

pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java

Lines changed: 21 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.junit.jupiter.api.Assumptions;
1515
import org.junit.jupiter.api.BeforeAll;
1616
import org.junit.jupiter.api.BeforeEach;
17-
import org.junit.jupiter.api.Disabled;
1817
import org.junit.jupiter.api.Test;
1918
import org.junit.jupiter.api.TestInstance;
2019
import org.junit.jupiter.api.Timeout;
@@ -74,6 +73,9 @@ public void shutdownWireMockServer() throws Exception {
7473
*/
7574
@BeforeEach
7675
public void setupTestEnvironment() {
76+
// Reset scenarios to ensure each test starts with a clean state
77+
// This prevents flaky tests caused by scenario state persisting across tests
78+
TestsUtil.resetWireMockScenarios();
7779
TestsUtil.startWireMockRecording(harness.getPubsubEndpoint());
7880
}
7981

@@ -206,21 +208,15 @@ public void testBatchAck() throws Exception {
206208
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
207209
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);
208210

209-
System.out.println("Starting to collect " + toSend.size() + " messages with timeout: " + timeoutSeconds + "s");
210-
211211
while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
212212
try {
213213
Message r = subscription.receive();
214214
if (r != null && r.getAckID() != null) {
215215
ackIDs.add(r.getAckID());
216-
System.out.println("Received message " + ackIDs.size() + "/" + toSend.size() +
217-
" with AckID: " + r.getAckID());
218216
} else {
219-
System.out.println("Received null message, waiting...");
220217
TimeUnit.MILLISECONDS.sleep(100);
221218
}
222219
} catch (Exception e) {
223-
System.err.println("Error receiving message: " + e.getMessage());
224220
TimeUnit.MILLISECONDS.sleep(100);
225221
}
226222
}
@@ -251,21 +247,15 @@ public void testBatchNack() throws Exception {
251247
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
252248
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);
253249

254-
System.out.println("Starting to collect " + toSend.size() + " messages with timeout: " + timeoutSeconds + "s");
255-
256250
while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
257251
try {
258252
Message r = subscription.receive();
259253
if (r != null && r.getAckID() != null) {
260254
ackIDs.add(r.getAckID());
261-
System.out.println("Received message " + ackIDs.size() + "/" + toSend.size() +
262-
" with AckID: " + r.getAckID());
263255
} else {
264-
System.out.println("Received null message, waiting...");
265256
TimeUnit.MILLISECONDS.sleep(100);
266257
}
267258
} catch (Exception e) {
268-
System.err.println("Error receiving message: " + e.getMessage());
269259
TimeUnit.MILLISECONDS.sleep(100);
270260
}
271261
}
@@ -283,9 +273,10 @@ public void testAckNullThrows() throws Exception {
283273
}
284274
}
285275

286-
@Disabled
287276
@Test
277+
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
288278
public void testDoubleAck() throws Exception {
279+
Assumptions.assumeFalse(AWS_PROVIDER_ID.equals(harness.getProviderId()));
289280
try (AbstractTopic topic = harness.createTopicDriver();
290281
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
291282

@@ -299,19 +290,17 @@ public void testDoubleAck() throws Exception {
299290
topic.send(message);
300291
}
301292

293+
// Receive all messages
302294
List<Message> receivedMessages = new ArrayList<>();
303-
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
304-
305-
while (receivedMessages.size() < 3 && System.nanoTime() < deadline) {
295+
for (int i = 0; i < messages.size(); i++) {
306296
Message received = subscription.receive();
307-
if (received != null) {
308-
receivedMessages.add(received);
309-
} else {
310-
TimeUnit.MILLISECONDS.sleep(100);
311-
}
297+
Assertions.assertNotNull(received, "Should receive message " + (i + 1));
298+
Assertions.assertNotNull(received.getAckID(), "Received message should have AckID");
299+
receivedMessages.add(received);
312300
}
313301

314-
Assertions.assertEquals(3, receivedMessages.size(), "Should receive all 3 messages within timeout");
302+
Assertions.assertEquals(messages.size(), receivedMessages.size(),
303+
"Should receive all " + messages.size() + " messages");
315304

316305
// Ack the first two messages
317306
List<AckID> firstTwoAcks = List.of(
@@ -351,7 +340,7 @@ public void testGetAttributes() throws Exception {
351340
}
352341

353342
@Test
354-
@Timeout(30)
343+
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
355344
public void testMultipleSendReceiveWithoutBatch() throws Exception {
356345
Assumptions.assumeFalse(AWS_PROVIDER_ID.equals(harness.getProviderId()));
357346
try (AbstractTopic topic = harness.createTopicDriver();
@@ -368,33 +357,21 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
368357
.build();
369358
topic.send(message);
370359
sentMessages.add(message);
371-
TimeUnit.MILLISECONDS.sleep(100); // Small delay between sends
372360
}
373361

374-
TimeUnit.MILLISECONDS.sleep(500); // Allow time for messages to be available
375-
376362
// Receive and ack messages one by one (not in batch)
377363
List<Message> receivedMessages = new ArrayList<>();
378-
379-
while (receivedMessages.size() < numMessages) {
380-
try {
381-
Message received = subscription.receive();
382-
if (received != null && received.getAckID() != null) {
383-
receivedMessages.add(received);
384-
// Ack immediately after receiving (not in batch)
385-
subscription.sendAck(received.getAckID());
386-
System.out.println("Received and acked message " + receivedMessages.size() + "/" + numMessages);
387-
} else {
388-
TimeUnit.MILLISECONDS.sleep(100);
389-
}
390-
} catch (Exception e) {
391-
System.err.println("Error receiving message: " + e.getMessage());
392-
TimeUnit.MILLISECONDS.sleep(100);
393-
}
364+
for (int i = 0; i < numMessages; i++) {
365+
Message received = subscription.receive();
366+
Assertions.assertNotNull(received, "Should receive message " + (i + 1));
367+
Assertions.assertNotNull(received.getAckID(), "Received message should have AckID");
368+
receivedMessages.add(received);
369+
// Ack immediately after receiving (not in batch)
370+
subscription.sendAck(received.getAckID());
394371
}
395372

396373
Assertions.assertEquals(numMessages, receivedMessages.size(),
397-
"Should receive all messages. Expected: " + numMessages + ", Got: " + receivedMessages.size());
374+
"Should receive all " + numMessages + " messages");
398375

399376
// Verify all messages were received
400377
for (int i = 0; i < receivedMessages.size(); i++) {

0 commit comments

Comments
 (0)