Skip to content

Commit 31a7508

Browse files
committed
AMQP 1.0: Handle per-queue-type disk alarms
1 parent 4826c4d commit 31a7508

File tree

3 files changed

+147
-34
lines changed

3 files changed

+147
-34
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@
384384
stashed_eol = [] :: [rabbit_amqqueue:name()],
385385

386386
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
387+
queue_types_published = sets:new([{version, 2}]) ::
388+
sets:set(rabbit_queue_type:queue_type()),
387389
permission_cache = [] :: permission_cache(),
388390
topic_permission_cache = [] :: topic_permission_cache()
389391
}).
@@ -451,9 +453,14 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
451453
true = is_valid_max(MaxLinkCredit),
452454
true = is_valid_max(MaxQueueCredit),
453455
true = is_valid_max(MaxIncomingWindow),
454-
IncomingWindow = case sets:is_empty(Alarms) of
455-
true -> MaxIncomingWindow;
456-
false -> 0
456+
InResourceAlarm = sets:fold(fun ({disk, _}, Acc) ->
457+
Acc;
458+
(_, _) ->
459+
true
460+
end, false, Alarms),
461+
IncomingWindow = case InResourceAlarm of
462+
true -> 0;
463+
false -> MaxIncomingWindow
457464
end,
458465
NextOutgoingId = ?INITIAL_OUTGOING_TRANSFER_ID,
459466

