diff --git a/pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java b/pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java index 95f0fad89..b330bd11c 100644 --- a/pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java +++ b/pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -73,7 +74,7 @@ public void shutdownWireMockServer() throws Exception { harness.close(); } - /** Initialize the harness and start recording */ + /** Initialize the harness and start recording. */ @BeforeEach public void setupTestEnvironment(TestInfo testInfo) { String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("Unknown"); @@ -255,7 +256,6 @@ public void testBatchAck() throws Exception { } } - @Disabled @Test @Timeout(120) // Integration test with batch operations - allow time for message delivery public void testBatchNack() throws Exception { @@ -282,23 +282,10 @@ public void testBatchNack() throws Exception { TimeUnit.MILLISECONDS.sleep(500); - List ackIDs = new java.util.ArrayList<>(); - boolean isRecording = System.getProperty("record") != null; - long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests - long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds); - - while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) { - try { - Message r = subscription.receive(); - if (r != null && r.getAckID() != null) { - ackIDs.add(r.getAckID()); - } else { - TimeUnit.MILLISECONDS.sleep(100); - } - } catch (Exception e) { - TimeUnit.MILLISECONDS.sleep(100); - } - } + List received = receiveMessages(subscription, toSend.size(), "subscription"); + List ackIDs = received.stream() + .map(Message::getAckID) + .collect(Collectors.toList()); Assertions.assertEquals( toSend.size(), @@ -429,43 +416,50 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception { /** Receive from two subscriptions to the same topic. Verify both get all the messages. */ @Test @Timeout(120) // Integration test with multiple subscriptions - allow time for message delivery - @Disabled public void testSendReceiveTwo() throws Exception { // Create two subscriptions to the same topic AbstractSubscription subscription1 = harness.createSubscriptionDriverWithIndex(1); AbstractSubscription subscription2 = harness.createSubscriptionDriverWithIndex(2); - try (AbstractTopic topic = harness.createTopicDriver(); - AbstractSubscription sub1 = subscription1; - AbstractSubscription sub2 = subscription2) { - - // Send 3 messages to the topic - List messagesToSend = - List.of( - Message.builder() - .withBody("fanout-msg1".getBytes()) - .withMetadata(Map.of("id", "1")) - .build(), - Message.builder() - .withBody("fanout-msg2".getBytes()) - .withMetadata(Map.of("id", "2")) - .build(), - Message.builder() - .withBody("fanout-msg3".getBytes()) - .withMetadata(Map.of("id", "3")) - .build()); + // Send 3 messages to the topic + List messagesToSend = + List.of( + Message.builder() + .withBody("fanout-msg1".getBytes()) + .withMetadata(Map.of("id", "1")) + .build(), + Message.builder() + .withBody("fanout-msg2".getBytes()) + .withMetadata(Map.of("id", "2")) + .build(), + Message.builder() + .withBody("fanout-msg3".getBytes()) + .withMetadata(Map.of("id", "3")) + .build()); - for (Message message : messagesToSend) { + try (AbstractTopic topic = harness.createTopicDriver()) { + for (int i = 0; i < messagesToSend.size(); i++) { + Message message = messagesToSend.get(i); + System.out.printf("[send] Sending message %d/%d: body=%s, metadata=%s%n", + i + 1, messagesToSend.size(), new String(message.getBody()), message.getMetadata()); topic.send(message); + System.out.printf("[send] Message %d sent successfully%n", i + 1); } + } // Close topic here to flush pending messages - TimeUnit.MILLISECONDS.sleep(500); + System.out.println("[wait] Sleeping 500ms for message delivery..."); + TimeUnit.MILLISECONDS.sleep(500); + System.out.println("[wait] Sleep done, starting receive phase"); + try (AbstractSubscription sub1 = subscription1; + AbstractSubscription sub2 = subscription2) { // Receive messages from both subscriptions - List received1 = receiveMessages(sub1, messagesToSend.size()); - List received2 = receiveMessages(sub2, messagesToSend.size()); + List received1 = receiveMessages(sub1, messagesToSend.size(), "sub1"); + List received2 = receiveMessages(sub2, messagesToSend.size(), "sub2"); // Verify both subscriptions received all messages + System.out.printf("[verify] sub1 received %d messages, sub2 received %d messages%n", + received1.size(), received2.size()); Assertions.assertEquals( messagesToSend.size(), received1.size(), @@ -482,35 +476,65 @@ public void testSendReceiveTwo() throws Exception { + received2.size()); // Verify messages match for both subscriptions + System.out.println("[verify] Verifying sub1 messages..."); verifyMessages(received1, messagesToSend, "Subscription 1"); + System.out.println("[verify] sub1 messages OK"); + System.out.println("[verify] Verifying sub2 messages..."); verifyMessages(received2, messagesToSend, "Subscription 2"); + System.out.println("[verify] sub2 messages OK"); // Ack all messages from both subscriptions + System.out.println("[ack] Acking sub1 messages..."); ackMessages(sub1, received1); + System.out.println("[ack] sub1 ack done"); + System.out.println("[ack] Acking sub2 messages..."); ackMessages(sub2, received2); + System.out.println("[ack] sub2 ack done"); } } /** Helper function: Receives messages from a subscription until the expected count is reached. */ - private List receiveMessages(AbstractSubscription subscription, int expectedCount) + private List receiveMessages( + AbstractSubscription subscription, int expectedCount, String subscriptionId) throws InterruptedException { - boolean isRecording = System.getProperty("record") != null; - long timeoutSeconds = isRecording ? 120 : 60; + long timeoutSeconds = 60; long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds); + System.out.printf("[%s] Starting to receive %d messages (timeout=%ds)%n", + subscriptionId, expectedCount, timeoutSeconds); + List received = new ArrayList<>(); - while (received.size() < expectedCount && System.nanoTime() < deadline) { - try { + try { + while (received.size() < expectedCount && System.nanoTime() < deadline) { + long remainingSec = TimeUnit.NANOSECONDS.toSeconds(deadline - System.nanoTime()); + System.out.printf("[%s] Calling receive() - got %d/%d so far, %ds remaining%n", + subscriptionId, received.size(), expectedCount, remainingSec); Message r = subscription.receive(); - if (r != null && r.getAckID() != null) { - received.add(r); - } else { - TimeUnit.MILLISECONDS.sleep(100); - } - } catch (Exception e) { - TimeUnit.MILLISECONDS.sleep(100); + System.out.printf("[%s] receive() returned message: body=%s, ackID=%s%n", + subscriptionId, + r != null ? new String(r.getBody()) : "null", + r != null ? r.getAckID() : "null"); + received.add(r); } + } catch (Exception e) { + String errorMsg = String.format( + "[%s] Failed to receive messages: Got exception after receiving %d/%d messages." + + " Exception: %s - %s", + subscriptionId, received.size(), expectedCount, + e.getClass().getSimpleName(), e.getMessage()); + System.out.println(errorMsg); + e.printStackTrace(System.out); + Assertions.fail(errorMsg, e); + } + if (received.size() < expectedCount) { + String errorMsg = String.format( + "[%s] Timeout waiting for messages: Received %d/%d messages.", + subscriptionId, received.size(), expectedCount); + System.out.println(errorMsg); + Assertions.fail(errorMsg); } + System.out.printf("[%s] Successfully received all %d messages%n", + subscriptionId, received.size()); return received; } diff --git a/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/GcpPubsubIT.java b/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/GcpPubsubIT.java index b3c0c58d0..37c7040ab 100644 --- a/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/GcpPubsubIT.java +++ b/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/GcpPubsubIT.java @@ -12,6 +12,7 @@ import com.salesforce.multicloudj.common.gcp.GcpConstants; import com.salesforce.multicloudj.common.gcp.util.MockGoogleCredentialsFactory; import com.salesforce.multicloudj.common.gcp.util.TestsUtilGcp; +import com.salesforce.multicloudj.pubsub.batcher.Batcher; import com.salesforce.multicloudj.pubsub.client.AbstractPubsubIT; import com.salesforce.multicloudj.pubsub.driver.AbstractSubscription; import com.salesforce.multicloudj.pubsub.driver.AbstractTopic; @@ -153,7 +154,16 @@ public AbstractSubscription createSubscriptionDriverWithIndex(int index) { GcpSubscription.Builder subscriptionBuilder = new GcpSubscription.Builder().withSubscriptionName(fullSubscriptionName); - GcpSubscription sub = new GcpSubscription(subscriptionBuilder, client); + GcpSubscription sub = new GcpSubscription(subscriptionBuilder, client) { + @Override + protected Batcher.Options createReceiveBatcherOptions() { + return new Batcher.Options() + .setMaxHandlers(1) + .setMinBatchSize(1) + .setMaxBatchSize(1) + .setMaxBatchByteSize(0); + } + }; if (index == 0) { subscription = sub; diff --git a/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/util/AckMatcherRelaxingTransformer.java b/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/util/AckMatcherRelaxingTransformer.java index 2e0038014..d4d35427a 100644 --- a/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/util/AckMatcherRelaxingTransformer.java +++ b/pubsub/pubsub-gcp/src/test/java/com/salesforce/multicloudj/pubsub/gcp/util/AckMatcherRelaxingTransformer.java @@ -45,15 +45,6 @@ public StubMapping transform(StubMapping stub, FileSource files, Parameters para .anyMatch(s -> s.contains("/subscriptions/") && !s.contains(":")); boolean isPutMethod = stub.getRequest().getMethod() == RequestMethod.PUT; - // During recording, if the Recorder detects repeated calls to the same endpoint, - // it will usually auto-add `scenarioName`, `requiredScenarioState`, and `newScenarioState` to - // those stubs. - // The call sequence and count during recording often differ from replay - // (due to batching, retries, and timing differences). - stub.setScenarioName(null); - stub.setRequiredScenarioState(null); - stub.setNewScenarioState(null); - if (isAck || isMod) { List> bodyPatterns = stub.getRequest().getBodyPatterns(); if (bodyPatterns != null) {