@@ -234,9 +234,17 @@ get(Ref, Key, TryNum) ->
234
234
E when is_record (E , bitcask_entry ) ->
235
235
case E # bitcask_entry .tstamp < expiry_time (State # bc_state .opts ) of
236
236
true ->
237
- % % Expired entry; remove from keydir and free up memory
238
- ok = bitcask_nifs :keydir_remove (State # bc_state .keydir , Key ),
239
- not_found ;
237
+ % % Expired entry; remove from keydir
238
+ case bitcask_nifs :keydir_remove (State # bc_state .keydir , Key ,
239
+ E # bitcask_entry .tstamp ,
240
+ E # bitcask_entry .file_id ,
241
+ E # bitcask_entry .offset ) of
242
+ ok ->
243
+ not_found ;
244
+ already_exists ->
245
+ % Updated since last read, try again.
246
+ get (Ref , Key , TryNum - 1 )
247
+ end ;
240
248
false ->
241
249
% % HACK: Use a fully-qualified call to get_filestate/2 so that
242
250
% % we can intercept calls w/ Pulse tests.
@@ -1151,7 +1159,7 @@ get_opt(Key, Opts) ->
1151
1159
put_state (Ref , State ) ->
1152
1160
erlang :put (Ref , State ).
1153
1161
1154
- kt_id (Key ) ->
1162
+ kt_id (Key ) when is_binary ( Key ) ->
1155
1163
Key .
1156
1164
1157
1165
scan_key_files ([], _KeyDir , Acc , _CloseFile , _KT ) ->
@@ -1241,6 +1249,7 @@ init_keydir(Dirname, WaitTime, ReadWriteModeP, KT) ->
1241
1249
1242
1250
case ScanResult of
1243
1251
{error , _ } ->
1252
+ ok = bitcask_nifs :keydir_release (KeyDir ),
1244
1253
ScanResult ;
1245
1254
_ ->
1246
1255
% % Now that we loaded all the data, mark the keydir as ready
@@ -1326,19 +1335,9 @@ get_filestate(FileId, Dirname, ReadFiles, Mode) ->
1326
1335
1327
1336
1328
1337
list_data_files (Dirname , WritingFile , MergingFile ) ->
1329
- % % Get list of {tstamp, filename} for all files in the directory then
1330
- % % reverse sort that list and extract the fully-qualified filename.
1331
1338
Files1 = bitcask_fileops :data_file_tstamps (Dirname ),
1332
- Files2 = bitcask_fileops :data_file_tstamps (Dirname ),
1333
- % TODO: Remove crazy
1334
- if Files1 == Files2 ->
1335
- % % No race, Files1 is a stable list.
1336
- [F || {_Tstamp , F } <- lists :sort (Files1 ),
1337
- F /= WritingFile ,
1338
- F /= MergingFile ];
1339
- true ->
1340
- list_data_files (Dirname , WritingFile , MergingFile )
1341
- end .
1339
+ [F || {_Tstamp , F } <- lists :sort (Files1 ),
1340
+ F /= WritingFile , F /= MergingFile ].
1342
1341
1343
1342
merge_files (# mstate { input_files = [] } = State ) ->
1344
1343
State ;
@@ -1659,7 +1658,16 @@ readable_and_setuid_files(Dirname) ->
1659
1658
% % Filter out files with setuid bit set: they've been marked for
1660
1659
% % deletion by an earlier *successful* merge.
1661
1660
Fs = [F || F <- list_data_files (Dirname , WritingFile , MergingFile )],
1662
- lists :partition (fun (F ) -> not has_pending_delete_bit (F ) end , Fs ).
1661
+
1662
+ WritingFile2 = bitcask_lockops :read_activefile (write , Dirname ),
1663
+ MergingFile2 = bitcask_lockops :read_activefile (merge , Dirname ),
1664
+ case {WritingFile2 , MergingFile2 } of
1665
+ {WritingFile , MergingFile } ->
1666
+ lists :partition (fun (F ) -> not has_pending_delete_bit (F ) end , Fs );
1667
+ _ ->
1668
+ % Changed while fetching file list, retry
1669
+ readable_and_setuid_files (Dirname )
1670
+ end .
1663
1671
1664
1672
% % Internal put - have validated that the file is opened for write
1665
1673
% % and looked up the state at this point
@@ -1720,15 +1728,21 @@ do_put(Key, Value, #bc_state{write_file = WriteFile} = State,
1720
1728
State3 = wrap_write_file (State2 ),
1721
1729
do_put (Key , Value , State3 , Retries - 1 , already_exists );
1722
1730
1723
- # bitcask_entry {file_id = OldFileId ,offset = OldOffset }
1724
- when OldFileId =< WriteFileId ->
1725
- PrevTombstone = <<? TOMBSTONE2_STR , OldFileId :32 >>,
1726
- {ok , WriteFile1 , _ , _ } =
1727
- bitcask_fileops :write (WriteFile0 , Key , PrevTombstone ,
1728
- Tstamp ),
1729
- State3 = State2 # bc_state {write_file = WriteFile1 },
1731
+ # bitcask_entry {file_id = OldFileId ,offset = OldOffset } ->
1732
+ State3 =
1733
+ case OldFileId < WriteFileId of
1734
+ true ->
1735
+ PrevTomb = <<? TOMBSTONE2_STR , OldFileId :32 >>,
1736
+ {ok , WriteFile1 , _ , _ } =
1737
+ bitcask_fileops :write (WriteFile0 , Key ,
1738
+ PrevTomb , Tstamp ),
1739
+ State2 # bc_state {write_file = WriteFile1 };
1740
+ false ->
1741
+ State2
1742
+ end ,
1730
1743
write_and_keydir_put (State3 , Key , Value , Tstamp , Retries ,
1731
1744
bitcask_time :tstamp (), OldFileId , OldOffset );
1745
+
1732
1746
_ ->
1733
1747
State3 = State2 # bc_state {write_file = WriteFile0 },
1734
1748
write_and_keydir_put (State3 , Key , Value , Tstamp , Retries ,
@@ -1908,9 +1922,12 @@ expiry_merge([], _LiveKeyDir, _KT, Acc) ->
1908
1922
Acc ;
1909
1923
expiry_merge ([File | Files ], LiveKeyDir , KT , Acc0 ) ->
1910
1924
FileId = bitcask_fileops :file_tstamp (File ),
1911
- Fun = fun (K , Tstamp , {Offset , _TotalSz }, Acc ) ->
1912
- bitcask_nifs :keydir_remove (LiveKeyDir , KT (K ), Tstamp , FileId , Offset ),
1913
- Acc
1925
+ Fun = fun ({tombstone , _ }, _ , _ , Acc ) ->
1926
+ Acc ;
1927
+ (K , Tstamp , {Offset , _TotalSz }, Acc ) ->
1928
+ bitcask_nifs :keydir_remove (LiveKeyDir , KT (K ), Tstamp , FileId ,
1929
+ Offset ),
1930
+ Acc
1914
1931
end ,
1915
1932
case bitcask_fileops :fold_keys (File , Fun , ok , default ) of
1916
1933
{error , Reason } ->
@@ -1965,11 +1982,13 @@ init_dataset(Dirname, Opts, KVs) ->
1965
1982
os :cmd (? FMT (" rm -rf ~s " , [Dirname ])),
1966
1983
1967
1984
B = bitcask :open (Dirname , [read_write ] ++ Opts ),
1968
- lists :foldl (fun ({K , V }, _ ) ->
1969
- ok = bitcask :put (B , K , V )
1970
- end , undefined , KVs ),
1985
+ put_kvs (B , KVs ),
1971
1986
B .
1972
1987
1988
+ put_kvs (B , KVs ) ->
1989
+ lists :foldl (fun ({K , V }, _ ) ->
1990
+ ok = bitcask :put (B , K , V )
1991
+ end , undefined , KVs ).
1973
1992
1974
1993
default_dataset () ->
1975
1994
[{<<" k" >>, <<" v" >>},
@@ -2029,6 +2048,44 @@ list_data_files_test2() ->
2029
2048
% % Now use the list_data_files to scan the dir
2030
2049
ExpFiles = list_data_files (" /tmp/bc.test.list" , undefined , undefined ).
2031
2050
2051
+ % Test that readable_files will not return the currently active
2052
+ % write or merge file by mistake if they change in between fetching them
2053
+ % and listing the files in the directory.
2054
+ list_data_files_race_test () ->
2055
+ Dir = " /tmp/bc.test.list.race" ,
2056
+ Fname = fun (N ) ->
2057
+ filename :join (Dir , integer_to_list (N ) ++ " .bitcask.data" )
2058
+ end ,
2059
+ WriteFile = fun (N ) ->
2060
+ ok = file :write_file (Fname (N ), <<>>)
2061
+ end ,
2062
+ WriteFiles = fun (S ,E ) ->
2063
+ [WriteFile (N ) || N <- lists :seq (S , E )]
2064
+ end ,
2065
+ os :cmd (" rm -rf " ++ Dir ++ " ; mkdir -p " ++ Dir ),
2066
+ WriteFiles (1 ,5 ),
2067
+ % Faking 4 as merge file, 5 as write file,
2068
+ % then switching to 6 as merge, 7 as write
2069
+ KindN = fun (merge ) -> 4 ; (write ) -> 5 end ,
2070
+ meck :new (bitcask_lockops , [passthrough ]),
2071
+ meck :expect (bitcask_lockops , read_activefile ,
2072
+ fun (Kind , _ ) ->
2073
+ case get ({fake_activefile , Kind }) of
2074
+ undefined ->
2075
+ N = KindN (Kind ),
2076
+ % Next time return file + 2
2077
+ WriteFile (N + 2 ),
2078
+ put ({fake_activefile , Kind }, Fname (N + 2 )),
2079
+ Fname (N );
2080
+ File ->
2081
+ File
2082
+ end
2083
+ end ),
2084
+ ReadFiles = lists :usort (bitcask :readable_files (Dir )),
2085
+ meck :unload (),
2086
+ ? assertEqual ([Fname (N )||N <- lists :seq (1 ,5 )],
2087
+ ReadFiles ).
2088
+
2032
2089
fold_test_ () ->
2033
2090
{timeout , 60 , fun fold_test2 /0 }.
2034
2091
@@ -2865,6 +2922,25 @@ corrupt_file(Path, Offset, Data) ->
2865
2922
ok = file :write (FH , Data ),
2866
2923
file :close (FH ).
2867
2924
2925
+ % Verify that if the cached efile port goes away, we can recover
2926
+ % and not get stuck opening casks
2927
+ efile_error_test () ->
2928
+ Dir = " /tmp/bc.efile.error" ,
2929
+ B = bitcask :open (Dir , [read_write ]),
2930
+ ok = bitcask :put (B , <<" k" >>, <<" v" >>),
2931
+ ok = bitcask :close (B ),
2932
+ Port = get (bitcask_efile_port ),
2933
+ % If this fails, we stopped using the efile port trick to list
2934
+ % dir contents, so remove this test
2935
+ ? assert (is_port (Port )),
2936
+ true = erlang :port_close (Port ),
2937
+ case bitcask :open (Dir ) of
2938
+ {error , _ } = Err ->
2939
+ ? assertEqual (ok , Err );
2940
+ B2 when is_reference (B2 ) ->
2941
+ ok = bitcask :close (B2 )
2942
+ end .
2943
+
2868
2944
% % About leak_t0():
2869
2945
% %
2870
2946
% % If bitcask leaks file descriptors for the 'touch'ed files, output is:
@@ -3210,6 +3286,26 @@ update_tstamp_stats_test2() ->
3210
3286
bitcask_time :test__clear_fudge ()
3211
3287
end .
3212
3288
3289
+ scan_err_test_ () ->
3290
+ {setup ,
3291
+ fun () ->
3292
+ meck :new (bitcask_fileops , [passthrough ]),
3293
+ ok
3294
+ end ,
3295
+ fun (_ ) ->
3296
+ meck :unload ()
3297
+ end ,
3298
+ [fun () ->
3299
+ Dir = " /tmp/bc.scan.err" ,
3300
+ meck :expect (bitcask_fileops , data_file_tstamps ,
3301
+ fun (_ ) -> {error , because } end ),
3302
+ ? assertMatch ({error , _ }, bitcask :open (Dir )),
3303
+ meck :unload (bitcask_fileops ),
3304
+ B = bitcask :open (Dir ),
3305
+ ? assertMatch ({true , B }, {is_reference (B ), B }),
3306
+ ok = bitcask :close (B )
3307
+ end ]}.
3308
+
3213
3309
total_byte_stats_test_ () ->
3214
3310
{timeout , 60 , fun total_byte_stats_test2 /0 }.
3215
3311
@@ -3281,6 +3377,31 @@ merge_batch_test2() ->
3281
3377
bitcask :close (B )
3282
3378
end .
3283
3379
3380
+ merge_expired_test_ () ->
3381
+ {timeout , 120 , fun merge_expired_test2 /0 }.
3382
+
3383
+ merge_expired_test2 () ->
3384
+ Dir = " /tmp/bc.merge.expired.files" ,
3385
+ NKeys = 10 ,
3386
+ KF = fun (N ) -> <<N :8 /integer >> end ,
3387
+ KVGen = fun (S , E ) ->
3388
+ [{KF (N ), <<" v" >>} || N <- lists :seq (S , E )]
3389
+ end ,
3390
+ DataSet = KVGen (1 , 3 ),
3391
+ B = init_dataset (Dir , [{max_file_size , 1 }], DataSet ),
3392
+ ok = bitcask :delete (B , KF (1 )),
3393
+ put_kvs (B , KVGen (4 , NKeys )),
3394
+ % Merge away the first 4 files as if they were completely expired,
3395
+ FirstFiles = [Dir ++ " /" ++ integer_to_list (N ) ++ " .bitcask.data" ||
3396
+ N <- lists :seq (1 , 4 )],
3397
+ ? assertEqual (ok , bitcask :merge (Dir , [], {FirstFiles , FirstFiles })),
3398
+ ExpectedKeys = [KF (N ) || N <- lists :seq (4 , NKeys )],
3399
+ ActualKeys1 = lists :sort (bitcask :list_keys (B )),
3400
+ ActualKeys2 = lists :sort (bitcask :fold (B , fun (K ,_V ,A )->[K |A ] end , [])),
3401
+ bitcask :close (B ),
3402
+ ? assertEqual (ExpectedKeys , ActualKeys1 ),
3403
+ ? assertEqual (ExpectedKeys , ActualKeys2 ).
3404
+
3284
3405
max_merge_size_test_ () ->
3285
3406
{timeout , 120 , fun max_merge_size_test2 /0 }.
3286
3407
@@ -3361,6 +3482,31 @@ legacy_tombstones_test2() ->
3361
3482
bitcask :close (B ),
3362
3483
? assertEqual ([Last ], AllFiles3 ).
3363
3484
3485
+ update_tombstones_test () ->
3486
+ Dir = " /tmp/bc.update.tombstones" ,
3487
+ Key = <<" k" >>,
3488
+ Data = [{Key , integer_to_binary (N )} || N <- lists :seq (1 , 10 )],
3489
+ B = init_dataset (Dir , [read_write , {max_file_size , 50000000 }], Data ),
3490
+ ok = bitcask :close (B ),
3491
+ % Re-open to guarantee opening a second file.
3492
+ % An update on the new file requires a tombstone.
3493
+ B2 = bitcask :open (Dir , [read_write , {max_file_size , 50000000 }]),
3494
+ ok = bitcask :put (B2 , Key , <<" last_val" >>),
3495
+ ok = bitcask :close (B2 ),
3496
+ Files = bitcask :readable_files (Dir ),
3497
+ Fds = [begin
3498
+ {ok , Fd } = bitcask_fileops :open_file (File ),
3499
+ Fd
3500
+ end || File <- Files ],
3501
+ CountF = fun (_K , V , _Tstamp , _ , Acc ) ->
3502
+ case bitcask :is_tombstone (V ) of
3503
+ true -> Acc + 1 ;
3504
+ false -> Acc
3505
+ end
3506
+ end ,
3507
+ TombCount = bitcask :subfold (CountF , Fds , 0 ),
3508
+ ? assertEqual (1 , TombCount ).
3509
+
3364
3510
make_merge_file (Dir , Seed , Probability ) ->
3365
3511
random :seed (Seed ),
3366
3512
case filelib :is_dir (Dir ) of
0 commit comments