Skip to content

Commit a1ec795

Browse files
Merge pull request #13657 from rabbitmq/stream-sac-re-evaluate-group-after-connection-down
Re-evaluate stream SAC group after connection down event
2 parents 12e600a + 602b6ac commit a1ec795

File tree

3 files changed

+222
-75
lines changed

3 files changed

+222
-75
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+45-51
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
229229
of
230230
{value, Consumer} ->
231231
G1 = remove_from_group(Consumer, Group0),
232-
handle_consumer_removal(G1, Consumer, Stream, ConsumerName);
232+
handle_consumer_removal(G1, Stream, ConsumerName, Consumer#consumer.active);
233233
false ->
234234
{Group0, []}
235235
end,
@@ -414,50 +414,44 @@ handle_connection_down(Pid,
414414
{State0, []};
415415
{Groups, PidsGroups1} ->
416416
State1 = State0#?MODULE{pids_groups = PidsGroups1},
417-
%% iterate other the groups that this PID affects
418-
maps:fold(fun({VirtualHost, Stream, ConsumerName}, _,
419-
{#?MODULE{groups = ConsumerGroups} = S0, Eff0}) ->
420-
case lookup_group(VirtualHost,
421-
Stream,
422-
ConsumerName,
423-
ConsumerGroups)
424-
of
425-
undefined -> {S0, Eff0};
426-
#group{consumers = Consumers} ->
427-
%% iterate over the consumers of the group
428-
%% and unregister the ones from this PID.
429-
%% It may not be optimal, computing the new active consumer
430-
%% from the purged group and notifying the remaining consumers
431-
%% appropriately should avoid unwanted notifications and even rebalancing.
432-
lists:foldl(fun (#consumer{pid = P,
433-
subscription_id =
434-
SubId},
435-
{StateSub0, EffSub0})
436-
when P == Pid ->
437-
{StateSub1, ok, E} =
438-
?MODULE:apply(#command_unregister_consumer{vhost
439-
=
440-
VirtualHost,
441-
stream
442-
=
443-
Stream,
444-
consumer_name
445-
=
446-
ConsumerName,
447-
connection_pid
448-
=
449-
Pid,
450-
subscription_id
451-
=
452-
SubId},
453-
StateSub0),
454-
{StateSub1, EffSub0 ++ E};
455-
(_Consumer, Acc) -> Acc
456-
end,
457-
{S0, Eff0}, Consumers)
458-
end
459-
end,
460-
{State1, []}, Groups)
417+
maps:fold(fun(G, _, Acc) ->
418+
handle_group_after_connection_down(Pid, Acc, G)
419+
end, {State1, []}, Groups)
420+
end.
421+
422+
handle_group_after_connection_down(Pid,
423+
{#?MODULE{groups = Groups0} = S0, Eff0},
424+
{VirtualHost, Stream, ConsumerName}) ->
425+
case lookup_group(VirtualHost,
426+
Stream,
427+
ConsumerName,
428+
Groups0) of
429+
undefined ->
430+
{S0, Eff0};
431+
#group{consumers = Consumers0} = G0 ->
432+
%% remove the connection consumers from the group state
433+
%% keep flags to know what happened
434+
{Consumers1, ActiveRemoved, AnyRemoved} =
435+
lists:foldl(
436+
fun(#consumer{pid = P, active = S}, {L, ActiveFlag, _}) when P == Pid ->
437+
{L, S or ActiveFlag, true};
438+
(C, {L, ActiveFlag, AnyFlag}) ->
439+
{L ++ [C], ActiveFlag, AnyFlag}
440+
end, {[], false, false}, Consumers0),
441+
442+
case AnyRemoved of
443+
true ->
444+
G1 = G0#group{consumers = Consumers1},
445+
{G2, Effects} = handle_consumer_removal(G1, Stream, ConsumerName, ActiveRemoved),
446+
Groups1 = update_groups(VirtualHost,
447+
Stream,
448+
ConsumerName,
449+
G2,
450+
Groups0),
451+
{S0#?MODULE{groups = Groups1}, Effects ++ Eff0};
452+
false ->
453+
{S0, Eff0}
454+
end
461455
end.
462456

463457
do_register_consumer(VirtualHost,
@@ -576,9 +570,9 @@ do_register_consumer(VirtualHost,
576570
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
577571
{G, []};
578572
handle_consumer_removal(#group{partition_index = -1} = Group0,
579-
Consumer, Stream, ConsumerName) ->
580-
case Consumer of
581-
#consumer{active = true} ->
573+
Stream, ConsumerName, ActiveRemoved) ->
574+
case ActiveRemoved of
575+
true ->
582576
%% this is the active consumer we remove, computing the new one
583577
Group1 = compute_active_consumer(Group0),
584578
case lookup_active_consumer(Group1) of
@@ -589,11 +583,11 @@ handle_consumer_removal(#group{partition_index = -1} = Group0,
589583
%% no active consumer found in the group, nothing to do
590584
{Group1, []}
591585
end;
592-
#consumer{active = false} ->
586+
false ->
593587
%% not the active consumer, nothing to do.
594588
{Group0, []}
595589
end;
596-
handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
590+
handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
597591
case lookup_active_consumer(Group0) of
598592
{value,
599593
#consumer{pid = ActPid, subscription_id = ActSubId} =
@@ -612,7 +606,7 @@ handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
612606
Stream, ConsumerName, false, true)]}
613607
end;
614608
false ->
615-
case Consumer#consumer.active of
609+
case ActiveRemoved of
616610
true ->
617611
%% the active one is going away, picking a new one
618612
#consumer{pid = P, subscription_id = SID} =

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

+175-12
Original file line numberDiff line numberDiff line change
@@ -312,29 +312,27 @@ ensure_monitors_test(_) ->
312312

313313
ok.
314314

315-
handle_connection_down_test(_) ->
315+
handle_connection_down_sac_should_get_activated_test(_) ->
316316
Stream = <<"stream">>,
317317
ConsumerName = <<"app">>,
318318
GroupId = {<<"/">>, Stream, ConsumerName},
319319
Pid0 = self(),
320320
Pid1 = spawn(fun() -> ok end),
321-
Group =
322-
cgroup([consumer(Pid0, 0, true), consumer(Pid1, 1, false),
323-
consumer(Pid0, 2, false)]),
324-
State0 =
325-
state(#{GroupId => Group},
326-
#{Pid0 => maps:from_list([{GroupId, true}]),
327-
Pid1 => maps:from_list([{GroupId, true}])}),
321+
Group = cgroup([consumer(Pid0, 0, true),
322+
consumer(Pid1, 1, false),
323+
consumer(Pid0, 2, false)]),
324+
State0 = state(#{GroupId => Group},
325+
#{Pid0 => maps:from_list([{GroupId, true}]),
326+
Pid1 => maps:from_list([{GroupId, true}])}),
328327

329328
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
330329
Effects1} =
331-
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
330+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
332331
assertSize(1, PidsGroups1),
333332
assertSize(1, maps:get(Pid1, PidsGroups1)),
334333
assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
335-
?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
336-
Groups1),
337-
{#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
334+
assertHasGroup(GroupId, cgroup([consumer(Pid1, 1, true)]), Groups1),
335+
{#?STATE{pids_groups = PidsGroups2, groups = Groups2},
338336
Effects2} =
339337
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State1),
340338
assertEmpty(PidsGroups2),
@@ -343,6 +341,168 @@ handle_connection_down_test(_) ->
343341

344342
ok.
345343

344+
handle_connection_down_sac_active_does_not_change_test(_) ->
345+
Stream = <<"stream">>,
346+
ConsumerName = <<"app">>,
347+
GroupId = {<<"/">>, Stream, ConsumerName},
348+
Pid0 = self(),
349+
Pid1 = spawn(fun() -> ok end),
350+
Group = cgroup([consumer(Pid1, 0, true),
351+
consumer(Pid0, 1, false),
352+
consumer(Pid0, 2, false)]),
353+
State = state(#{GroupId => Group},
354+
#{Pid0 => maps:from_list([{GroupId, true}]),
355+
Pid1 => maps:from_list([{GroupId, true}])}),
356+
357+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
358+
Effects} =
359+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
360+
assertSize(1, PidsGroups),
361+
assertSize(1, maps:get(Pid1, PidsGroups)),
362+
assertEmpty(Effects),
363+
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true)]), Groups),
364+
ok.
365+
366+
handle_connection_down_sac_no_more_consumers_test(_) ->
367+
Stream = <<"stream">>,
368+
ConsumerName = <<"app">>,
369+
GroupId = {<<"/">>, Stream, ConsumerName},
370+
Pid0 = self(),
371+
Group = cgroup([consumer(Pid0, 0, true),
372+
consumer(Pid0, 1, false)]),
373+
State = state(#{GroupId => Group},
374+
#{Pid0 => maps:from_list([{GroupId, true}])}),
375+
376+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
377+
Effects} =
378+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
379+
assertEmpty(PidsGroups),
380+
assertEmpty(Groups),
381+
assertEmpty(Effects),
382+
ok.
383+
384+
handle_connection_down_sac_no_consumers_in_down_connection_test(_) ->
385+
Stream = <<"stream">>,
386+
ConsumerName = <<"app">>,
387+
GroupId = {<<"/">>, Stream, ConsumerName},
388+
Pid0 = self(),
389+
Pid1 = spawn(fun() -> ok end),
390+
Group = cgroup([consumer(Pid1, 0, true),
391+
consumer(Pid1, 1, false)]),
392+
State = state(#{GroupId => Group},
393+
#{Pid0 => maps:from_list([{GroupId, true}]), %% should not be there
394+
Pid1 => maps:from_list([{GroupId, true}])}),
395+
396+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
397+
Effects} =
398+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
399+
400+
assertSize(1, PidsGroups),
401+
assertSize(1, maps:get(Pid1, PidsGroups)),
402+
assertEmpty(Effects),
403+
assertHasGroup(GroupId, cgroup([consumer(Pid1, 0, true), consumer(Pid1, 1, false)]),
404+
Groups),
405+
ok.
406+
407+
handle_connection_down_super_stream_active_stays_test(_) ->
408+
Stream = <<"stream">>,
409+
ConsumerName = <<"app">>,
410+
GroupId = {<<"/">>, Stream, ConsumerName},
411+
Pid0 = self(),
412+
Pid1 = spawn(fun() -> ok end),
413+
Group = cgroup(1, [consumer(Pid0, 0, false),
414+
consumer(Pid0, 1, true),
415+
consumer(Pid1, 2, false),
416+
consumer(Pid1, 3, false)]),
417+
State = state(#{GroupId => Group},
418+
#{Pid0 => maps:from_list([{GroupId, true}]),
419+
Pid1 => maps:from_list([{GroupId, true}])}),
420+
421+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
422+
Effects} =
423+
rabbit_stream_sac_coordinator:handle_connection_down(Pid1, State),
424+
assertSize(1, PidsGroups),
425+
assertSize(1, maps:get(Pid0, PidsGroups)),
426+
assertEmpty(Effects),
427+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid0, 0, false), consumer(Pid0, 1, true)]),
428+
Groups),
429+
ok.
430+
431+
handle_connection_down_super_stream_active_changes_test(_) ->
432+
Stream = <<"stream">>,
433+
ConsumerName = <<"app">>,
434+
GroupId = {<<"/">>, Stream, ConsumerName},
435+
Pid0 = self(),
436+
Pid1 = spawn(fun() -> ok end),
437+
Group = cgroup(1, [consumer(Pid0, 0, false),
438+
consumer(Pid1, 1, true),
439+
consumer(Pid0, 2, false),
440+
consumer(Pid1, 3, false)]),
441+
State = state(#{GroupId => Group},
442+
#{Pid0 => maps:from_list([{GroupId, true}]),
443+
Pid1 => maps:from_list([{GroupId, true}])}),
444+
445+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
446+
Effects} =
447+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
448+
assertSize(1, PidsGroups),
449+
assertSize(1, maps:get(Pid1, PidsGroups)),
450+
assertSendMessageSteppingDownEffect(Pid1, 1, Stream, ConsumerName, Effects),
451+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 1, false), consumer(Pid1, 3, false)]),
452+
Groups),
453+
ok.
454+
455+
handle_connection_down_super_stream_activate_in_remaining_connection_test(_) ->
456+
Stream = <<"stream">>,
457+
ConsumerName = <<"app">>,
458+
GroupId = {<<"/">>, Stream, ConsumerName},
459+
Pid0 = self(),
460+
Pid1 = spawn(fun() -> ok end),
461+
Group = cgroup(1, [consumer(Pid0, 0, false),
462+
consumer(Pid0, 1, true),
463+
consumer(Pid1, 2, false),
464+
consumer(Pid1, 3, false)]),
465+
State = state(#{GroupId => Group},
466+
#{Pid0 => maps:from_list([{GroupId, true}]),
467+
Pid1 => maps:from_list([{GroupId, true}])}),
468+
469+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
470+
Effects} =
471+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
472+
assertSize(1, PidsGroups),
473+
assertSize(1, maps:get(Pid1, PidsGroups)),
474+
assertSendMessageEffect(Pid1, 3, Stream, ConsumerName, true, Effects),
475+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, true)]),
476+
Groups),
477+
ok.
478+
479+
handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
480+
Stream = <<"stream">>,
481+
ConsumerName = <<"app">>,
482+
GroupId = {<<"/">>, Stream, ConsumerName},
483+
Pid0 = self(),
484+
Pid1 = spawn(fun() -> ok end),
485+
%% this is a weird case that should not happen in the wild,
486+
%% we test the logic in the code nevertheless.
487+
%% No active consumer in the group
488+
Group = cgroup(1, [consumer(Pid0, 0, false),
489+
consumer(Pid0, 1, false),
490+
consumer(Pid1, 2, false),
491+
consumer(Pid1, 3, false)]),
492+
State = state(#{GroupId => Group},
493+
#{Pid0 => maps:from_list([{GroupId, true}]),
494+
Pid1 => maps:from_list([{GroupId, true}])}),
495+
496+
{#?STATE{pids_groups = PidsGroups, groups = Groups},
497+
Effects} =
498+
rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State),
499+
assertSize(1, PidsGroups),
500+
assertSize(1, maps:get(Pid1, PidsGroups)),
501+
assertEmpty(Effects),
502+
assertHasGroup(GroupId, cgroup(1, [consumer(Pid1, 2, false), consumer(Pid1, 3, false)]),
503+
Groups),
504+
ok.
505+
346506
assertSize(Expected, []) ->
347507
?assertEqual(Expected, 0);
348508
assertSize(Expected, Map) when is_map(Map) ->
@@ -353,6 +513,9 @@ assertSize(Expected, List) when is_list(List) ->
353513
assertEmpty(Data) ->
354514
assertSize(0, Data).
355515