@@ -585,36 +592,17 @@ handle_cast({queue_event, _, _} = QEvent, State0) ->
585592
log_error_and_close_session(Error, State0)
586593
end;
587594
handle_cast({conserve_resources, Alarm, Conserve},
588-
#state{incoming_window = IncomingWindow0,
589-
cfg = #cfg{resource_alarms = Alarms0,
590-
incoming_window_margin = Margin0,
595+
#state{cfg = #cfg{resource_alarms = Alarms0,
591596
writer_pid = WriterPid,
592-
channel_num = Ch,
593-
max_incoming_window = MaxIncomingWindow
597+
channel_num = Ch
594598
} = Cfg
595599
} = State0) ->
596600
Alarms = case Conserve of
597601
true -> sets:add_element(Alarm, Alarms0);
598602
false -> sets:del_element(Alarm, Alarms0)
599603
end,
600-
{SendFlow, IncomingWindow, Margin} =
601-
case {sets:is_empty(Alarms0), sets:is_empty(Alarms)} of
602-
{true, false} ->
603-
%% Alarm kicked in.
604-
%% Notify the client to not send us any more TRANSFERs. Since we decrase
605-
%% our incoming window dynamically, there might be incoming in-flight
606-
%% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
607-
{true, 0, MaxIncomingWindow};
608-
{false, true} ->
609-
%% All alarms cleared.
610-
%% Notify the client that it can resume sending us TRANSFERs.
611-
{true, MaxIncomingWindow, 0};
612-
_ ->
613-
{false, IncomingWindow0, Margin0}
614-
end,
615-
State = State0#state{incoming_window = IncomingWindow,
616-
cfg = Cfg#cfg{resource_alarms = Alarms,
617-
incoming_window_margin = Margin}},
604+
State1 = State0#state{cfg = Cfg#cfg{resource_alarms = Alarms}},
605+
{SendFlow, State} = check_resource_alarm(State0, State1),
618606
case SendFlow of
619607
true ->
620608
Flow = session_flow_fields(#'v1_0.flow'{}, State),
@@ -640,6 +628,41 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
640628
handle_cast(shutdown, State) ->
641629
{stop, normal, State}.
642630

631+
is_in_resource_alarm(#state{cfg = #cfg{resource_alarms = Alarms},
632+
queue_types_published = QTs}) ->
633+
sets:fold(
634+
fun ({disk, QT}, Acc) ->
635+
Acc orelse sets:is_element(QT, QTs);
636+
(_, _) ->
637+
true
638+
end, false, Alarms).
639+
640+
check_resource_alarm(State0,
641+
#state{incoming_window = IncomingWindow0,
642+
cfg = #cfg{incoming_window_margin = Margin0,
643+
max_incoming_window = MaxIncomingWindow
644+
} = Cfg} = State1) ->
645+
WasBlocked = is_in_resource_alarm(State0),
646+
IsBlocked = is_in_resource_alarm(State1),
647+
{SendFlow, IncomingWindow, Margin} =
648+
case IsBlocked of
649+
true when not WasBlocked ->
650+
%% Alarm kicked in.
651+
%% Notify the client to not send us any more TRANSFERs. Since we decrase
652+
%% our incoming window dynamically, there might be incoming in-flight
653+
%% TRANSFERs. So, let's be lax and allow for some excess TRANSFERs.
654+
{true, 0, MaxIncomingWindow};
655+
false when WasBlocked ->
656+
%% All alarms cleared.
657+
%% Notify the client that it can resume sending us TRANSFERs.
658+
{true, MaxIncomingWindow, 0};
659+
_ ->
660+
{false, IncomingWindow0, Margin0}
661+
end,
662+
State = State1#state{incoming_window = IncomingWindow,
663+
cfg = Cfg#cfg{incoming_window_margin = Margin}},
664+
{SendFlow, State}.
665+
643666
log_error_and_close_session(
644667
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
645668
writer_pid = WriterPid,
@@ -1958,7 +1981,6 @@ session_flow_control_received_transfer(
19581981
incoming_window = InWindow0,
19591982
remote_outgoing_window = RemoteOutgoingWindow,
19601983
cfg = #cfg{incoming_window_margin = Margin,
1961-
resource_alarms = Alarms,
19621984
max_incoming_window = MaxIncomingWindow}
19631985
} = State) ->
19641986
InWindow1 = InWindow0 - 1,
@@ -1972,7 +1994,7 @@ session_flow_control_received_transfer(
19721994
ok
19731995
end,
19741996
{Flows, InWindow} = case InWindow1 =< (MaxIncomingWindow div 2) andalso
1975-
sets:is_empty(Alarms) of
1997+
not is_in_resource_alarm(State) of
19761998
true ->
19771999
%% We've reached halfway and there are no
19782000
%% disk or memory alarm, open the window.
@@ -2388,6 +2410,7 @@ incoming_link_transfer(
23882410
multi_transfer_msg = MultiTransfer
23892411
} = Link0,
23902412
State0 = #state{queue_states = QStates0,
2413+
queue_types_published = QTs0,
23912414
permission_cache = PermCache0,
23922415
topic_permission_cache = TopicPermCache0,
23932416
cfg = #cfg{user = User = #user{username = Username},
@@ -2435,19 +2458,26 @@ incoming_link_transfer(
24352458
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
24362459
case rabbit_queue_type:deliver(Qs, Mc, Opts, QStates0) of
24372460
{ok, QStates, Actions} ->
2461+
QTs1 = sets:from_list(rabbit_amqqueue:queue_types(Qs),
2462+
[{version, 2}]),
2463+
QTs = sets:union(QTs0, QTs1),
24382464
State1 = State0#state{queue_states = QStates,
2465+
queue_types_published = QTs,
24392466
permission_cache = PermCache,
24402467
topic_permission_cache = TopicPermCache},
24412468
%% Confirms must be registered before processing actions
24422469
%% because actions may contain rejections of publishes.
24432470
{U, Reply0} = process_routing_confirm(
24442471
Qs, Settled, DeliveryId, U0),
2445-
State = handle_queue_actions(Actions, State1),
2472+
State2 = handle_queue_actions(Actions, State1),
2473+
{SendAlarmFlow, State} = check_resource_alarm(
2474+
State0, State2),
24462475
DeliveryCount = add(DeliveryCount0, 1),
24472476
Credit1 = Credit0 - 1,
24482477
{Credit, Reply1} = maybe_grant_link_credit(
24492478
Credit1, MaxLinkCredit,
2450-
DeliveryCount, map_size(U), Handle),
2479+
DeliveryCount, map_size(U), Handle,
2480+
SendAlarmFlow),
24512481
Reply = Reply0 ++ Reply1,
24522482
Link = Link0#incoming_link{
24532483
delivery_count = DeliveryCount,
@@ -2482,7 +2512,8 @@ incoming_link_transfer(
24822512
Credit1 = Credit0 - 1,
24832513
{Credit, Reply0} = maybe_grant_link_credit(
24842514
Credit1, MaxLinkCredit,
2485-
DeliveryCount, map_size(U0), Handle),
2515+
DeliveryCount, map_size(U0), Handle,
2516+
false),
24862517
Reply = [Disposition | Reply0],
24872518
Link = Link0#incoming_link{
24882519
delivery_count = DeliveryCount,
@@ -2613,8 +2644,9 @@ rejected(QNameBin, down) ->
26132644
{{symbol, <<"reason">>}, {symbol, <<"unavailable">>}}]}}}.
26142645

26152646

2616-
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
2617-
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
2647+
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed,
2648+
Handle, AlarmFlow) ->
2649+
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) orelse AlarmFlow of
26182650
true ->
26192651
{MaxLinkCredit, [flow(Handle, DeliveryCount, MaxLinkCredit)]};
26202652
false ->

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ groups() ->
115115
resource_alarm_before_session_begin,
116116
resource_alarm_after_session_begin,
117117
resource_alarm_send_many,
118+
per_queue_type_disk_alarm,
118119
max_message_size_client_to_server,
119120
max_message_size_server_to_client,
120121
global_counters,
@@ -229,7 +230,13 @@ init_per_suite(Config) ->
229230
rabbit_ct_helpers:log_environment(),
230231
rabbit_ct_helpers:merge_app_env(
231232
Config, {rabbit, [{quorum_tick_interval, 1000},
232-
{stream_tick_interval, 1000}
233+
{stream_tick_interval, 1000},
234+
%% Imaginary mount-point for per-queue-type disk alarms
235+
{disk_free_limits,
236+
#{1 => #{name => <<"streaming">>,
237+
mount => "/does/not/exist",
238+
limit => "2GB",
239+
queue_types => [<<"stream">>]}}}
233240
]}).
234241

