Skip to content

Commit 9dc60c0

Browse files
committed
Deduplicate retention evaluation requests
Prevents osiris_retention gen_server from building up a long message queue when stream writes outperform processing retention evaluations. Related to #204
1 parent 8e0b786 commit 9dc60c0

File tree

1 file changed

+45
-53
lines changed

1 file changed

+45
-53
lines changed

src/osiris_retention.erl

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,16 @@
77

88
-module(osiris_retention).
99

10-
-behaviour(gen_server).
10+
-behaviour(gen_batch_server).
1111

1212
-include("osiris.hrl").
1313
%% API functions
1414
-export([start_link/0,
1515
eval/4]).
16-
%% gen_server callbacks
16+
%% gen_batch_server callbacks
1717
-export([init/1,
18-
handle_call/3,
19-
handle_cast/2,
20-
handle_info/2,
21-
terminate/2,
22-
code_change/3]).
18+
handle_batch/2,
19+
terminate/2]).
2320

2421
-define(DEFAULT_SCHEDULED_EVAL_TIME, 1000 * 60 * 60). %% 1HR
2522

@@ -29,44 +26,57 @@
2926
%%% API functions
3027
%%%===================================================================
3128

32-
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
29+
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
3330
start_link() ->
34-
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
31+
gen_batch_server:start_link({local, ?MODULE}, ?MODULE, [], []).
3532

3633
-spec eval(osiris:name(), file:name_all(), [osiris:retention_spec()],
3734
fun((osiris_log:range()) -> ok)) ->
3835
ok.
3936
eval(_Name, _Dir, [], _Fun) ->
4037
ok;
4138
eval(Name, Dir, Specs, Fun) ->
42-
gen_server:cast(?MODULE, {eval, self(), Name, Dir, Specs, Fun}).
39+
gen_batch_server:cast(?MODULE, {eval, self(), Name, Dir, Specs, Fun}).
4340

4441
%%%===================================================================
45-
%%% gen_server callbacks
42+
%%% gen_batch_server callbacks
4643
%%%===================================================================
4744

48-
% @spec init(Args) -> {ok, State} |
49-
%% {ok, State, Timeout} |
50-
%% ignore |
51-
%% {stop, Reason}
45+
-spec init([]) -> {ok, #state{}}.
5246
init([]) ->
5347
{ok, #state{}}.
5448

55-
%% @spec handle_call(Request, From, State) ->
56-
%% {reply, Reply, State} |
57-
%% {reply, Reply, State, Timeout} |
58-
%% {noreply, State} |
59-
%% {noreply, State, Timeout} |
60-
%% {stop, Reason, Reply, State} |
61-
%% {stop, Reason, State}
62-
handle_call(_Request, _From, State) ->
63-
Reply = ok,
64-
{reply, Reply, State}.
65-
66-
%% @spec handle_cast(Msg, State) -> {noreply, State} |
67-
%% {noreply, State, Timeout} |
68-
%% {stop, Reason, State}
69-
handle_cast({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
49+
-spec handle_batch([gen_batch_server:op()], #state{}) -> {ok, #state{}}.
50+
handle_batch(Ops, State0) ->
51+
Evals = deduplicate_evals(Ops),
52+
State1 = lists:foldl(fun evaluate_retention/2, State0, Evals),
53+
{ok, State1}.
54+
55+
-spec terminate(term(), #state{}) -> ok.
56+
terminate(_Reason, _State) ->
57+
ok.
58+
59+
%%%===================================================================
60+
%%% Internal functions
61+
%%%===================================================================
62+
63+
%% Deduplicate by Name as multiple requests from the same stream might
64+
%% have arrived while processing the previous batch.
65+
%% Keeping last occurrence for each Name, as retention config could
66+
%% have changed.
67+
deduplicate_evals(Ops) ->
68+
Map = lists:foldl(
69+
fun({cast, {eval, _Pid, Name, _Dir, _Specs, _Fun} = Eval}, Acc) ->
70+
maps:put(Name, Eval, Acc);
71+
({call, From, _}, Acc) ->
72+
gen:reply(From, ok),
73+
Acc;
74+
(_, Acc) ->
75+
Acc
76+
end, #{}, Ops),
77+
maps:values(Map).
78+
79+
evaluate_retention({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
7080
%% only do retention evaluation for stream processes that are
7181
%% alive as the callback Fun passed in would update a shared atomic
7282
%% value and this atomic is new per process incarnation
@@ -75,32 +85,15 @@ handle_cast({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
7585
try osiris_log:evaluate_retention(Dir, Specs) of
7686
Result ->
7787
_ = Fun(Result),
78-
{noreply, schedule(Eval, Result, State)}
88+
schedule(Eval, Result, State)
7989
catch _:Err ->
80-
?DEBUG_(Name, "retention evaluation failed with ~w", [Err]),
81-
{noreply, State}
90+
?DEBUG_(Name, "retention evaluation failed with ~w", [Err]),
91+
State
8292
end;
8393
false ->
84-
{noreply, State}
94+
State
8595
end.
8696

87-
%% @spec handle_info(Info, State) -> {noreply, State} |
88-
%% {noreply, State, Timeout} |
89-
%% {stop, Reason, State}
90-
handle_info(_Info, State) ->
91-
{noreply, State}.
92-
93-
%% @spec terminate(Reason, State) -> void()
94-
terminate(_Reason, _State) ->
95-
ok.
96-
97-
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
98-
code_change(_OldVsn, State, _Extra) ->
99-
{ok, State}.
100-
101-
%%%===================================================================
102-
%%% Internal functions
103-
%%%===================================================================
10497
schedule({eval, _Pid, Name, _Dir, Specs, _Fun} = Eval,
10598
{_, _, NumSegmentRemaining},
10699
#state{scheduled = Scheduled0} = State) ->
@@ -118,9 +111,8 @@ schedule({eval, _Pid, Name, _Dir, Specs, _Fun} = Eval,
118111
true ->
119112
EvalInterval = application:get_env(osiris, retention_eval_interval,
120113
?DEFAULT_SCHEDULED_EVAL_TIME),
121-
Ref = erlang:send_after(EvalInterval, self(), {'$gen_cast', Eval}),
114+
Ref = erlang:send_after(EvalInterval, self(), {cast, Eval}),
122115
State#state{scheduled = Scheduled#{Name => Ref}};
123116
false ->
124117
State#state{scheduled = Scheduled}
125118
end.
126-

0 commit comments

Comments
 (0)