Skip to content

Commit 61c67ef

Browse files
committed
Add new machine module for SAC coordinator v5
1 parent 2ffc07c commit 61c67ef

11 files changed

+1669
-144
lines changed

deps/rabbit/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ PARALLEL_CT_SET_2_B = clustering_recovery crashing_queues deprecated_features di
268268
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy
269269
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator
270270

271-
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_prop rabbit_fifo_v0 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
271+
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_prop rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
272272
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
273273
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
274274
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

deps/rabbit/ct.test.spec

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
, rabbit_local_random_exchange_SUITE
118118
, rabbit_message_interceptor_SUITE
119119
, rabbit_stream_coordinator_SUITE
120+
, rabbit_stream_sac_coordinator_v4_SUITE
120121
, rabbit_stream_sac_coordinator_SUITE
121122
, rabbitmq_4_0_deprecations_SUITE
122123
, rabbitmq_queues_cli_integration_SUITE

deps/rabbit/src/rabbit_stream_coordinator.erl

+18-6
Original file line numberDiff line numberDiff line change
@@ -520,13 +520,13 @@ reachable_coord_members() ->
520520
Nodes = rabbit_nodes:list_reachable(),
521521
[{?MODULE, Node} || Node <- Nodes].
522522

523-
version() -> 4.
523+
version() -> 5.
524524

525525
which_module(_) ->
526526
?MODULE.
527527

528528
init(_Conf) ->
529-
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}.
529+
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator_v4:init_state()}.
530530

531531
-spec apply(ra_machine:command_meta_data(), command(), state()) ->
532532
{state(), term(), ra_machine:effects()}.
@@ -564,11 +564,12 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
564564
end;
565565
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
566566
monitors = Monitors0} = State0) ->
567-
{SacState1, Reply, Effects0} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0),
567+
Mod = sac_module(Meta),
568+
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
568569
{SacState2, Monitors1, Effects1} =
569-
rabbit_stream_sac_coordinator:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
570+
Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
570571
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
571-
monitors = Monitors1}, Reply, Effects1);
572+
monitors = Monitors1}, Reply, Effects1);
572573
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
573574
#?MODULE{streams = Streams0,
574575
monitors = Monitors0,
@@ -629,7 +630,8 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
629630
monitors = Monitors1}, ok, Effects0)
630631
end;
631632
{sac, Monitors1} ->
632-
{SacState1, Effects} = rabbit_stream_sac_coordinator:handle_connection_down(Pid, SacState0),
633+
Mod = sac_module(Meta),
634+
{SacState1, Effects} = Mod:handle_connection_down(Pid, SacState0),
633635
return(Meta, State#?MODULE{single_active_consumer = SacState1,
634636
monitors = Monitors1}, ok, Effects);
635637
error ->
@@ -747,6 +749,11 @@ state_enter(leader, #?MODULE{streams = Streams,
747749
state_enter(_S, _) ->
748750
[].
749751

752+
sac_module(#{machine_version := MachineVersion}) when MachineVersion =< 4 ->
753+
rabbit_stream_sac_coordinator_v4;
754+
sac_module(_) ->
755+
rabbit_stream_sac_coordinator.
756+
750757
all_member_nodes(Streams) ->
751758
maps:keys(
752759
maps:fold(
@@ -2214,6 +2221,11 @@ machine_version(3, 4, #?MODULE{streams = Streams0} = State) ->
22142221
end, Members)}
22152222
end, Streams0),
22162223
{State#?MODULE{streams = Streams}, []};
2224+
machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
2225+
rabbit_log:info("Stream coordinator machine version changes from 4 to 5, updating state."),
2226+
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
2227+
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
2228+
{State#?MODULE{single_active_consumer = Sac1}, []};
22172229
machine_version(From, To, State) ->
22182230
rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
22192231
[From, To]),

deps/rabbit/src/rabbit_stream_coordinator.hrl

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
listeners = #{} :: undefined | #{stream_id() =>
6969
#{pid() := queue_ref()}},
7070
single_active_consumer = undefined :: undefined |
71+
rabbit_stream_sac_coordinator_v4:state() |
7172
rabbit_stream_sac_coordinator:state(),
7273
%% future extensibility
7374
reserved_2}).

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+75-29
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,15 @@
3939
handle_connection_down/2,
4040
consumer_groups/3,
4141
group_consumers/5,
42-
overview/1]).
42+
overview/1,
43+
import_state/2]).
4344

