Skip to content

Commit cfba0e9

Browse files
committed
Optional io device to open files
1 parent d2e2a3f commit cfba0e9

File tree

1 file changed

+61
-36
lines changed

1 file changed

+61
-36
lines changed

src/osiris_log.erl

Lines changed: 61 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@
105105

106106
-define(SKIP_SEARCH_JUMP, 2048).
107107

108+
-define(DEFAULT_IO_MODULE, file).
109+
108110
%% Specification of the Log format.
109111
%%
110112
%% Notes:
@@ -577,14 +579,14 @@ init(#{dir := Dir,
577579
?DEBUG_(Name, " next offset ~b first offset ~b",
578580
[element(1, TailInfo),
579581
FstChId]),
580-
{ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE),
582+
{ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE),
581583
{ok, Size} = file:position(SegFd, Size),
582584
%% maybe_fix_corrupted_files has truncated the index to the last
583585
%% record pointing
584586
%% at a valid chunk we can now truncate the segment to size in
585587
%% case there is trailing data
586588
ok = file:truncate(SegFd),
587-
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
589+
{ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE),
588590
{ok, IdxEof} = file:position(IdxFd, eof),
589591
NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B,
590592
#?MODULE{cfg = Cfg,
@@ -600,8 +602,8 @@ init(#{dir := Dir,
600602
index = IdxFilename,
601603
last = undefined}, _} ->
602604
%% the empty log case
603-
{ok, SegFd} = open(Filename, ?FILE_OPTS_WRITE),
604-
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
605+
{ok, SegFd} = open_seg(Filename, ?FILE_OPTS_WRITE),
606+
{ok, IdxFd} = open_idx(IdxFilename, ?FILE_OPTS_WRITE),
605607
{ok, _} = file:position(SegFd, ?LOG_HEADER_SIZE),
606608
counters:put(Cnt, ?C_SEGMENTS, 1),
607609
%% the segment could potentially have trailing data here so we'll
@@ -641,7 +643,7 @@ maybe_fix_corrupted_files([IdxFile]) ->
641643
true ->
642644
% the only index doesn't contain a single valid record
643645
% make sure it has a valid header
644-
{ok, IdxFd} = file:open(IdxFile, ?FILE_OPTS_WRITE),
646+
{ok, IdxFd} = open_idx(IdxFile, ?FILE_OPTS_WRITE),
645647
ok = file:write(IdxFd, ?IDX_HEADER),
646648
ok = file:close(IdxFd);
647649
false ->
@@ -651,7 +653,7 @@ maybe_fix_corrupted_files([IdxFile]) ->
651653
true ->
652654
% the only segment doesn't contain a single valid chunk
653655
% make sure it has a valid header
654-
{ok, SegFd} = file:open(SegFile, ?FILE_OPTS_WRITE),
656+
{ok, SegFd} = open_seg(SegFile, ?FILE_OPTS_WRITE),
655657
ok = file:write(SegFd, ?LOG_HEADER),
656658
ok = file:close(SegFd);
657659
false ->
@@ -698,7 +700,7 @@ truncate_invalid_idx_records(IdxFile, SegSize) ->
698700
% add an option to perform a full segment scan and reconstruct
699701
% the index for the valid chunks.
700702
SegFile = segment_from_index_file(IdxFile),
701-
{ok, IdxFd} = open(IdxFile, [raw, binary, write, read]),
703+
{ok, IdxFd} = open_idx(IdxFile, [raw, binary, write, read]),
702704
{ok, Pos} = position_at_idx_record_boundary(IdxFd, eof),
703705
ok = skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos),
704706
ok = file:truncate(IdxFd),
@@ -716,7 +718,7 @@ skip_invalid_idx_records(IdxFd, SegFile, SegSize, Pos) ->
716718
{ok, ?IDX_MATCH(_, _, ChunkPos)} ->
717719
% a non-zero index record
718720
case ChunkPos < SegSize andalso
719-
is_valid_chunk_on_disk(SegFile, ChunkPos) of
721+
is_valid_chunk_on_disk(SegFile, ChunkPos) of
720722
true ->
721723
ok;
722724
false ->
@@ -956,9 +958,9 @@ truncate_to(Name, RemoteRange, [{E, ChId} | NextEOs], IdxFiles) ->
956958
%% the Chunk id was found and has the right epoch
957959
%% lets truncate to this point
958960
%% FilePos could be eof here which means the next offset
959-
{ok, Fd} = file:open(File, [read, write, binary, raw]),
961+
{ok, Fd} = open_seg(File, [read, write, binary, raw]),
960962
_ = file:advise(Fd, 0, 0, random),
961-
{ok, IdxFd} = file:open(IdxFile, [read, write, binary, raw]),
963+
{ok, IdxFd} = open_idx(IdxFile, [read, write, binary, raw]),
962964

