@@ -492,11 +492,11 @@ list_group_consumers_run(Config) ->
492492 #{<<" single-active-consumer" >> => <<" true" >>,
493493 <<" name" >> => ConsumerReference },
494494
495- create_stream (S , Stream1 , C ),
496- subscribe (S , 0 , Stream1 , SubProperties , C ),
497- handle_consumer_update (S , C , 0 ),
498- subscribe (S , 1 , Stream1 , SubProperties , C ),
499- subscribe (S , 2 , Stream1 , SubProperties , C ),
495+ C1 = create_stream (S , Stream1 , C ),
496+ C2 = subscribe (S , 0 , Stream1 , SubProperties , C1 ),
497+ C3 = handle_consumer_update (S , C2 , 0 ),
498+ C4 = subscribe (S , 1 , Stream1 , SubProperties , C3 ),
499+ C5 = subscribe (S , 2 , Stream1 , SubProperties , C4 ),
500500
501501 ? awaitMatch (3 , consumer_count (Config ), ? WAIT ),
502502
@@ -512,11 +512,11 @@ list_group_consumers_run(Config) ->
512512 maps :merge (#{stream => Stream2 , reference => ConsumerReference },
513513 Opts ),
514514
515- create_stream (S , Stream2 , C ),
516- subscribe (S , 3 , Stream2 , SubProperties , C ),
517- handle_consumer_update (S , C , 3 ),
518- subscribe (S , 4 , Stream2 , SubProperties , C ),
519- subscribe (S , 5 , Stream2 , SubProperties , C ),
515+ C6 = create_stream (S , Stream2 , C5 ),
516+ C7 = subscribe (S , 3 , Stream2 , SubProperties , C6 ),
517+ C8 = handle_consumer_update (S , C7 , 3 ),
518+ C9 = subscribe (S , 4 , Stream2 , SubProperties , C8 ),
519+ C10 = subscribe (S , 5 , Stream2 , SubProperties , C9 ),
520520
521521 ? awaitMatch (3 + 3 , consumer_count (Config ), ? WAIT ),
522522
@@ -527,13 +527,13 @@ list_group_consumers_run(Config) ->
527527 [{subscription_id , 5 }, {state , " waiting (connected)" }]],
528528 Consumers2 ),
529529
530- delete_stream (S , Stream1 , C ),
531- delete_stream (S , Stream2 , C ),
530+ C11 = delete_stream (S , Stream1 , C10 ),
531+ C12 = delete_stream (S , Stream2 , C11 ),
532532
533533 {error , not_found } =
534534 ? COMMAND_LIST_GROUP_CONSUMERS :run (Args , OptsGroup2 ),
535535
536- close (S , C ),
536+ close (S , C12 ),
537537 ok .
538538
539539activate_consumer_validate (_ ) ->
@@ -583,18 +583,18 @@ activate_consumer_run(Config) ->
583583 SubProperties = #{<<" single-active-consumer" >> => <<" true" >>,
584584 <<" name" >> => ConsumerReference },
585585
586- create_stream (S , St , C ),
587- subscribe (S , 0 , St , SubProperties , C ),
588- handle_consumer_update (S , C , 0 ),
589- subscribe (S , 1 , St , SubProperties , C ),
590- subscribe (S , 2 , St , SubProperties , C ),
586+ C1 = create_stream (S , St , C ),
587+ C2 = subscribe (S , 0 , St , SubProperties , C1 ),
588+ C3 = handle_consumer_update (S , C2 , 0 ),
589+ C4 = subscribe (S , 1 , St , SubProperties , C3 ),
590+ C5 = subscribe (S , 2 , St , SubProperties , C4 ),
591591
592592 ? awaitMatch (3 , consumer_count (Config ), ? WAIT ),
593593
594594 ? assertEqual (ok , Cmd :run (Args , OptsGroup )),
595595
596- delete_stream (S , St , C ),
597- close (S , C ),
596+ C6 = delete_stream (S , St , C5 ),
597+ close (S , C6 ),
598598 ok .
599599
600600handle_consumer_update (S , C0 , SubId ) ->
@@ -663,30 +663,30 @@ list_stream_tracking_run(Config) ->
663663 {S , C } = start_stream_connection (StreamPort ),
664664 ? awaitMatch (1 , connection_count (Config ), ? WAIT ),
665665
666- create_stream (S , Stream , C ),
666+ C1 = create_stream (S , Stream , C ),
667667
668668 ? assertMatch ([],
669669 ? COMMAND_LIST_STREAM_TRACKING :run (Args , Opts #{all => true })),
670670
671- store_offset (S , Stream , ConsumerReference , 42 , C ),
671+ { ok , C1_1 } = store_offset (S , Stream , ConsumerReference , 42 , C1 ),
672672
673673 ? assertMatch ([[{type ,offset }, {name , ConsumerReference }, {tracking_value , 42 }]],
674674 ? COMMAND_LIST_STREAM_TRACKING :run (Args , Opts #{all => true })),
675675
676676 ? assertMatch ([[{type ,offset }, {name , ConsumerReference }, {tracking_value , 42 }]],
677677 ? COMMAND_LIST_STREAM_TRACKING :run (Args , Opts #{offset => true })),
678678
679- ok = store_offset (S , Stream , ConsumerReference , 55 , C ),
679+ { ok , C1_2 } = store_offset (S , Stream , ConsumerReference , 55 , C1_1 ),
680680 ? assertMatch ([[{type ,offset }, {name , ConsumerReference }, {tracking_value , 55 }]],
681681 ? COMMAND_LIST_STREAM_TRACKING :run (Args , Opts #{offset => true })),
682682
683683
684684 PublisherId = 1 ,
685- rabbit_stream_SUITE :test_declare_publisher (gen_tcp , S , PublisherId ,
686- PublisherReference , Stream , C ),
687- rabbit_stream_SUITE :test_publish_confirm (gen_tcp , S , PublisherId , 42 , <<" " >>, C ),
685+ C2 = rabbit_stream_SUITE :test_declare_publisher (gen_tcp , S , PublisherId ,
686+ PublisherReference , Stream , C1_2 ),
687+ C3 = rabbit_stream_SUITE :test_publish_confirm (gen_tcp , S , PublisherId , 42 , <<" " >>, C2 ),
688688
689- ok = check_publisher_sequence (S , Stream , PublisherReference , 42 , C ),
689+ { ok , C3_1 } = check_publisher_sequence (S , Stream , PublisherReference , 42 , C3 ),
690690
691691 ? assertMatch ([
692692 [{type ,writer },{name ,<<" bar" >>},{tracking_value , 42 }],
@@ -699,18 +699,18 @@ list_stream_tracking_run(Config) ->
699699 ],
700700 ? COMMAND_LIST_STREAM_TRACKING :run (Args , Opts #{writer => true })),
701701
702- rabbit_stream_SUITE :test_publish_confirm (gen_tcp , S , PublisherId , 66 , <<" " >>, C ),
702+ C4 = rabbit_stream_SUITE :test_publish_confirm (gen_tcp , S , PublisherId , 66 , <<" " >>, C3_1 ),
703703
704- ok = check_publisher_sequence (S , Stream , PublisherReference , 66 , C ),
704+ { ok , C4_1 } = check_publisher_sequence (S , Stream , PublisherReference , 66 , C4 ),
705705
706706 ? assertMatch ([
707707 [{type ,writer },{name ,<<" bar" >>},{tracking_value , 66 }]
708708 ],
709709 ? COMMAND_LIST_STREAM_TRACKING :run (Args , Opts #{writer => true })),
710710
711- delete_stream (S , Stream , C ),
711+ C5 = delete_stream (S , Stream , C4_1 ),
712712
713- close (S , C ),
713+ close (S , C5 ),
714714 ok .
715715
716716reset_offset_validate (_ ) ->
@@ -759,17 +759,17 @@ reset_offset_run(Config) ->
759759
760760 Port = rabbit_stream_SUITE :get_stream_port (Config ),
761761 {S , C } = start_stream_connection (Port ),
762- create_stream (S , St , C ),
762+ C1 = create_stream (S , St , C ),
763763
764764 ? assertEqual ({error , no_reference }, Cmd :run (Args , OptsGroup )),
765- store_offset (S , St , Ref , 42 , C ),
765+ { ok , C1_1 } = store_offset (S , St , Ref , 42 , C1 ),
766766
767- check_stored_offset (S , St , Ref , 42 , C ),
767+ { ok , C1_2 } = check_stored_offset (S , St , Ref , 42 , C1_1 ),
768768 ? assertMatch (ok , Cmd :run (Args , OptsGroup )),
769- check_stored_offset (S , St , Ref , 0 , C ),
769+ { ok , C1_3 } = check_stored_offset (S , St , Ref , 0 , C1_2 ),
770770
771- delete_stream (S , St , C ),
772- close (S , C ),
771+ C2 = delete_stream (S , St , C1_3 ),
772+ close (S , C2 ),
773773 ok .
774774
775775add_super_stream_merge_defaults (_Config ) ->
@@ -1082,8 +1082,8 @@ store_offset(S, Stream, Reference, Value, C) ->
10821082 rabbit_stream_core :frame ({store_offset , Reference , Stream , Value }),
10831083 ok = gen_tcp :send (S , StoreOffsetFrame ),
10841084 case check_stored_offset (S , Stream , Reference , Value , C , 20 ) of
1085- ok ->
1086- ok ;
1085+ { ok , CNew } ->
1086+ { ok , CNew } ;
10871087 _ ->
10881088 {error , offset_not_stored }
10891089 end .
@@ -1098,15 +1098,15 @@ check_stored_offset(S, Stream, Reference, Expected, C, Attempt) ->
10981098 QueryOffsetFrame =
10991099 rabbit_stream_core :frame ({request , 1 , {query_offset , Reference , Stream }}),
11001100 ok = gen_tcp :send (S , QueryOffsetFrame ),
1101- {Cmd , _ } = rabbit_stream_SUITE :receive_commands (gen_tcp , S , C ),
1101+ {Cmd , CNew } = rabbit_stream_SUITE :receive_commands (gen_tcp , S , C ),
11021102 ? assertMatch ({response , 1 , {query_offset , ? RESPONSE_CODE_OK , _ }}, Cmd ),
11031103 {response , 1 , {query_offset , ? RESPONSE_CODE_OK , StoredValue }} = Cmd ,
11041104 case StoredValue of
11051105 Expected ->
1106- ok ;
1106+ { ok , CNew } ;
11071107 _ ->
11081108 timer :sleep (50 ),
1109- check_stored_offset (S , Stream , Reference , Expected , C , Attempt - 1 )
1109+ check_stored_offset (S , Stream , Reference , Expected , CNew , Attempt - 1 )
11101110 end .
11111111
11121112check_publisher_sequence (S , Stream , Reference , Expected , C ) ->
@@ -1118,15 +1118,15 @@ check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
11181118 QueryFrame =
11191119 rabbit_stream_core :frame ({request , 1 , {query_publisher_sequence , Reference , Stream }}),
11201120 ok = gen_tcp :send (S , QueryFrame ),
1121- {Cmd , _ } = rabbit_stream_SUITE :receive_commands (gen_tcp , S , C ),
1121+ {Cmd , CNew } = rabbit_stream_SUITE :receive_commands (gen_tcp , S , C ),
11221122 ? assertMatch ({response , 1 , {query_publisher_sequence , _ , _ }}, Cmd ),
11231123 {response , 1 , {query_publisher_sequence , _ , StoredValue }} = Cmd ,
11241124 case StoredValue of
11251125 Expected ->
1126- ok ;
1126+ { ok , CNew } ;
11271127 _ ->
11281128 timer :sleep (50 ),
1129- check_publisher_sequence (S , Stream , Reference , Expected , C , Attempt - 1 )
1129+ check_publisher_sequence (S , Stream , Reference , Expected , CNew , Attempt - 1 )
11301130 end .
11311131
11321132gen_bin (L ) ->
0 commit comments