Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement limiting the number of entries #18

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ jobs:
- run: rebar3 as test do cover, covertool generate
if: ${{ matrix.otp == '27' }}
- name: Upload code coverage
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v4
if: ${{ matrix.otp == '27' }}
with:
file: "_build/test/covertool/segmented_cache.covertool.xml"
files: _build/test/covertool/segmented_cache.covertool.xml
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
2 changes: 2 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{minimum_otp_vsn, "21"}.

{erl_opts,
[warn_missing_doc, warn_missing_spec, warn_unused_import,
warn_export_vars, verbose, report, debug_info
Expand Down
5 changes: 4 additions & 1 deletion src/segmented_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ For more information, see the README, and the function documentation.
-type key() :: term().
?DOC("Dynamic type of _values_ from cache clients.").
-type value() :: term().
?DOC("Maximum number of entries per segment. When filled, rotation is ensued.").
-type entries_limit() :: infinity | non_neg_integer().
?DOC("Merging function to use for resolving conflicts").
-type merger_fun(Value) :: fun((Value, Value) -> Value).
?DOC("Configuration values for the cache.").
-type opts() :: #{scope => scope(),
strategy => strategy(),
entries_limit => entries_limit(),
segment_num => non_neg_integer(),
ttl => timeout() | {erlang:time_unit(), non_neg_integer()},
merger_fun => merger_fun(term())}.

-export_type([scope/0, name/0, key/0, value/0, hit/0, delete_error/1,
strategy/0, merger_fun/1, opts/0]).
entries_limit/0, strategy/0, merger_fun/1, opts/0]).

%%====================================================================
%% API
Expand Down
134 changes: 84 additions & 50 deletions src/segmented_cache_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
-export([purge_last_segment_and_rotate/1]).

-record(segmented_cache, {scope :: segmented_cache:scope(),
name :: segmented_cache:name(),
strategy = fifo :: segmented_cache:strategy(),
entries_limit = infinity :: segmented_cache:entries_limit(),
index :: atomics:atomics_ref(),
segments :: tuple(),
merger_fun :: merger_fun(term())}).
Expand All @@ -26,16 +28,22 @@

-spec init_cache_config(segmented_cache:name(), segmented_cache:opts()) ->
#{scope := segmented_cache:scope(), ttl := timeout()}.
init_cache_config(Name, Opts) ->
{Scope, N, TTL, Strategy, MergerFun} = assert_parameters(Opts),
SegmentOpts = ets_settings(),
init_cache_config(Name, Opts0) ->
#{scope := Scope,
strategy := Strategy,
entries_limit := EntriesLimit,
segment_num := N,
ttl := TTL,
merger_fun := MergerFun} = Opts = assert_parameters(Opts0),
SegmentOpts = ets_settings(Opts),
SegmentsList = lists:map(fun(_) -> ets:new(undefined, SegmentOpts) end, lists:seq(1, N)),
Segments = list_to_tuple(SegmentsList),
Index = atomics:new(1, [{signed, false}]),
atomics:put(Index, 1, 1),
Config = #segmented_cache{scope = Scope, strategy = Strategy, index = Index,
segments = Segments, merger_fun = MergerFun},
set_cache_config(Name, Config),
Config = #segmented_cache{scope = Scope, name = Name, strategy = Strategy,
index = Index, entries_limit = EntriesLimit,
segments = Segments, merger_fun = MergerFun},
persist_cache_config(Name, Config),
#{scope => Scope, ttl => TTL}.

-spec get_cache_scope(segmented_cache:name()) -> segmented_cache:scope().
Expand All @@ -51,8 +59,8 @@ erase_cache_config(Name) ->
get_cache_config(Name) ->
persistent_term:get({?APP_KEY, Name}).

-spec set_cache_config(segmented_cache:name(), config()) -> ok.
set_cache_config(Name, Config) ->
-spec persist_cache_config(segmented_cache:name(), config()) -> ok.
persist_cache_config(Name, Config) ->
persistent_term:put({?APP_KEY, Name}, Config).

%%====================================================================
Expand All @@ -77,7 +85,7 @@ get_entry_span(Name, Key) when is_atom(Name) ->
-spec put_entry_front(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) -> boolean().
put_entry_front(Name, Key, Value) ->
SegmentRecord = get_cache_config(Name),
do_put_entry_front(SegmentRecord, Key, Value).
do_put_entry_front(SegmentRecord, Key, Value, 3).

