Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 243 additions & 23 deletions deps/rabbit/src/mirrored_supervisor.erl

Large diffs are not rendered by default.

37 changes: 36 additions & 1 deletion deps/rabbit/test/mirrored_supervisor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ groups() ->
start_idempotence,
unsupported,
ignore,
startup_failure
startup_failure,
reclaim_orphan
]}
].

Expand Down Expand Up @@ -304,6 +305,34 @@ test_startup_failure(Fail, Group) ->
process_flag(trap_exit, false),
ok.

%% A dynamic child can be left orphaned in Khepri under a dead mirroring
%% process, for example when a takeover failed because Khepri was briefly
%% unavailable. The periodic reconciliation on the sorted-first member must
%% reclaim such a child and start it locally.
reclaim_orphan(Config) ->
passed = rabbit_ct_broker_helpers:rpc(
Config, 0, ?MODULE, reclaim_orphan1, [?config(sup_prefix, Config)]).

reclaim_orphan1(Group) ->
with_sups(
fun([A]) ->
ChildSpec = childspec(worker),
%% Record the child in Khepri as owned by a dead process, without
%% starting it anywhere: this is the orphaned state to recover.
DeadPid = dead_pid(),
start = rabbit_db_msup:create_or_update(
Group, DeadPid, DeadPid, ChildSpec, id(worker)),
%% Trigger reconciliation explicitly rather than waiting for the
%% periodic timer.
Mirroring = ?MS:child(A, mirroring),
Mirroring ! reconcile,
%% The orphan is reclaimed, started locally, and its ownership in
%% Khepri is transferred to the live member.
Pid = pid_of(worker),
true = is_pid(Pid),
{ok, A} = rabbit_db_msup:find_mirror(Group, id(worker))
end, [sup(Group, 1)], Group).

%% ---------------------------------------------------------------------------

with_sups(Fun, Sups, Group) ->
Expand Down Expand Up @@ -387,3 +416,9 @@ init({Strategy, ChildSpecs}) ->

sup(Prefix, Number) ->
rabbit_data_coercion:to_atom(lists:flatten(io_lib:format("~p~p", [Prefix, Number]))).

dead_pid() ->
Pid = spawn(fun() -> ok end),
Ref = erlang:monitor(process, Pid),
receive {'DOWN', Ref, process, Pid, _} -> ok end,
Pid.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ init([]) ->
XLinkSupSup = #{
id => x_links,
start => {rabbit_federation_exchange_link_sup_sup, start_link, []},
restart => transient,
restart => permanent,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules =>[rabbit_federation_exchange_link_sup_sup]
Expand Down
68 changes: 57 additions & 11 deletions deps/rabbitmq_federation_common/src/rabbit_federation_status.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).

