Skip to content

Commit beaf28c

Browse files
committed
pubsub: integration test to pull atleast 1 message
1 parent 0ba9ce9 commit beaf28c

29 files changed

+939
-0
lines changed

pubsub/pubsub-client/src/test/java/com/salesforce/multicloudj/pubsub/client/AbstractPubsubIT.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,4 +347,58 @@ public void testGetAttributes() throws Exception {
347347
Assertions.assertNotNull(attributes.getTopic(), "Should have topic attribute");
348348
}
349349
}
350+
351+
@Test
352+
public void testMultipleSendReceiveWithoutBatch() throws Exception {
353+
try (AbstractTopic topic = harness.createTopicDriver();
354+
AbstractSubscription subscription = harness.createSubscriptionDriver()) {
355+
356+
int numMessages = 5;
357+
List<Message> sentMessages = new ArrayList<>();
358+
359+
// Send messages one by one (not in batch)
360+
for (int i = 0; i < numMessages; i++) {
361+
Message message = Message.builder()
362+
.withBody(("non-batch-msg-" + i).getBytes())
363+
.withMetadata(Map.of("index", String.valueOf(i)))
364+
.build();
365+
topic.send(message);
366+
sentMessages.add(message);
367+
TimeUnit.MILLISECONDS.sleep(100); // Small delay between sends
368+
}
369+
370+
TimeUnit.MILLISECONDS.sleep(500); // Allow time for messages to be available
371+
372+
// Receive and ack messages one by one (not in batch)
373+
List<Message> receivedMessages = new ArrayList<>();
374+
375+
while (receivedMessages.size() < numMessages) {
376+
try {
377+
Message received = subscription.receive();
378+
if (received != null && received.getAckID() != null) {
379+
receivedMessages.add(received);
380+
// Ack immediately after receiving (not in batch)
381+
subscription.sendAck(received.getAckID());
382+
System.out.println("Received and acked message " + receivedMessages.size() + "/" + numMessages);
383+
} else {
384+
TimeUnit.MILLISECONDS.sleep(100);
385+
}
386+
} catch (Exception e) {
387+
System.err.println("Error receiving message: " + e.getMessage());
388+
TimeUnit.MILLISECONDS.sleep(100);
389+
}
390+
}
391+
392+
Assertions.assertEquals(numMessages, receivedMessages.size(),
393+
"Should receive all messages. Expected: " + numMessages + ", Got: " + receivedMessages.size());
394+
395+
// Verify all messages were received
396+
for (int i = 0; i < receivedMessages.size(); i++) {
397+
Message received = receivedMessages.get(i);
398+
Assertions.assertNotNull(received, "Received message " + i + " should not be null");
399+
Assertions.assertNotNull(received.getBody(), "Received message " + i + " body should not be null");
400+
Assertions.assertNotNull(received.getAckID(), "Received message " + i + " should have AckID");
401+
}
402+
}
403+
}
350404
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"id" : "6cafbf0f-21f3-4939-ae7e-44e18a5115c1",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_subscriptions_test-subscription",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/subscriptions/test-subscription?$alt=json;enum-encoding%3Dint",
6+
"method" : "GET"
7+
},
8+
"response" : {
9+
"status" : 200,
10+
"body" : "{\n \"name\": \"projects/substrate-sdk-gcp-poc1/subscriptions/test-subscription\",\n \"topic\": \"projects/substrate-sdk-gcp-poc1/topics/test-topic\",\n \"pushConfig\": {},\n \"ackDeadlineSeconds\": 10,\n \"messageRetentionDuration\": \"604800s\",\n \"expirationPolicy\": {\n \"ttl\": \"2678400s\"\n },\n \"state\": 1\n}\n",
11+
"headers" : {
12+
"X-Frame-Options" : "SAMEORIGIN",
13+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
14+
"Server" : "ESF",
15+
"X-Content-Type-Options" : "nosniff",
16+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
17+
"X-XSS-Protection" : "0",
18+
"Date" : "Tue, 09 Dec 2025 22:26:35 GMT",
19+
"Content-Type" : "application/json; charset=UTF-8"
20+
}
21+
},
22+
"uuid" : "6cafbf0f-21f3-4939-ae7e-44e18a5115c1",
23+
"persistent" : true,
24+
"insertionIndex" : 74
25+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"id" : "9a9c8540-4ffb-43a3-b90f-450ce1c9a1ad",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_topics_test-topicpublish",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/topics/test-topic:publish?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.messages"
9+
}, {
10+
"matchesJsonPath" : "$.messages[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{\n \"messageIds\": [\n \"17294729671276765\"\n ]\n}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:29 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "9a9c8540-4ffb-43a3-b90f-450ce1c9a1ad",
28+
"persistent" : true,
29+
"insertionIndex" : 60
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"id" : "1fe0cf57-1ea9-4c96-9bb8-30b7f0f8738c",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_topics_test-topicpublish",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/topics/test-topic:publish?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.messages"
9+
}, {
10+
"matchesJsonPath" : "$.messages[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{\n \"messageIds\": [\n \"17295293079357176\"\n ]\n}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:32 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "1fe0cf57-1ea9-4c96-9bb8-30b7f0f8738c",
28+
"persistent" : true,
29+
"insertionIndex" : 68
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"id" : "00080b5e-ece0-4829-9f5e-a88eef8b0186",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_subscriptions_test-subscriptionpull",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/subscriptions/test-subscription:pull?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.returnImmediately"
9+
}, {
10+
"matchesJsonPath" : "$.maxMessages"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{\n \"receivedMessages\": [\n {\n \"ackId\": \"SDofGScFTF5FLTg1aDwFUUZTBwcrHUQdDmINCncle09eb39kGmpaEwECRFosX14aXD1VXX0NUAcZYExhbcCuwfg0Z3J_WFkeCWpaXnQDUA4Zc3VhdEK2hL7b6ciffkEnNcKojowxcaGnnoVQZig9JxJLLD5-KTkORUZXWgEhHQwEUTsICjJSTl9HMWI2KjUaUBxRGQw7C0Rb\",\n \"message\": {\n \"data\": \"YmF0Y2gtYWNrLTE=\",\n \"attributes\": {\n \"batch\": \"ack\"\n },\n \"messageId\": \"17254262141301801\",\n \"publishTime\": \"2025-12-09T21:57:00Z\"\n }\n }\n ]\n}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:34 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "00080b5e-ece0-4829-9f5e-a88eef8b0186",
28+
"persistent" : true,
29+
"insertionIndex" : 71
30+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"id" : "9ef1eb4f-2fe4-437b-aa37-0e63d2c0a2d5",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_topics_test-topicpublish",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/topics/test-topic:publish?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.messages"
9+
}, {
10+
"matchesJsonPath" : "$.messages[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{\n \"messageIds\": [\n \"17294675313852863\"\n ]\n}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:31 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "9ef1eb4f-2fe4-437b-aa37-0e63d2c0a2d5",
28+
"persistent" : true,
29+
"scenarioName" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-topics-test-topic:publish",
30+
"requiredScenarioState" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-topics-test-topic:publish-2",
31+
"newScenarioState" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-topics-test-topic:publish-3",
32+
"insertionIndex" : 63
33+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"id" : "631b2695-c5b4-4c0a-8bb9-0333557d0d7d",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_subscriptions_test-subscriptionmodifyackdeadline",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/subscriptions/test-subscription:modifyAckDeadline?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.ackIds"
9+
}, {
10+
"matchesJsonPath" : "$.ackIds[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:33 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "631b2695-c5b4-4c0a-8bb9-0333557d0d7d",
28+
"persistent" : true,
29+
"insertionIndex" : 66
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"id" : "d1c8a330-de53-4e43-b95c-4ef37ca67395",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_topics_test-topicpublish",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/topics/test-topic:publish?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.messages"
9+
}, {
10+
"matchesJsonPath" : "$.messages[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{\n \"messageIds\": [\n \"17294878368001388\"\n ]\n}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:33 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "d1c8a330-de53-4e43-b95c-4ef37ca67395",
28+
"persistent" : true,
29+
"insertionIndex" : 72
30+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"id" : "f40d0a90-be16-4157-8d75-299659e1c4e5",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_subscriptions_test-subscriptionmodifyackdeadline",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/subscriptions/test-subscription:modifyAckDeadline?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.ackIds"
9+
}, {
10+
"matchesJsonPath" : "$.ackIds[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:28 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "f40d0a90-be16-4157-8d75-299659e1c4e5",
28+
"persistent" : true,
29+
"scenarioName" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-subscriptions-test-subscription:modifyAckDeadline",
30+
"requiredScenarioState" : "Started",
31+
"newScenarioState" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-subscriptions-test-subscription:modifyAckDeadline-2",
32+
"insertionIndex" : 51
33+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"id" : "a33fead8-1692-474d-beec-6c72cdc730a8",
3+
"name" : "v1_projects_substrate-sdk-gcp-poc1_topics_test-topicpublish",
4+
"request" : {
5+
"url" : "/v1/projects/substrate-sdk-gcp-poc1/topics/test-topic:publish?$alt=json;enum-encoding%3Dint",
6+
"method" : "POST",
7+
"bodyPatterns" : [ {
8+
"matchesJsonPath" : "$.messages"
9+
}, {
10+
"matchesJsonPath" : "$.messages[*]"
11+
} ]
12+
},
13+
"response" : {
14+
"status" : 200,
15+
"body" : "{\n \"messageIds\": [\n \"17295120101046542\"\n ]\n}\n",
16+
"headers" : {
17+
"X-Frame-Options" : "SAMEORIGIN",
18+
"Alt-Svc" : "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
19+
"Server" : "ESF",
20+
"X-Content-Type-Options" : "nosniff",
21+
"Vary" : [ "Origin", "X-Origin", "Referer" ],
22+
"X-XSS-Protection" : "0",
23+
"Date" : "Tue, 09 Dec 2025 22:26:31 GMT",
24+
"Content-Type" : "application/json; charset=UTF-8"
25+
}
26+
},
27+
"uuid" : "a33fead8-1692-474d-beec-6c72cdc730a8",
28+
"persistent" : true,
29+
"scenarioName" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-topics-test-topic:publish",
30+
"requiredScenarioState" : "scenario-1-v1-projects-substrate-sdk-gcp-poc1-topics-test-topic:publish-3",
31+
"insertionIndex" : 62
32+
}

0 commit comments

Comments
 (0)