-spec merge_entry(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) -> boolean().
merge_entry(Name, Key, Value) when is_atom(Name) ->
Expand All @@ -91,7 +99,7 @@ merge_entry(Name, Key, Value) when is_atom(Name) ->
end,
case iterate_fun_in_tables(Name, Key, F) of
true -> true;
false -> do_put_entry_front(SegmentRecord, Key, Value)
false -> do_put_entry_front(SegmentRecord, Key, Value, 3)
end.

-spec delete_entry(segmented_cache:name(), segmented_cache:key()) -> true.
Expand Down Expand Up @@ -187,27 +195,49 @@ apply_strategy(lru, _CurrentIndex, FoundIndex, Key, SegmentRecord) ->
Segments = SegmentRecord#segmented_cache.segments,
FoundInSegment = element(FoundIndex, Segments),
try [{_, Value}] = ets:lookup(FoundInSegment, Key),
do_put_entry_front(SegmentRecord, Key, Value)
do_put_entry_front(SegmentRecord, Key, Value, 3)
catch _:_ -> false
end.

-spec do_put_entry_front(#segmented_cache{}, segmented_cache:key(), segmented_cache:value()) ->
-spec do_put_entry_front(#segmented_cache{}, segmented_cache:key(), segmented_cache:value(), 0..3) ->
boolean().
do_put_entry_front(SegmentRecord, Key, Value) ->
Atomic = SegmentRecord#segmented_cache.index,
do_put_entry_front(_, _, _, 0) -> false;
do_put_entry_front(#segmented_cache{
name = Name,
entries_limit = EntriesLimit,
index = Atomic,
segments = Segments,
merger_fun = MergerFun
} = SegmentRecord, Key, Value, Retry) ->
Index = atomics:get(Atomic, 1),
Segments = SegmentRecord#segmented_cache.segments,
FrontSegment = element(Index, Segments),
Inserted = case ets:insert_new(FrontSegment, {Key, Value}) of
true -> true;
false ->
MergerFun = SegmentRecord#segmented_cache.merger_fun,
compare_and_swap(3, FrontSegment, Key, Value, MergerFun)
end,
MaybeMovedIndex = atomics:get(Atomic, 1),
case post_insert_check_should_retry(Inserted, Index, MaybeMovedIndex) of
false -> Inserted;
true -> do_put_entry_front(SegmentRecord, Key, Value)
case insert_new(FrontSegment, Key, Value, EntriesLimit, Name) of
retry ->
do_put_entry_front(SegmentRecord, Key, Value, Retry - 1);
true ->
MaybeMovedIndex = atomics:get(Atomic, 1),
case post_insert_check_should_retry(true, Index, MaybeMovedIndex) of
false -> true;
true -> do_put_entry_front(SegmentRecord, Key, Value, Retry - 1)
end;
false ->
Inserted = compare_and_swap(3, FrontSegment, Key, Value, MergerFun),
MaybeMovedIndex = atomics:get(Atomic, 1),
case post_insert_check_should_retry(Inserted, Index, MaybeMovedIndex) of
false -> Inserted;
true -> do_put_entry_front(SegmentRecord, Key, Value, Retry - 1)
end
end.

insert_new(Table, Key, Value, infinity, _) ->
ets:insert_new(Table, {Key, Value});
insert_new(Table, Key, Value, EntriesLimit, Name) ->
case EntriesLimit =< ets:info(Table, size) of
false ->
ets:insert_new(Table, {Key, Value});
true ->
purge_last_segment_and_rotate(Name),
retry
end.

-spec post_insert_check_should_retry(boolean(), integer(), integer()) -> boolean().
Expand Down Expand Up @@ -254,12 +284,14 @@ purge_last_segment_and_rotate(Name) ->
atomics:put(SegmentRecord#segmented_cache.index, 1, NewIndex),
NewIndex.

-spec assert_parameters(segmented_cache:opts()) ->
{segmented_cache:name(), pos_integer(), timeout(), segmented_cache:strategy(), merger_fun(term())}.
assert_parameters(Opts) when is_map(Opts) ->
N = maps:get(segment_num, Opts, 3),
true = is_integer(N) andalso N > 0,
TTL0 = maps:get(ttl, Opts, {hours, 8}),
-spec assert_parameters(segmented_cache:opts()) -> segmented_cache:opts().
assert_parameters(Opts0) when is_map(Opts0) ->
#{scope := Scope,
strategy := Strategy,
entries_limit := EntriesLimit,
segment_num := N,
ttl := TTL0,
merger_fun := MergerFun} = Opts = maps:merge(defaults(), Opts0),
TTL = case TTL0 of
infinity -> infinity;
{milliseconds, S} -> S;
Expand All @@ -268,31 +300,33 @@ assert_parameters(Opts) when is_map(Opts) ->
{hours, H} -> timer:hours(H);
T when is_integer(T) -> timer:minutes(T)
end,
true = is_integer(N) andalso N > 0,
true = (EntriesLimit =:= infinity) orelse (is_integer(EntriesLimit) andalso EntriesLimit > 0),
true = (TTL =:= infinity) orelse (is_integer(TTL) andalso N > 0),
Strategy = maps:get(strategy, Opts, fifo),
true = (Strategy =:= fifo) orelse (Strategy =:= lru),
MergerFun = maps:get(merger_fun, Opts, fun segmented_cache_callbacks:default_merger_fun/2),
true = is_function(MergerFun, 2),
Scope = maps:get(scope, Opts, pg),
true = (undefined =/= whereis(Scope)),
{Scope, N, TTL, Strategy, MergerFun}.
Opts#{ttl := TTL}.

defaults() ->
#{scope => pg,
strategy => fifo,
entries_limit => infinity,
segment_num => 3,
ttl => {hours, 8},
merger_fun => fun segmented_cache_callbacks:default_merger_fun/2}.

