From 1be8234442d1dcec9f46b317dd5466b5dcda394c Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Mon, 8 Jun 2026 18:26:41 +0200 Subject: [PATCH 01/12] federation: remove status entries when a link goes down uncleanly The federation status server keeps an ETS entry per link, reported by the link supervisor. When a link went away without removing itself - for example because its supervisor crashed - the entry was never deleted, so the link was left behind in the status table (typically stuck in a "starting" state) and kept being reported by the status API. Monitor each supervisor that reports a status, once per supervisor, and on its 'DOWN' drop every status entry it owns. Conversely, demonitor a supervisor once its last entry has been removed explicitly, so the monitor table does not grow without bound. --- .../src/rabbit_federation_status.erl | 68 ++++++++-- .../test/unit_SUITE.erl | 118 +++++++++++++++++- 2 files changed, 174 insertions(+), 12 deletions(-) diff --git a/deps/rabbitmq_federation_common/src/rabbit_federation_status.erl b/deps/rabbitmq_federation_common/src/rabbit_federation_status.erl index 5a2aac82afd1..9629d79cb539 100644 --- a/deps/rabbitmq_federation_common/src/rabbit_federation_status.erl +++ b/deps/rabbitmq_federation_common/src/rabbit_federation_status.erl @@ -23,7 +23,7 @@ -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). --record(state, {}). +-record(state, {monitors = #{}}). -record(entry, {key, uri, status, timestamp, id, supervisor, upstream}). start_link() -> @@ -52,19 +52,26 @@ lookup(Id) -> init([]) -> ?ETS_NAME = ets:new(?ETS_NAME, [named_table, {keypos, #entry.key}, private]), - {ok, #state{}}. + {ok, #state{monitors = #{}}}. handle_call({remove_exchange_or_queue, XorQName}, _From, State) -> - [link_gone(Entry) - || Entry <- ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName)))], - {reply, ok, State}; + Entries = ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName))), + [link_gone(Entry) || Entry <- Entries], + NewState = lists:foldl( + fun(#entry{supervisor = Sup}, StateAcc) -> + maybe_demonitor(Sup, StateAcc) + end, State, Entries), + {reply, ok, NewState}; handle_call({remove, Upstream, XorQName}, _From, State) -> case ets:match_object(?ETS_NAME, match_entry(key(XorQName, Upstream))) of - [Entry] -> link_gone(Entry); - [] -> ok - end, - {reply, ok, State}; + [Entry] -> + link_gone(Entry), + NewState = maybe_demonitor(Entry#entry.supervisor, State), + {reply, ok, NewState}; + [] -> + {reply, ok, State} + end; handle_call({lookup, Id}, _From, State) -> Link = case ets:match_object(?ETS_NAME, match_id(Id)) of @@ -96,8 +103,24 @@ handle_cast({report, Supervisor, Upstream, #upstream_params{safe_uri = URI}, id = unique_id(Key)}, true = ets:insert(?ETS_NAME, Entry), rabbit_event:notify(federation_link_status, format(Entry)), - {noreply, State}. - + NewState = case maps:is_key(Supervisor, State#state.monitors) of + false -> + Ref = erlang:monitor(process, Supervisor), + State#state{monitors = maps:put(Supervisor, Ref, State#state.monitors)}; + true -> + State + end, + {noreply, NewState}. + +handle_info({'DOWN', Ref, process, Pid, _Reason}, State) -> + Entries = ets:match_object(?ETS_NAME, match_supervisor(Pid)), + [link_gone(Entry) || Entry <- Entries], + NewState = case maps:take(Pid, State#state.monitors) of + {Ref, Monitors} -> State#state{monitors = Monitors}; + {_OtherRef, Monitors} -> State#state{monitors = Monitors}; + error -> State + end, + {noreply, NewState}; handle_info(_Info, State) -> {noreply, State}. @@ -176,3 +199,26 @@ match_id(Id) -> id = Id, supervisor = '_', upstream = '_'}. + +match_supervisor(Supervisor) -> + #entry{key = '_', + uri = '_', + status = '_', + timestamp = '_', + id = '_', + supervisor = Supervisor, + upstream = '_'}. + +maybe_demonitor(Supervisor, State) -> + case ets:match_object(?ETS_NAME, match_supervisor(Supervisor)) of + [] -> + case maps:take(Supervisor, State#state.monitors) of + {Ref, NewMonitors} -> + erlang:demonitor(Ref, [flush]), + State#state{monitors = NewMonitors}; + error -> + State + end; + _ -> + State + end. diff --git a/deps/rabbitmq_federation_common/test/unit_SUITE.erl b/deps/rabbitmq_federation_common/test/unit_SUITE.erl index 4197ef0e87e3..f4f2d02a9143 100644 --- a/deps/rabbitmq_federation_common/test/unit_SUITE.erl +++ b/deps/rabbitmq_federation_common/test/unit_SUITE.erl @@ -26,7 +26,10 @@ all() -> [ terminate_all_shutdown_kills_non_trapping_processes, terminate_all_timeout_force_kill, terminate_all_paced_batching, - terminate_all_timeout_kills_remaining_batches + terminate_all_timeout_kills_remaining_batches, + federation_status_removed_when_supervisor_down, + federation_status_demonitors_on_remove, + federation_status_one_monitor_per_supervisor ]. init_per_suite(Config) -> @@ -174,3 +177,116 @@ await_all_dead(Pids, AttemptsLeft) -> timer:sleep(10), await_all_dead(Pids, AttemptsLeft - 1) end. + +%% A link that goes down without removing itself (for example because its +%% supervisor crashed) must not leave a stale status entry behind. +federation_status_removed_when_supervisor_down(_Config) -> + with_status_server( + fun() -> + Sup = spawn_fake_supervisor(), + report_link(Sup, exchange_upstream(<<"upstream-1">>), + exchange_resource(<<"fed.x">>)), + ?assertEqual(1, length(rabbit_federation_status:status())), + kill_and_wait(Sup), + ?assertEqual(ok, await(fun() -> + rabbit_federation_status:status() =:= [] + end)), + ?assertEqual(0, monitor_count()) + end). + +%% Removing a link explicitly must also drop the monitor of its supervisor, so +%% the monitor table cannot grow without bound. +federation_status_demonitors_on_remove(_Config) -> + with_status_server( + fun() -> + Sup = spawn_fake_supervisor(), + Upstream = exchange_upstream(<<"upstream-1">>), + XName = exchange_resource(<<"fed.x">>), + report_link(Sup, Upstream, XName), + ?assertEqual(1, monitor_count()), + ok = rabbit_federation_status:remove(Upstream, XName), + ?assertEqual([], rabbit_federation_status:status()), + ?assertEqual(0, monitor_count()) + end). + +%% Several links reported by the same supervisor are monitored only once, and +%% that supervisor going down removes all of its entries. +federation_status_one_monitor_per_supervisor(_Config) -> + with_status_server( + fun() -> + Sup = spawn_fake_supervisor(), + report_link(Sup, exchange_upstream(<<"u1">>), + exchange_resource(<<"fed.x1">>)), + report_link(Sup, exchange_upstream(<<"u2">>), + exchange_resource(<<"fed.x2">>)), + ?assertEqual(2, length(rabbit_federation_status:status())), + ?assertEqual(1, monitor_count()), + kill_and_wait(Sup), + ?assertEqual(ok, await(fun() -> + rabbit_federation_status:status() =:= [] + end)), + ?assertEqual(0, monitor_count()) + end). + +exchange_resource(Name) -> + #resource{virtual_host = <<"/">>, kind = exchange, name = Name}. + +exchange_upstream(UpstreamName) -> + #upstream{name = UpstreamName, exchange_name = <<"upstream-x">>}. + +report_link(Sup, Upstream, XorQName) -> + UParams = #upstream_params{safe_uri = <<"amqp://localhost">>}, + gen_server:cast(rabbit_federation_status, + {report, Sup, Upstream, UParams, XorQName, running, + {{2024, 1, 1}, {0, 0, 0}}}), + %% Casts from this process are handled in order, so a synchronous call + %% guarantees the report above has been processed. + _ = rabbit_federation_status:status(), + ok. + +%% The status server keeps its supervisor monitors in the single field of its +%% state record, so its size is the number of monitored supervisors. +monitor_count() -> + {state, Monitors} = sys:get_state(rabbit_federation_status), + maps:size(Monitors). + +spawn_fake_supervisor() -> + spawn(fun() -> receive stop -> ok end end). + +kill_and_wait(Pid) -> + MRef = erlang:monitor(process, Pid), + exit(Pid, kill), + receive {'DOWN', MRef, process, Pid, _} -> ok + after 5000 -> exit(timeout_killing_fake_supervisor) + end. + +with_status_server(Fun) -> + stop_status_server(), + {ok, Pid} = rabbit_federation_status:start_link(), + try Fun() + after stop_status_server(Pid) + end. + +stop_status_server() -> + case whereis(rabbit_federation_status) of + undefined -> ok; + Pid -> stop_status_server(Pid) + end. + +stop_status_server(Pid) -> + unlink(Pid), + MRef = erlang:monitor(process, Pid), + exit(Pid, kill), + receive {'DOWN', MRef, process, Pid, _} -> ok + after 5000 -> ok + end. + +await(Fun) -> await(Fun, 500). + +await(Fun, 0) -> + case Fun() of true -> ok; false -> {error, timeout} end; +await(Fun, N) -> + case Fun() of + true -> ok; + false -> timer:sleep(10), await(Fun, N - 1) + end. From a03464ec11709b1e5f5a0b820de8a2e76f00cee6 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 11 Jun 2026 11:02:42 +0200 Subject: [PATCH 02/12] federation: always restart the exchange and queue link supervisors The top-level federation link supervisors (`x_links` and `q_links`) were registered as `transient` children of their respective federation supervisors. A `transient` child is only restarted after an abnormal exit, so whenever one of these supervisors stopped with reason `normal` or `shutdown` - for example when a peer asked it to die as part of the mirrored supervisor failover protocol - it was not restarted. The node was then left with no federation links until the next full restart. Mark both supervisors as `permanent` so they are always restarted regardless of the exit reason. --- .../src/rabbit_exchange_federation_sup.erl | 2 +- .../src/rabbit_queue_federation_sup.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_exchange_federation/src/rabbit_exchange_federation_sup.erl b/deps/rabbitmq_exchange_federation/src/rabbit_exchange_federation_sup.erl index 715043545bd3..5020f1bb1147 100644 --- a/deps/rabbitmq_exchange_federation/src/rabbit_exchange_federation_sup.erl +++ b/deps/rabbitmq_exchange_federation/src/rabbit_exchange_federation_sup.erl @@ -55,7 +55,7 @@ init([]) -> XLinkSupSup = #{ id => x_links, start => {rabbit_federation_exchange_link_sup_sup, start_link, []}, - restart => transient, + restart => permanent, shutdown => ?SUPERVISOR_WAIT, type => supervisor, modules =>[rabbit_federation_exchange_link_sup_sup] diff --git a/deps/rabbitmq_queue_federation/src/rabbit_queue_federation_sup.erl b/deps/rabbitmq_queue_federation/src/rabbit_queue_federation_sup.erl index 006e1d5a2add..39848a1678ba 100644 --- a/deps/rabbitmq_queue_federation/src/rabbit_queue_federation_sup.erl +++ b/deps/rabbitmq_queue_federation/src/rabbit_queue_federation_sup.erl @@ -55,7 +55,7 @@ init([]) -> QLinkSupSup = #{ id => q_links, start => {rabbit_federation_queue_link_sup_sup, start_link, []}, - restart => transient, + restart => permanent, shutdown => ?SUPERVISOR_WAIT, type => supervisor, modules => [rabbit_federation_queue_link_sup_sup] From 35cb3e65c83175487aecd4cc8f5f58d0c60fa558 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 11 Jun 2026 11:03:10 +0200 Subject: [PATCH 03/12] mirrored_supervisor: reclaim orphaned children after a failed takeover When a mirrored supervisor process goes down, the "first" surviving member of the group takes over its children in the `DOWN` handler. If that takeover fails - for example because Khepri is temporarily unavailable or electing a new leader - the child specs are left in Khepri still owned by the dead process. The restarted supervisor only boots its static `initial_childspecs`, so dynamic children such as federation links stay orphaned and are never restarted, sometimes leaving a link stuck without a running process. Add a periodic reconciliation loop. On the sorted-first active member of each group it scans Khepri for child specs whose owner is no longer a group member, takes over their ownership and restarts them locally. It also stops any local child the store says is owned by another node, so that exactly one node runs each child. The loop runs on every node and must tolerate Khepri being briefly unavailable - precisely the condition it recovers from - so the whole body is wrapped in a `try/catch`: any error is logged and retried on the next tick rather than crashing the supervisor. --- deps/rabbit/src/mirrored_supervisor.erl | 88 ++++++++++++++++++- .../rabbit/test/mirrored_supervisor_SUITE.erl | 37 +++++++- 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index 6b1f09a1545d..d2f97e435a97 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -7,7 +7,12 @@ -module(mirrored_supervisor). +-compile(nowarn_deprecated_catch). + +-include_lib("khepri/include/khepri.hrl"). +-include("include/rabbit_khepri.hrl"). -include_lib("kernel/include/logger.hrl"). +-include("mirrored_supervisor.hrl"). %% Mirrored Supervisor %% =================== @@ -273,7 +278,9 @@ handle_call({init, Overall}, _From, Results = [maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs], mirrored_supervisor_locks:unlock(LockId), case errors(Results) of - [] -> {reply, ok, State1}; + [] -> + erlang:send_after(10000, self(), reconcile), + {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -363,9 +370,88 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, Errors -> {stop, {shutdown, Errors}, State} end; +handle_info(reconcile, State = #state{overall = Overall, + delegate = Delegate, + group = Group}) when Overall =/= undefined -> + reconcile_children(Group, Overall, Delegate), + erlang:send_after(30000, self(), reconcile), + {noreply, State}; + handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. +reconcile_children(Group, Overall, Delegate) -> + %% This runs periodically on every node, so it must tolerate Khepri being + %% temporarily unavailable (for example during a leader election or a + %% partition), which is precisely when this code is most needed. Any error + %% is logged and swallowed; the next reconcile tick retries. + try + do_reconcile_children(Group, Overall, Delegate) + catch + Class:Reason:Stacktrace -> + ?LOG_WARNING("Mirrored supervisor: reconciliation of group ~tp failed, will retry on the next tick: ~tp:~tp~n~tp", + [Group, Class, Reason, Stacktrace]), + ok + end. + +do_reconcile_children(Group, Overall, Delegate) -> + case ?SUPERVISOR:which_children(Delegate) of + Children when is_list(Children) -> + %% Stop any local child the metadata store says is owned by another + %% node, so that exactly one node runs each child. + [case rabbit_db_msup:find_mirror(Group, Id) of + {ok, Owner} when Owner =/= Overall -> + ?LOG_WARNING("Mirrored supervisor: child ~tp in group ~tp is owned by another node ~tp (we are ~tp), stopping local instance", + [Id, Group, node(Owner), node()]), + catch ?SUPERVISOR:terminate_child(Delegate, Id), + catch ?SUPERVISOR:delete_child(Delegate, Id); + _ -> + ok + end + || {Id, Pid, _, _} <- Children, is_pid(Pid)], + reclaim_orphans(Group, Overall, Delegate); + _ -> + ok + end. + +reclaim_orphans(Group, Overall, Delegate) -> + ActiveMembers = pg:get_members(Group), + case lists:sort(ActiveMembers) of + [Overall | _] -> + Pattern = #mirrored_sup_childspec{key = {Group, '_'}, _ = '_'}, + Conditions = [?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}], + PathPattern = rabbit_db_msup:khepri_mirrored_supervisor_path( + ?KHEPRI_WILDCARD_STAR, + #if_all{conditions = Conditions}), + case rabbit_khepri:get_many(PathPattern) of + {ok, Map} -> + [case S0 of + #mirrored_sup_childspec{mirroring_pid = OwnerPid, childspec = ChildSpec, key = {Group, Id}} -> + case lists:member(OwnerPid, ActiveMembers) of + false -> + ?LOG_NOTICE("Mirrored supervisor: reclaiming orphan child ~tp in group ~tp (previous owner ~tp was dead/unreachable)", + [Id, Group, OwnerPid]), + NewS = S0#mirrored_sup_childspec{mirroring_pid = Overall}, + case rabbit_khepri:put(Path, NewS) of + ok -> + catch ?SUPERVISOR:start_child(Delegate, ChildSpec); + _ -> + ok + end; + true -> + ok + end; + _ -> + ok + end + || {Path, S0} <- maps:to_list(Map)]; + _ -> + ok + end; + _ -> + ok + end. + terminate(_Reason, _State) -> ok. diff --git a/deps/rabbit/test/mirrored_supervisor_SUITE.erl b/deps/rabbit/test/mirrored_supervisor_SUITE.erl index 680b6ed83d97..5e5d77e0e599 100644 --- a/deps/rabbit/test/mirrored_supervisor_SUITE.erl +++ b/deps/rabbit/test/mirrored_supervisor_SUITE.erl @@ -36,7 +36,8 @@ groups() -> start_idempotence, unsupported, ignore, - startup_failure + startup_failure, + reclaim_orphan ]} ]. @@ -304,6 +305,34 @@ test_startup_failure(Fail, Group) -> process_flag(trap_exit, false), ok. +%% A dynamic child can be left orphaned in Khepri under a dead mirroring +%% process, for example when a takeover failed because Khepri was briefly +%% unavailable. The periodic reconciliation on the sorted-first member must +%% reclaim such a child and start it locally. +reclaim_orphan(Config) -> + passed = rabbit_ct_broker_helpers:rpc( + Config, 0, ?MODULE, reclaim_orphan1, [?config(sup_prefix, Config)]). + +reclaim_orphan1(Group) -> + with_sups( + fun([A]) -> + ChildSpec = childspec(worker), + %% Record the child in Khepri as owned by a dead process, without + %% starting it anywhere: this is the orphaned state to recover. + DeadPid = dead_pid(), + start = rabbit_db_msup:create_or_update( + Group, DeadPid, DeadPid, ChildSpec, id(worker)), + %% Trigger reconciliation explicitly rather than waiting for the + %% periodic timer. + Mirroring = ?MS:child(A, mirroring), + Mirroring ! reconcile, + %% The orphan is reclaimed, started locally, and its ownership in + %% Khepri is transferred to the live member. + Pid = pid_of(worker), + true = is_pid(Pid), + {ok, A} = rabbit_db_msup:find_mirror(Group, id(worker)) + end, [sup(Group, 1)], Group). + %% --------------------------------------------------------------------------- with_sups(Fun, Sups, Group) -> @@ -387,3 +416,9 @@ init({Strategy, ChildSpecs}) -> sup(Prefix, Number) -> rabbit_data_coercion:to_atom(lists:flatten(io_lib:format("~p~p", [Prefix, Number]))). + +dead_pid() -> + Pid = spawn(fun() -> ok end), + Ref = erlang:monitor(process, Pid), + receive {'DOWN', Ref, process, Pid, _} -> ok end, + Pid. From 16701259ae766124045d6b0bd96685a9154c435f Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 11 Jun 2026 11:03:25 +0200 Subject: [PATCH 04/12] mirrored_supervisor: stop children on the minority side of a partition During a network partition both sides of a mirrored supervisor group kept their children running, producing duplicates - for example two copies of the same federation link draining the same upstream. The majority side should own the children; the minority side should give them up. Extend the reconciliation loop to detect when the local node is in a minority partition, that is when fewer than a strict majority of the cluster members are reachable, and stop its local children in that case. The majority side then reclaims them through the orphan reconciliation. Clusters with fewer than three members never stop children this way, since they cannot establish a majority, and an even split stops both sides until the partition heals. --- deps/rabbit/src/mirrored_supervisor.erl | 45 ++++++++++++++++++------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index d2f97e435a97..f8f5946b8557 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -397,19 +397,38 @@ reconcile_children(Group, Overall, Delegate) -> do_reconcile_children(Group, Overall, Delegate) -> case ?SUPERVISOR:which_children(Delegate) of Children when is_list(Children) -> - %% Stop any local child the metadata store says is owned by another - %% node, so that exactly one node runs each child. - [case rabbit_db_msup:find_mirror(Group, Id) of - {ok, Owner} when Owner =/= Overall -> - ?LOG_WARNING("Mirrored supervisor: child ~tp in group ~tp is owned by another node ~tp (we are ~tp), stopping local instance", - [Id, Group, node(Owner), node()]), - catch ?SUPERVISOR:terminate_child(Delegate, Id), - catch ?SUPERVISOR:delete_child(Delegate, Id); - _ -> - ok - end - || {Id, Pid, _, _} <- Children, is_pid(Pid)], - reclaim_orphans(Group, Overall, Delegate); + Members = rabbit_nodes:list_members(), + TotalSize = length(Members), + Reachable = rabbit_nodes:list_reachable(), + %% A strict majority of cluster members must be reachable to keep + %% children running locally. On an even split (for example 2|2 of + %% four nodes) neither side reaches the majority, so both sides stop + %% their children; the majority side, once one exists again, reclaims + %% them on partition heal. Clusters with fewer than three members + %% never stop children this way, since they cannot have a majority. + IsMinority = (TotalSize >= 3) andalso (length(Reachable) < (TotalSize div 2) + 1), + case IsMinority of + true -> + [begin + ?LOG_WARNING("Mirrored supervisor: node is in a minority partition (~tp/~tp nodes reachable), stopping local instance of child ~tp in group ~tp", + [length(Reachable), TotalSize, Id, Group]), + catch ?SUPERVISOR:terminate_child(Delegate, Id), + catch ?SUPERVISOR:delete_child(Delegate, Id) + end + || {Id, Pid, _, _} <- Children, is_pid(Pid)]; + false -> + [case rabbit_db_msup:find_mirror(Group, Id) of + {ok, Owner} when Owner =/= Overall -> + ?LOG_WARNING("Mirrored supervisor: child ~tp in group ~tp is owned by another node ~tp (we are ~tp), stopping local instance", + [Id, Group, node(Owner), node()]), + catch ?SUPERVISOR:terminate_child(Delegate, Id), + catch ?SUPERVISOR:delete_child(Delegate, Id); + _ -> + ok + end + || {Id, Pid, _, _} <- Children, is_pid(Pid)], + reclaim_orphans(Group, Overall, Delegate) + end; _ -> ok end. From c4f870239316788d46ceaa480b95cd0ecb5933a2 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 11 Jun 2026 12:51:04 +0200 Subject: [PATCH 05/12] mirrored_supervisor: only delete all child records on a genuine single-node start On initialisation, when a mirroring process found no peers in its process group, it deleted every child record for the group from Khepri. Because Khepri is cluster-wide, this removed the records backing children running on all other nodes, not just stale local ones. An empty group is not a reliable signal that this node is alone: a peer's mirroring process may simply not have joined the group yet, or this node's supervisor may be restarting while the rest of the cluster keeps running its children. In particular, when a supervisor restarts after reaching its maximum restart intensity (for example following Khepri timeouts during a partition), it re-initialises with an empty group for a moment and wipes every node's records. The children are then gone from the store, so orphan reconciliation has nothing left to restart and federation links stop running cluster-wide. Only delete the records when this node is the sole reachable cluster member, i.e. a genuine single-node (re)start. Otherwise leave any genuinely orphaned records to the reconciliation loop, which restarts them on the majority side. --- deps/rabbit/src/mirrored_supervisor.erl | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index f8f5946b8557..0c33c603f023 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -262,11 +262,18 @@ handle_call({init, Overall}, _From, Rest = pg:get_members(Group) -- [Overall], Nodes = [node(M) || M <- Rest], ?LOG_DEBUG("Mirrored supervisor: known group ~tp members: ~tp on nodes ~tp", [Group, Rest, Nodes]), - case Rest of - [] -> - ?LOG_DEBUG("Mirrored supervisor: no known peer members in group ~tp, will delete all child records for it", [Group]), + %% An empty group is not enough to justify deleting all child records: a + %% peer's mirroring process may simply not have (re)joined the group yet, + %% and the records (owned by other nodes) may back children that are still + %% running cluster-wide. Deleting them here would wipe those children's + %% records on every node. Only clear the records when this node is the sole + %% reachable cluster member, i.e. a genuine single-node (re)start; otherwise + %% leave any genuinely orphaned records to the reconciliation loop. + case Rest =:= [] andalso rabbit_nodes:list_reachable() =:= [node()] of + true -> + ?LOG_DEBUG("Mirrored supervisor: no known peer members in group ~tp and no other reachable cluster members, will delete all child records for it", [Group]), delete_all(Group); - _ -> ok + false -> ok end, [begin ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}), From fb9995fff214943f6999a4456743a4718fcdda7e Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 11 Jun 2026 12:54:02 +0200 Subject: [PATCH 06/12] mirrored_supervisor: do not crash failover on transient Khepri errors When a peer mirroring process goes down, the first member of the group takes over its children, which requires writing to Khepri. If that write failed or timed out, the handler stopped the mirroring process with `{shutdown, [timeout]}`. This is most likely precisely when a peer goes down: losing a node can trigger a Khepri leader election during which writes time out. The mirroring process then restarts, fails again, and quickly reaches its maximum restart intensity, tearing down the whole link supervisor for the group. Wrap the failover in a `try` and treat update or child-start failures as transient: log them and keep running rather than stopping. The periodic reconciliation loop takes over the orphaned children once Khepri is reachable again. The successful failover path is unchanged, so children still migrate immediately in the common case. --- deps/rabbit/src/mirrored_supervisor.erl | 53 +++++++++++++++++-------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index 0c33c603f023..50a504583933 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -358,24 +358,43 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, group = Group, overall = O, child_order = ChildOrder}) -> + %% A peer mirroring process went down. If we sort first in the group, take + %% over its children. This touches Khepri, which can be temporarily + %% unavailable, for example during the leader election triggered by the + %% very partition that caused this 'DOWN'. We must not crash here: crashing + %% restarts the mirroring process and, on repeated failures, exhausts the + %% supervisor's restart intensity and tears down the whole group. Instead + %% we log and let the periodic reconciliation loop take over the orphaned + %% children once Khepri is reachable again. + %% %% No guarantee pg will have received the DOWN before us. - R = case lists:sort(pg:get_members(Group)) -- [Pid] of - [O | _] -> ChildSpecs = update_all(O, Pid), - case ChildSpecs of - _ when is_list(ChildSpecs) -> - [start(Delegate, ChildSpec) - || ChildSpec <- restore_child_order( - ChildSpecs, - ChildOrder)]; - {error, _} -> - [ChildSpecs] - end; - _ -> [] - end, - case errors(R) of - [] -> {noreply, State}; - Errors -> {stop, {shutdown, Errors}, State} - end; + try + case lists:sort(pg:get_members(Group)) -- [Pid] of + [O | _] -> + case update_all(O, Pid) of + ChildSpecs when is_list(ChildSpecs) -> + Results = [start(Delegate, ChildSpec) + || ChildSpec <- restore_child_order( + ChildSpecs, ChildOrder)], + case errors(Results) of + [] -> ok; + Errors -> + ?LOG_WARNING("Mirrored supervisor: failover in group ~tp could not start some children, reconciliation will retry: ~tp", + [Group, Errors]) + end; + {error, UpdateError} -> + ?LOG_WARNING("Mirrored supervisor: failover in group ~tp failed (~tp), reconciliation will retry", + [Group, UpdateError]) + end; + _ -> + ok + end + catch + Class:CatchReason:Stacktrace -> + ?LOG_WARNING("Mirrored supervisor: failover in group ~tp raised ~tp:~tp, reconciliation will retry~n~tp", + [Group, Class, CatchReason, Stacktrace]) + end, + {noreply, State}; handle_info(reconcile, State = #state{overall = Overall, delegate = Delegate, From 66621662bb89d7d0136734f54003576e91f641c6 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 11 Jun 2026 16:02:41 +0200 Subject: [PATCH 07/12] mirrored_supervisor: address Dialyzer warnings in reconciliation code Clean up the reconciliation code added for orphan reclamation and minority handling: - Drop the `is_list/1` guard and the dead `_ -> ok` clause on `supervisor2:which_children/1`, whose return is always a list. - Use `lists:foreach/2` instead of side-effecting list comprehensions whose list result was discarded, so the functions return `ok` and no longer trigger `unmatched_returns`. No behaviour change. --- deps/rabbit/src/mirrored_supervisor.erl | 116 ++++++++++++------------ 1 file changed, 60 insertions(+), 56 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index 50a504583933..a9c6c5e61886 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -421,42 +421,41 @@ reconcile_children(Group, Overall, Delegate) -> end. do_reconcile_children(Group, Overall, Delegate) -> - case ?SUPERVISOR:which_children(Delegate) of - Children when is_list(Children) -> - Members = rabbit_nodes:list_members(), - TotalSize = length(Members), - Reachable = rabbit_nodes:list_reachable(), - %% A strict majority of cluster members must be reachable to keep - %% children running locally. On an even split (for example 2|2 of - %% four nodes) neither side reaches the majority, so both sides stop - %% their children; the majority side, once one exists again, reclaims - %% them on partition heal. Clusters with fewer than three members - %% never stop children this way, since they cannot have a majority. - IsMinority = (TotalSize >= 3) andalso (length(Reachable) < (TotalSize div 2) + 1), - case IsMinority of - true -> - [begin - ?LOG_WARNING("Mirrored supervisor: node is in a minority partition (~tp/~tp nodes reachable), stopping local instance of child ~tp in group ~tp", - [length(Reachable), TotalSize, Id, Group]), - catch ?SUPERVISOR:terminate_child(Delegate, Id), - catch ?SUPERVISOR:delete_child(Delegate, Id) - end - || {Id, Pid, _, _} <- Children, is_pid(Pid)]; - false -> - [case rabbit_db_msup:find_mirror(Group, Id) of - {ok, Owner} when Owner =/= Overall -> - ?LOG_WARNING("Mirrored supervisor: child ~tp in group ~tp is owned by another node ~tp (we are ~tp), stopping local instance", - [Id, Group, node(Owner), node()]), - catch ?SUPERVISOR:terminate_child(Delegate, Id), - catch ?SUPERVISOR:delete_child(Delegate, Id); - _ -> - ok - end - || {Id, Pid, _, _} <- Children, is_pid(Pid)], - reclaim_orphans(Group, Overall, Delegate) - end; - _ -> - ok + Children = ?SUPERVISOR:which_children(Delegate), + RunningIds = [Id || {Id, Pid, _, _} <- Children, is_pid(Pid)], + Members = rabbit_nodes:list_members(), + TotalSize = length(Members), + Reachable = rabbit_nodes:list_reachable(), + %% A strict majority of cluster members must be reachable to keep children + %% running locally. On an even split (for example 2|2 of four nodes) neither + %% side reaches the majority, so both sides stop their children; the + %% majority side, once one exists again, reclaims them on partition heal. + %% Clusters with fewer than three members never stop children this way, + %% since they cannot have a majority. + IsMinority = (TotalSize >= 3) andalso (length(Reachable) < (TotalSize div 2) + 1), + case IsMinority of + true -> + lists:foreach( + fun(Id) -> + ?LOG_WARNING("Mirrored supervisor: node is in a minority partition (~tp/~tp nodes reachable), stopping local instance of child ~tp in group ~tp", + [length(Reachable), TotalSize, Id, Group]), + catch ?SUPERVISOR:terminate_child(Delegate, Id), + catch ?SUPERVISOR:delete_child(Delegate, Id) + end, RunningIds); + false -> + lists:foreach( + fun(Id) -> + case rabbit_db_msup:find_mirror(Group, Id) of + {ok, Owner} when Owner =/= Overall -> + ?LOG_WARNING("Mirrored supervisor: child ~tp in group ~tp is owned by another node ~tp (we are ~tp), stopping local instance", + [Id, Group, node(Owner), node()]), + catch ?SUPERVISOR:terminate_child(Delegate, Id), + catch ?SUPERVISOR:delete_child(Delegate, Id); + _ -> + ok + end + end, RunningIds), + reclaim_orphans(Group, Overall, Delegate) end. reclaim_orphans(Group, Overall, Delegate) -> @@ -470,26 +469,31 @@ reclaim_orphans(Group, Overall, Delegate) -> #if_all{conditions = Conditions}), case rabbit_khepri:get_many(PathPattern) of {ok, Map} -> - [case S0 of - #mirrored_sup_childspec{mirroring_pid = OwnerPid, childspec = ChildSpec, key = {Group, Id}} -> - case lists:member(OwnerPid, ActiveMembers) of - false -> - ?LOG_NOTICE("Mirrored supervisor: reclaiming orphan child ~tp in group ~tp (previous owner ~tp was dead/unreachable)", - [Id, Group, OwnerPid]), - NewS = S0#mirrored_sup_childspec{mirroring_pid = Overall}, - case rabbit_khepri:put(Path, NewS) of - ok -> - catch ?SUPERVISOR:start_child(Delegate, ChildSpec); - _ -> - ok - end; - true -> - ok - end; - _ -> - ok - end - || {Path, S0} <- maps:to_list(Map)]; + lists:foreach( + %% get_many already restricts results to this group via + %% the #if_data_matches condition, so the key's group + %% element is always Group here. + fun({Path, #mirrored_sup_childspec{mirroring_pid = OwnerPid, + childspec = ChildSpec, + key = {_, Id}} = S0}) -> + case lists:member(OwnerPid, ActiveMembers) of + false -> + ?LOG_NOTICE("Mirrored supervisor: reclaiming orphan child ~tp in group ~tp (previous owner ~tp was dead/unreachable)", + [Id, Group, OwnerPid]), + NewS = S0#mirrored_sup_childspec{mirroring_pid = Overall}, + case rabbit_khepri:put(Path, NewS) of + ok -> + catch ?SUPERVISOR:start_child(Delegate, ChildSpec), + ok; + _ -> + ok + end; + true -> + ok + end; + (_) -> + ok + end, maps:to_list(Map)); _ -> ok end; From 910fc0ca32343b40fc54d1b97e54f98d14f5d82c Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 12 Jun 2026 10:44:08 +0200 Subject: [PATCH 08/12] mirrored_supervisor: restart children a node owns but is not running The reconciliation loop reclaimed children owned by dead or unreachable nodes, but never handled a child a node owns in the store yet is not actually running locally. A node ends up in this state after leaving a minority partition: while in the minority it stops its children but, having no quorum, cannot give up their ownership in the store. If no majority node reclaims them before the partition heals, the owner is alive again, so the records are no longer orphans and nothing ever restarts the children. The result is records owned by a live node with no running child anywhere - observed with Shovel after a partition healed: every shovel was owned by the formerly-partitioned node but none were running. Add a reconciliation step, run on every node, that starts any child this node owns in the store but is not currently running locally. This complements the existing orphan reclamation (dead owners) and conflict resolution (children owned by another node) to keep the store and the running children consistent. --- deps/rabbit/src/mirrored_supervisor.erl | 91 ++++++++++++++++--------- 1 file changed, 59 insertions(+), 32 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index a9c6c5e61886..b287d23685bc 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -455,48 +455,75 @@ do_reconcile_children(Group, Overall, Delegate) -> ok end end, RunningIds), + %% Restart children we own but are not running, then reclaim those + %% owned by dead or unreachable nodes. + restart_owned_children(Group, Overall, Delegate, RunningIds), reclaim_orphans(Group, Overall, Delegate) end. +%% Reads all child spec records for the group from the store, as {Path, Record} +%% pairs. The #if_data_matches condition restricts the results to this group, so +%% the key's group element is always Group. +group_childspecs(Group) -> + Pattern = #mirrored_sup_childspec{key = {Group, '_'}, _ = '_'}, + Conditions = [?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}], + PathPattern = rabbit_db_msup:khepri_mirrored_supervisor_path( + ?KHEPRI_WILDCARD_STAR, + #if_all{conditions = Conditions}), + case rabbit_khepri:get_many(PathPattern) of + {ok, Map} -> + [{Path, S} || {Path, S} <- maps:to_list(Map), + is_record(S, mirrored_sup_childspec)]; + _ -> + [] + end. + +%% Ensure every child this node owns in the store is actually running locally. +%% A node can own a child without running it after it has left a minority +%% partition: while in the minority it stopped its children but could not give +%% up ownership in the store (it has no quorum), and if no majority node +%% reclaimed them before the partition healed, the owner is alive again so they +%% are not orphans and would otherwise never be restarted. +restart_owned_children(Group, Overall, Delegate, RunningIds) -> + lists:foreach( + fun({_Path, #mirrored_sup_childspec{mirroring_pid = OwnerPid, + childspec = ChildSpec, + key = {_, Id}}}) -> + case OwnerPid =:= Overall andalso not lists:member(Id, RunningIds) of + true -> + ?LOG_NOTICE("Mirrored supervisor: restarting child ~tp in group ~tp which this node owns but is not running locally", + [Id, Group]), + catch ?SUPERVISOR:start_child(Delegate, ChildSpec), + ok; + false -> + ok + end + end, group_childspecs(Group)). + reclaim_orphans(Group, Overall, Delegate) -> ActiveMembers = pg:get_members(Group), case lists:sort(ActiveMembers) of [Overall | _] -> - Pattern = #mirrored_sup_childspec{key = {Group, '_'}, _ = '_'}, - Conditions = [?KHEPRI_WILDCARD_STAR_STAR, #if_data_matches{pattern = Pattern}], - PathPattern = rabbit_db_msup:khepri_mirrored_supervisor_path( - ?KHEPRI_WILDCARD_STAR, - #if_all{conditions = Conditions}), - case rabbit_khepri:get_many(PathPattern) of - {ok, Map} -> - lists:foreach( - %% get_many already restricts results to this group via - %% the #if_data_matches condition, so the key's group - %% element is always Group here. - fun({Path, #mirrored_sup_childspec{mirroring_pid = OwnerPid, - childspec = ChildSpec, - key = {_, Id}} = S0}) -> - case lists:member(OwnerPid, ActiveMembers) of - false -> - ?LOG_NOTICE("Mirrored supervisor: reclaiming orphan child ~tp in group ~tp (previous owner ~tp was dead/unreachable)", - [Id, Group, OwnerPid]), - NewS = S0#mirrored_sup_childspec{mirroring_pid = Overall}, - case rabbit_khepri:put(Path, NewS) of - ok -> - catch ?SUPERVISOR:start_child(Delegate, ChildSpec), - ok; - _ -> - ok - end; - true -> + lists:foreach( + fun({Path, #mirrored_sup_childspec{mirroring_pid = OwnerPid, + childspec = ChildSpec, + key = {_, Id}} = S0}) -> + case lists:member(OwnerPid, ActiveMembers) of + false -> + ?LOG_NOTICE("Mirrored supervisor: reclaiming orphan child ~tp in group ~tp (previous owner ~tp was dead/unreachable)", + [Id, Group, OwnerPid]), + NewS = S0#mirrored_sup_childspec{mirroring_pid = Overall}, + case rabbit_khepri:put(Path, NewS) of + ok -> + catch ?SUPERVISOR:start_child(Delegate, ChildSpec), + ok; + _ -> ok end; - (_) -> + true -> ok - end, maps:to_list(Map)); - _ -> - ok - end; + end + end, group_childspecs(Group)); _ -> ok end. From d307c969969e83bb22e317b711d8f718bd2baa7b Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 12 Jun 2026 10:44:19 +0200 Subject: [PATCH 09/12] shovel: always restart the dynamic shovel supervisor The dynamic Shovel supervisor (rabbit_shovel_dyn_worker_sup_sup, a mirrored supervisor) was a transient child of rabbit_shovel_sup. As with the federation link supervisors, a transient child is not restarted after a normal or shutdown exit, so if it stopped that way - for example after reaching its restart intensity during Khepri timeouts in a partition, or because a peer asked it to die - it would not come back and the node would run no shovels until a full restart. Its restart intensity is low ({one_for_one, 3, 10}), making this more likely than for federation. Mark it permanent so it is always restarted, mirroring the federation fix. --- deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl index 25204a59ea68..abdb399aaf51 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_sup.erl @@ -58,7 +58,7 @@ supervisor_tree_child_specs(standard) -> #{ id => rabbit_shovel_dyn_worker_sup_sup, start => {rabbit_shovel_dyn_worker_sup_sup, start_link, []}, - restart => transient, + restart => permanent, shutdown => 16#ffffffff, type => supervisor, modules => [rabbit_shovel_dyn_worker_sup_sup] From 249d0a984d9022b1ce666a632233ef0a57ba7e58 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 12 Jun 2026 11:32:22 +0200 Subject: [PATCH 10/12] mirrored_supervisor: reconcile promptly on group membership changes Failover is event-driven: when a peer goes down, the first surviving member takes over its children immediately. Minority-stop and conflict resolution, however, only ran on the 30s periodic reconciliation. So after a partition healed, a formerly-isolated node kept running (and reporting) its children until its next periodic tick noticed they were now owned by another node and stopped them - a window of up to 30s during which a federation link or shovel ran on two nodes at once. For shovels in particular this means duplicate message movement, not just duplicate reporting. Monitor the process group with pg:monitor/1 and, on a join (partition heal) or leave (partition), schedule a single debounced reconciliation. A short debounce coalesces bursts of membership events, and the membership-triggered reconciliation does not disturb the periodic timer, which remains as a backstop. As a result the minority side sheds its children shortly after losing quorum, and the rejoining side drops any duplicate shortly after healing, shrinking the duplicate window from up to the periodic interval to a couple of seconds. This lives in the shared mirrored_supervisor, so federation and shovel both benefit. --- deps/rabbit/src/mirrored_supervisor.erl | 46 +++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index b287d23685bc..0e4855a99b2d 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -103,6 +103,11 @@ -define(GEN_SERVER, gen_server2). -define(SUP_MODULE, mirrored_supervisor_sups). +%% Periodic reconciliation interval, and a short debounce used to coalesce +%% reconciliations triggered by process group membership changes. +-define(RECONCILE_INTERVAL, 30000). +-define(RECONCILE_DEBOUNCE, 2000). + -export([start_link/3, start_link/4, start_child/2, restart_child/2, delete_child/2, terminate_child/2, @@ -121,7 +126,12 @@ group, tx_fun, initial_childspecs, - child_order}). + child_order, + %% Reference returned by pg:monitor/1 for the group. + pg_ref, + %% Whether a debounced, membership-triggered reconciliation is + %% already scheduled, to coalesce bursts of pg events. + reconcile_scheduled = false}). %%-------------------------------------------------------------------------- %% Callback behaviour @@ -258,6 +268,11 @@ handle_call({init, Overall}, _From, LockId = mirrored_supervisor_locks:lock(Group), maybe_log_lock_acquisition_failure(LockId, Group), ok = pg:join(Group, Overall), + %% Watch group membership so that partitions and heals trigger a prompt + %% reconciliation instead of waiting for the next periodic tick. This + %% shrinks the window during which a child runs (and is reported) on more + %% than one node after a partition heals. + {PgRef, _} = pg:monitor(Group), ?LOG_DEBUG("Mirrored supervisor: initializing, overall supervisor ~tp joined group ~tp", [Overall, Group]), Rest = pg:get_members(Group) -- [Overall], Nodes = [node(M) || M <- Rest], @@ -281,7 +296,7 @@ handle_call({init, Overall}, _From, end || Pid <- Rest], Delegate = delegate(Overall), erlang:monitor(process, Delegate), - State1 = State#state{overall = Overall, delegate = Delegate}, + State1 = State#state{overall = Overall, delegate = Delegate, pg_ref = PgRef}, Results = [maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs], mirrored_supervisor_locks:unlock(LockId), case errors(Results) of @@ -400,9 +415,34 @@ handle_info(reconcile, State = #state{overall = Overall, delegate = Delegate, group = Group}) when Overall =/= undefined -> reconcile_children(Group, Overall, Delegate), - erlang:send_after(30000, self(), reconcile), + erlang:send_after(?RECONCILE_INTERVAL, self(), reconcile), {noreply, State}; +%% A debounced reconciliation triggered by a group membership change. Unlike +%% the periodic 'reconcile' it does not reschedule the periodic timer; it just +%% reconciles once and clears the debounce flag. +handle_info(reconcile_now, State = #state{overall = Overall, + delegate = Delegate, + group = Group}) when Overall =/= undefined -> + reconcile_children(Group, Overall, Delegate), + {noreply, State#state{reconcile_scheduled = false}}; + +%% Group membership changed (a peer joined on partition heal, or left on +%% partition). Schedule a single debounced reconciliation so that the minority +%% side sheds its children promptly and the majority side stops any duplicate +%% it has taken over, instead of waiting for the next periodic tick. +handle_info({PgRef, Event, _Group, _Pids}, + State = #state{pg_ref = PgRef, overall = Overall, + reconcile_scheduled = Scheduled}) + when (Event =:= join orelse Event =:= leave) -> + case Overall =/= undefined andalso not Scheduled of + true -> + erlang:send_after(?RECONCILE_DEBOUNCE, self(), reconcile_now), + {noreply, State#state{reconcile_scheduled = true}}; + false -> + {noreply, State} + end; + handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. From 15274dcd857f7832fed7c1b81508a4ef50b6859f Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 12 Jun 2026 12:44:05 +0200 Subject: [PATCH 11/12] mirrored_supervisor: skip failover on the minority side of a partition When a peer's mirroring process goes down, the first surviving member takes over its children via update_all/2, which writes to Khepri. On the minority side of a partition there is no quorum, so that write blocks the mirroring gen_server until it times out (~30s). While blocked, the process handles no other message - including the reconciliation that is meant to stop this node's children because it is in the minority. So minority-stop was delayed by the full Khepri timeout, and if the partition healed sooner it never ran at all, leaving the isolated node running duplicates of children the majority had already taken over (observed: a partitioned node kept all its shovels and federation links, and they stayed on it after the partition healed). Check for a minority partition before attempting the takeover and skip it when in the minority: such a node must not take over children (the majority owns them) and cannot (no quorum). This frees the gen_server to process the membership-triggered reconciliation promptly, so the minority side sheds its children within the debounce window instead of after a 30s stall or not at all. The minority test is factored into partition_status/0 and shared with the reconciliation loop. It relies only on Erlang distribution and locally-known membership, both of which were confirmed to stay responsive and accurate on a partitioned node (unlike Khepri writes). --- deps/rabbit/src/mirrored_supervisor.erl | 68 ++++++++++++++++--------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/deps/rabbit/src/mirrored_supervisor.erl b/deps/rabbit/src/mirrored_supervisor.erl index 0e4855a99b2d..20245a66541c 100644 --- a/deps/rabbit/src/mirrored_supervisor.erl +++ b/deps/rabbit/src/mirrored_supervisor.erl @@ -386,20 +386,33 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, try case lists:sort(pg:get_members(Group)) -- [Pid] of [O | _] -> - case update_all(O, Pid) of - ChildSpecs when is_list(ChildSpecs) -> - Results = [start(Delegate, ChildSpec) - || ChildSpec <- restore_child_order( - ChildSpecs, ChildOrder)], - case errors(Results) of - [] -> ok; - Errors -> - ?LOG_WARNING("Mirrored supervisor: failover in group ~tp could not start some children, reconciliation will retry: ~tp", - [Group, Errors]) - end; - {error, UpdateError} -> - ?LOG_WARNING("Mirrored supervisor: failover in group ~tp failed (~tp), reconciliation will retry", - [Group, UpdateError]) + case partition_status() of + {true, _, _} -> + %% We are in a minority partition. We must not take over + %% the children (the majority side owns them), and we + %% cannot: update_all writes to Khepri, which has no + %% quorum here, so it would block this process until it + %% times out - stalling every other message, including + %% the reconciliation that is supposed to stop our local + %% children. Skip the failover; reconciliation handles + %% the minority case. + ok; + {false, _, _} -> + case update_all(O, Pid) of + ChildSpecs when is_list(ChildSpecs) -> + Results = [start(Delegate, ChildSpec) + || ChildSpec <- restore_child_order( + ChildSpecs, ChildOrder)], + case errors(Results) of + [] -> ok; + Errors -> + ?LOG_WARNING("Mirrored supervisor: failover in group ~tp could not start some children, reconciliation will retry: ~tp", + [Group, Errors]) + end; + {error, UpdateError} -> + ?LOG_WARNING("Mirrored supervisor: failover in group ~tp failed (~tp), reconciliation will retry", + [Group, UpdateError]) + end end; _ -> ok @@ -446,6 +459,20 @@ handle_info({PgRef, Event, _Group, _Pids}, handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. +%% Determine whether this node is in a minority partition, along with the +%% reachable and total cluster member counts (for logging). A strict majority +%% of cluster members must be reachable; clusters smaller than three members +%% never qualify, since they cannot establish a majority. On an even split +%% (for example 2|2 of four nodes) neither side reaches the majority, so both +%% sides treat themselves as minority and stop their children. These counts +%% come from Erlang distribution and the locally-known cluster membership, both +%% of which stay available on a partitioned node (unlike Khepri writes). +partition_status() -> + TotalSize = length(rabbit_nodes:list_members()), + ReachableCount = length(rabbit_nodes:list_reachable()), + IsMinority = (TotalSize >= 3) andalso (ReachableCount < (TotalSize div 2) + 1), + {IsMinority, ReachableCount, TotalSize}. + reconcile_children(Group, Overall, Delegate) -> %% This runs periodically on every node, so it must tolerate Khepri being %% temporarily unavailable (for example during a leader election or a @@ -463,22 +490,13 @@ reconcile_children(Group, Overall, Delegate) -> do_reconcile_children(Group, Overall, Delegate) -> Children = ?SUPERVISOR:which_children(Delegate), RunningIds = [Id || {Id, Pid, _, _} <- Children, is_pid(Pid)], - Members = rabbit_nodes:list_members(), - TotalSize = length(Members), - Reachable = rabbit_nodes:list_reachable(), - %% A strict majority of cluster members must be reachable to keep children - %% running locally. On an even split (for example 2|2 of four nodes) neither - %% side reaches the majority, so both sides stop their children; the - %% majority side, once one exists again, reclaims them on partition heal. - %% Clusters with fewer than three members never stop children this way, - %% since they cannot have a majority. - IsMinority = (TotalSize >= 3) andalso (length(Reachable) < (TotalSize div 2) + 1), + {IsMinority, ReachableCount, TotalSize} = partition_status(), case IsMinority of true -> lists:foreach( fun(Id) -> ?LOG_WARNING("Mirrored supervisor: node is in a minority partition (~tp/~tp nodes reachable), stopping local instance of child ~tp in group ~tp", - [length(Reachable), TotalSize, Id, Group]), + [ReachableCount, TotalSize, Id, Group]), catch ?SUPERVISOR:terminate_child(Delegate, Id), catch ?SUPERVISOR:delete_child(Delegate, Id) end, RunningIds); From 3ab7ed406e4f3558400fac65f8d3ac6f134a58b4 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 12 Jun 2026 14:41:20 +0200 Subject: [PATCH 12/12] shovel, federation management: tolerate unreachable nodes in status Fix a 500 error from the Management UI during a network partition: ``` crasher: initial call: cowboy_stream_h:request_process/3 pid: <0.4677.0> registered_name: [] exception error: bad generator {badrpc,timeout} in function rabbit_shovel_mgmt_util:'-status/1-lc$^0/1-0-'/2 (rabbit_shovel_mgmt_util.erl:44) in call from rabbit_shovel_mgmt_util:'-status/2-lc$^0/1-0-'/1 (rabbit_shovel_mgmt_util.erl:34) in call from rabbit_shovel_mgmt_util:'-status/2-lc$^0/1-0-'/1 (rabbit_shovel_mgmt_util.erl:34) in call from rabbit_shovel_mgmt_util:status/2 (rabbit_shovel_mgmt_util.erl:34) in call from rabbit_shovel_mgmt_shovels:to_json/2 (rabbit_shovel_mgmt_shovels.erl:47) in call from cowboy_rest:call/3 (src/cowboy_rest.erl:1577) in call from cowboy_rest:set_resp_body/2 (src/cowboy_rest.erl:1455) in call from cowboy_rest:upgrade/4 (src/cowboy_rest.erl:281) ``` --- .../src/rabbit_federation_mgmt.erl | 8 ++++---- .../src/rabbit_shovel_mgmt_util.erl | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/deps/rabbitmq_federation_management/src/rabbit_federation_mgmt.erl b/deps/rabbitmq_federation_management/src/rabbit_federation_mgmt.erl index 86ddbe058243..fdfcfe4942e2 100644 --- a/deps/rabbitmq_federation_management/src/rabbit_federation_mgmt.erl +++ b/deps/rabbitmq_federation_management/src/rabbit_federation_mgmt.erl @@ -89,10 +89,10 @@ status(Chs, ReqData, Context, Filter) -> status(Node, Chs, Filter) -> case rpc:call(Node, rabbit_federation_status, status, [], infinity) of - {badrpc, {'EXIT', {undef, _}}} -> []; - {badrpc, {'EXIT', {noproc, _}}} -> []; - Status -> [format(Node, I, Chs) || I <- Status, - filter_status(I, Filter)] + Status when is_list(Status) -> + [format(Node, I, Chs) || I <- Status, filter_status(I, Filter)]; + _Error -> + [] end. filter_status(_, all) -> diff --git a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl index 2187c7ad144e..e079da752ae8 100644 --- a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl +++ b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl @@ -38,10 +38,10 @@ status(ReqData, Context) -> status(Node) -> case rpc:call(Node, rabbit_shovel_status, status, [], ?SHOVEL_CALLS_TIMEOUT_MS) of - {badrpc, {'EXIT', _}} -> - []; - Status -> - [format(Node, I) || I <- Status] + Status when is_list(Status) -> + [format(Node, I) || I <- Status]; + _Error -> + [] end. format(Node, {Name, Type, Info, Metrics, TS}) ->