963965
NextPos = next_chunk_pos(Fd, Pos),
964966
{ok, _} = file:position(Fd, NextPos),
@@ -1047,7 +1049,7 @@ init_data_reader_at(ChunkId, FilePos, File,
10471049
#{dir := Dir, name := Name,
10481050
shared := Shared,
10491051
readers_counter_fun := CountersFun} = Config) ->
1050-
case file:open(File, [raw, binary, read]) of
1052+
case open_seg(File, [raw, binary, read]) of
10511053
{ok, Fd} ->
10521054
Cnt = make_counter(Config),
10531055
counters:put(Cnt, ?C_OFFSET, ChunkId - 1),
@@ -1157,7 +1159,7 @@ init_offset_reader0({timestamp, Ts}, #{} = Conf) ->
11571159
SegmentFile = segment_from_index_file(IdxFile),
11581160
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
11591161
{first_in, IdxFile} ->
1160-
{ok, Fd} = file:open(IdxFile, [raw, binary, read]),
1162+
{ok, Fd} = open_seg(IdxFile, [raw, binary, read]),
11611163
{ok, ?IDX_MATCH(ChunkId, _, FilePos)} = first_idx_record(Fd),
11621164
SegmentFile = segment_from_index_file(IdxFile),
11631165
open_offset_reader_at(SegmentFile, ChunkId, FilePos, Conf);
@@ -1268,7 +1270,7 @@ open_offset_reader_at(SegmentFile, NextChunkId, FilePos,
12681270
readers_counter_fun := ReaderCounterFun,
12691271
options := Options} =
12701272
Conf) ->
1271-
{ok, Fd} = open(SegmentFile, [raw, binary, read]),
1273+
{ok, Fd} = open_seg(SegmentFile, [raw, binary, read]),
12721274
Cnt = make_counter(Conf),
12731275
ReaderCounterFun(1),
12741276
FilterMatcher = case Options of
@@ -1308,7 +1310,7 @@ last_user_chunk_id0(_, []) ->
13081310
not_found;
13091311
last_user_chunk_id0(Name, [IdxFile | Rest]) ->
13101312
%% Do not read-ahead since we read the index file backwards chunk by chunk.
1311-
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
1313+
{ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]),
13121314
{ok, EofPos} = position_at_idx_record_boundary(IdxFd, eof),
13131315
Last = last_user_chunk_id_in_index(EofPos - ?INDEX_RECORD_SIZE_B, IdxFd),
13141316
_ = file:close(IdxFd),
@@ -1607,7 +1609,7 @@ iter_all_records({X, I}, Acc0) ->
16071609
is_valid_chunk_on_disk(SegFile, Pos) ->
16081610
%% read a chunk from a specified location in the segment
16091611
%% then checks the CRC
1610-
case open(SegFile, [read, raw, binary]) of
1612+
case open_seg(SegFile, [read, raw, binary]) of
16111613
{ok, SegFd} ->
16121614
IsValid = case file:pread(SegFd, Pos, ?HEADER_SIZE_B) of
16131615
{ok,
@@ -1897,7 +1899,7 @@ last_idx_record(IdxFd) ->
18971899
nth_last_idx_record(IdxFd, 1).
18981900

18991901
nth_last_idx_record(IdxFile, N) when ?IS_STRING(IdxFile) ->
1900-
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
1902+
{ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]),
19011903
IdxRecord = nth_last_idx_record(IdxFd, N),
19021904
_ = file:close(IdxFd),
19031905
IdxRecord;
@@ -1910,7 +1912,7 @@ nth_last_idx_record(IdxFd, N) ->
19101912
end.
19111913

