Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,15 @@ public static void stopWireMockRecording() {
wireMockServer.stopRecording();
}
}

/**
* Resets all WireMock scenarios to their initial state.
* This ensures each test starts with a clean scenario state, preventing
* flaky tests caused by scenario state persisting across test runs.
*/
public static void resetWireMockScenarios() {
if (wireMockServer != null) {
wireMockServer.resetScenarios();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -74,6 +73,9 @@ public void shutdownWireMockServer() throws Exception {
*/
@BeforeEach
public void setupTestEnvironment() {
// Reset scenarios to ensure each test starts with a clean state
// This prevents flaky tests caused by scenario state persisting across tests
TestsUtil.resetWireMockScenarios();
Comment on lines +76 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate why scenarios are resulting in flakiness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since scenarios were not being reset, the tests were passing only when executed in a particular order. Doing a scenario reset helps start from clean slate before every run and facilitates Wiremock matching. For example, if scenarios are not reset, the test may look for pull-5 on the second pull call instead of pull-2

TestsUtil.startWireMockRecording(harness.getPubsubEndpoint());
}

Expand Down Expand Up @@ -206,21 +208,15 @@ public void testBatchAck() throws Exception {
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

System.out.println("Starting to collect " + toSend.size() + " messages with timeout: " + timeoutSeconds + "s");

while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
try {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
System.out.println("Received message " + ackIDs.size() + "/" + toSend.size() +
" with AckID: " + r.getAckID());
} else {
System.out.println("Received null message, waiting...");
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.err.println("Error receiving message: " + e.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
}
}
Expand Down Expand Up @@ -251,21 +247,15 @@ public void testBatchNack() throws Exception {
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

System.out.println("Starting to collect " + toSend.size() + " messages with timeout: " + timeoutSeconds + "s");

while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
try {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
System.out.println("Received message " + ackIDs.size() + "/" + toSend.size() +
" with AckID: " + r.getAckID());
} else {
System.out.println("Received null message, waiting...");
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.err.println("Error receiving message: " + e.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
}
}
Expand All @@ -283,9 +273,10 @@ public void testAckNullThrows() throws Exception {
}
}

@Disabled
@Test
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
public void testDoubleAck() throws Exception {
Assumptions.assumeFalse(AWS_PROVIDER_ID.equals(harness.getProviderId()));
try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription subscription = harness.createSubscriptionDriver()) {

Expand All @@ -299,19 +290,17 @@ public void testDoubleAck() throws Exception {
topic.send(message);
}

// Receive all messages
List<Message> receivedMessages = new ArrayList<>();
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);

while (receivedMessages.size() < 3 && System.nanoTime() < deadline) {
for (int i = 0; i < messages.size(); i++) {
Message received = subscription.receive();
if (received != null) {
receivedMessages.add(received);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
Assertions.assertNotNull(received, "Should receive message " + (i + 1));
Assertions.assertNotNull(received.getAckID(), "Received message should have AckID");
receivedMessages.add(received);
}

Assertions.assertEquals(3, receivedMessages.size(), "Should receive all 3 messages within timeout");
Assertions.assertEquals(messages.size(), receivedMessages.size(),
"Should receive all " + messages.size() + " messages");

// Ack the first two messages
List<AckID> firstTwoAcks = List.of(
Expand Down Expand Up @@ -351,7 +340,7 @@ public void testGetAttributes() throws Exception {
}

@Test
@Timeout(30)
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
public void testMultipleSendReceiveWithoutBatch() throws Exception {
Assumptions.assumeFalse(AWS_PROVIDER_ID.equals(harness.getProviderId()));
try (AbstractTopic topic = harness.createTopicDriver();
Expand All @@ -368,33 +357,21 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
.build();
topic.send(message);
sentMessages.add(message);
TimeUnit.MILLISECONDS.sleep(100); // Small delay between sends
}

TimeUnit.MILLISECONDS.sleep(500); // Allow time for messages to be available

// Receive and ack messages one by one (not in batch)
List<Message> receivedMessages = new ArrayList<>();

while (receivedMessages.size() < numMessages) {
try {
Message received = subscription.receive();
if (received != null && received.getAckID() != null) {
receivedMessages.add(received);
// Ack immediately after receiving (not in batch)
subscription.sendAck(received.getAckID());
System.out.println("Received and acked message " + receivedMessages.size() + "/" + numMessages);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.err.println("Error receiving message: " + e.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
}
for (int i = 0; i < numMessages; i++) {
Message received = subscription.receive();
Assertions.assertNotNull(received, "Should receive message " + (i + 1));
Assertions.assertNotNull(received.getAckID(), "Received message should have AckID");
receivedMessages.add(received);
// Ack immediately after receiving (not in batch)
subscription.sendAck(received.getAckID());
}

Assertions.assertEquals(numMessages, receivedMessages.size(),
"Should receive all messages. Expected: " + numMessages + ", Got: " + receivedMessages.size());
"Should receive all " + numMessages + " messages");

// Verify all messages were received
for (int i = 0; i < receivedMessages.size(); i++) {
Expand Down
Loading