235242
end_per_suite(Config) ->
@@ -3483,6 +3490,76 @@ auth_attempt_metrics(Config) ->
34833490
?assertEqual(0, proplists:get_value(auth_attempts_failed, Attempt2)),
34843491
?assertEqual(1, proplists:get_value(auth_attempts_succeeded, Attempt2)).
34853492

3493+
per_queue_type_disk_alarm(Config) ->
3494+
Prefix = atom_to_binary(?FUNCTION_NAME),
3495+
Resource = {disk, rabbit_stream_queue},
3496+
CQ = <<Prefix/binary, "-classic">>,
3497+
SQ = <<Prefix/binary, "-stream">>,
3498+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
3499+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = CQ}),
3500+
#'queue.declare_ok'{} = amqp_channel:call(
3501+
Ch, #'queue.declare'{
3502+
queue = SQ,
3503+
durable = true,
3504+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
3505+
3506+
OpnConf = connection_config(Config),
3507+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
3508+
3509+
%% Set the alarm for the stream queue type.
3510+
ok = rabbit_ct_broker_helpers:set_alarm(Config, 0, Resource),
3511+
3512+
%% Attach one sender to the CQ and one to the SQ.
3513+
{ok, Session1} = amqp10_client:begin_session_sync(Connection),
3514+
{ok, Sender1} = amqp10_client:attach_sender_link(
3515+
Session1, <<Prefix/binary, "-cq-sender">>,
3516+
rabbitmq_amqp_address:queue(CQ), unsettled),
3517+
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
3518+
{ok, Sender2} = amqp10_client:attach_sender_link(
3519+
Session2, <<Prefix/binary, "-sq-sender">>,
3520+
rabbitmq_amqp_address:queue(SQ), unsettled),
3521+
3522+
%% Both senders initially have link and session credit.
3523+
ok = wait_for_credit(Sender1),
3524+
ok = wait_for_credit(Sender2),
3525+
Tag1 = <<"tag1">>,
3526+
Msg1 = amqp10_msg:new(Tag1, <<"m1">>, false),
3527+
?assertEqual(ok,
3528+
amqp10_client:send_msg(Sender1, Msg1)),
3529+
ok = wait_for_accepted(Tag1),
3530+
?assertEqual(ok,
3531+
amqp10_client:send_msg(Sender2, Msg1)),
3532+
ok = wait_for_accepted(Tag1),
3533+
3534+
%% Once the SQ sender has delivered to a stream, it becomes blocked by
3535+
%% session flow control.
3536+
Tag2 = <<"tag2">>,
3537+
Msg2 = amqp10_msg:new(Tag2, <<"m2">>, false),
3538+
?assertEqual(ok,
3539+
amqp10_client:send_msg(Sender1, Msg2)),
3540+
ok = wait_for_accepted(Tag2),
3541+
?assertEqual({error, remote_incoming_window_exceeded},
3542+
amqp10_client:send_msg(Sender2, Msg2)),
3543+
3544+
%% Clear the alarm and the SQ sender can then send transfers.
3545+
ok = rabbit_ct_broker_helpers:clear_alarm(Config, 0, Resource),
3546+
Tag3 = <<"tag3">>,
3547+
Msg3 = amqp10_msg:new(Tag3, <<"m3">>, false),
3548+
?assertEqual(ok,
3549+
amqp10_client:send_msg(Sender1, Msg3)),
3550+
ok = wait_for_accepted(Tag3),
3551+
?assertEqual(ok,
3552+
amqp10_client:send_msg(Sender2, Msg3)),
3553+
ok = wait_for_accepted(Tag3),
3554+
3555+
ok = amqp10_client:detach_link(Sender1),
3556+
ok = end_session_sync(Session1),
3557+
ok = amqp10_client:detach_link(Sender2),
3558+
ok = end_session_sync(Session2),
3559+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = CQ}),
3560+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = SQ}),
3561+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
3562+
34863563
max_message_size_client_to_server(Config) ->
34873564
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
34883565
%% Limit the server to only accept messages up to 2KB.

deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,6 +1793,8 @@ set_alarm(Config, Node, file_descriptor_limit = Resource) ->
17931793
set_alarm(Config, Node, memory = Resource) ->
17941794
rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]);
17951795
set_alarm(Config, Node, disk = Resource) ->
1796+
rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]);
1797+
set_alarm(Config, Node, {disk, QueueType} = Resource) when is_atom(QueueType) ->
17961798
rpc(Config, Node, rabbit_alarm, set_alarm, [{{resource_limit, Resource, Node}, []}]).
17971799

17981800
get_alarms(Config, Node) ->
@@ -1806,6 +1808,8 @@ clear_alarm(Config, Node, file_descriptor_limit = Resource) ->
18061808
clear_alarm(Config, Node, memory = Resource) ->
18071809
rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]);
18081810
clear_alarm(Config, Node, disk = Resource) ->
1811+
rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]);
1812+
clear_alarm(Config, Node, {disk, QueueType} = Resource) when is_atom(QueueType) ->
18091813
rpc(Config, Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]).
18101814

18111815
clear_all_alarms(Config, Node) ->

0 commit comments

Comments
 (0)