1818import org .junit .jupiter .api .Timeout ;
1919
2020import java .util .ArrayList ;
21+ import java .util .Arrays ;
22+ import java .util .HashMap ;
2123import java .util .List ;
2224import java .util .Map ;
2325import java .util .concurrent .TimeUnit ;
@@ -29,7 +31,19 @@ public interface Harness extends AutoCloseable {
2931
3032 AbstractTopic createTopicDriver ();
3133
32- AbstractSubscription createSubscriptionDriver ();
34+ default AbstractSubscription createSubscriptionDriver () {
35+ return createSubscriptionDriverWithIndex (0 );
36+ }
37+
38+ /**
39+ * Create a subscription driver with an index suffix.
40+ * This allows creating multiple subscriptions to the same topic.
41+ */
42+ default AbstractSubscription createSubscriptionDriverWithIndex (int index ) {
43+ // Default implementation: just create a subscription driver
44+ // Providers that need index-based naming should override this method
45+ return createSubscriptionDriver ();
46+ }
3347
3448 String getPubsubEndpoint ();
3549
@@ -375,4 +389,124 @@ public void testMultipleSendReceiveWithoutBatch() throws Exception {
375389 }
376390 }
377391 }
392+
393+ /**
394+ * Receive from two subscriptions to the same topic.
395+ * Verify both get all the messages.
396+ */
397+ @ Test
398+ @ Timeout (120 ) // Integration test with multiple subscriptions - allow time for message delivery
399+ public void testSendReceiveTwo () throws Exception {
400+ // Create two subscriptions to the same topic
401+ AbstractSubscription subscription1 = harness .createSubscriptionDriverWithIndex (1 );
402+ AbstractSubscription subscription2 = harness .createSubscriptionDriverWithIndex (2 );
403+
404+ try (AbstractTopic topic = harness .createTopicDriver ();
405+ AbstractSubscription sub1 = subscription1 ;
406+ AbstractSubscription sub2 = subscription2 ) {
407+
408+ // Send 3 messages to the topic
409+ List <Message > messagesToSend = List .of (
410+ Message .builder ().withBody ("fanout-msg1" .getBytes ()).withMetadata (Map .of ("id" , "1" )).build (),
411+ Message .builder ().withBody ("fanout-msg2" .getBytes ()).withMetadata (Map .of ("id" , "2" )).build (),
412+ Message .builder ().withBody ("fanout-msg3" .getBytes ()).withMetadata (Map .of ("id" , "3" )).build ()
413+ );
414+
415+ for (Message message : messagesToSend ) {
416+ topic .send (message );
417+ }
418+
419+ TimeUnit .MILLISECONDS .sleep (500 );
420+
421+ // Receive messages from both subscriptions
422+ List <Message > received1 = receiveMessages (sub1 , messagesToSend .size ());
423+ List <Message > received2 = receiveMessages (sub2 , messagesToSend .size ());
424+
425+ // Verify both subscriptions received all messages
426+ Assertions .assertEquals (messagesToSend .size (), received1 .size (),
427+ "Subscription 1 should receive all " + messagesToSend .size () + " messages. Got: " + received1 .size ());
428+ Assertions .assertEquals (messagesToSend .size (), received2 .size (),
429+ "Subscription 2 should receive all " + messagesToSend .size () + " messages. Got: " + received2 .size ());
430+
431+ // Verify messages match for both subscriptions
432+ verifyMessages (received1 , messagesToSend , "Subscription 1" );
433+ verifyMessages (received2 , messagesToSend , "Subscription 2" );
434+
435+ // Ack all messages from both subscriptions
436+ ackMessages (sub1 , received1 );
437+ ackMessages (sub2 , received2 );
438+ }
439+ }
440+
441+ /**
442+ * Helper function: Receives messages from a subscription until the expected count is reached.
443+ */
444+ private List <Message > receiveMessages (AbstractSubscription subscription , int expectedCount ) throws InterruptedException {
445+ boolean isRecording = System .getProperty ("record" ) != null ;
446+ long timeoutSeconds = isRecording ? 120 : 60 ;
447+ long deadline = System .nanoTime () + TimeUnit .SECONDS .toNanos (timeoutSeconds );
448+
449+ List <Message > received = new ArrayList <>();
450+ while (received .size () < expectedCount && System .nanoTime () < deadline ) {
451+ try {
452+ Message r = subscription .receive ();
453+ if (r != null && r .getAckID () != null ) {
454+ received .add (r );
455+ } else {
456+ TimeUnit .MILLISECONDS .sleep (100 );
457+ }
458+ } catch (Exception e ) {
459+ TimeUnit .MILLISECONDS .sleep (100 );
460+ }
461+ }
462+ return received ;
463+ }
464+
465+ /**
466+ * Helper function: Verifies that received messages match the expected messages.
467+ */
468+ private void verifyMessages (List <Message > received , List <Message > expected , String subscriptionName ) {
469+ if (received .size () != expected .size ()) {
470+ Assertions .fail (String .format ("%s: got %d messages, expected %d" ,
471+ subscriptionName , received .size (), expected .size ()));
472+ }
473+ Map <String , Message > gotByBody = new HashMap <>();
474+ for (Message msg : received ) {
475+ gotByBody .put (new String (msg .getBody ()), msg );
476+ }
477+ for (Message exp : expected ) {
478+ String body = new String (exp .getBody ());
479+ Message got = gotByBody .get (body );
480+ if (got == null ) {
481+ Assertions .fail (subscriptionName + ": missing message: " + body );
482+ }
483+ if (!Arrays .equals (exp .getBody (), got .getBody ())) {
484+ Assertions .fail (subscriptionName + ": body mismatch for " + body );
485+ }
486+ if (exp .getMetadata () != null ) {
487+ for (Map .Entry <String , String > entry : exp .getMetadata ().entrySet ()) {
488+ String expValue = entry .getValue ();
489+ String gotValue = got .getMetadata () != null ? got .getMetadata ().get (entry .getKey ()) : null ;
490+ if (!expValue .equals (gotValue )) {
491+ Assertions .fail (String .format ("%s: metadata[%s] mismatch for %s: expected %s, got %s" ,
492+ subscriptionName , entry .getKey (), body , expValue , gotValue ));
493+ }
494+ }
495+ }
496+ }
497+ }
498+
499+
500+ /**
501+ * Helper function: Acknowledges all messages in the given list.
502+ */
503+ private void ackMessages (AbstractSubscription subscription , List <Message > messages ) {
504+ if (!messages .isEmpty ()) {
505+ List <AckID > ackIDs = new ArrayList <>();
506+ for (Message msg : messages ) {
507+ ackIDs .add (msg .getAckID ());
508+ }
509+ subscription .sendAcks (ackIDs ).join ();
510+ }
511+ }
378512}
0 commit comments