Skip to content
Open
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
423bd46
enable batchnack
roseyang62 Jan 24, 2026
f388c87
Empty commit to trigger CI
roseyang62 Jan 25, 2026
e71fb9b
Empty commit to trigger CI
roseyang62 Jan 25, 2026
b1e9df8
fail fast if mapping unmatch
roseyang62 Jan 25, 2026
f17a22c
throw exception
roseyang62 Jan 25, 2026
da2639c
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
50607ec
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
d2bbe2d
enable testBatchNack
roseyang62 Jan 25, 2026
7815d3b
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
b526ee5
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
e1ee8f3
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
a0f7e19
Merge remote-tracking branch 'upstream/main' into investigate-pubsub-…
roseyang62 Jan 29, 2026
2a6075a
Add error handling
roseyang62 Jan 29, 2026
a549c94
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
454b500
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
e4eaa2f
Merge branch 'main' into investigate-pubsub-conformance-test
roseyang62 Jan 29, 2026
da36b54
update error handling
roseyang62 Jan 29, 2026
0a0549a
Merge branch 'investigate-pubsub-conformance-test' of https://github.…
roseyang62 Jan 29, 2026
e34c1af
empty commit to trigger ci to see if still have error
roseyang62 Jan 29, 2026
db95ac5
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
f0b43e5
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
ff3752b
move WireMock debugging to teardown phase
roseyang62 Jan 30, 2026
f7f91d9
empty commit to trigger ci to reproduce error
roseyang62 Jan 30, 2026
9e0d22c
empty commit to trigger ci to reproduce error
roseyang62 Jan 30, 2026
8c169f0
empty commit to trigger ci to reproduce error
roseyang62 Jan 30, 2026
ce35044
chore: trigger CI
roseyang62 Feb 2, 2026
84bf1ed
Merge remote-tracking branch 'upstream/main' into investigate-pubsub-…
roseyang62 Feb 2, 2026
94e9698
add resetWireMockScenarios
roseyang62 Feb 25, 2026
2893688
Merge branch 'main' into investigate-pubsub-conformance-test
roseyang62 Feb 25, 2026
db7113b
add resetWireMockScenarios
roseyang62 Feb 25, 2026
9b67bf5
add resetWireMockScenarios
roseyang62 Feb 25, 2026
c20280b
Use Assertions.fail instead of AssertionError
roseyang62 Feb 25, 2026
ef459aa
Merge branch 'main' into investigate-pubsub-conformance-test
roseyang62 Feb 25, 2026
199d364
remove resetscenarios
roseyang62 Feb 26, 2026
f33c0da
Merge remote-tracking branch 'upstream/main' into investigate-pubsub-…
roseyang62 Feb 26, 2026
6701ef4
empty commit to trigger CI
roseyang62 Feb 26, 2026
ccb29a0
Add detailed debug logging for pubsub pull requests to diagnose CI ti…
roseyang62 Feb 26, 2026
a67855c
empty commit to trigger CI and collect debug logs for pubsub timeout …
roseyang62 Feb 26, 2026
0431a68
investigate timeout
roseyang62 Feb 26, 2026
403d8ee
trigger ci
roseyang62 Feb 26, 2026
3dcc189
trigger ci
roseyang62 Feb 26, 2026
f692aca
trigger ci
roseyang62 Feb 26, 2026
de325d4
trigger ci
roseyang62 Feb 27, 2026
3b4ef9e
fix pubsub race condition by limiting prefetch concurrency
roseyang62 Feb 27, 2026
983abde
record
roseyang62 Feb 27, 2026
aa92da5
revert logger
roseyang62 Feb 27, 2026
01c0c82
log more info
roseyang62 Feb 27, 2026
4c29888
try to reproduce error
roseyang62 Feb 27, 2026
a105daa
upload test report
roseyang62 Feb 27, 2026
b57ff5f
upload test report
roseyang62 Feb 27, 2026
400af65
fix yml
roseyang62 Feb 27, 2026
0facde9
revert reports
roseyang62 Feb 27, 2026
89ef3ae
add log to find out which step is stuck
roseyang62 Feb 27, 2026
47ab0a4
revert mapping
roseyang62 Mar 2, 2026
a3cae92
revert mapping
roseyang62 Mar 2, 2026
509b048
trigger CI
roseyang62 Mar 2, 2026
189826d
print more test details
roseyang62 Mar 2, 2026
f63ffb7
re-run
roseyang62 Mar 2, 2026
453ccd5
validate changes
roseyang62 Mar 2, 2026
4caf0fc
resolve conflicts
roseyang62 Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,14 @@ public static void stopWireMockRecording() {
wireMockServer.stopRecording();
}
}