4445
-import(rabbit_stream_coordinator, [ra_local_query/1]).
4546

47+
-define(ACTIVE, active).
48+
-define(WAITING, waiting).
49+
-define(DEACTIVATING, deactivating).
50+
4651
%% Single Active Consumer API
4752
-spec register_consumer(binary(),
4853
binary(),
@@ -231,7 +236,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
231236
G1 = remove_from_group(Consumer, Group0),
232237
handle_consumer_removal(
233238
G1, Stream, ConsumerName,
234-
is_active(Consumer#consumer.active));
239+
is_active(Consumer#consumer.status));
235240
false ->
236241
{Group0, []}
237242
end,
@@ -255,10 +260,10 @@ apply(#command_activate_consumer{vhost = VirtualHost,
255260
[{VirtualHost, Stream, ConsumerName}]),
256261
{undefined, []};
257262
Group0 ->
258-
Group1 = update_consumers(Group0, waiting),
263+
Group1 = update_consumers(Group0, ?WAITING),
259264
#consumer{pid = Pid, subscription_id = SubId} =
260265
evaluate_active_consumer(Group1),
261-
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, active),
266+
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, ?ACTIVE),
262267
{Group2, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
263268
end,
264269
StreamGroups1 =
@@ -314,7 +319,7 @@ group_consumers(VirtualHost,
314319
#{GroupId := #group{consumers = Consumers}} ->
315320
Cs = lists:foldr(fun(#consumer{subscription_id = SubId,
316321
owner = Owner,
317-
active = Active},
322+
status = Status},
318323
Acc) ->
319324
Record =
320325
lists:foldr(fun (subscription_id, RecAcc) ->
@@ -326,7 +331,7 @@ group_consumers(VirtualHost,
326331
Owner}
327332
| RecAcc];
328333
(state, RecAcc) ->
329-
[{state, Active}
334+
[{state, cli_consumer_status_label(Status)}
330335
| RecAcc];
331336
(Unknown, RecAcc) ->
332337
[{Unknown,
@@ -342,6 +347,11 @@ group_consumers(VirtualHost,
342347
{error, not_found}
343348
end.
344349

350+
cli_consumer_status_label(?ACTIVE) ->
351+
active;
352+
cli_consumer_status_label(_) ->
353+
inactive.
354+
345355
-spec ensure_monitors(command(),
346356
state(),
347357
map(),
@@ -432,7 +442,7 @@ handle_group_after_connection_down(Pid,
432442
%% keep flags to know what happened
433443
{Consumers1, ActiveRemoved, AnyRemoved} =
434444
lists:foldl(
435-
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _})
445+
fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _})
436446
when P == Pid ->
437447
{L, is_active(S) or ActiveFlag, true};
438448
(C, {L, ActiveFlag, AnyFlag}) ->
@@ -454,6 +464,42 @@ handle_group_after_connection_down(Pid,
454464
end
455465
end.
456466

467+
-spec import_state(ra_machine:version(), map()) -> state().
468+
import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
469+
#?MODULE{groups = map_to_groups(Groups),
470+
pids_groups = map_to_pids_groups(PidsGroups)}.
471+
472+
map_to_groups(Groups) when is_map(Groups) ->
473+
maps:fold(fun(K, V, Acc) ->
474+
Acc#{K => map_to_group(V)}
475+
end, #{}, Groups);
476+
map_to_groups(_) ->
477+
#{}.
478+
479+
map_to_pids_groups(PidsGroups) when is_map(PidsGroups) ->
480+
PidsGroups;
481+
map_to_pids_groups(_) ->
482+
#{}.
483+
484+
map_to_group(#{<<"consumers">> := Consumers, <<"partition_index">> := Index}) ->
485+
C = lists:foldl(fun(V, Acc) ->
486+
Acc ++ [map_to_consumer(V)]
487+
end, [], Consumers),
488+
#group{consumers = C,
489+
partition_index = Index}.
490+
491+
map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId,
492+
<<"owner">> := Owner, <<"active">> := Active}) ->
493+
#consumer{pid = Pid,
494+
subscription_id = SubId,
495+
owner = Owner,
496+
status = active_to_status(Active)}.
497+
498+
active_to_status(true) ->
499+
?ACTIVE;
500+
active_to_status(false) ->
501+
?WAITING.
502+
457503
is_active(waiting) ->
458504
false;
459505
is_active(_) ->
@@ -476,12 +522,12 @@ do_register_consumer(VirtualHost,
476522
#consumer{pid = ConnectionPid,
477523
owner = Owner,
478524
subscription_id = SubscriptionId,
479-
active = waiting};
525+
status = ?WAITING};
480526
false ->
481527
#consumer{pid = ConnectionPid,
482528
subscription_id = SubscriptionId,
483529
owner = Owner,
484-
active = active}
530+
status = ?ACTIVE}
485531
end,
486532
Group1 = add_to_group(Consumer, Group0),
487533
StreamGroups1 =
@@ -491,17 +537,17 @@ do_register_consumer(VirtualHost,
491537
Group1,
492538
StreamGroups0),
493539

494-
#consumer{active = Active} = Consumer,
540+
#consumer{status = Status} = Consumer,
495541
Effects =
496-
case Active of
497-
active ->
542+
case Status of
543+
?ACTIVE ->
498544
[notify_consumer_effect(ConnectionPid, SubscriptionId,
499-
Stream, ConsumerName, is_active(Active))];
545+
Stream, ConsumerName, is_active(Status))];
500546
_ ->
501547
[]
502548
end,
503549