-ifdef(OTP_RELEASE).
-if(?OTP_RELEASE >= 25).
ets_settings() ->
[set, public,
{read_concurrency, true},
{write_concurrency, auto},
{decentralized_counters, true}].
-elif(?OTP_RELEASE >= 21).
ets_settings() ->
[set, public,
{read_concurrency, true},
{write_concurrency, true},
{decentralized_counters, true}].
-endif.
-if(?OTP_RELEASE >= 25).
ets_settings(#{entries_limit := infinity}) ->
[set, public,
{read_concurrency, true},
{write_concurrency, auto}];
ets_settings(#{entries_limit := _}) ->
[set, public,
{read_concurrency, true},
{write_concurrency, true}].
-else.
ets_settings() ->
ets_settings(_Opts) ->
[set, public,
{read_concurrency, true},
{write_concurrency, true},
Expand Down
37 changes: 31 additions & 6 deletions test/segmented_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
init_per_testcase/2,
end_per_testcase/2]).

-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").

Expand All @@ -22,6 +23,7 @@
all() ->
[
{group, basic_api},
{group, cache_limits},
{group, short_fifo},
{group, lru}
].
Expand All @@ -36,6 +38,10 @@ groups() ->
put_entry_then_delete_it_then_not_member,
stateful_property
]},
{cache_limits, [sequence],
[
ensure_configured_size_is_respected
]},
{short_fifo, [sequence],
[
put_entry_wait_and_check_false
Expand Down Expand Up @@ -72,15 +78,25 @@ end_per_suite(_Config) ->
%%%===================================================================
init_per_group(lru, Config) ->
print_and_restart_counters(),
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, #{strategy => lru,
segment_num => 2,
ttl => {milliseconds, 100}}),
Opts = #{strategy => lru,
segment_num => 2,
ttl => {milliseconds, 100}},
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
[{cleaner, Cleaner} | Config];
init_per_group(short_fifo, Config) ->
print_and_restart_counters(),
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, #{strategy => fifo,
segment_num => 2,
ttl => {milliseconds, 5}}),
Opts = #{strategy => fifo,
segment_num => 2,
ttl => {milliseconds, 5}},
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
[{cleaner, Cleaner} | Config];
init_per_group(cache_limits, Config) ->
print_and_restart_counters(),
Opts = #{entries_limit => 1,
strategy => fifo,
segment_num => 2,
ttl => {seconds, 60}},
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
[{cleaner, Cleaner} | Config];
init_per_group(_Groupname, Config) ->
print_and_restart_counters(),
Expand Down Expand Up @@ -109,6 +125,15 @@ end_per_testcase(_TestCase, _Config) ->
%%% Stateful property Test Case
%%%===================================================================

ensure_configured_size_is_respected(_Config) ->
%% We have 2 tables with 1 element each
?assert(segmented_cache:put_entry(?CACHE_NAME, one, make_ref())),
?assert(segmented_cache:put_entry(?CACHE_NAME, two, make_ref())),
?assert(segmented_cache:put_entry(?CACHE_NAME, three, make_ref())),
?assert(segmented_cache:is_member(?CACHE_NAME, three)),
?assert(segmented_cache:is_member(?CACHE_NAME, two)),
?assertNot(segmented_cache:is_member(?CACHE_NAME, one)).

stateful_property(_Config) ->
Prop =
?FORALL(Cmds, commands(?CMD_MODULE),
Expand Down