Skip to content

Commit 50b5d74

Browse files
Merge pull request #12637 from rabbitmq/mergify/bp/v4.0.x/pr-12636
QQ: handle case where a stale read request results in member crash. (backport #12636)
2 parents 0b4ed27 + 05aa3f1 commit 50b5d74

File tree

2 files changed

+87
-9
lines changed

2 files changed

+87
-9
lines changed

deps/rabbit/src/rabbit_fifo.erl

+24-8
Original file line numberDiff line numberDiff line change
@@ -1126,8 +1126,11 @@ handle_aux(_, _, garbage_collection, Aux, RaAux) ->
11261126
handle_aux(_RaState, _, force_checkpoint,
11271127
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
11281128
Ts = erlang:system_time(millisecond),
1129+
#?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux),
1130+
rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b",
1131+
[rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]),
11291132
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
1130-
{no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects};
1133+
{no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects};
11311134
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
11321135
#?STATE{dlx = DlxState,
11331136
cfg = #cfg{dead_letter_handler = DLH,
@@ -2052,25 +2055,38 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}],
20522055
{CTag, CPid} = consumer_id(ConsumerKey, State),
20532056
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
20542057
?DELIVERY_SEND_MSG_OPTS};
2055-
delivery_effect(ConsumerKey, Msgs, State) ->
2058+
delivery_effect(ConsumerKey, Msgs,
2059+
#?STATE{cfg = #cfg{resource = QR}} = State) ->
20562060
{CTag, CPid} = consumer_id(ConsumerKey, State),
2057-
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
2058-
[I | Acc]
2059-
end, [], Msgs),
2061+
{RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
2062+
{[I | Acc], N+1}
2063+
end, {[], 0}, Msgs),
20602064
{log, RaftIdxs,
2061-
fun(Log) ->
2065+
fun (Commands)
2066+
when length(Commands) < Num ->
2067+
%% the mandatory length/1 guard is a bit :(
2068+
rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2069+
"indexes ~w but only ~b were returned. "
2070+
"This is most likely a stale read request "
2071+
"and can be ignored",
2072+
[rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2073+
length(Commands)]),
2074+
[];
2075+
(Commands) ->
20622076
DelMsgs = lists:zipwith(
20632077
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
20642078
{MsgId, {Header, get_msg(Cmd)}}
2065-
end, Log, Msgs),
2079+
end, Commands, Msgs),
20662080
[{send_msg, CPid, {delivery, CTag, DelMsgs},
20672081
?DELIVERY_SEND_MSG_OPTS}]
20682082
end,
20692083
{local, node(CPid)}}.
20702084

20712085
reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
20722086
{log, [RaftIdx],
2073-
fun ([Cmd]) ->
2087+
fun ([]) ->
2088+
[];
2089+
([Cmd]) ->
20742090
[{reply, From, {wrap_reply,
20752091
{dequeue, {MsgId, {Header, get_msg(Cmd)}}, Ready}}}]
20762092
end}.

deps/rabbit/test/quorum_queue_SUITE.erl

+63-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ groups() ->
9595
single_active_consumer_priority,
9696
force_shrink_member_to_current_member,
9797
force_all_queues_shrink_member_to_current_member,
98-
force_vhost_queues_shrink_member_to_current_member
98+
force_vhost_queues_shrink_member_to_current_member,
99+
gh_12635
99100
]
100101
++ all_tests()},
101102
{cluster_size_5, [], [start_queue,
@@ -1302,6 +1303,67 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
13021303
?assertEqual(3, length(Nodes0))
13031304
end || Q <- QQs, VHost <- VHosts].
13041305

1306+
gh_12635(Config) ->
1307+
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
1308+
[Server0, _Server1, Server2] =
1309+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1310+
1311+
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
1312+
[rabbit, quorum_min_checkpoint_interval, 1]),
1313+
1314+
Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
1315+
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
1316+
QQ = ?config(queue_name, Config),
1317+
RaName = ra_name(QQ),
1318+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
1319+
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
1320+
1321+
%% stop member to simulate slow or down member
1322+
ok = rpc:call(Server2, ra, stop_server, [quorum_queues, {RaName, Server2}]),
1323+
1324+
publish_confirm(Ch0, QQ),
1325+
publish_confirm(Ch0, QQ),
1326+
1327+
%% force a checkpoint on leader
1328+
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
1329+
rabbit_ct_helpers:await_condition(
1330+
fun () ->
1331+
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1332+
undefined =/= maps:get(latest_checkpoint_index, Log)
1333+
end),
1334+
1335+
%% publish 1 more message
1336+
publish_confirm(Ch0, QQ),
1337+
1338+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
1339+
%% subscribe then cancel, this will assign the messages against the consumer
1340+
%% but as the member is down they will not be delivered
1341+
qos(Ch2, 100, false),
1342+
subscribe(Ch2, QQ, false),
1343+
rabbit_ct_client_helpers:close_channel(Ch2),
1344+
flush(100),
1345+
%% purge
1346+
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),
1347+
1348+
rabbit_ct_helpers:await_condition(
1349+
fun () ->
1350+
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
1351+
undefined =/= maps:get(snapshot_index, Log)
1352+
end),
1353+
%% restart the down member
1354+
ok = rpc:call(Server2, ra, restart_server, [quorum_queues, {RaName, Server2}]),
1355+
Pid2 = rpc:call(Server2, erlang, whereis, [RaName]),
1356+
?assert(is_pid(Pid2)),
1357+
Ref = erlang:monitor(process, Pid2),
1358+
receive
1359+
{'DOWN',Ref, process,_, _} ->
1360+
ct:fail("unexpected DOWN")
1361+
after 500 ->
1362+
ok
1363+
end,
1364+
flush(1),
1365+
ok.
1366+
13051367
priority_queue_fifo(Config) ->
13061368
%% testing: if hi priority messages are published before lo priority
13071369
%% messages they are always consumed first (fifo)

0 commit comments

Comments
 (0)