diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 6041cfc6bab6..168771405837 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -552,7 +552,7 @@ reachable_coord_members() -> Nodes = rabbit_nodes:list_reachable(), [{?MODULE, Node} || Node <- Nodes]. -version() -> 7. +version() -> 8. which_module(_) -> ?MODULE. @@ -2349,6 +2349,8 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) -> machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) -> Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0), {State#?MODULE{monitors = Monitors}, []}; +machine_version(7, 8, State) -> + {State, []}; machine_version(From, To, State) -> ?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.", [From, To]), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index c0d937d71ff4..54d20cb0971d 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1607,7 +1607,40 @@ drain(TransferCandidates) -> case whereis(rabbit_stream_coordinator) of undefined -> ok; _Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates) - end. + end, + transfer_leadership_of_local_stream_leaders(TransferCandidates). + +-spec transfer_leadership_of_local_stream_leaders([node()]) -> ok. +transfer_leadership_of_local_stream_leaders([]) -> + ok; +transfer_leadership_of_local_stream_leaders(TransferCandidates) -> + LocalLeaderQueues = list_local_leaders(), + ?LOG_INFO("transferring leadership of ~b local stream leaders...", + [length(LocalLeaderQueues)]), + QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues), + [begin + Refs = [begin + Nodes = get_nodes(Q), + case rabbit_maintenance:random_primary_replica_transfer_candidate_node(TransferCandidates, Nodes) of + {ok, PreferredNode} -> + {_, Ref} = spawn_monitor(fun() -> + rabbit_stream_coordinator:restart_stream(Q, #{preferred_leader_node => PreferredNode}) + end), + Ref; + undefined -> + undefined + end + end || Q <- Queues], + [receive {'DOWN', Ref, process, _, _} -> ok end || Ref <- Refs, Ref =/= undefined], + timer:sleep(1000) + end || Queues <- QueuesChunked], + ok. + +list_local_leaders() -> + [Q || Q <- rabbit_amqqueue:list(), + ?amqqueue_type_is(Q, ?MODULE), + amqqueue:get_state(Q) =/= crashed, + amqqueue:get_leader_node(Q) =:= node()]. revive() -> ok. diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index eafac43f0f70..b452cb781ee4 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -92,6 +92,7 @@ C1#consumer.pid =:= C2#consumer.pid andalso C1#consumer.subscription_id =:= C2#consumer.subscription_id)). -define(V6_OR_MORE(Vsn), (Vsn >= 6)). +-define(V8_OR_MORE(Vsn), (Vsn >= 8)). %% Single Active Consumer API -spec register_consumer(binary(), @@ -849,6 +850,21 @@ handle_connection_down0(Pid, State, Groups) -> -spec handle_connection_node_disconnected(ra_machine:command_meta_data(), connection_pid(), state()) -> {state(), ra_machine:effects()}. +handle_connection_node_disconnected(#{machine_version := Vsn} = Meta, ConnPid, + #?MODULE{pids_groups = PidsGroups0} = State0) + when ?V8_OR_MORE(Vsn) -> + case maps:get(ConnPid, PidsGroups0, error) of + error -> + {State0, []}; + Groups -> + {State2, Eff} = + maps:fold(fun(G, _, Acc) -> + handle_group_after_connection_node_disconnected( + Meta, ConnPid, Acc, G) + end, {State0, []}, Groups), + T = disconnected_timeout(State2), + {State2, [node_disconnected_timer_effect(ConnPid, T) | Eff]} + end; handle_connection_node_disconnected(Meta, ConnPid, #?MODULE{pids_groups = PidsGroups0} = State0) -> case maps:take(ConnPid, PidsGroups0) of @@ -884,13 +900,26 @@ handle_node_reconnected(Node, -spec presume_connection_down(ra_machine:command_meta_data(), connection_pid(), state()) -> {state(), ra_machine:effects()}. -presume_connection_down(Meta, Pid, #?MODULE{groups = Groups} = State0) -> - {State1, Eff} = - maps:fold(fun(G, _, {St, Eff}) -> - handle_group_connection_presumed_down(Meta, Pid, St, - Eff, G) - end, {State0, []}, Groups), - {State1, Eff}. +presume_connection_down(#{machine_version := Vsn} = Meta, Pid, #?MODULE{groups = Groups0, pids_groups = PidsGroups0} = State0) + when ?V8_OR_MORE(Vsn) -> + case maps:take(Pid, PidsGroups0) of + error -> + maps:fold(fun(G, _, {St, Eff}) -> + handle_group_connection_presumed_down(Meta, Pid, St, + Eff, G) + end, {State0, []}, Groups0); + {Groups, PidsGroups1} -> + State1 = State0#?MODULE{pids_groups = PidsGroups1}, + maps:fold(fun(G, _, {St, Eff}) -> + handle_group_connection_presumed_down(Meta, Pid, St, + Eff, G) + end, {State1, []}, Groups) + end; +presume_connection_down(Meta, Pid, #?MODULE{groups = Groups0} = State0) -> + maps:fold(fun(G, _, {St, Eff}) -> + handle_group_connection_presumed_down(Meta, Pid, St, + Eff, G) + end, {State0, []}, Groups0). handle_group_connection_presumed_down(#{machine_version := Vsn}, Pid, #?MODULE{groups = Groups0} = S0,