10
10
-include_lib (" eunit/include/eunit.hrl" ).
11
11
-include_lib (" amqp_client/include/amqp_client.hrl" ).
12
12
13
+ -import (rabbit_ct_helpers , [eventually /1 ]).
14
+
13
15
-compile (export_all ).
14
16
15
17
-export ([spawn_suspender_proc /1 ]).
@@ -696,9 +698,11 @@ credit_flow(Config) ->
696
698
5000 ),
697
699
698
700
% % There should be only one process with a message buildup
699
- [{WriterPid , MQLen , _ }, {_ , 0 , _ } | _ ] =
701
+ Top = [{WriterPid , MQLen , _ }, {_ , P , _ } | _ ] =
700
702
rabbit_ct_broker_helpers :rpc (
701
703
Config , 0 , recon , proc_count , [message_queue_len , 10 ]),
704
+ ct :pal (" Top processes by message queue length: ~p " , [Top ]),
705
+ ? assert (P < 3 ),
702
706
703
707
% % The writer process should have only a limited
704
708
% % message queue. The shovel stops sending messages
@@ -725,9 +729,10 @@ credit_flow(Config) ->
725
729
end ,
726
730
5000 ),
727
731
#{messages := 1000 } = message_count (Config , <<" dest" >>),
728
- [{_ , 0 , _ }] =
732
+ [{_ , P , _ }] =
729
733
rabbit_ct_broker_helpers :rpc (
730
734
Config , 0 , recon , proc_count , [message_queue_len , 1 ]),
735
+ ? assert (P < 3 ),
731
736
732
737
% % Status only transitions from flow to running
733
738
% % after a 1 second state-change-interval
@@ -839,9 +844,12 @@ dest_resource_alarm(AckMode, Config) ->
839
844
MsgCnts = message_count (Config , <<" src" >>),
840
845
841
846
% % There should be no process with a message buildup
842
- [{_ , 0 , _ }] =
843
- rabbit_ct_broker_helpers :rpc (
844
- Config , 0 , recon , proc_count , [message_queue_len , 1 ]),
847
+ eventually (? _assertEqual (0 , begin
848
+ Top = [{_ , P , _ }] = rabbit_ct_broker_helpers :rpc (
849
+ Config , 0 , recon , proc_count , [message_queue_len , 1 ]),
850
+ ct :pal (" Top process by message queue length: ~p " , [Top ]),
851
+ P
852
+ end )),
845
853
846
854
% % Clear the resource alarm, all messages should
847
855
% % arrive to the dest queue
0 commit comments