Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
423bd46
enable batchnack
roseyang62 Jan 24, 2026
f388c87
Empty commit to trigger CI
roseyang62 Jan 25, 2026
e71fb9b
Empty commit to trigger CI
roseyang62 Jan 25, 2026
b1e9df8
fail fast if mapping unmatch
roseyang62 Jan 25, 2026
f17a22c
throw exception
roseyang62 Jan 25, 2026
da2639c
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
50607ec
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
d2bbe2d
enable testBatchNack
roseyang62 Jan 25, 2026
7815d3b
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
b526ee5
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
e1ee8f3
empty commit to trigger ci to reproduce error
roseyang62 Jan 25, 2026
a0f7e19
Merge remote-tracking branch 'upstream/main' into investigate-pubsub-…
roseyang62 Jan 29, 2026
2a6075a
Add error handling
roseyang62 Jan 29, 2026
a549c94
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
454b500
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
e4eaa2f
Merge branch 'main' into investigate-pubsub-conformance-test
roseyang62 Jan 29, 2026
da36b54
update error handling
roseyang62 Jan 29, 2026
0a0549a
Merge branch 'investigate-pubsub-conformance-test' of https://github.…
roseyang62 Jan 29, 2026
e34c1af
empty commit to trigger ci to see if still have error
roseyang62 Jan 29, 2026
db95ac5
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
f0b43e5
empty commit to trigger ci to reproduce error
roseyang62 Jan 29, 2026
ff3752b
move WireMock debugging to teardown phase
roseyang62 Jan 30, 2026
f7f91d9
empty commit to trigger ci to reproduce error
roseyang62 Jan 30, 2026
9e0d22c
empty commit to trigger ci to reproduce error
roseyang62 Jan 30, 2026
8c169f0
empty commit to trigger ci to reproduce error
roseyang62 Jan 30, 2026
ce35044
chore: trigger CI
roseyang62 Feb 2, 2026
84bf1ed
Merge remote-tracking branch 'upstream/main' into investigate-pubsub-…
roseyang62 Feb 2, 2026
94e9698
add resetWireMockScenarios
roseyang62 Feb 25, 2026
2893688
Merge branch 'main' into investigate-pubsub-conformance-test
roseyang62 Feb 25, 2026
db7113b
add resetWireMockScenarios
roseyang62 Feb 25, 2026
9b67bf5
add resetWireMockScenarios
roseyang62 Feb 25, 2026
c20280b
Use Assertions.fail instead of AssertionError
roseyang62 Feb 25, 2026
ef459aa
Merge branch 'main' into investigate-pubsub-conformance-test
roseyang62 Feb 25, 2026
199d364
remove resetscenarios
roseyang62 Feb 26, 2026
f33c0da
Merge remote-tracking branch 'upstream/main' into investigate-pubsub-…
roseyang62 Feb 26, 2026
6701ef4
empty commit to trigger CI
roseyang62 Feb 26, 2026
ccb29a0
Add detailed debug logging for pubsub pull requests to diagnose CI ti…
roseyang62 Feb 26, 2026
a67855c
empty commit to trigger CI and collect debug logs for pubsub timeout …
roseyang62 Feb 26, 2026
0431a68
investigate timeout
roseyang62 Feb 26, 2026
403d8ee
trigger ci
roseyang62 Feb 26, 2026
3dcc189
trigger ci
roseyang62 Feb 26, 2026
f692aca
trigger ci
roseyang62 Feb 26, 2026
de325d4
trigger ci
roseyang62 Feb 27, 2026
3b4ef9e
fix pubsub race condition by limiting prefetch concurrency
roseyang62 Feb 27, 2026
983abde
record
roseyang62 Feb 27, 2026
aa92da5
revert logger
roseyang62 Feb 27, 2026
01c0c82
log more info
roseyang62 Feb 27, 2026
4c29888
try to reproduce error
roseyang62 Feb 27, 2026
a105daa
upload test report
roseyang62 Feb 27, 2026
b57ff5f
upload test report
roseyang62 Feb 27, 2026
400af65
fix yml
roseyang62 Feb 27, 2026
0facde9
revert reports
roseyang62 Feb 27, 2026
89ef3ae
add log to find out which step is stuck
roseyang62 Feb 27, 2026
47ab0a4
revert mapping
roseyang62 Mar 2, 2026
a3cae92
revert mapping
roseyang62 Mar 2, 2026
509b048
trigger CI
roseyang62 Mar 2, 2026
189826d
print more test details
roseyang62 Mar 2, 2026
f63ffb7
re-run
roseyang62 Mar 2, 2026
453ccd5
validate changes
roseyang62 Mar 2, 2026
4caf0fc
resolve conflicts
roseyang62 Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void shutdownWireMockServer() throws Exception {
harness.close();
}

