Skip to content

Commit eb863b3

Browse files
ansdmergify[bot]
authored andcommitted
Fix MQTT test flake
Prior to this commit, test ``` make -C deps/rabbitmq_mqtt ct-mqtt_shared t=[mqtt,cluster_size_1,v4]:non_clean_sess_reconnect_qos0_and_qos1 ``` flaked in CI with error: ``` {mqtt_shared_SUITE,non_clean_sess_reconnect_qos0_and_qos1,972} {badmatch,{publish_not_received,<<"msg-0">>}} ``` The problem was the following race condition: * The MQTT v4 client sends an async DISCONNECT * The global MQTT consumer metric got decremented. However, the classic queue still has the MQTT connection proc registered as consumer. * The test case sends a message * The classic queue checks out the message to the old connection instead of checking out the message to the new connection. The solution in this commit is to check the consumer count of the classic queue before proceeding to send the message after disconnection. (cherry picked from commit 40bf778)
1 parent d98000e commit eb863b3

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

deps/rabbitmq_mqtt/test/shared_SUITE.erl

+25-19
Original file line numberDiff line numberDiff line change
@@ -878,34 +878,30 @@ session_expiry(Config) ->
878878
ok = rpc(Config, application, set_env, [App, Par, DefaultVal]).
879879

880880
non_clean_sess_reconnect_qos1(Config) ->
881-
non_clean_sess_reconnect(Config, qos1).
881+
non_clean_sess_reconnect(Config, 1).
882882

883883
non_clean_sess_reconnect_qos0(Config) ->
884-
non_clean_sess_reconnect(Config, qos0).
884+
non_clean_sess_reconnect(Config, 0).
885885

886886
non_clean_sess_reconnect(Config, SubscriptionQoS) ->
887887
Pub = connect(<<"publisher">>, Config),
888888
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
889889

890890
C1 = connect(ClientId, Config, non_clean_sess_opts()),
891-
{ok, _, _} = emqtt:subscribe(C1, Topic, SubscriptionQoS),
892-
?assertMatch(#{consumers := 1},
893-
get_global_counters(Config)),
891+
{ok, _, [SubscriptionQoS]} = emqtt:subscribe(C1, Topic, SubscriptionQoS),
892+
ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config),
894893

895894
ok = emqtt:disconnect(C1),
896-
eventually(?_assertMatch(#{consumers := 0},
897-
get_global_counters(Config))),
895+
ok = await_consumer_count(0, ClientId, SubscriptionQoS, Config),
898896

899-
timer:sleep(20),
900897
ok = emqtt:publish(Pub, Topic, <<"msg-3-qos0">>, qos0),
901898
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-4-qos1">>, qos1),
902899

903900
C2 = connect(ClientId, Config, non_clean_sess_opts()),
904901
%% Server should reply in CONNACK that it has session state.
905902
?assertEqual({session_present, 1},
906903
proplists:lookup(session_present, emqtt:info(C2))),
907-
?assertMatch(#{consumers := 1},
908-
get_global_counters(Config)),
904+
ok = await_consumer_count(1, ClientId, SubscriptionQoS, Config),
909905

910906
ok = emqtt:publish(Pub, Topic, <<"msg-5-qos0">>, qos0),
911907
{ok, _} = emqtt:publish(Pub, Topic, <<"msg-6-qos1">>, qos1),
@@ -938,21 +934,20 @@ non_clean_sess_reconnect_qos0_and_qos1(Config) ->
938934
ClientId = ?FUNCTION_NAME,
939935

940936
C1 = connect(ClientId, Config, non_clean_sess_opts()),
941-
{ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1}, {Topic0, qos0}]),
942-
?assertMatch(#{consumers := 1},
943-
get_global_counters(Config)),
937+
{ok, _, [1, 0]} = emqtt:subscribe(C1, [{Topic1, qos1},
938+
{Topic0, qos0}]),
939+
ok = await_consumer_count(1, ClientId, 0, Config),
940+
ok = await_consumer_count(1, ClientId, 1, Config),
944941

945942
ok = emqtt:disconnect(C1),
946-
eventually(?_assertMatch(#{consumers := 0},
947-
get_global_counters(Config))),
948-
943+
ok = await_consumer_count(0, ClientId, 0, Config),
944+
ok = await_consumer_count(0, ClientId, 1, Config),
949945
{ok, _} = emqtt:publish(Pub, Topic0, <<"msg-0">>, qos1),
950946
{ok, _} = emqtt:publish(Pub, Topic1, <<"msg-1">>, qos1),
951947

952948
C2 = connect(ClientId, Config, non_clean_sess_opts()),
953-
?assertMatch(#{consumers := 1},
954-
get_global_counters(Config)),
955-
949+
ok = await_consumer_count(1, ClientId, 0, Config),
950+
ok = await_consumer_count(1, ClientId, 1, Config),
956951
ok = expect_publishes(C2, Topic0, [<<"msg-0">>]),
957952
ok = expect_publishes(C2, Topic1, [<<"msg-1">>]),
958953

@@ -1868,6 +1863,17 @@ await_confirms_unordered(From, Left) ->
18681863
ct:fail("~b confirms are missing", [Left])
18691864
end.
18701865

1866+
await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
1867+
Ch = rabbit_ct_client_helpers:open_channel(Config),
1868+
QueueName = rabbit_mqtt_util:queue_name_bin(
1869+
rabbit_data_coercion:to_binary(ClientId), QoS),
1870+
eventually(
1871+
?_assertMatch(
1872+
#'queue.declare_ok'{consumer_count = ConsumerCount},
1873+
amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
1874+
passive = true})), 500, 10),
1875+
ok = rabbit_ct_client_helpers:close_channel(Ch).
1876+
18711877
declare_queue(Ch, QueueName, Args)
18721878
when is_pid(Ch), is_binary(QueueName), is_list(Args) ->
18731879
#'queue.declare_ok'{} = amqp_channel:call(

0 commit comments

Comments
 (0)