Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 46 additions & 52 deletions src/osiris_retention.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@

-module(osiris_retention).

-behaviour(gen_server).
-behaviour(gen_batch_server).

-include("osiris.hrl").
%% API functions
-export([start_link/0,
eval/4]).
%% gen_server callbacks
%% gen_batch_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
handle_batch/2,
terminate/2]).

-define(DEFAULT_SCHEDULED_EVAL_TIME, 1000 * 60 * 60). %% 1HR

Expand All @@ -29,44 +26,59 @@
%%% API functions
%%%===================================================================

%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
gen_batch_server:start_link({local, ?MODULE}, ?MODULE, [], [{reversed_batch, true}]).

-spec eval(osiris:name(), file:name_all(), [osiris:retention_spec()],
fun((osiris_log:range()) -> ok)) ->
ok.
eval(_Name, _Dir, [], _Fun) ->
ok;
eval(Name, Dir, Specs, Fun) ->
gen_server:cast(?MODULE, {eval, self(), Name, Dir, Specs, Fun}).
gen_batch_server:cast(?MODULE, {eval, self(), Name, Dir, Specs, Fun}).

%%%===================================================================
%%% gen_server callbacks
%%% gen_batch_server callbacks
%%%===================================================================

% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
-spec init([]) -> {ok, #state{}}.
init([]) ->
{ok, #state{}}.

%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.

%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
handle_cast({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
-spec handle_batch([gen_batch_server:op()], #state{}) -> {ok, #state{}}.
handle_batch(Ops, State0) ->
%% Ops are in reverse order of arrival. Process newest first.
{State1, _Seen} = lists:foldl(fun process_op/2, {State0, sets:new()}, Ops),
{ok, State1}.

-spec terminate(term(), #state{}) -> ok.
terminate(_Reason, _State) ->
ok.

%%%===================================================================
%%% Internal functions
%%%===================================================================

%% Multiple requests from the same stream might have arrived while
%% processing the previous batch.
%% Only process newest request for each stream (with the newest
%% retention config), as the config could have changed.
process_op({cast, {eval, _Pid, Name, _Dir, _Specs, _Fun} = Eval}, {StateAcc, Seen}) ->
case sets:is_element(Name, Seen) of
true ->
%% Retention for this stream already evaluated
{StateAcc, Seen};
false ->
{evaluate_retention(Eval, StateAcc), sets:add_element(Name, Seen)}
end;
process_op({call, From, _}, {StateAcc, Seen}) ->
gen:reply(From, ok),
{StateAcc, Seen};
process_op(_, {StateAcc, Seen}) ->
{StateAcc, Seen}.

evaluate_retention({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
%% only do retention evaluation for stream processes that are
%% alive as the callback Fun passed in would update a shared atomic
%% value and this atomic is new per process incarnation
Expand All @@ -75,32 +87,15 @@ handle_cast({eval, Pid, Name, Dir, Specs, Fun} = Eval, State) ->
try osiris_log:evaluate_retention(Dir, Specs) of
Result ->
_ = Fun(Result),
{noreply, schedule(Eval, Result, State)}
schedule(Eval, Result, State)
catch _:Err ->
?DEBUG_(Name, "retention evaluation failed with ~w", [Err]),
{noreply, State}
?DEBUG_(Name, "retention evaluation failed with ~w", [Err]),
State
end;
false ->
{noreply, State}
State
end.

%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
handle_info(_Info, State) ->
{noreply, State}.

%% @spec terminate(Reason, State) -> void()
terminate(_Reason, _State) ->
ok.

%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
schedule({eval, _Pid, Name, _Dir, Specs, _Fun} = Eval,
{_, _, NumSegmentRemaining},
#state{scheduled = Scheduled0} = State) ->
Expand All @@ -123,4 +118,3 @@ schedule({eval, _Pid, Name, _Dir, Specs, _Fun} = Eval,
false ->
State#state{scheduled = Scheduled}
end.