19121914
last_valid_idx_record(IdxFile) ->
1913-
{ok, IdxFd} = open(IdxFile, [read, raw, binary]),
1915+
{ok, IdxFd} = open_idx(IdxFile, [read, raw, binary]),
19141916
case position_at_idx_record_boundary(IdxFd, eof) of
19151917
{ok, Pos} ->
19161918
SegFile = segment_from_index_file(IdxFile),
@@ -1955,7 +1957,7 @@ position_at_idx_record_boundary(IdxFd, At) ->
19551957
end.
19561958

19571959
build_segment_info(SegFile, LastChunkPos, IdxFile) ->
1958-
{ok, Fd} = open(SegFile, [read, binary, raw]),
1960+
{ok, Fd} = open_seg(SegFile, [read, binary, raw]),
19591961
%% we don't want to read blocks into page cache we are unlikely to need
19601962
_ = file:advise(Fd, 0, 0, random),
19611963
case file:pread(Fd, ?LOG_HEADER_SIZE, ?HEADER_SIZE_B) of
@@ -2183,8 +2185,13 @@ eval_max_bytes([IdxFile | Rest], Limit, Acc) ->
21832185
Acc
21842186
end.
21852187

2188+
2189+
2190+
21862191
file_size(Path) ->
2187-
case prim_file:read_file_info(Path) of
2192+
IOMod = application:get_env(osiris, io_segment_module,
2193+
prim_file),
2194+
case IOMod:read_file_info(Path) of
21882195
{ok, #file_info{size = Size}} ->
21892196
Size;
21902197
{error, enoent} ->
@@ -2215,7 +2222,7 @@ last_epoch_chunk_ids0([], undefined) ->
22152222
%% the empty stream
22162223
[];
22172224
last_epoch_chunk_ids0([IdxFile | _] = Files, undefined) ->
2218-
{ok, Fd} = open(IdxFile, [read, raw, binary]),
2225+
{ok, Fd} = open_idx(IdxFile, [read, raw, binary]),
22192226
case first_idx_record(Fd) of
22202227
{ok, ?IDX_MATCH(FstChId, FstEpoch, _)} ->
22212228
ok = file:close(Fd),
@@ -2229,7 +2236,7 @@ last_epoch_chunk_ids0([IdxFile | Rem], {PrevE, _PrevChId, EOs} = Acc0) ->
22292236
case last_valid_idx_record(IdxFile) of
22302237
{ok, ?IDX_MATCH(_LstChId, LstEpoch, _)}
22312238
when LstEpoch > PrevE ->
2232-
{ok, Fd} = open(IdxFile, [read, raw, binary]),
2239+
{ok, Fd} = open_idx(IdxFile, [read, raw, binary]),
22332240
Acc = idx_skip_search(Fd, ?IDX_HEADER_SIZE,
22342241
fun leo_search_fun/3,
22352242
Acc0),
@@ -2450,6 +2457,13 @@ max_segment_size_reached(
24502457

24512458
sendfile(_Transport, _Fd, _Sock, _Pos, 0) ->
24522459
ok;
2460+
sendfile(tcp = _Transport, Fd, Sock, Pos, ToSend) when is_pid(Fd) ->
2461+
case file:pread(Fd, Pos, ToSend) of
2462+
{ok, Data} ->
2463+
gen_tcp:send(Sock, Data);
2464+
{error, _} = Err ->
2465+
Err
2466+
end;
24532467
sendfile(tcp = Transport, Fd, Sock, Pos, ToSend) ->
24542468
case file:sendfile(Fd, Sock, Pos, ToSend, []) of
24552469
{ok, 0} ->
@@ -2474,7 +2488,7 @@ send(ssl, Sock, Data) ->
24742488
ssl:send(Sock, Data).
24752489

24762490
last_timestamp_in_index_file(IdxFile) ->
2477-
case file:open(IdxFile, [raw, binary, read]) of
2491+
case open_idx(IdxFile, [raw, binary, read]) of
24782492
{ok, IdxFd} ->
24792493
case last_idx_record(IdxFd) of
24802494
{ok, <<_O:64/unsigned,
@@ -2493,7 +2507,7 @@ last_timestamp_in_index_file(IdxFile) ->
24932507
end.
24942508

24952509
first_timestamp_from_index_files([IdxFile | _]) ->
2496-
case file:open(IdxFile, [raw, binary, read]) of
2510+
case open_idx(IdxFile, [raw, binary, read]) of
24972511
{ok, IdxFd} ->
24982512
case first_idx_record(IdxFd) of
24992513
{ok, <<_FstO:64/unsigned,
@@ -2525,14 +2539,14 @@ chunk_id_range_from_idx_files(Files) ->
25252539
end.
25262540

25272541
chunk_id_range_from_idx_files(FstIdxFile, LstIdxFile) ->
2528-
{ok, LstFd} = open(LstIdxFile, [read, raw, binary]),
2542+
{ok, LstFd} = open_idx(LstIdxFile, [read, raw, binary]),
25292543
case position_at_idx_record_boundary(LstFd, eof) of
25302544
{ok, Pos} ->
25312545
case file:pread(LstFd, Pos - ?INDEX_RECORD_SIZE_B,
25322546
?INDEX_RECORD_SIZE_B) of
25332547
{ok, ?IDX_MATCH(LstChId, _, _)} ->
25342548
ok = file:close(LstFd),
2535-
{ok, FstFd} = open(FstIdxFile, [read, raw, binary]),
2549+
{ok, FstFd} = open_idx(FstIdxFile, [read, raw, binary]),
25362550
case file:pread(FstFd, ?IDX_HEADER_SIZE,
25372551
?INDEX_RECORD_SIZE_B) of
25382552
{ok, ?IDX_MATCH(FstChId, _, _)} ->
@@ -2650,11 +2664,11 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
26502664
IdxFilename = make_file_name(NextOffset, "index"),
26512665
?DEBUG_(Name, "~ts", [Filename]),
26522666
{ok, IdxFd} =
2653-
file:open(
2667+
open_idx(
26542668
filename:join(Dir, IdxFilename), ?FILE_OPTS_WRITE),
26552669
ok = file:write(IdxFd, ?IDX_HEADER),
26562670
{ok, Fd} =
2657-
file:open(
2671+
open_seg(
26582672
filename:join(Dir, Filename), ?FILE_OPTS_WRITE),
26592673
ok = file:write(Fd, ?LOG_HEADER),
26602674
%% we always move to the end of the file
@@ -2669,7 +2683,7 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
26692683
mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}.
26702684

26712685
open_index_read(File) ->
2672-
{ok, Fd} = open(File, [read, raw, binary, read_ahead]),
2686+
{ok, Fd} = open_idx(File, [read, raw, binary, read_ahead]),
26732687
%% We can't use the assertion that index header is correct because of a
26742688
%% race condition between opening the file and writing the header
26752689
%% It seems to happen when retention policies are applied
@@ -2687,7 +2701,7 @@ offset_idx_scan(Name, Offset, #seg_info{index = IndexFile} = SegmentInfo) ->
26872701
true ->
26882702
offset_out_of_range;
26892703
false ->
2690-
{ok, IdxFd} = open(IndexFile,
2704+
{ok, IdxFd} = open_idx(IndexFile,
26912705
[read, raw, binary]),
26922706
_ = file:advise(IdxFd, 0, 0, random),
26932707
{Offset, SearchResult} =
@@ -2720,8 +2734,21 @@ throw_missing({error, enoent}) ->
27202734
throw_missing(Any) ->
27212735
Any.
27222736

2723-
open(File, Options) ->
2724-
throw_missing(file:open(File, Options)).
2737+
open_idx(File, Options) ->
2738+
IOMod = application:get_env(osiris, io_idx_module,
2739+
?DEFAULT_IO_MODULE),
2740+
open(IOMod, File, Options).
2741+
open_seg(File, Options) ->
2742+
IOMod = application:get_env(osiris, io_segment_module,
2743+
?DEFAULT_IO_MODULE),
2744+
open_seg(IOMod, File, Options).
2745+
open_seg(IOMod, File, Options) ->
2746+
open(IOMod, File, Options).
2747+
2748+
%% open(File, Options) ->
2749+
%% open(file, File, Options).
2750+
open(Mod, File, Options) ->
2751+
throw_missing(Mod:open(File, Options)).
27252752

27262753
chunk_location_for_timestamp(Idx, Ts) ->
27272754
%% TODO: optimise using skip search approach
@@ -2796,7 +2823,7 @@ recover_tracking(#?MODULE{cfg = #cfg{directory = Dir,
27962823
%% we need to open a new file handle here as we cannot use the one that is
27972824
%% being used for appending to the segment as pread _may_ move the file
27982825
%% position on some systems (such as windows)
2799-
{ok, Fd} = open(filename:join(Dir, File), [read, raw, binary]),
2826+
{ok, Fd} = open_seg(filename:join(Dir, File), [read, raw, binary]),
28002827
_ = file:advise(Fd, 0, 0, random),
28012828
%% TODO: if the first chunk in the segment isn't a tracking snapshot and
28022829
%% there are prior segments we could scan at least two segments increasing
@@ -2885,7 +2912,6 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
28852912
counters:put(CntRef, ?C_OFFSET, NextChId0 + NumRecords),
28862913
counters:add(CntRef, ?C_CHUNKS, 1),
28872914
NextPos = Pos + ?HEADER_SIZE_B + FilterSize + DataSize + TrailerSize,
2888-
28892915
ChunkFilter = case MaybeFilter of
28902916
<<F:FilterSize/binary, _/binary>> ->
28912917
%% filter is of default size or 0
@@ -2946,7 +2972,7 @@ read_header0(#?MODULE{cfg = #cfg{directory = Dir,
29462972
%% log but would cause an infinite loop if it does
29472973
{end_of_stream, State};
29482974
false ->
2949-
case file:open(filename:join(Dir, SegFile),
2975+
case open_seg(filename:join(Dir, SegFile),
29502976
[raw, binary, read]) of
29512977
{ok, Fd2} ->
29522978
ok = file:close(Fd),
@@ -3019,7 +3045,7 @@ index_file_first_offset(IdxFile) when is_binary(IdxFile) ->
30193045
binary_to_integer(filename:basename(IdxFile, <<".index">>)).
30203046

30213047
first_last_timestamps(IdxFile) ->
3022-
case file:open(IdxFile, [raw, read, binary]) of
3048+
case open_idx(IdxFile, [raw, read, binary]) of
30233049
{ok, Fd} ->
30243050
_ = file:advise(Fd, 0, 0, random),
30253051
case first_idx_record(Fd) of
@@ -3107,7 +3133,6 @@ close_fd(Fd) ->
31073133
_ = file:close(Fd),
31083134
ok.
31093135

3110-
31113136
dump_init(File) ->
31123137
{ok, Fd} = file:open(File, [raw, binary, read]),
31133138
{ok, <<"OSIL", _V:4/binary>> } = file:read(Fd, ?LOG_HEADER_SIZE),

0 commit comments

Comments
 (0)