Skip to content

Commit 5dd9bb2

Browse files
acogoluegnesmergify[bot]
authored andcommitted
Use infinity timout for RA local query in stream coordinator
The 5-second default timeout is too short. (cherry picked from commit 1634adb)
1 parent 6d4e1c2 commit 5dd9bb2

File tree

2 files changed

+12
-10
lines changed

2 files changed

+12
-10
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

+8-4
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
-export([query_local_pid/3,
5656
query_writer_pid/2,
5757
query_members/2,
58-
query_stream_overview/2]).
58+
query_stream_overview/2,
59+
ra_local_query/1]).
5960

6061

6162
-export([log_overview/1,
@@ -271,7 +272,7 @@ sac_state(#?MODULE{single_active_consumer = SacState}) ->
271272

272273
%% for debugging
273274
state() ->
274-
case ra:local_query({?MODULE, node()}, fun(State) -> State end) of
275+
case ra_local_query(fun(State) -> State end) of
275276
{ok, {_, Res}, _} ->
276277
Res;
277278
Any ->
@@ -289,7 +290,7 @@ local_pid(StreamId) when is_list(StreamId) ->
289290
query_pid(StreamId, MFA).
290291

291292
query_pid(StreamId, MFA) when is_list(StreamId) ->
292-
case ra:local_query({?MODULE, node()}, MFA) of
293+
case ra_local_query(MFA) of
293294
{ok, {_, {ok, Pid}}, _} ->
294295
case erpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
295296
true ->
@@ -380,7 +381,7 @@ query_writer_pid(StreamId, #?MODULE{streams = Streams}) ->
380381
end.
381382

382383
do_query(MFA) ->
383-
case ra:local_query({?MODULE, node()}, MFA) of
384+
case ra_local_query(MFA) of
384385
{ok, {_, {ok, _} = Result}, _} ->
385386
Result;
386387
{ok, {_, {error, not_found}}, _} ->
@@ -2337,3 +2338,6 @@ key_metrics_rpc(ServerId) ->
23372338

23382339
maps_to_list(M) ->
23392340
lists:sort(maps:to_list(M)).
2341+
2342+
ra_local_query(QueryFun) ->
2343+
ra:local_query({?MODULE, node()}, QueryFun, infinity).

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

+4-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
group_consumers/5,
4242
overview/1]).
4343

44+
-import(rabbit_stream_coordinator, [ra_local_query/1]).
45+
4446
%% Single Active Consumer API
4547
-spec register_consumer(binary(),
4648
binary(),
@@ -129,9 +131,7 @@ process_command(Cmd) ->
129131
{ok,
130132
[term()] | {error, atom()}}.
131133
consumer_groups(VirtualHost, InfoKeys) ->
132-
case ra:local_query({rabbit_stream_coordinator,
133-
node()},
134-
fun(State) ->
134+
case ra_local_query(fun(State) ->
135135
SacState =
136136
rabbit_stream_coordinator:sac_state(State),
137137
consumer_groups(VirtualHost,
@@ -152,9 +152,7 @@ consumer_groups(VirtualHost, InfoKeys) ->
152152
{ok, [term()]} |
153153
{error, atom()}.
154154
group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
155-
case ra:local_query({rabbit_stream_coordinator,
156-
node()},
157-
fun(State) ->
155+
case ra_local_query(fun(State) ->
158156
SacState =
159157
rabbit_stream_coordinator:sac_state(State),
160158
group_consumers(VirtualHost,

0 commit comments

Comments
 (0)