-record(state, {}).
-record(state, {monitors = #{}}).
-record(entry, {key, uri, status, timestamp, id, supervisor, upstream}).

start_link() ->
Expand Down Expand Up @@ -52,19 +52,26 @@ lookup(Id) ->
init([]) ->
?ETS_NAME = ets:new(?ETS_NAME,
[named_table, {keypos, #entry.key}, private]),
{ok, #state{}}.
{ok, #state{monitors = #{}}}.

handle_call({remove_exchange_or_queue, XorQName}, _From, State) ->
[link_gone(Entry)
|| Entry <- ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName)))],
{reply, ok, State};
Entries = ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName))),
[link_gone(Entry) || Entry <- Entries],
NewState = lists:foldl(
fun(#entry{supervisor = Sup}, StateAcc) ->
maybe_demonitor(Sup, StateAcc)
end, State, Entries),
{reply, ok, NewState};

handle_call({remove, Upstream, XorQName}, _From, State) ->
case ets:match_object(?ETS_NAME, match_entry(key(XorQName, Upstream))) of
[Entry] -> link_gone(Entry);
[] -> ok
end,
{reply, ok, State};
[Entry] ->
link_gone(Entry),
NewState = maybe_demonitor(Entry#entry.supervisor, State),
{reply, ok, NewState};
[] ->
{reply, ok, State}
end;

handle_call({lookup, Id}, _From, State) ->
Link = case ets:match_object(?ETS_NAME, match_id(Id)) of
Expand Down Expand Up @@ -96,8 +103,24 @@ handle_cast({report, Supervisor, Upstream, #upstream_params{safe_uri = URI},
id = unique_id(Key)},
true = ets:insert(?ETS_NAME, Entry),
rabbit_event:notify(federation_link_status, format(Entry)),
{noreply, State}.

NewState = case maps:is_key(Supervisor, State#state.monitors) of
false ->
Ref = erlang:monitor(process, Supervisor),
State#state{monitors = maps:put(Supervisor, Ref, State#state.monitors)};
true ->
State
end,
{noreply, NewState}.

handle_info({'DOWN', Ref, process, Pid, _Reason}, State) ->
Entries = ets:match_object(?ETS_NAME, match_supervisor(Pid)),
[link_gone(Entry) || Entry <- Entries],
NewState = case maps:take(Pid, State#state.monitors) of
{Ref, Monitors} -> State#state{monitors = Monitors};
{_OtherRef, Monitors} -> State#state{monitors = Monitors};
error -> State
end,
{noreply, NewState};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -176,3 +199,26 @@ match_id(Id) ->
id = Id,
supervisor = '_',
upstream = '_'}.

match_supervisor(Supervisor) ->
#entry{key = '_',
uri = '_',
status = '_',
timestamp = '_',
id = '_',
supervisor = Supervisor,
upstream = '_'}.

maybe_demonitor(Supervisor, State) ->
case ets:match_object(?ETS_NAME, match_supervisor(Supervisor)) of
[] ->
case maps:take(Supervisor, State#state.monitors) of
{Ref, NewMonitors} ->
erlang:demonitor(Ref, [flush]),
State#state{monitors = NewMonitors};
error ->
State
end;
_ ->
State
end.
118 changes: 117 additions & 1 deletion deps/rabbitmq_federation_common/test/unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ all() -> [
terminate_all_shutdown_kills_non_trapping_processes,
terminate_all_timeout_force_kill,
terminate_all_paced_batching,
terminate_all_timeout_kills_remaining_batches
terminate_all_timeout_kills_remaining_batches,
federation_status_removed_when_supervisor_down,
federation_status_demonitors_on_remove,
federation_status_one_monitor_per_supervisor
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -174,3 +177,116 @@ await_all_dead(Pids, AttemptsLeft) ->
timer:sleep(10),
await_all_dead(Pids, AttemptsLeft - 1)
end.

%% A link that goes down without removing itself (for example because its
%% supervisor crashed) must not leave a stale status entry behind.
federation_status_removed_when_supervisor_down(_Config) ->
with_status_server(
fun() ->
Sup = spawn_fake_supervisor(),
report_link(Sup, exchange_upstream(<<"upstream-1">>),
exchange_resource(<<"fed.x">>)),
?assertEqual(1, length(rabbit_federation_status:status())),
kill_and_wait(Sup),
?assertEqual(ok, await(fun() ->
rabbit_federation_status:status() =:= []
end)),
?assertEqual(0, monitor_count())
end).

%% Removing a link explicitly must also drop the monitor of its supervisor, so
%% the monitor table cannot grow without bound.
federation_status_demonitors_on_remove(_Config) ->
with_status_server(
fun() ->
Sup = spawn_fake_supervisor(),
Upstream = exchange_upstream(<<"upstream-1">>),
XName = exchange_resource(<<"fed.x">>),
report_link(Sup, Upstream, XName),
?assertEqual(1, monitor_count()),
ok = rabbit_federation_status:remove(Upstream, XName),
?assertEqual([], rabbit_federation_status:status()),
?assertEqual(0, monitor_count())
end).

%% Several links reported by the same supervisor are monitored only once, and
%% that supervisor going down removes all of its entries.
federation_status_one_monitor_per_supervisor(_Config) ->
with_status_server(
fun() ->
Sup = spawn_fake_supervisor(),
report_link(Sup, exchange_upstream(<<"u1">>),
exchange_resource(<<"fed.x1">>)),
report_link(Sup, exchange_upstream(<<"u2">>),
exchange_resource(<<"fed.x2">>)),
?assertEqual(2, length(rabbit_federation_status:status())),
?assertEqual(1, monitor_count()),
kill_and_wait(Sup),
?assertEqual(ok, await(fun() ->
rabbit_federation_status:status() =:= []
end)),
?assertEqual(0, monitor_count())
end).

exchange_resource(Name) ->
#resource{virtual_host = <<"/">>, kind = exchange, name = Name}.

exchange_upstream(UpstreamName) ->
#upstream{name = UpstreamName, exchange_name = <<"upstream-x">>}.

report_link(Sup, Upstream, XorQName) ->
UParams = #upstream_params{safe_uri = <<"amqp://localhost">>},
gen_server:cast(rabbit_federation_status,
{report, Sup, Upstream, UParams, XorQName, running,
{{2024, 1, 1}, {0, 0, 0}}}),
%% Casts from this process are handled in order, so a synchronous call
%% guarantees the report above has been processed.
_ = rabbit_federation_status:status(),
ok.

%% The status server keeps its supervisor monitors in the single field of its
%% state record, so its size is the number of monitored supervisors.
monitor_count() ->
{state, Monitors} = sys:get_state(rabbit_federation_status),
maps:size(Monitors).

spawn_fake_supervisor() ->
spawn(fun() -> receive stop -> ok end end).

kill_and_wait(Pid) ->
MRef = erlang:monitor(process, Pid),
exit(Pid, kill),
receive {'DOWN', MRef, process, Pid, _} -> ok
after 5000 -> exit(timeout_killing_fake_supervisor)
end.

with_status_server(Fun) ->
stop_status_server(),
{ok, Pid} = rabbit_federation_status:start_link(),
try Fun()
after stop_status_server(Pid)
end.

stop_status_server() ->
case whereis(rabbit_federation_status) of
undefined -> ok;
Pid -> stop_status_server(Pid)
end.

stop_status_server(Pid) ->
unlink(Pid),
MRef = erlang:monitor(process, Pid),
exit(Pid, kill),
receive {'DOWN', MRef, process, Pid, _} -> ok
after 5000 -> ok
end.

await(Fun) -> await(Fun, 500).

await(Fun, 0) ->
case Fun() of true -> ok; false -> {error, timeout} end;
await(Fun, N) ->
case Fun() of
true -> ok;
false -> timer:sleep(10), await(Fun, N - 1)
end.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ status(Chs, ReqData, Context, Filter) ->

status(Node, Chs, Filter) ->
case rpc:call(Node, rabbit_federation_status, status, [], infinity) of
{badrpc, {'EXIT', {undef, _}}} -> [];
{badrpc, {'EXIT', {noproc, _}}} -> [];
Status -> [format(Node, I, Chs) || I <- Status,
filter_status(I, Filter)]
Status when is_list(Status) ->
[format(Node, I, Chs) || I <- Status, filter_status(I, Filter)];
_Error ->
[]
end.

filter_status(_, all) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ init([]) ->
QLinkSupSup = #{
id => q_links,
start => {rabbit_federation_queue_link_sup_sup, start_link, []},
restart => transient,
restart => permanent,
shutdown => ?SUPERVISOR_WAIT,
type => supervisor,
modules => [rabbit_federation_queue_link_sup_sup]
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ supervisor_tree_child_specs(standard) ->
#{
id => rabbit_shovel_dyn_worker_sup_sup,
start => {rabbit_shovel_dyn_worker_sup_sup, start_link, []},
restart => transient,
restart => permanent,
shutdown => 16#ffffffff,
type => supervisor,
modules => [rabbit_shovel_dyn_worker_sup_sup]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ status(ReqData, Context) ->

status(Node) ->
case rpc:call(Node, rabbit_shovel_status, status, [], ?SHOVEL_CALLS_TIMEOUT_MS) of
{badrpc, {'EXIT', _}} ->
[];
Status ->
[format(Node, I) || I <- Status]
Status when is_list(Status) ->
[format(Node, I) || I <- Status];
_Error ->
[]
end.

format(Node, {Name, Type, Info, Metrics, TS}) ->
Expand Down
Loading