55
55
-export ([query_local_pid /3 ,
56
56
query_writer_pid /2 ,
57
57
query_members /2 ,
58
- query_stream_overview /2 ]).
58
+ query_stream_overview /2 ,
59
+ ra_local_query /1 ]).
59
60
60
61
61
62
-export ([log_overview /1 ,
@@ -271,7 +272,7 @@ sac_state(#?MODULE{single_active_consumer = SacState}) ->
271
272
272
273
% % for debugging
273
274
state () ->
274
- case ra : local_query ({ ? MODULE , node ()}, fun (State ) -> State end ) of
275
+ case ra_local_query ( fun (State ) -> State end ) of
275
276
{ok , {_ , Res }, _ } ->
276
277
Res ;
277
278
Any ->
@@ -289,7 +290,7 @@ local_pid(StreamId) when is_list(StreamId) ->
289
290
query_pid (StreamId , MFA ).
290
291
291
292
query_pid (StreamId , MFA ) when is_list (StreamId ) ->
292
- case ra : local_query ({ ? MODULE , node ()}, MFA ) of
293
+ case ra_local_query ( MFA ) of
293
294
{ok , {_ , {ok , Pid }}, _ } ->
294
295
case erpc :call (node (Pid ), erlang , is_process_alive , [Pid ]) of
295
296
true ->
@@ -380,7 +381,7 @@ query_writer_pid(StreamId, #?MODULE{streams = Streams}) ->
380
381
end .
381
382
382
383
do_query (MFA ) ->
383
- case ra : local_query ({ ? MODULE , node ()}, MFA ) of
384
+ case ra_local_query ( MFA ) of
384
385
{ok , {_ , {ok , _ } = Result }, _ } ->
385
386
Result ;
386
387
{ok , {_ , {error , not_found }}, _ } ->
@@ -2337,3 +2338,6 @@ key_metrics_rpc(ServerId) ->
2337
2338
2338
2339
maps_to_list (M ) ->
2339
2340
lists :sort (maps :to_list (M )).
2341
+
2342
+ ra_local_query (QueryFun ) ->
2343
+ ra :local_query ({? MODULE , node ()}, QueryFun , infinity ).
0 commit comments