504-
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects};
550+
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects};
505551
do_register_consumer(VirtualHost,
506552
Stream,
507553
_PartitionIndex,
@@ -521,7 +567,7 @@ do_register_consumer(VirtualHost,
521567
#consumer{pid = ConnectionPid,
522568
owner = Owner,
523569
subscription_id = SubscriptionId,
524-
active = active},
570+
status = ?ACTIVE},
525571
G1 = add_to_group(Consumer0, Group0),
526572
{G1,
527573
[notify_consumer_effect(ConnectionPid, SubscriptionId,
@@ -532,7 +578,7 @@ do_register_consumer(VirtualHost,
532578
#consumer{pid = ConnectionPid,
533579
owner = Owner,
534580
subscription_id = SubscriptionId,
535-
active = waiting},
581+
status = ?WAITING},
536582
G1 = add_to_group(Consumer0, Group0),
537583

538584
case lookup_active_consumer(G1) of
@@ -548,7 +594,7 @@ do_register_consumer(VirtualHost,
548594
{update_consumer_state_in_group(G1,
549595
ActPid,
550596
ActSubId,
551-
deactivating),
597+
?DEACTIVATING),
552598
[notify_consumer_effect(ActPid,
553599
ActSubId,
554600
Stream,
@@ -568,9 +614,9 @@ do_register_consumer(VirtualHost,
568614
ConsumerName,
569615
Group1,
570616
StreamGroups0),
571-
{value, #consumer{active = Active}} =
617+
{value, #consumer{status = Status}} =
572618
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
573-
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Active)}, Effects}.
619+
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}.
574620

575621
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
576622
{G, []};
@@ -606,7 +652,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
606652
{update_consumer_state_in_group(Group0,
607653
ActPid,
608654
ActSubId,
609-
deactivating),
655+
?DEACTIVATING),
610656
[notify_consumer_effect(ActPid, ActSubId,
611657
Stream, ConsumerName, false, true)]}
612658
end;
@@ -616,7 +662,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
616662
%% the active one is going away, picking a new one
617663
#consumer{pid = P, subscription_id = SID} =
618664
evaluate_active_consumer(Group0),
619-
{update_consumer_state_in_group(Group0, P, SID, active),
665+
{update_consumer_state_in_group(Group0, P, SID, ?ACTIVE),
620666
[notify_consumer_effect(P, SID,
621667
Stream, ConsumerName, true)]};
622668
false ->
@@ -683,13 +729,13 @@ compute_active_consumer(#group{consumers = Crs,
683729
compute_active_consumer(#group{partition_index = -1,
684730
consumers = [Consumer0]} =
685731
Group0) ->
686-
Consumer1 = Consumer0#consumer{active = active},
732+
Consumer1 = Consumer0#consumer{status = ?ACTIVE},
687733
Group0#group{consumers = [Consumer1]};
688734
compute_active_consumer(#group{partition_index = -1,
689735
consumers = [Consumer0 | T]} =
690736
Group0) ->
691-
Consumer1 = Consumer0#consumer{active = active},
692-
Consumers = lists:map(fun(C) -> C#consumer{active = waiting} end, T),
737+
Consumer1 = Consumer0#consumer{status = ?ACTIVE},
738+
Consumers = lists:map(fun(C) -> C#consumer{status = ?WAITING} end, T),
693739
Group0#group{consumers = [Consumer1] ++ Consumers}.
694740

695741
evaluate_active_consumer(#group{partition_index = PartitionIndex,
@@ -706,7 +752,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
706752
Consumers).
707753

708754
lookup_active_consumer(#group{consumers = Consumers}) ->
709-
lists:search(fun(#consumer{active = Active}) -> is_active(Active) end,
755+
lists:search(fun(#consumer{status = Status}) -> is_active(Status) end,
710756
Consumers).
711757

712758
update_groups(_VirtualHost,
@@ -732,20 +778,20 @@ update_groups(VirtualHost,
732778
update_consumer_state_in_group(#group{consumers = Consumers0} = G,
733779
Pid,
734780
SubId,
735-
NewState) ->
781+
NewStatus) ->
736782
CS1 = lists:map(fun(C0) ->
737783
case C0 of
738784
#consumer{pid = Pid, subscription_id = SubId} ->
739-
C0#consumer{active = NewState};
785+
C0#consumer{status = NewStatus};
740786
C -> C
741787
end
742788
end,
743789
Consumers0),
744790
G#group{consumers = CS1}.
745791

746-
update_consumers(#group{consumers = Consumers0} = G, NewState) ->
792+
update_consumers(#group{consumers = Consumers0} = G, NewStatus) ->
747793
Consumers1 = lists:map(fun(C) ->
748-
C#consumer{active = NewState}
794+
C#consumer{status = NewStatus}
749795
end, Consumers0),
750796
G#group{consumers = Consumers1}.
751797

deps/rabbit/src/rabbit_stream_sac_coordinator.hrl

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
-type subscription_id() :: byte().
2323
-type group_id() :: {vhost(), stream(), consumer_name()}.
2424
-type owner() :: binary().
25+
-type consumer_status() :: active | waiting | deactivating.
2526

2627
-record(consumer,
2728
{pid :: pid(),
2829
subscription_id :: subscription_id(),
2930
owner :: owner(), %% just a label
30-
active :: active | waiting | deactivating}).
31+
status :: consumer_status()}).
3132
-record(group,
3233
{consumers :: [#consumer{}], partition_index :: integer()}).
3334
-record(rabbit_stream_sac_coordinator,

0 commit comments

Comments
 (0)