Skip to content

Commit 4826c4d

Browse files
committed
MQTT: Handle per-queue-type disk alarms
This includes regular MQTT and MQTT-over-WebSockets.
1 parent a1bebc1 commit 4826c4d

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@
9999
-record(state,
100100
{cfg :: #cfg{},
101101
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
102+
queue_types_published = sets:new([{version, 2}]) ::
103+
sets:set(rabbit_queue_type:queue_type()),
102104
%% Packet IDs published to queues but not yet confirmed.
103105
unacked_client_pubs = rabbit_mqtt_confirms:init() :: rabbit_mqtt_confirms:state(),
104106
%% Packet IDs published to MQTT subscribers but not yet acknowledged.
@@ -1702,14 +1704,19 @@ deliver_to_queues(Message,
17021704
Options,
17031705
RoutedToQNames,
17041706
State0 = #state{queue_states = QStates0,
1707+
queue_types_published = QTs0,
17051708
cfg = #cfg{proto_ver = ProtoVer}}) ->
17061709
Qs0 = rabbit_db_queue:get_targets(RoutedToQNames),
17071710
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
17081711
case rabbit_queue_type:deliver(Qs, Message, Options, QStates0) of
17091712
{ok, QStates, Actions} ->
17101713
rabbit_global_counters:messages_routed(ProtoVer, length(Qs)),
1711-
State = process_routing_confirm(Options, Qs,
1712-
State0#state{queue_states = QStates}),
1714+
QTs1 = sets:from_list(rabbit_amqqueue:queue_types(Qs),
1715+
[{version, 2}]),
1716+
QTs = sets:union(QTs0, QTs1),
1717+
State1 = State0#state{queue_states = QStates,
1718+
queue_types_published = QTs},
1719+
State = process_routing_confirm(Options, Qs, State1),
17131720
%% Actions must be processed after registering confirms as actions may
17141721
%% contain rejections of publishes.
17151722
{ok, handle_queue_actions(Actions, State)};
@@ -2389,10 +2396,18 @@ is_socket_busy(Socket) ->
23892396
false
23902397
end.
23912398

2392-
-spec throttle(boolean(), state()) -> boolean().
2393-
throttle(Conserve, #state{queues_soft_limit_exceeded = QSLE,
2394-
cfg = #cfg{published = Published}}) ->
2395-
Conserve andalso Published orelse
2399+
-spec throttle(sets:set(rabbit_alarm:resource_alarm_source()), state()) ->
2400+
boolean().
2401+
throttle(BlockedBy, #state{queues_soft_limit_exceeded = QSLE,
2402+
queue_types_published = QTs,
2403+
cfg = #cfg{published = Published}}) ->
2404+
Alarmed = sets:fold(
2405+
fun ({disk, QT}, Acc) ->
2406+
Acc orelse sets:is_element(QT, QTs);
2407+
(_, _) ->
2408+
true
2409+
end, false, BlockedBy),
2410+
Alarmed andalso Published orelse
23962411
not sets:is_empty(QSLE) orelse
23972412
credit_flow:blocked().
23982413

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,9 @@ control_throttle(State = #state{connection_state = ConnState,
428428
proc_state = PState,
429429
keepalive = KState
430430
}) ->
431-
Conserve = not sets:is_empty(BlockedBy),
432431
Throttle = case PState of
433-
connect_packet_unprocessed -> Conserve;
434-
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
432+
connect_packet_unprocessed -> not sets:is_empty(BlockedBy);
433+
_ -> rabbit_mqtt_processor:throttle(BlockedBy, PState)
435434
end,
436435
case {ConnState, Throttle} of
437436
{running, true} ->

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,9 @@ control_throttle(State = #state{connection_state = ConnState,
414414
proc_state = PState,
415415
keepalive = KState
416416
}) ->
417-
Conserve = not sets:is_empty(BlockedBy),
418417
Throttle = case PState of
419-
connect_packet_unprocessed -> Conserve;
420-
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
418+
connect_packet_unprocessed -> not sets:is_empty(BlockedBy);
419+
_ -> rabbit_mqtt_processor:throttle(BlockedBy, PState)
421420
end,
422421
case {ConnState, Throttle} of
423422
{running, true} ->

0 commit comments

Comments
 (0)