Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ reachable_coord_members() ->
Nodes = rabbit_nodes:list_reachable(),
[{?MODULE, Node} || Node <- Nodes].

version() -> 7.
version() -> 8.

which_module(_) ->
?MODULE.
Expand Down Expand Up @@ -2349,6 +2349,8 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) ->
Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0),
{State#?MODULE{monitors = Monitors}, []};
machine_version(7, 8, State) ->
{State, []};
machine_version(From, To, State) ->
?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
[From, To]),
Expand Down
35 changes: 34 additions & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,40 @@ drain(TransferCandidates) ->
case whereis(rabbit_stream_coordinator) of
undefined -> ok;
_Pid -> transfer_leadership_of_stream_coordinator(TransferCandidates)
end.
end,
transfer_leadership_of_local_stream_leaders(TransferCandidates).

-spec transfer_leadership_of_local_stream_leaders([node()]) -> ok.
transfer_leadership_of_local_stream_leaders([]) ->
ok;
transfer_leadership_of_local_stream_leaders(TransferCandidates) ->
LocalLeaderQueues = list_local_leaders(),
?LOG_INFO("transferring leadership of ~b local stream leaders...",
[length(LocalLeaderQueues)]),
QueuesChunked = ra_lib:lists_chunk(256, LocalLeaderQueues),
[begin
Refs = [begin
Nodes = get_nodes(Q),
case rabbit_maintenance:random_primary_replica_transfer_candidate_node(TransferCandidates, Nodes) of
{ok, PreferredNode} ->
{_, Ref} = spawn_monitor(fun() ->
rabbit_stream_coordinator:restart_stream(Q, #{preferred_leader_node => PreferredNode})
end),
Ref;
undefined ->
undefined
end
end || Q <- Queues],
[receive {'DOWN', Ref, process, _, _} -> ok end || Ref <- Refs, Ref =/= undefined],
timer:sleep(1000)
end || Queues <- QueuesChunked],
ok.

list_local_leaders() ->
[Q || Q <- rabbit_amqqueue:list(),
?amqqueue_type_is(Q, ?MODULE),
amqqueue:get_state(Q) =/= crashed,
amqqueue:get_leader_node(Q) =:= node()].

revive() ->
ok.
Expand Down
43 changes: 36 additions & 7 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
C1#consumer.pid =:= C2#consumer.pid andalso
C1#consumer.subscription_id =:= C2#consumer.subscription_id)).
-define(V6_OR_MORE(Vsn), (Vsn >= 6)).
-define(V8_OR_MORE(Vsn), (Vsn >= 8)).

%% Single Active Consumer API
-spec register_consumer(binary(),
Expand Down Expand Up @@ -849,6 +850,21 @@ handle_connection_down0(Pid, State, Groups) ->
-spec handle_connection_node_disconnected(ra_machine:command_meta_data(),
connection_pid(), state()) ->
{state(), ra_machine:effects()}.
handle_connection_node_disconnected(#{machine_version := Vsn} = Meta, ConnPid,
#?MODULE{pids_groups = PidsGroups0} = State0)
when ?V8_OR_MORE(Vsn) ->
case maps:get(ConnPid, PidsGroups0, error) of
error ->
{State0, []};
Groups ->
{State2, Eff} =
maps:fold(fun(G, _, Acc) ->
handle_group_after_connection_node_disconnected(
Meta, ConnPid, Acc, G)
end, {State0, []}, Groups),
T = disconnected_timeout(State2),
{State2, [node_disconnected_timer_effect(ConnPid, T) | Eff]}
end;
handle_connection_node_disconnected(Meta, ConnPid,
#?MODULE{pids_groups = PidsGroups0} = State0) ->
case maps:take(ConnPid, PidsGroups0) of
Expand Down Expand Up @@ -884,13 +900,26 @@ handle_node_reconnected(Node,
-spec presume_connection_down(ra_machine:command_meta_data(), connection_pid(),
state()) ->
{state(), ra_machine:effects()}.
presume_connection_down(Meta, Pid, #?MODULE{groups = Groups} = State0) ->
{State1, Eff} =
maps:fold(fun(G, _, {St, Eff}) ->
handle_group_connection_presumed_down(Meta, Pid, St,
Eff, G)
end, {State0, []}, Groups),
{State1, Eff}.
presume_connection_down(#{machine_version := Vsn} = Meta, Pid, #?MODULE{groups = Groups0, pids_groups = PidsGroups0} = State0)
when ?V8_OR_MORE(Vsn) ->
case maps:take(Pid, PidsGroups0) of
error ->
maps:fold(fun(G, _, {St, Eff}) ->
handle_group_connection_presumed_down(Meta, Pid, St,
Eff, G)
end, {State0, []}, Groups0);
{Groups, PidsGroups1} ->
State1 = State0#?MODULE{pids_groups = PidsGroups1},
maps:fold(fun(G, _, {St, Eff}) ->
handle_group_connection_presumed_down(Meta, Pid, St,
Eff, G)
end, {State1, []}, Groups)
end;
presume_connection_down(Meta, Pid, #?MODULE{groups = Groups0} = State0) ->
maps:fold(fun(G, _, {St, Eff}) ->
handle_group_connection_presumed_down(Meta, Pid, St,
Eff, G)
end, {State0, []}, Groups0).

handle_group_connection_presumed_down(#{machine_version := Vsn}, Pid,
#?MODULE{groups = Groups0} = S0,
Expand Down
Loading