Skip to content

Commit d0082cb

Browse files
committed
consumewr timeouts mqtt
1 parent 702758f commit d0082cb

File tree

3 files changed

+84
-16
lines changed

3 files changed

+84
-16
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,20 +2026,30 @@ handle_queue_event({queue_event, QName, Evt},
20262026
case rabbit_queue_type:handle_event(QName, Evt, QStates0) of
20272027
{ok, QStates, Actions} ->
20282028
State1 = State0#state{queue_states = QStates},
2029-
State = handle_queue_actions(Actions, State1),
2030-
{ok, State};
2029+
try handle_queue_actions(Actions, State1) of
2030+
State ->
2031+
{ok, State}
2032+
catch throw:Reason when Reason =:= consuming_queue_down;
2033+
Reason =:= consumer_timeout ->
2034+
{error, Reason, State1}
2035+
end;
20312036
{eol, Actions} ->
2032-
State1 = handle_queue_actions(Actions, State0),
2033-
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
2034-
QStates = rabbit_queue_type:remove(QName, QStates0),
2035-
State = State1#state{queue_states = QStates,
2036-
unacked_client_pubs = U},
2037-
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
2038-
try handle_queue_down(QName, State) of
2039-
State2 ->
2040-
{ok, State2}
2041-
catch throw:consuming_queue_down ->
2042-
{error, consuming_queue_down, State}
2037+
try handle_queue_actions(Actions, State0) of
2038+
State1 ->
2039+
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
2040+
QStates = rabbit_queue_type:remove(QName, QStates0),
2041+
State = State1#state{queue_states = QStates,
2042+
unacked_client_pubs = U},
2043+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
2044+
try handle_queue_down(QName, State) of
2045+
State2 ->
2046+
{ok, State2}
2047+
catch throw:consuming_queue_down ->
2048+
{error, consuming_queue_down, State}
2049+
end
2050+
catch throw:Reason when Reason =:= consuming_queue_down;
2051+
Reason =:= consumer_timeout ->
2052+
{error, Reason, State0}
20432053
end;
20442054
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
20452055
{error, Error, State0}
@@ -2071,7 +2081,11 @@ handle_queue_actions(Actions, #state{} = State0) ->
20712081
({unblock, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
20722082
S#state{queues_soft_limit_exceeded = sets:del_element(QName, QSLE)};
20732083
({queue_down, QName}, S) ->
2074-
handle_queue_down(QName, S)
2084+
handle_queue_down(QName, S);
2085+
({released, QName, _CTag, _MsgSeqNos, timeout}, _S) ->
2086+
?LOG_INFO("Terminating MQTT connection because consumer on ~s timed out",
2087+
[rabbit_misc:rs(QName)]),
2088+
throw(consumer_timeout)
20752089
end, State0, Actions).
20762090

20772091
handle_queue_down(QName, State0 = #state{cfg = #cfg{client_id = ClientId}}) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ handle_cast(QueueEvent = {queue_event, _, _},
131131
try rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
132132
{ok, PState} ->
133133
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
134-
{error, consuming_queue_down = Reason, PState} ->
134+
{error, Reason, PState} when Reason =:= consuming_queue_down;
135+
Reason =:= consumer_timeout ->
135136
{stop, {shutdown, Reason}, pstate(State, PState)};
136137
{error, Reason0, PState} ->
137138
{stop, Reason0, pstate(State, PState)}

deps/rabbitmq_mqtt/test/v5_SUITE.erl

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ cluster_size_1_tests() ->
126126
topic_alias_disallowed_retained_message,
127127
extended_auth,
128128
headers_exchange,
129-
consistent_hash_exchange
129+
consistent_hash_exchange,
130+
consumer_timeout_quorum_queue
130131
].
131132

132133
cluster_size_3_tests() ->
@@ -2158,6 +2159,58 @@ consistent_hash_exchange(Config) ->
21582159
[#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Qs],
21592160
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0).
21602161

2162+
%% Test that consumer timeout on a quorum queue terminates the MQTT connection.
2163+
%% Consumer timeouts are only supported for quorum queues.
2164+
%%
2165+
%% We use {auto_ack, never} to prevent emqtt from automatically sending PUBACK,
2166+
%% which allows the consumer timeout to trigger.
2167+
consumer_timeout_quorum_queue(Config) ->
2168+
Topic = atom_to_binary(?FUNCTION_NAME),
2169+
PolicyName = <<"consumer-timeout-policy">>,
2170+
2171+
ok = rpc(Config, application, set_env, [?APP, durable_queue_type, quorum]),
2172+
%% Set a short consumer timeout policy for quorum queues
2173+
ok = rabbit_ct_broker_helpers:set_policy(
2174+
Config, 0, PolicyName, <<".*">>, <<"quorum_queues">>,
2175+
[{<<"consumer-timeout">>, 1000}]),
2176+
2177+
%% Use two clients: one for publishing (normal auto_ack), one for subscribing (auto_ack=never)
2178+
Pub = connect(<<"publisher">>, Config),
2179+
Sub = connect(?FUNCTION_NAME, Config,
2180+
non_clean_sess_opts() ++ [{auto_ack, never}]),
2181+
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, qos1),
2182+
2183+
%% Publish a message from the publisher client
2184+
{ok, _} = emqtt:publish(Pub, Topic, <<"test message">>, [{qos, 1}]),
2185+
2186+
%% Receive the message on subscriber but do NOT acknowledge it (auto_ack=never)
2187+
receive
2188+
{publish, #{client_pid := Sub,
2189+
topic := Topic,
2190+
payload := <<"test message">>}} ->
2191+
ok
2192+
after 5000 ->
2193+
ct:fail("did not receive message")
2194+
end,
2195+
2196+
ok = rpc(Config, application, unset_env, [?APP, durable_queue_type]),
2197+
2198+
%% Trap exits so we receive EXIT message instead of crashing when Sub exits
2199+
process_flag(trap_exit, true),
2200+
2201+
%% The subscriber connection should be terminated due to consumer timeout
2202+
%% Wait for the consumer timeout (1s) plus some margin
2203+
util:await_exit(Sub),
2204+
2205+
process_flag(trap_exit, false),
2206+
2207+
%% Cleanup
2208+
ok = emqtt:disconnect(Pub),
2209+
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, PolicyName),
2210+
%% Clean up the session by connecting with clean_start
2211+
C2 = connect(?FUNCTION_NAME, Config, [{clean_start, true}]),
2212+
ok = emqtt:disconnect(C2).
2213+
21612214
%% -------------------------------------------------------------------
21622215
%% Helpers
21632216
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)