9
9
-export ([purge_last_segment_and_rotate /1 ]).
10
10
11
11
-record (segmented_cache , {scope :: segmented_cache :scope (),
12
+ name :: segmented_cache :name (),
12
13
strategy = fifo :: segmented_cache :strategy (),
14
+ entries_limit = infinity :: segmented_cache :entries_limit (),
13
15
index :: atomics :atomics_ref (),
14
16
segments :: tuple (),
15
17
merger_fun :: merger_fun (term ())}).
26
28
27
29
-spec init_cache_config (segmented_cache :name (), segmented_cache :opts ()) ->
28
30
#{scope := segmented_cache :scope (), ttl := timeout ()}.
29
- init_cache_config (Name , Opts ) ->
30
- {Scope , N , TTL , Strategy , MergerFun } = assert_parameters (Opts ),
31
- SegmentOpts = ets_settings (),
31
+ init_cache_config (Name , Opts0 ) ->
32
+ #{scope := Scope ,
33
+ strategy := Strategy ,
34
+ entries_limit := EntriesLimit ,
35
+ segment_num := N ,
36
+ ttl := TTL ,
37
+ merger_fun := MergerFun } = Opts = assert_parameters (Opts0 ),
38
+ SegmentOpts = ets_settings (Opts ),
32
39
SegmentsList = lists :map (fun (_ ) -> ets :new (undefined , SegmentOpts ) end , lists :seq (1 , N )),
33
40
Segments = list_to_tuple (SegmentsList ),
34
41
Index = atomics :new (1 , [{signed , false }]),
35
42
atomics :put (Index , 1 , 1 ),
36
- Config = # segmented_cache {scope = Scope , strategy = Strategy , index = Index ,
37
- segments = Segments , merger_fun = MergerFun },
38
- set_cache_config (Name , Config ),
43
+ Config = # segmented_cache {scope = Scope , name = Name , strategy = Strategy ,
44
+ index = Index , entries_limit = EntriesLimit ,
45
+ segments = Segments , merger_fun = MergerFun },
46
+ persist_cache_config (Name , Config ),
39
47
#{scope => Scope , ttl => TTL }.
40
48
41
49
-spec get_cache_scope (segmented_cache :name ()) -> segmented_cache :scope ().
@@ -51,8 +59,8 @@ erase_cache_config(Name) ->
51
59
get_cache_config (Name ) ->
52
60
persistent_term :get ({? APP_KEY , Name }).
53
61
54
- -spec set_cache_config (segmented_cache :name (), config ()) -> ok .
55
- set_cache_config (Name , Config ) ->
62
+ -spec persist_cache_config (segmented_cache :name (), config ()) -> ok .
63
+ persist_cache_config (Name , Config ) ->
56
64
persistent_term :put ({? APP_KEY , Name }, Config ).
57
65
58
66
% %====================================================================
@@ -77,7 +85,7 @@ get_entry_span(Name, Key) when is_atom(Name) ->
77
85
-spec put_entry_front (segmented_cache :name (), segmented_cache :key (), segmented_cache :value ()) -> boolean ().
78
86
put_entry_front (Name , Key , Value ) ->
79
87
SegmentRecord = get_cache_config (Name ),
80
- do_put_entry_front (SegmentRecord , Key , Value ).
88
+ do_put_entry_front (SegmentRecord , Key , Value , 3 ).
81
89
82
90
-spec merge_entry (segmented_cache :name (), segmented_cache :key (), segmented_cache :value ()) -> boolean ().
83
91
merge_entry (Name , Key , Value ) when is_atom (Name ) ->
@@ -91,7 +99,7 @@ merge_entry(Name, Key, Value) when is_atom(Name) ->
91
99
end ,
92
100
case iterate_fun_in_tables (Name , Key , F ) of
93
101
true -> true ;
94
- false -> do_put_entry_front (SegmentRecord , Key , Value )
102
+ false -> do_put_entry_front (SegmentRecord , Key , Value , 3 )
95
103
end .
96
104
97
105
-spec delete_entry (segmented_cache :name (), segmented_cache :key ()) -> true .
@@ -187,27 +195,49 @@ apply_strategy(lru, _CurrentIndex, FoundIndex, Key, SegmentRecord) ->
187
195
Segments = SegmentRecord # segmented_cache .segments ,
188
196
FoundInSegment = element (FoundIndex , Segments ),
189
197
try [{_ , Value }] = ets :lookup (FoundInSegment , Key ),
190
- do_put_entry_front (SegmentRecord , Key , Value )
198
+ do_put_entry_front (SegmentRecord , Key , Value , 3 )
191
199
catch _ :_ -> false
192
200
end .
193
201
194
- -spec do_put_entry_front (# segmented_cache {}, segmented_cache :key (), segmented_cache :value ()) ->
202
+ -spec do_put_entry_front (# segmented_cache {}, segmented_cache :key (), segmented_cache :value (), 0 .. 3 ) ->
195
203
boolean ().
196
- do_put_entry_front (SegmentRecord , Key , Value ) ->
197
- Atomic = SegmentRecord # segmented_cache .index ,
204
+ do_put_entry_front (_ , _ , _ , 0 ) -> false ;
205
+ do_put_entry_front (# segmented_cache {
206
+ name = Name ,
207
+ entries_limit = EntriesLimit ,
208
+ index = Atomic ,
209
+ segments = Segments ,
210
+ merger_fun = MergerFun
211
+ } = SegmentRecord , Key , Value , Retry ) ->
198
212
Index = atomics :get (Atomic , 1 ),
199
- Segments = SegmentRecord # segmented_cache .segments ,
200
213
FrontSegment = element (Index , Segments ),
201
- Inserted = case ets :insert_new (FrontSegment , {Key , Value }) of
202
- true -> true ;
203
- false ->
204
- MergerFun = SegmentRecord # segmented_cache .merger_fun ,
205
- compare_and_swap (3 , FrontSegment , Key , Value , MergerFun )
206
- end ,
207
- MaybeMovedIndex = atomics :get (Atomic , 1 ),
208
- case post_insert_check_should_retry (Inserted , Index , MaybeMovedIndex ) of
209
- false -> Inserted ;
210
- true -> do_put_entry_front (SegmentRecord , Key , Value )
214
+ case insert_new (FrontSegment , Key , Value , EntriesLimit , Name ) of
215
+ retry ->
216
+ do_put_entry_front (SegmentRecord , Key , Value , Retry - 1 );
217
+ true ->
218
+ MaybeMovedIndex = atomics :get (Atomic , 1 ),
219
+ case post_insert_check_should_retry (true , Index , MaybeMovedIndex ) of
220
+ false -> true ;
221
+ true -> do_put_entry_front (SegmentRecord , Key , Value , Retry - 1 )
222
+ end ;
223
+ false ->
224
+ Inserted = compare_and_swap (3 , FrontSegment , Key , Value , MergerFun ),
225
+ MaybeMovedIndex = atomics :get (Atomic , 1 ),
226
+ case post_insert_check_should_retry (Inserted , Index , MaybeMovedIndex ) of
227
+ false -> Inserted ;
228
+ true -> do_put_entry_front (SegmentRecord , Key , Value , Retry - 1 )
229
+ end
230
+ end .
231
+
232
+ insert_new (Table , Key , Value , infinity , _ ) ->
233
+ ets :insert_new (Table , {Key , Value });
234
+ insert_new (Table , Key , Value , EntriesLimit , Name ) ->
235
+ case EntriesLimit =< ets :info (Table , size ) of
236
+ false ->
237
+ ets :insert_new (Table , {Key , Value });
238
+ true ->
239
+ purge_last_segment_and_rotate (Name ),
240
+ retry
211
241
end .
212
242
213
243
-spec post_insert_check_should_retry (boolean (), integer (), integer ()) -> boolean ().
@@ -254,12 +284,14 @@ purge_last_segment_and_rotate(Name) ->
254
284
atomics :put (SegmentRecord # segmented_cache .index , 1 , NewIndex ),
255
285
NewIndex .
256
286
257
- -spec assert_parameters (segmented_cache :opts ()) ->
258
- {segmented_cache :name (), pos_integer (), timeout (), segmented_cache :strategy (), merger_fun (term ())}.
259
- assert_parameters (Opts ) when is_map (Opts ) ->
260
- N = maps :get (segment_num , Opts , 3 ),
261
- true = is_integer (N ) andalso N > 0 ,
262
- TTL0 = maps :get (ttl , Opts , {hours , 8 }),
287
+ -spec assert_parameters (segmented_cache :opts ()) -> segmented_cache :opts ().
288
+ assert_parameters (Opts0 ) when is_map (Opts0 ) ->
289
+ #{scope := Scope ,
290
+ strategy := Strategy ,
291
+ entries_limit := EntriesLimit ,
292
+ segment_num := N ,
293
+ ttl := TTL0 ,
294
+ merger_fun := MergerFun } = Opts = maps :merge (defaults (), Opts0 ),
263
295
TTL = case TTL0 of
264
296
infinity -> infinity ;
265
297
{milliseconds , S } -> S ;
@@ -268,31 +300,33 @@ assert_parameters(Opts) when is_map(Opts) ->
268
300
{hours , H } -> timer :hours (H );
269
301
T when is_integer (T ) -> timer :minutes (T )
270
302
end ,
303
+ true = is_integer (N ) andalso N > 0 ,
304
+ true = (EntriesLimit =:= infinity ) orelse (is_integer (EntriesLimit ) andalso EntriesLimit > 0 ),
271
305
true = (TTL =:= infinity ) orelse (is_integer (TTL ) andalso N > 0 ),
272
- Strategy = maps :get (strategy , Opts , fifo ),
273
306
true = (Strategy =:= fifo ) orelse (Strategy =:= lru ),
274
- MergerFun = maps :get (merger_fun , Opts , fun segmented_cache_callbacks :default_merger_fun /2 ),
275
307
true = is_function (MergerFun , 2 ),
276
- Scope = maps :get (scope , Opts , pg ),
277
308
true = (undefined =/= whereis (Scope )),
278
- {Scope , N , TTL , Strategy , MergerFun }.
309
+ Opts #{ttl := TTL }.
310
+
311
+ defaults () ->
312
+ #{scope => pg ,
313
+ strategy => fifo ,
314
+ entries_limit => infinity ,
315
+ segment_num => 3 ,
316
+ ttl => {hours , 8 },
317
+ merger_fun => fun segmented_cache_callbacks :default_merger_fun /2 }.
279
318
280
- -ifdef (OTP_RELEASE ).
281
- -if (? OTP_RELEASE >= 25 ).
282
- ets_settings () ->
283
- [set , public ,
284
- {read_concurrency , true },
285
- {write_concurrency , auto },
286
- {decentralized_counters , true }].
287
- -elif (? OTP_RELEASE >= 21 ).
288
- ets_settings () ->
289
- [set , public ,
290
- {read_concurrency , true },
291
- {write_concurrency , true },
292
- {decentralized_counters , true }].
293
- -endif .
319
+ -if (? OTP_RELEASE >= 25 ).
320
+ ets_settings (#{entries_limit := infinity }) ->
321
+ [set , public ,
322
+ {read_concurrency , true },
323
+ {write_concurrency , auto }];
324
+ ets_settings (#{entries_limit := _ }) ->
325
+ [set , public ,
326
+ {read_concurrency , true },
327
+ {write_concurrency , true }].
294
328
-else .
295
- ets_settings () ->
329
+ ets_settings (_Opts ) ->
296
330
[set , public ,
297
331
{read_concurrency , true },
298
332
{write_concurrency , true },
0 commit comments