/** Initialize the harness and start recording */
/** Initialize the harness and start recording. */
@BeforeEach
public void setupTestEnvironment(TestInfo testInfo) {
String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("Unknown");
Expand Down Expand Up @@ -255,7 +256,6 @@ public void testBatchAck() throws Exception {
}
}

@Disabled
@Test
@Timeout(120) // Integration test with batch operations - allow time for message delivery
public void testBatchNack() throws Exception {
Expand All @@ -282,23 +282,10 @@ public void testBatchNack() throws Exception {

TimeUnit.MILLISECONDS.sleep(500);

List<AckID> ackIDs = new java.util.ArrayList<>();
boolean isRecording = System.getProperty("record") != null;
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
try {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
TimeUnit.MILLISECONDS.sleep(100);
}
}
List<Message> received = receiveMessages(subscription, toSend.size(), "subscription");
List<AckID> ackIDs = received.stream()
.map(Message::getAckID)
.collect(Collectors.toList());

Assertions.assertEquals(
toSend.size(),
Expand Down Expand Up @@ -429,43 +416,50 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
/** Receive from two subscriptions to the same topic. Verify both get all the messages. */
@Test
@Timeout(120) // Integration test with multiple subscriptions - allow time for message delivery
@Disabled
public void testSendReceiveTwo() throws Exception {
// Create two subscriptions to the same topic
AbstractSubscription subscription1 = harness.createSubscriptionDriverWithIndex(1);
AbstractSubscription subscription2 = harness.createSubscriptionDriverWithIndex(2);

try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription sub1 = subscription1;
AbstractSubscription sub2 = subscription2) {

// Send 3 messages to the topic
List<Message> messagesToSend =
List.of(
Message.builder()
.withBody("fanout-msg1".getBytes())
.withMetadata(Map.of("id", "1"))
.build(),
Message.builder()
.withBody("fanout-msg2".getBytes())
.withMetadata(Map.of("id", "2"))
.build(),
Message.builder()
.withBody("fanout-msg3".getBytes())
.withMetadata(Map.of("id", "3"))
.build());
// Send 3 messages to the topic
List<Message> messagesToSend =
List.of(
Message.builder()
.withBody("fanout-msg1".getBytes())
.withMetadata(Map.of("id", "1"))
.build(),
Message.builder()
.withBody("fanout-msg2".getBytes())
.withMetadata(Map.of("id", "2"))
.build(),
Message.builder()
.withBody("fanout-msg3".getBytes())
.withMetadata(Map.of("id", "3"))
.build());

for (Message message : messagesToSend) {
try (AbstractTopic topic = harness.createTopicDriver()) {
for (int i = 0; i < messagesToSend.size(); i++) {
Message message = messagesToSend.get(i);
System.out.printf("[send] Sending message %d/%d: body=%s, metadata=%s%n",
i + 1, messagesToSend.size(), new String(message.getBody()), message.getMetadata());
topic.send(message);
System.out.printf("[send] Message %d sent successfully%n", i + 1);
}
} // Close topic here to flush pending messages

TimeUnit.MILLISECONDS.sleep(500);
System.out.println("[wait] Sleeping 500ms for message delivery...");
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("[wait] Sleep done, starting receive phase");

try (AbstractSubscription sub1 = subscription1;
AbstractSubscription sub2 = subscription2) {
// Receive messages from both subscriptions
List<Message> received1 = receiveMessages(sub1, messagesToSend.size());
List<Message> received2 = receiveMessages(sub2, messagesToSend.size());
List<Message> received1 = receiveMessages(sub1, messagesToSend.size(), "sub1");
List<Message> received2 = receiveMessages(sub2, messagesToSend.size(), "sub2");

// Verify both subscriptions received all messages
System.out.printf("[verify] sub1 received %d messages, sub2 received %d messages%n",
received1.size(), received2.size());
Assertions.assertEquals(
messagesToSend.size(),
received1.size(),
Expand All @@ -482,35 +476,65 @@ public void testSendReceiveTwo() throws Exception {
+ received2.size());

// Verify messages match for both subscriptions
System.out.println("[verify] Verifying sub1 messages...");
verifyMessages(received1, messagesToSend, "Subscription 1");
System.out.println("[verify] sub1 messages OK");
System.out.println("[verify] Verifying sub2 messages...");
verifyMessages(received2, messagesToSend, "Subscription 2");
System.out.println("[verify] sub2 messages OK");

// Ack all messages from both subscriptions
System.out.println("[ack] Acking sub1 messages...");
ackMessages(sub1, received1);
System.out.println("[ack] sub1 ack done");
System.out.println("[ack] Acking sub2 messages...");
ackMessages(sub2, received2);
System.out.println("[ack] sub2 ack done");
}
}

/** Helper function: Receives messages from a subscription until the expected count is reached. */
private List<Message> receiveMessages(AbstractSubscription subscription, int expectedCount)
private List<Message> receiveMessages(
AbstractSubscription subscription, int expectedCount, String subscriptionId)
throws InterruptedException {
boolean isRecording = System.getProperty("record") != null;
long timeoutSeconds = isRecording ? 120 : 60;
long timeoutSeconds = 60;
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

System.out.printf("[%s] Starting to receive %d messages (timeout=%ds)%n",
subscriptionId, expectedCount, timeoutSeconds);

List<Message> received = new ArrayList<>();
while (received.size() < expectedCount && System.nanoTime() < deadline) {
try {
try {
while (received.size() < expectedCount && System.nanoTime() < deadline) {
long remainingSec = TimeUnit.NANOSECONDS.toSeconds(deadline - System.nanoTime());
System.out.printf("[%s] Calling receive() - got %d/%d so far, %ds remaining%n",
subscriptionId, received.size(), expectedCount, remainingSec);
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
received.add(r);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.printf("[%s] receive() returned message: body=%s, ackID=%s%n",
subscriptionId,
r != null ? new String(r.getBody()) : "null",
r != null ? r.getAckID() : "null");
received.add(r);
}
} catch (Exception e) {
String errorMsg = String.format(
"[%s] Failed to receive messages: Got exception after receiving %d/%d messages."
+ " Exception: %s - %s",
subscriptionId, received.size(), expectedCount,
e.getClass().getSimpleName(), e.getMessage());
System.out.println(errorMsg);
e.printStackTrace(System.out);
Assertions.fail(errorMsg, e);
}
if (received.size() < expectedCount) {
String errorMsg = String.format(
"[%s] Timeout waiting for messages: Received %d/%d messages.",
subscriptionId, received.size(), expectedCount);
System.out.println(errorMsg);
Assertions.fail(errorMsg);
}
System.out.printf("[%s] Successfully received all %d messages%n",
subscriptionId, received.size());
return received;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.salesforce.multicloudj.common.gcp.GcpConstants;
import com.salesforce.multicloudj.common.gcp.util.MockGoogleCredentialsFactory;
import com.salesforce.multicloudj.common.gcp.util.TestsUtilGcp;
import com.salesforce.multicloudj.pubsub.batcher.Batcher;
import com.salesforce.multicloudj.pubsub.client.AbstractPubsubIT;
import com.salesforce.multicloudj.pubsub.driver.AbstractSubscription;
import com.salesforce.multicloudj.pubsub.driver.AbstractTopic;
Expand Down Expand Up @@ -153,7 +154,16 @@ public AbstractSubscription createSubscriptionDriverWithIndex(int index) {

GcpSubscription.Builder subscriptionBuilder =
new GcpSubscription.Builder().withSubscriptionName(fullSubscriptionName);
GcpSubscription sub = new GcpSubscription(subscriptionBuilder, client);
GcpSubscription sub = new GcpSubscription(subscriptionBuilder, client) {
@Override
protected Batcher.Options createReceiveBatcherOptions() {
return new Batcher.Options()
.setMaxHandlers(1)
.setMinBatchSize(1)
.setMaxBatchSize(1)
.setMaxBatchByteSize(0);
}
};

if (index == 0) {
subscription = sub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ public StubMapping transform(StubMapping stub, FileSource files, Parameters para
.anyMatch(s -> s.contains("/subscriptions/") && !s.contains(":"));
boolean isPutMethod = stub.getRequest().getMethod() == RequestMethod.PUT;

// During recording, if the Recorder detects repeated calls to the same endpoint,
// it will usually auto-add `scenarioName`, `requiredScenarioState`, and `newScenarioState` to
// those stubs.
// The call sequence and count during recording often differ from replay
// (due to batching, retries, and timing differences).
stub.setScenarioName(null);
stub.setRequiredScenarioState(null);
stub.setNewScenarioState(null);

if (isAck || isMod) {
List<ContentPattern<?>> bodyPatterns = stub.getRequest().getBodyPatterns();
if (bodyPatterns != null) {
Expand Down
Loading