Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.salesforce.multicloudj.pubsub.driver.AbstractTopic;
import com.salesforce.multicloudj.pubsub.driver.Message;
import com.salesforce.multicloudj.pubsub.driver.AckID;
import com.salesforce.multicloudj.pubsub.client.GetAttributeResult;
import com.salesforce.multicloudj.common.util.common.TestsUtil;
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;

Expand All @@ -17,12 +16,10 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractPubsubIT {
Expand Down Expand Up @@ -109,7 +106,6 @@ public void testSendBatchMessages() throws Exception {
}

@Test
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
public void testReceiveAfterSend() throws Exception {
try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
Expand All @@ -120,14 +116,7 @@ public void testReceiveAfterSend() throws Exception {
.build();
topic.send(toSend);

Message received = null;
for (int i = 0; i < 50; i++) {
received = subscription.receive();
if (received != null) {
break;
}
TimeUnit.MILLISECONDS.sleep(200);
}
Message received = subscription.receive();

Assertions.assertNotNull(received, "Should receive a message within timeout");
Assertions.assertNotNull(received.getBody(), "Received message body should not be null");
Expand All @@ -136,7 +125,6 @@ public void testReceiveAfterSend() throws Exception {
}

@Test
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
public void testAckAfterReceive() throws Exception {
try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
Expand All @@ -147,12 +135,7 @@ public void testAckAfterReceive() throws Exception {
.build();
topic.send(toSend);

Message received = null;
for (int i = 0; i < 50; i++) {
received = subscription.receive();
if (received != null) break;
TimeUnit.MILLISECONDS.sleep(200);
}
Message received = subscription.receive();

Assertions.assertNotNull(received, "Should receive a message to ack");
Assertions.assertNotNull(received.getAckID(), "AckID must not be null");
Expand All @@ -162,7 +145,6 @@ public void testAckAfterReceive() throws Exception {
}

@Test
@Timeout(30) // Integration test that calls receive() - fail fast if recordings are missing
public void testNackAfterReceive() throws Exception {
try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
Expand All @@ -173,12 +155,7 @@ public void testNackAfterReceive() throws Exception {
.build();
topic.send(toSend);

Message received = null;
for (int i = 0; i < 50; i++) {
received = subscription.receive();
if (received != null) break;
TimeUnit.MILLISECONDS.sleep(200);
}
Message received = subscription.receive();

Assertions.assertNotNull(received, "Should receive a message to nack");
Assertions.assertNotNull(received.getAckID(), "AckID must not be null");
Expand All @@ -199,29 +176,12 @@ public void testBatchAck() throws Exception {
);
for (Message m : toSend) topic.send(m);

TimeUnit.MILLISECONDS.sleep(500);

List<AckID> ackIDs = new java.util.ArrayList<>();
boolean isRecording = System.getProperty("record") != null;
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

System.out.println("Starting to collect " + toSend.size() + " messages with timeout: " + timeoutSeconds + "s");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s remove this System.out.println from the code and use the logger instead.


while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
try {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
System.out.println("Received message " + ackIDs.size() + "/" + toSend.size() +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

" with AckID: " + r.getAckID());
} else {
System.out.println("Received null message, waiting...");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.err.println("Error receiving message: " + e.getMessage());
TimeUnit.MILLISECONDS.sleep(100);

while (ackIDs.size() < toSend.size()) {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
}
}

Expand All @@ -232,7 +192,6 @@ public void testBatchAck() throws Exception {
}

@Test
@Timeout(120) // Integration test with batch operations - allow time for message delivery
public void testBatchNack() throws Exception {
try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
Expand All @@ -244,29 +203,12 @@ public void testBatchNack() throws Exception {
);
for (Message m : toSend) topic.send(m);

TimeUnit.MILLISECONDS.sleep(500);

List<AckID> ackIDs = new java.util.ArrayList<>();
boolean isRecording = System.getProperty("record") != null;
long timeoutSeconds = isRecording ? 120 : 60; // Increased timeout for integration tests
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds);

System.out.println("Starting to collect " + toSend.size() + " messages with timeout: " + timeoutSeconds + "s");

while (ackIDs.size() < toSend.size() && System.nanoTime() < deadline) {
try {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
System.out.println("Received message " + ackIDs.size() + "/" + toSend.size() +
" with AckID: " + r.getAckID());
} else {
System.out.println("Received null message, waiting...");
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.err.println("Error receiving message: " + e.getMessage());
TimeUnit.MILLISECONDS.sleep(100);

while (ackIDs.size() < toSend.size()) {
Message r = subscription.receive();
if (r != null && r.getAckID() != null) {
ackIDs.add(r.getAckID());
}
}

Expand Down Expand Up @@ -300,14 +242,11 @@ public void testDoubleAck() throws Exception {
}

List<Message> receivedMessages = new ArrayList<>();
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);

while (receivedMessages.size() < 3 && System.nanoTime() < deadline) {
while (receivedMessages.size() < 3) {
Message received = subscription.receive();
if (received != null) {
receivedMessages.add(received);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
}

Expand Down Expand Up @@ -351,14 +290,12 @@ public void testGetAttributes() throws Exception {
}

@Test
@Timeout(30)
public void testMultipleSendReceiveWithoutBatch() throws Exception {
Assumptions.assumeFalse(AWS_PROVIDER_ID.equals(harness.getProviderId()));
try (AbstractTopic topic = harness.createTopicDriver();
AbstractSubscription subscription = harness.createSubscriptionDriver()) {

int numMessages = 5;
List<Message> sentMessages = new ArrayList<>();

// Send messages one by one (not in batch)
for (int i = 0; i < numMessages; i++) {
Expand All @@ -367,29 +304,17 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
.withMetadata(Map.of("index", String.valueOf(i)))
.build();
topic.send(message);
sentMessages.add(message);
TimeUnit.MILLISECONDS.sleep(100); // Small delay between sends
}

TimeUnit.MILLISECONDS.sleep(500); // Allow time for messages to be available

// Receive and ack messages one by one (not in batch)
List<Message> receivedMessages = new ArrayList<>();

while (receivedMessages.size() < numMessages) {
try {
Message received = subscription.receive();
if (received != null && received.getAckID() != null) {
receivedMessages.add(received);
// Ack immediately after receiving (not in batch)
subscription.sendAck(received.getAckID());
System.out.println("Received and acked message " + receivedMessages.size() + "/" + numMessages);
} else {
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (Exception e) {
System.err.println("Error receiving message: " + e.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
Message received = subscription.receive();
if (received != null && received.getAckID() != null) {
receivedMessages.add(received);
// Ack immediately after receiving (not in batch)
subscription.sendAck(received.getAckID());
}
}

Expand Down
Loading