44import com .salesforce .multicloudj .pubsub .driver .AbstractTopic ;
55import com .salesforce .multicloudj .pubsub .driver .Message ;
66import com .salesforce .multicloudj .pubsub .driver .AckID ;
7- import com .salesforce .multicloudj .pubsub .client .GetAttributeResult ;
87import com .salesforce .multicloudj .common .util .common .TestsUtil ;
98import com .salesforce .multicloudj .common .exceptions .InvalidArgumentException ;
109
2120import java .util .ArrayList ;
2221import java .util .List ;
2322import java .util .Map ;
24- import java .util .concurrent .TimeUnit ;
2523
2624@ TestInstance (TestInstance .Lifecycle .PER_CLASS )
2725public abstract class AbstractPubsubIT {
@@ -118,14 +116,7 @@ public void testReceiveAfterSend() throws Exception {
118116 .build ();
119117 topic .send (toSend );
120118
121- Message received = null ;
122- for (int i = 0 ; i < 50 ; i ++) {
123- received = subscription .receive ();
124- if (received != null ) {
125- break ;
126- }
127- TimeUnit .MILLISECONDS .sleep (200 );
128- }
119+ Message received = subscription .receive ();
129120
130121 Assertions .assertNotNull (received , "Should receive a message within timeout" );
131122 Assertions .assertNotNull (received .getBody (), "Received message body should not be null" );
@@ -144,12 +135,7 @@ public void testAckAfterReceive() throws Exception {
144135 .build ();
145136 topic .send (toSend );
146137
147- Message received = null ;
148- for (int i = 0 ; i < 50 ; i ++) {
149- received = subscription .receive ();
150- if (received != null ) break ;
151- TimeUnit .MILLISECONDS .sleep (200 );
152- }
138+ Message received = subscription .receive ();
153139
154140 Assertions .assertNotNull (received , "Should receive a message to ack" );
155141 Assertions .assertNotNull (received .getAckID (), "AckID must not be null" );
@@ -169,12 +155,7 @@ public void testNackAfterReceive() throws Exception {
169155 .build ();
170156 topic .send (toSend );
171157
172- Message received = null ;
173- for (int i = 0 ; i < 50 ; i ++) {
174- received = subscription .receive ();
175- if (received != null ) break ;
176- TimeUnit .MILLISECONDS .sleep (200 );
177- }
158+ Message received = subscription .receive ();
178159
179160 Assertions .assertNotNull (received , "Should receive a message to nack" );
180161 Assertions .assertNotNull (received .getAckID (), "AckID must not be null" );
@@ -195,29 +176,12 @@ public void testBatchAck() throws Exception {
195176 );
196177 for (Message m : toSend ) topic .send (m );
197178
198- TimeUnit .MILLISECONDS .sleep (500 );
199-
200179 List <AckID > ackIDs = new java .util .ArrayList <>();
201- boolean isRecording = System .getProperty ("record" ) != null ;
202- long timeoutSeconds = isRecording ? 120 : 60 ; // Increased timeout for integration tests
203- long deadline = System .nanoTime () + TimeUnit .SECONDS .toNanos (timeoutSeconds );
204-
205- System .out .println ("Starting to collect " + toSend .size () + " messages with timeout: " + timeoutSeconds + "s" );
206-
207- while (ackIDs .size () < toSend .size () && System .nanoTime () < deadline ) {
208- try {
209- Message r = subscription .receive ();
210- if (r != null && r .getAckID () != null ) {
211- ackIDs .add (r .getAckID ());
212- System .out .println ("Received message " + ackIDs .size () + "/" + toSend .size () +
213- " with AckID: " + r .getAckID ());
214- } else {
215- System .out .println ("Received null message, waiting..." );
216- TimeUnit .MILLISECONDS .sleep (100 );
217- }
218- } catch (Exception e ) {
219- System .err .println ("Error receiving message: " + e .getMessage ());
220- TimeUnit .MILLISECONDS .sleep (100 );
180+
181+ while (ackIDs .size () < toSend .size ()) {
182+ Message r = subscription .receive ();
183+ if (r != null && r .getAckID () != null ) {
184+ ackIDs .add (r .getAckID ());
221185 }
222186 }
223187
@@ -239,29 +203,12 @@ public void testBatchNack() throws Exception {
239203 );
240204 for (Message m : toSend ) topic .send (m );
241205
242- TimeUnit .MILLISECONDS .sleep (500 );
243-
244206 List <AckID > ackIDs = new java .util .ArrayList <>();
245- boolean isRecording = System .getProperty ("record" ) != null ;
246- long timeoutSeconds = isRecording ? 120 : 60 ; // Increased timeout for integration tests
247- long deadline = System .nanoTime () + TimeUnit .SECONDS .toNanos (timeoutSeconds );
248-
249- System .out .println ("Starting to collect " + toSend .size () + " messages with timeout: " + timeoutSeconds + "s" );
250-
251- while (ackIDs .size () < toSend .size () && System .nanoTime () < deadline ) {
252- try {
253- Message r = subscription .receive ();
254- if (r != null && r .getAckID () != null ) {
255- ackIDs .add (r .getAckID ());
256- System .out .println ("Received message " + ackIDs .size () + "/" + toSend .size () +
257- " with AckID: " + r .getAckID ());
258- } else {
259- System .out .println ("Received null message, waiting..." );
260- TimeUnit .MILLISECONDS .sleep (100 );
261- }
262- } catch (Exception e ) {
263- System .err .println ("Error receiving message: " + e .getMessage ());
264- TimeUnit .MILLISECONDS .sleep (100 );
207+
208+ while (ackIDs .size () < toSend .size ()) {
209+ Message r = subscription .receive ();
210+ if (r != null && r .getAckID () != null ) {
211+ ackIDs .add (r .getAckID ());
265212 }
266213 }
267214
@@ -295,14 +242,11 @@ public void testDoubleAck() throws Exception {
295242 }
296243
297244 List <Message > receivedMessages = new ArrayList <>();
298- long deadline = System .nanoTime () + TimeUnit .SECONDS .toNanos (30 );
299245
300- while (receivedMessages .size () < 3 && System . nanoTime () < deadline ) {
246+ while (receivedMessages .size () < 3 ) {
301247 Message received = subscription .receive ();
302248 if (received != null ) {
303249 receivedMessages .add (received );
304- } else {
305- TimeUnit .MILLISECONDS .sleep (100 );
306250 }
307251 }
308252
@@ -352,7 +296,6 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
352296 AbstractSubscription subscription = harness .createSubscriptionDriver ()) {
353297
354298 int numMessages = 5 ;
355- List <Message > sentMessages = new ArrayList <>();
356299
357300 // Send messages one by one (not in batch)
358301 for (int i = 0 ; i < numMessages ; i ++) {
@@ -361,29 +304,17 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
361304 .withMetadata (Map .of ("index" , String .valueOf (i )))
362305 .build ();
363306 topic .send (message );
364- sentMessages .add (message );
365- TimeUnit .MILLISECONDS .sleep (100 ); // Small delay between sends
366307 }
367308
368- TimeUnit .MILLISECONDS .sleep (500 ); // Allow time for messages to be available
369-
370309 // Receive and ack messages one by one (not in batch)
371310 List <Message > receivedMessages = new ArrayList <>();
372311
373312 while (receivedMessages .size () < numMessages ) {
374- try {
375- Message received = subscription .receive ();
376- if (received != null && received .getAckID () != null ) {
377- receivedMessages .add (received );
378- // Ack immediately after receiving (not in batch)
379- subscription .sendAck (received .getAckID ());
380- System .out .println ("Received and acked message " + receivedMessages .size () + "/" + numMessages );
381- } else {
382- TimeUnit .MILLISECONDS .sleep (100 );
383- }
384- } catch (Exception e ) {
385- System .err .println ("Error receiving message: " + e .getMessage ());
386- TimeUnit .MILLISECONDS .sleep (100 );
313+ Message received = subscription .receive ();
314+ if (received != null && received .getAckID () != null ) {
315+ receivedMessages .add (received );
316+ // Ack immediately after receiving (not in batch)
317+ subscription .sendAck (received .getAckID ());
387318 }
388319 }
389320
0 commit comments