/**
* Resets all WireMock scenario states to their initial "Started" state.
* ensuring each test starts with a clean scenario state.
*/
public static void resetWireMockScenarios() {
if (wireMockServer != null) {
wireMockServer.resetScenarios();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was never needed before with huge fleet conf tests in all services, why it's required now ?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ public void shutdownWireMockServer() throws Exception {
}

/**
* Initialize the harness and start recording
* Initialize the harness and start recording.
* Resets WireMock scenario states to prevent state pollution between tests.
*/
@BeforeEach
public void setupTestEnvironment() {
TestsUtil.resetWireMockScenarios();
TestsUtil.startWireMockRecording(harness.getPubsubEndpoint());
}

Expand Down Expand Up @@ -234,8 +236,7 @@ public void testBatchAck() throws Exception {
subscription.sendAcks(ackIDs).join();
}
}

@Disabled

@Test
@Timeout(120) // Integration test with batch operations - allow time for message delivery
public void testBatchNack() throws Exception {
Expand All @@ -251,23 +252,10 @@ public void testBatchNack() throws Exception {

TimeUnit.MILLISECONDS.sleep(500);

List<AckID> ackIDs = new java.util.ArrayList<>();
boolean isRecording = System.getProperty("record") != null;
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
try {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
TimeUnit.MILLISECONDS.sleep(100);
}
}
List<Message> received = receiveMessages(subscription, toSend.size());
List<AckID> ackIDs = received.stream()
.map(Message::getAckID)
.collect(java.util.stream.Collectors.toList());

Assertions.assertEquals(toSend.size(), ackIDs.size(),
"Should collect all AckIDs. Expected: " + toSend.size() + ", Got: " + ackIDs.size());
Expand Down Expand Up @@ -396,7 +384,6 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
*/
@Test
@Timeout(120) // Integration test with multiple subscriptions - allow time for message delivery
@Disabled
public void testSendReceiveTwo() throws Exception {
// Create two subscriptions to the same topic
AbstractSubscription subscription1 = harness.createSubscriptionDriverWithIndex(1);
Expand Down Expand Up @@ -443,22 +430,28 @@ public void testSendReceiveTwo() throws Exception {
* Helper function: Receives messages from a subscription until the expected count is reached.
*/
private List<Message> receiveMessages(AbstractSubscription subscription, int expectedCount) throws InterruptedException {
boolean isRecording = System.getProperty("record") != null;
long timeoutSeconds = isRecording ? 120 : 60;
long timeoutSeconds = 60;
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

List<Message> received = new ArrayList<>();
while (received.size() < expectedCount && System.nanoTime() < deadline) {
try {
try {
while (received.size() < expectedCount && System.nanoTime() < deadline) {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
received.add(r);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
TimeUnit.MILLISECONDS.sleep(100);
// receive() either returns a Message or throws an exception
received.add(r);
}
} catch (Exception e) {
String errorMsg = String.format(
"Failed to receive messages: Got exception after receiving %d/%d messages.",
received.size(), expectedCount);
Assertions.fail(errorMsg, e);
}
// If the loop exits but received count is less than expected, it indicates a timeout occurred.
if (received.size() < expectedCount) {
String errorMsg = String.format(
"Timeout waiting for messages: Received %d/%d messages.",
received.size(), expectedCount);
Assertions.fail(errorMsg);
}
return received;
}
Expand Down
Loading