Skip to content

Commit f7291db

Browse files
committed
test: add rocketmq_client_SUITE covering state transitions
Unit-level CT suite for the rocketmq_client state machine, using a tiny gen_tcp listener helper (no real broker needed) so each case can deterministically exercise connect / drop / reconnect. Covers: - connect refused -> {disconnected, {tcp_connect_error, _}} - successful connect -> connected - socket drop flips status to connecting, then {disconnected, _} - get_connection_state drives async reconnect on poll - reconnecting flag dedups rapid polls (bounded mailbox) - unreachable -> reachable recovery - get_status/1 backwards-compatible return shape
1 parent 9e65362 commit f7291db

1 file changed

Lines changed: 265 additions & 0 deletions

File tree

test/rocketmq_client_SUITE.erl

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
%%--------------------------------------------------------------------
2+
%% Copyright (c) 2026 EMQ Technologies Co., Ltd. All Rights Reserved.
3+
%%
4+
%% Unit-level tests for rocketmq_client state machine. Uses a tiny
5+
%% gen_tcp listener helper instead of a real RocketMQ broker so the
6+
%% tests can deterministically exercise connect / drop / reconnect
7+
%% transitions.
8+
%%--------------------------------------------------------------------
9+
10+
-module(rocketmq_client_SUITE).
11+
12+
-include_lib("common_test/include/ct.hrl").
13+
-include_lib("eunit/include/eunit.hrl").
14+
15+
-export([all/0, init_per_suite/1, end_per_suite/1,
16+
init_per_testcase/2, end_per_testcase/2]).
17+
18+
-export([t_connect_refused_reports_disconnected_with_reason/1,
19+
t_connect_success_reports_connected/1,
20+
t_socket_drop_flips_to_connecting_with_reason/1,
21+
t_reconnect_driven_by_get_connection_state_poll/1,
22+
t_reconnecting_flag_dedups_polls/1,
23+
t_unreachable_then_reachable_recovers/1,
24+
t_get_status_unchanged_for_backwards_compat/1]).
25+
26+
all() ->
27+
[t_connect_refused_reports_disconnected_with_reason,
28+
t_connect_success_reports_connected,
29+
t_socket_drop_flips_to_connecting_with_reason,
30+
t_reconnect_driven_by_get_connection_state_poll,
31+
t_reconnecting_flag_dedups_polls,
32+
t_unreachable_then_reachable_recovers,
33+
t_get_status_unchanged_for_backwards_compat].
34+
35+
init_per_suite(Config) ->
36+
Config.
37+
38+
end_per_suite(_Config) -> ok.
39+
40+
init_per_testcase(TC, Config) ->
41+
%% Each case gets a unique registered ClientId so parallel
42+
%% start_link calls don't collide via the local name registry.
43+
[{client_id, list_to_atom("rocketmq_client_test_" ++ atom_to_list(TC))} | Config].
44+
45+
end_per_testcase(_TC, Config) ->
46+
ClientId = ?config(client_id, Config),
47+
case whereis(ClientId) of
48+
undefined -> ok;
49+
Pid -> ok = stop(Pid)
50+
end,
51+
ok.
52+
53+
%%--------------------------------------------------------------------
54+
%% Cases
55+
56+
t_connect_refused_reports_disconnected_with_reason(Config) ->
57+
%% Pick a free port and never bind to it -> immediate econnrefused.
58+
Port = free_port(),
59+
{ok, Pid} = start_client(?config(client_id, Config),
60+
[{"127.0.0.1", Port}], #{}),
61+
%% handle_continue fires the first attempt synchronously before the
62+
%% gen_server starts accepting external calls; it fails immediately
63+
%% and bumps reconnect_attempts.
64+
?assertMatch({disconnected, {tcp_connect_error, {_, _, econnrefused}}},
65+
rocketmq_client:get_connection_state(Pid)),
66+
ok.
67+
68+
t_connect_success_reports_connected(Config) ->
69+
{ok, Listener, Port} = start_listener(),
70+
{ok, Pid} = start_client(?config(client_id, Config),
71+
[{"127.0.0.1", Port}], #{}),
72+
ok = wait_for(fun() -> rocketmq_client:get_connection_state(Pid) =:= connected end, 2000),
73+
?assertEqual(connected, rocketmq_client:get_connection_state(Pid)),
74+
stop_listener(Listener),
75+
ok.
76+
77+
t_socket_drop_flips_to_connecting_with_reason(Config) ->
78+
{ok, Listener, Port} = start_listener(),
79+
{ok, Pid} = start_client(?config(client_id, Config),
80+
[{"127.0.0.1", Port}], #{}),
81+
ok = wait_for(fun() -> rocketmq_client:get_connection_state(Pid) =:= connected end, 2000),
82+
%% Close the listener and all its accepted sockets to simulate a drop.
83+
stop_listener(Listener),
84+
ok = wait_for(
85+
fun() ->
86+
case rocketmq_client:get_connection_state(Pid) of
87+
connecting -> true;
88+
{disconnected, _} -> true;
89+
_ -> false
90+
end
91+
end, 2000),
92+
%% After the drop but before the next failed retry, the status is
93+
%% `connecting'. After the next retry fails (listener is gone) it
94+
%% flips to {disconnected, _}.
95+
ok = wait_for(
96+
fun() ->
97+
case rocketmq_client:get_connection_state(Pid) of
98+
{disconnected, _} -> true;
99+
_ -> false
100+
end
101+
end, 5000),
102+
{disconnected, Reason} = rocketmq_client:get_connection_state(Pid),
103+
%% Reason is either the original drop (tcp_closed) or the follow-up
104+
%% failed reconnect. Either way, a concrete tagged reason rather
105+
%% than `undefined'.
106+
?assertMatch(R when R =/= undefined, Reason),
107+
ok.
108+
109+
t_reconnect_driven_by_get_connection_state_poll(Config) ->
110+
%% Start with no server, client stays disconnected.
111+
Port = free_port(),
112+
{ok, Pid} = start_client(?config(client_id, Config),
113+
[{"127.0.0.1", Port}], #{}),
114+
?assertMatch({disconnected, _}, rocketmq_client:get_connection_state(Pid)),
115+
%% Bring up a listener on that port, then poll a few times to
116+
%% confirm get_connection_state drives the reconnect itself (no
117+
%% producer traffic required).
118+
{ok, Listener} = listen_on(Port),
119+
ok = wait_for(fun() ->
120+
_ = rocketmq_client:get_connection_state(Pid),
121+
rocketmq_client:get_connection_state(Pid) =:= connected
122+
end, 3000),
123+
?assertEqual(connected, rocketmq_client:get_connection_state(Pid)),
124+
stop_listener(Listener),
125+
ok.
126+
127+
t_reconnecting_flag_dedups_polls(Config) ->
128+
%% With a dead port, reconnect attempts are slow (TCP connect
129+
%% timeout). Fire a burst of get_connection_state calls and assert
130+
%% only one `try_reconnect' message is in flight at any moment --
131+
%% the `reconnecting' flag must prevent mailbox buildup.
132+
Port = free_port(),
133+
{ok, Pid} = start_client(?config(client_id, Config),
134+
[{"127.0.0.1", Port}], #{}),
135+
_ = rocketmq_client:get_connection_state(Pid),
136+
%% Fire 50 polls in quick succession. If dedup wasn't working we'd
137+
%% accumulate 50 try_reconnect messages in the mailbox.
138+
[rocketmq_client:get_connection_state(Pid) || _ <- lists:seq(1, 50)],
139+
{message_queue_len, QLen} = erlang:process_info(Pid, message_queue_len),
140+
%% Allow for at most one queued try_reconnect (the in-flight one is
141+
%% not counted in message_queue_len since it's mid-handler).
142+
?assert(QLen =< 1, {too_many_pending, QLen}),
143+
ok.
144+
145+
t_unreachable_then_reachable_recovers(Config) ->
146+
%% End-to-end recovery: unreachable -> reachable -> connected.
147+
Port = free_port(),
148+
{ok, Pid} = start_client(?config(client_id, Config),
149+
[{"127.0.0.1", Port}], #{}),
150+
ok = wait_for(
151+
fun() ->
152+
case rocketmq_client:get_connection_state(Pid) of
153+
{disconnected, _} -> true;
154+
_ -> false
155+
end
156+
end, 2000),
157+
{ok, Listener} = listen_on(Port),
158+
%% Polling drives recovery; within ~3s of bringing the listener up,
159+
%% the client should flip to connected.
160+
ok = wait_for(
161+
fun() ->
162+
_ = rocketmq_client:get_connection_state(Pid),
163+
rocketmq_client:get_connection_state(Pid) =:= connected
164+
end, 3000),
165+
stop_listener(Listener),
166+
ok.
167+
168+
t_get_status_unchanged_for_backwards_compat(Config) ->
169+
{ok, Listener, Port} = start_listener(),
170+
{ok, Pid} = start_client(?config(client_id, Config),
171+
[{"127.0.0.1", Port}], #{}),
172+
ok = wait_for(fun() -> rocketmq_client:get_status(Pid) =:= true end, 2000),
173+
?assertEqual(true, rocketmq_client:get_status(Pid)),
174+
stop_listener(Listener),
175+
%% After the server goes away and sock drops, get_status should
176+
%% attempt an inline reconnect and return false (preserving the
177+
%% pre-v0.7.0 return shape).
178+
ok = wait_for(fun() -> rocketmq_client:get_status(Pid) =:= false end, 5000),
179+
ok.
180+
181+
%%--------------------------------------------------------------------
182+
%% helpers
183+
184+
start_client(ClientId, Servers, ExtraOpts) ->
185+
Opts = maps:merge(#{connect_timeout => 1000}, ExtraOpts),
186+
rocketmq_client:start_link(ClientId, Servers, Opts).
187+
188+
stop(Pid) when is_pid(Pid) ->
189+
%% Unlink first so the test process isn't taken down by the
190+
%% start_link relationship when we terminate the client.
191+
unlink(Pid),
192+
try gen_server:stop(Pid, shutdown, 2000) of
193+
ok -> ok
194+
catch
195+
exit:noproc -> ok;
196+
exit:{noproc, _} -> ok
197+
end.
198+
199+
free_port() ->
200+
{ok, L} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]),
201+
{ok, Port} = inet:port(L),
202+
ok = gen_tcp:close(L),
203+
Port.
204+
205+
start_listener() ->
206+
Port = free_port(),
207+
{ok, L} = listen_on(Port),
208+
{ok, L, Port}.
209+
210+
%% Returns an opaque listener handle that tracks both the listen socket
211+
%% and every accepted socket, so stop_listener/1 can close them all
212+
%% and deterministically drop whatever the client connected through.
213+
-record(listener, {acceptor, ctrl}).
214+
215+
listen_on(Port) ->
216+
{ok, LSock} = gen_tcp:listen(Port, [binary, {active, false}, {reuseaddr, true}]),
217+
Parent = self(),
218+
Ctrl = spawn_link(fun() -> listener_ctrl_loop(LSock, Parent, []) end),
219+
{ok, #listener{acceptor = LSock, ctrl = Ctrl}}.
220+
221+
listener_ctrl_loop(LSock, Parent, Accepted) ->
222+
receive
223+
stop ->
224+
catch gen_tcp:close(LSock),
225+
[catch gen_tcp:close(S) || S <- Accepted],
226+
ok
227+
after 0 ->
228+
case gen_tcp:accept(LSock, 100) of
229+
{ok, Sock} ->
230+
gen_tcp:controlling_process(Sock, self()),
231+
listener_ctrl_loop(LSock, Parent, [Sock | Accepted]);
232+
{error, timeout} ->
233+
listener_ctrl_loop(LSock, Parent, Accepted);
234+
{error, closed} ->
235+
ok;
236+
{error, _} ->
237+
ok
238+
end
239+
end.
240+
241+
stop_listener(#listener{ctrl = Ctrl}) ->
242+
Ref = erlang:monitor(process, Ctrl),
243+
Ctrl ! stop,
244+
receive
245+
{'DOWN', Ref, process, Ctrl, _} -> ok
246+
after 2000 ->
247+
erlang:demonitor(Ref, [flush]),
248+
ok
249+
end.
250+
251+
wait_for(Pred, Timeout) ->
252+
Deadline = erlang:monotonic_time(millisecond) + Timeout,
253+
wait_loop(Pred, Deadline).
254+
255+
wait_loop(Pred, Deadline) ->
256+
case Pred() of
257+
true -> ok;
258+
false ->
259+
case erlang:monotonic_time(millisecond) of
260+
T when T > Deadline -> {error, timeout};
261+
_ ->
262+
timer:sleep(50),
263+
wait_loop(Pred, Deadline)
264+
end
265+
end.

0 commit comments

Comments
 (0)