Skip to content

Commit 41e7306

Browse files
Merge pull request #13820 from rabbitmq/fix-shovel-test-flakes
Shovel test flakes and logging fixes
2 parents 005bb2c + 0f36610 commit 41e7306

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

deps/amqp_client/src/amqp_channel.erl

+4
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,10 @@ init([Driver, Connection, ChannelNumber, Consumer, Identity]) ->
384384
handle_call(open, From, State) ->
385385
{noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)};
386386
%% @private
387+
handle_call(flush, _From, State) ->
388+
flush_writer(State),
389+
{noreply, State};
390+
%% @private
387391
handle_call({close, Code, Text}, From, State) ->
388392
handle_close(Code, Text, From, State);
389393
%% @private

deps/amqp_client/src/amqp_direct_connection.erl

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ handle_message({'DOWN', _MRef, process, _ConnSup, shutdown}, State) ->
7272
handle_message({'DOWN', _MRef, process, _ConnSup, Reason}, State) ->
7373
{stop, {remote_node_down, Reason}, State};
7474
handle_message({'EXIT', Pid, Reason}, State) ->
75-
{stop, rabbit_misc:format("stopping because dependent process ~tp died: ~tp", [Pid, Reason]), State};
75+
?LOG_INFO("stopping because dependent process ~tp died: ~tp", [Pid, Reason]),
76+
{stop, normal, State};
7677
handle_message(Msg, State) ->
7778
{stop, {unexpected_msg, Msg}, State}.
7879

deps/rabbitmq_shovel/test/dynamic_SUITE.erl

+13-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
-include_lib("eunit/include/eunit.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
1212

13+
-import(rabbit_ct_helpers, [eventually/1]).
14+
1315
-compile(export_all).
1416

1517
-export([spawn_suspender_proc/1]).
@@ -696,9 +698,11 @@ credit_flow(Config) ->
696698
5000),
697699

698700
%% There should be only one process with a message buildup
699-
[{WriterPid, MQLen, _}, {_, 0, _} | _] =
701+
Top = [{WriterPid, MQLen, _}, {_, P, _} | _] =
700702
rabbit_ct_broker_helpers:rpc(
701703
Config, 0, recon, proc_count, [message_queue_len, 10]),
704+
ct:pal("Top processes by message queue length: ~p", [Top]),
705+
?assert(P < 3),
702706

703707
%% The writer process should have only a limited
704708
%% message queue. The shovel stops sending messages
@@ -725,9 +729,10 @@ credit_flow(Config) ->
725729
end,
726730
5000),
727731
#{messages := 1000} = message_count(Config, <<"dest">>),
728-
[{_, 0, _}] =
732+
[{_, P, _}] =
729733
rabbit_ct_broker_helpers:rpc(
730734
Config, 0, recon, proc_count, [message_queue_len, 1]),
735+
?assert(P < 3),
731736

732737
%% Status only transitions from flow to running
733738
%% after a 1 second state-change-interval
@@ -839,9 +844,12 @@ dest_resource_alarm(AckMode, Config) ->
839844
MsgCnts = message_count(Config, <<"src">>),
840845

841846
%% There should be no process with a message buildup
842-
[{_, 0, _}] =
843-
rabbit_ct_broker_helpers:rpc(
844-
Config, 0, recon, proc_count, [message_queue_len, 1]),
847+
eventually(?_assertEqual(0, begin
848+
Top = [{_, P, _}] = rabbit_ct_broker_helpers:rpc(
849+
Config, 0, recon, proc_count, [message_queue_len, 1]),
850+
ct:pal("Top process by message queue length: ~p", [Top]),
851+
P
852+
end)),
845853

846854
%% Clear the resource alarm, all messages should
847855
%% arrive to the dest queue

0 commit comments

Comments
 (0)