516+
assertHasGroup(GroupId, Group, Groups) ->
517+
?assertEqual(#{GroupId => Group}, Groups).
518+
356519
consumer(Pid, SubId, Active) ->
357520
#consumer{pid = Pid,
358521
subscription_id = SubId,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

+2-12
Original file line numberDiff line numberDiff line change
@@ -598,26 +598,16 @@ augment_infos_with_user_provided_connection_name(Infos,
598598
end.
599599

600600
close(Transport,
601-
#stream_connection{socket = S, virtual_host = VirtualHost,
602-
outstanding_requests = Requests},
601+
#stream_connection{socket = S},
603602
#stream_connection_state{consumers = Consumers}) ->
604603
[begin
605-
%% we discard the result (updated requests) because they are no longer used
606-
_ = maybe_unregister_consumer(VirtualHost, Consumer,
607-
single_active_consumer(Properties),
608-
Requests),
609604
case Log of
610605
undefined ->
611606
ok; %% segment may not be defined on subscription (single active consumer)
612607
L ->
613608
osiris_log:close(L)
614609
end
615-
end
616-
|| #consumer{log = Log,
617-
configuration =
618-
#consumer_configuration{properties = Properties}} =
619-
Consumer
620-
<- maps:values(Consumers)],
610+
end || #consumer{log = Log} <- maps:values(Consumers)],
621611
Transport:shutdown(S, write),
622612
Transport:close(S).
623613

0 commit comments

Comments
 (0)