Skip to content

Commit c78cfc0

Browse files
committed
Add osiris_log_manifest behaviour for hooks into log writers
`osiris_log_manifest` adds a few callbacks which are executed during initialization of writers and at events like rolling segments and writing chunks.
1 parent f662b43 commit c78cfc0

File tree

2 files changed

+150
-28
lines changed

2 files changed

+150
-28
lines changed

src/osiris_log.erl

Lines changed: 108 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@
5858
generate_log/4,
5959
parse_header/2]).
6060

61+
-behaviour(osiris_log_manifest).
62+
%% osiris_log_manifest implementations
63+
-export([init_manifest/2,
64+
handle_event/2,
65+
close_manifest/1,
66+
delete/1]).
67+
6168
-export([dump_init/1,
6269
dump_init_idx/1,
6370
dump_chunk/1,
@@ -362,6 +369,7 @@
362369
-type offset() :: osiris:offset().
363370
-type epoch() :: osiris:epoch().
364371
-type range() :: empty | {From :: offset(), To :: offset()}.
372+
-type epoch_offsets() :: [{epoch(), offset()}].
365373
-type counter_spec() :: {Tag :: term(), Fields :: seshat:fields_spec()}.
366374
-type chunk_type() ::
367375
?CHNK_USER |
@@ -422,7 +430,8 @@
422430
%% that will be included in snapshots written to new segments
423431
readers_counter_fun = fun(_) -> ok end :: function(),
424432
shared :: atomics:atomics_ref(),
425-
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size()
433+
filter_size = ?DEFAULT_FILTER_SIZE :: osiris_bloom:filter_size(),
434+
manifest_module = ?MODULE :: module()
426435
}).
427436
-record(ra,
428437
{size = ?HEADER_SIZE_B + ?DEFAULT_FILTER_SIZE :: non_neg_integer(),
@@ -444,7 +453,8 @@
444453
{type = writer :: writer | acceptor,
445454
segment_size = {?LOG_HEADER_SIZE, 0} :: {non_neg_integer(), non_neg_integer()},
446455
current_epoch :: non_neg_integer(),
447-
tail_info = {0, empty} :: osiris:tail_info()
456+
tail_info = {0, empty} :: osiris:tail_info(),
457+
manifest :: osiris_log_manifest:state()
448458
}).
449459
-record(?MODULE,
450460
{cfg :: #cfg{},
@@ -456,7 +466,7 @@
456466
%% record chunk_info does not map exactly to an index record (field 'num' differs)
457467
-record(chunk_info,
458468
{id :: offset(),
459-
timestamp :: non_neg_integer(),
469+
timestamp :: osiris:timestamp(),
460470
epoch :: epoch(),
461471
num :: non_neg_integer(),
462472
type :: chunk_type(),
@@ -475,8 +485,10 @@
475485
-opaque state() :: #?MODULE{}.
476486

477487
-export_type([state/0,
488+
chunk_type/0,
478489
chunk_iterator/0,
479490
range/0,
491+
epoch_offsets/0,
480492
config/0,
481493
counter_spec/0,
482494
transport/0,
@@ -499,20 +511,11 @@ init(Config) ->
499511
-spec init(config(), writer | acceptor) -> state().
500512
init(#{dir := Dir,
501513
name := Name,
502-
epoch := Epoch} = Config,
514+
epoch := Epoch} = Config0,
503515
WriterType) ->
504516
%% scan directory for segments if in write mode
505-
MaxSizeBytes = maps:get(max_segment_size_bytes, Config,
506-
?DEFAULT_MAX_SEGMENT_SIZE_B),
507-
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
508-
?DEFAULT_MAX_SEGMENT_SIZE_C),
509-
Retention = maps:get(retention, Config, []),
510-
FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE),
511517
?INFO("Stream: ~ts will use ~ts for osiris log data directory",
512518
[Name, Dir]),
513-
?DEBUG_(Name, "max_segment_size_bytes: ~b,
514-
max_segment_size_chunks ~b, retention ~w, filter size ~b",
515-
[MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]),
516519
ok = filelib:ensure_dir(Dir),
517520
case file:make_dir(Dir) of
518521
ok ->
@@ -522,7 +525,25 @@ init(#{dir := Dir,
522525
Err ->
523526
throw(Err)
524527
end,
528+
ok = maybe_fix_corrupted_files(Config0),
529+
530+
ManifestMod = manifest_module(),
531+
{Config, Manifest0} = case Config0 of
532+
#{manifest := M} ->
533+
{Config0, M};
534+
_ ->
535+
ManifestMod:init_manifest(Config0, writer)
536+
end,
525537

538+
MaxSizeBytes = maps:get(max_segment_size_bytes, Config,
539+
?DEFAULT_MAX_SEGMENT_SIZE_B),
540+
MaxSizeChunks = application:get_env(osiris, max_segment_size_chunks,
541+
?DEFAULT_MAX_SEGMENT_SIZE_C),
542+
Retention = maps:get(retention, Config, []),
543+
FilterSize = maps:get(filter_size, Config, ?DEFAULT_FILTER_SIZE),
544+
?DEBUG_(Name, "max_segment_size_bytes: ~b,
545+
max_segment_size_chunks ~b, retention ~w, filter size ~b",
546+
[MaxSizeBytes, MaxSizeChunks, Retention, FilterSize]),
526547
Cnt = make_counter(Config),
527548
%% initialise offset counter to -1 as 0 is the first offset in the log and
528549
%% it hasn't necessarily been written yet, for an empty log the first offset
@@ -544,8 +565,8 @@ init(#{dir := Dir,
544565
counter = Cnt,
545566
counter_id = counter_id(Config),
546567
shared = Shared,
547-
filter_size = FilterSize},
548-
ok = maybe_fix_corrupted_files(Config),
568+
filter_size = FilterSize,
569+
manifest_module = ManifestMod},
549570
DefaultNextOffset = case Config of
550571
#{initial_offset := IO}
551572
when WriterType == acceptor ->
@@ -562,6 +583,7 @@ init(#{dir := Dir,
562583
#write{type = WriterType,
563584
tail_info = {DefaultNextOffset,
564585
empty},
586+
manifest = Manifest0,
565587
current_epoch = Epoch}});
566588
{NumSegments,
567589
#seg_info{first = #chunk_info{id = FstChId,
@@ -603,11 +625,14 @@ init(#{dir := Dir,
603625
{ok, IdxFd} = open(IdxFilename, ?FILE_OPTS_WRITE),
604626
{ok, IdxEof} = file:position(IdxFd, eof),
605627
NumChunks = (IdxEof - ?IDX_HEADER_SIZE) div ?INDEX_RECORD_SIZE_B,
628+
Event = {segment_opened, undefined, filename:basename(Filename)},
629+
Manifest = ManifestMod:handle_event(Event, Manifest0),
606630
#?MODULE{cfg = Cfg,
607631
mode =
608632
#write{type = WriterType,
609633
tail_info = TailInfo,
610634
segment_size = {Size, NumChunks},
635+
manifest = Manifest,
611636
current_epoch = Epoch},
612637
current_file = filename:basename(Filename),
613638
fd = SegFd,
@@ -627,10 +652,13 @@ init(#{dir := Dir,
627652
{ok, _} = file:position(IdxFd, ?IDX_HEADER_SIZE),
628653
osiris_log_shared:set_first_chunk_id(Shared, DefaultNextOffset - 1),
629654
osiris_log_shared:set_last_chunk_id(Shared, DefaultNextOffset - 1),
655+
Event = {segment_opened, undefined, filename:basename(Filename)},
656+
Manifest = ManifestMod:handle_event(Event, Manifest0),
630657
#?MODULE{cfg = Cfg,
631658
mode =
632659
#write{type = WriterType,
633660
tail_info = {DefaultNextOffset, empty},
661+
manifest = Manifest,
634662
current_epoch = Epoch},
635663
current_file = filename:basename(Filename),
636664
fd = SegFd,
@@ -867,7 +895,7 @@ evaluate_tracking_snapshot(#?MODULE{mode = #write{type = writer}} = State0, Trk0
867895
-spec init_acceptor(range(), list(), config()) ->
868896
state().
869897
init_acceptor(Range, EpochOffsets0,
870-
#{name := Name, dir := Dir} = Conf) ->
898+
#{name := Name, dir := Dir} = Conf0) ->
871899
%% truncate to first common last epoch offset
872900
%% * if the last local chunk offset has the same epoch but is lower
873901
%% than the last chunk offset then just attach at next offset.
@@ -879,6 +907,8 @@ init_acceptor(Range, EpochOffsets0,
879907
lists:reverse(
880908
lists:sort(EpochOffsets0)),
881909

910+
{Conf, Manifest} = (manifest_module()):init_manifest(Conf0, acceptor),
911+
882912
%% then truncate to
883913
IdxFiles = sorted_index_files(Dir),
884914
?DEBUG_(Name, "from epoch offsets: ~w range ~w", [EpochOffsets, Range]),
@@ -889,7 +919,8 @@ init_acceptor(Range, EpochOffsets0,
889919
{O, _} -> O
890920
end,
891921
init(Conf#{initial_offset => InitOffset,
892-
index_files => RemIdxFiles}, acceptor).
922+
index_files => RemIdxFiles,
923+
manifest => Manifest}, acceptor).
893924

894925
chunk_id_index_scan(IdxFile, ChunkId)
895926
when ?IS_STRING(IdxFile) ->
@@ -1817,11 +1848,19 @@ needs_handling(_, _, _) ->
18171848

18181849
-spec close(state()) -> ok.
18191850
close(#?MODULE{cfg = #cfg{counter_id = CntId,
1851+
manifest_module = ManifestMod,
18201852
readers_counter_fun = Fun},
18211853
fd = SegFd,
1822-
index_fd = IdxFd}) ->
1854+
index_fd = IdxFd,
1855+
mode = Mode}) ->
18231856
close_fd(IdxFd),
18241857
close_fd(SegFd),
1858+
case Mode of
1859+
#write{manifest = Manifest} ->
1860+
ok = ManifestMod:close_manifest(Manifest);
1861+
_ ->
1862+
ok
1863+
end,
18251864
Fun(-1),
18261865
case CntId of
18271866
undefined ->
@@ -1830,14 +1869,17 @@ close(#?MODULE{cfg = #cfg{counter_id = CntId,
18301869
osiris_counters:delete(CntId)
18311870
end.
18321871

1833-
delete_directory(#{name := Name,
1834-
dir := _} = Config) ->
1872+
delete_directory(Config) ->
1873+
(manifest_module()):delete(Config).
1874+
1875+
delete(#{name := Name,
1876+
dir := _} = Config) ->
18351877
Dir = directory(Config),
18361878
?DEBUG_(Name, " deleting directory ~ts", [Dir]),
18371879
delete_dir(Dir);
1838-
delete_directory(#{name := Name}) ->
1839-
delete_directory(Name);
1840-
delete_directory(Name) when ?IS_STRING(Name) ->
1880+
delete(#{name := Name}) ->
1881+
delete(Name);
1882+
delete(Name) when ?IS_STRING(Name) ->
18411883
Dir = directory(Name),
18421884
?DEBUG_(Name, " deleting directory ~ts", [Dir]),
18431885
delete_dir(Dir).
@@ -2118,7 +2160,7 @@ build_segment_info(SegFile, LastChunkPos, IdxFile) ->
21182160
end.
21192161

21202162
-spec overview(file:filename_all()) ->
2121-
{range(), [{epoch(), offset()}]}.
2163+
{range(), epoch_offsets()}.
21222164
overview(Dir) ->
21232165
Files = list_dir(Dir),
21242166
%% index files with matching segment
@@ -2182,11 +2224,19 @@ format_status(#?MODULE{cfg = #cfg{directory = Dir,
21822224
-spec update_retention([retention_spec()], state()) -> state().
21832225
update_retention(Retention,
21842226
#?MODULE{cfg = #cfg{name = Name,
2227+
manifest_module = ?MODULE,
21852228
retention = Retention0} = Cfg} = State0)
21862229
when is_list(Retention) ->
21872230
?DEBUG_(Name, " from: ~w to ~w", [Retention0, Retention]),
21882231
State = State0#?MODULE{cfg = Cfg#cfg{retention = Retention}},
2189-
trigger_retention_eval(State).
2232+
trigger_retention_eval(State);
2233+
update_retention(Retention,
2234+
#?MODULE{cfg = #cfg{manifest_module = ManifestMod},
2235+
mode = #write{manifest = Manifest0} = Write0} =
2236+
State0) ->
2237+
Event = {retention_updated, Retention},
2238+
Manifest = ManifestMod:handle_event(Event, Manifest0),
2239+
State0#?MODULE{mode = Write0#write{manifest = Manifest}}.
21902240

21912241
-spec evaluate_retention(file:filename_all(), [retention_spec()]) ->
21922242
{range(), FirstTimestamp :: osiris:timestamp(),
@@ -2482,11 +2532,13 @@ write_chunk(Chunk,
24822532
Epoch,
24832533
NumRecords,
24842534
#?MODULE{cfg = #cfg{counter = CntRef,
2535+
manifest_module = ManifestMod,
24852536
shared = Shared} = Cfg,
24862537
fd = Fd,
24872538
index_fd = IdxFd,
24882539
mode =
24892540
#write{segment_size = {SegSizeBytes, SegSizeChunks},
2541+
manifest = Manifest0,
24902542
tail_info = {Next, _}} =
24912543
Write} =
24922544
State) ->
@@ -2516,9 +2568,19 @@ write_chunk(Chunk,
25162568
counters:put(CntRef, ?C_OFFSET, NextOffset - 1),
25172569
counters:add(CntRef, ?C_CHUNKS, 1),
25182570
maybe_set_first_offset(Next, Cfg),
2571+
ChunkInfo = #{id => Next,
2572+
timestamp => Timestamp,
2573+
epoch => Epoch,
2574+
num => NumRecords,
2575+
type => ChType,
2576+
size => Size,
2577+
pos => Cur},
2578+
Event = {chunk_written, ChunkInfo, Chunk},
2579+
Manifest = ManifestMod:handle_event(Event, Manifest0),
25192580
State#?MODULE{mode =
25202581
Write#write{tail_info = {NextOffset,
25212582
{Epoch, Next, Timestamp}},
2583+
manifest = Manifest,
25222584
segment_size = {SegSizeBytes + Size,
25232585
SegSizeChunks + 1}}}
25242586
end.
@@ -2727,10 +2789,13 @@ make_file_name(N, Suff) ->
27272789

27282790
open_new_segment(#?MODULE{cfg = #cfg{name = Name,
27292791
directory = Dir,
2730-
counter = Cnt},
2792+
counter = Cnt,
2793+
manifest_module = ManifestMod},
27312794
fd = OldFd,
27322795
index_fd = OldIdxFd,
2796+
current_file = OldFilename,
27332797
mode = #write{type = _WriterType,
2798+
manifest = Manifest0,
27342799
tail_info = {NextOffset, _}} = Write} =
27352800
State0) ->
27362801
_ = close_fd(OldFd),
@@ -2751,11 +2816,15 @@ open_new_segment(#?MODULE{cfg = #cfg{name = Name,
27512816
{ok, _} = file:position(IdxFd, eof),
27522817
counters:add(Cnt, ?C_SEGMENTS, 1),
27532818

2819+
Event = {segment_opened, OldFilename, Filename},
2820+
Manifest = ManifestMod:handle_event(Event, Manifest0),
2821+
27542822
State0#?MODULE{current_file = Filename,
27552823
fd = Fd,
2756-
%% reset segment_size counter
27572824
index_fd = IdxFd,
2758-
mode = Write#write{segment_size = {?LOG_HEADER_SIZE, 0}}}.
2825+
mode = Write#write{manifest = Manifest,
2826+
%% reset segment_size counter
2827+
segment_size = {?LOG_HEADER_SIZE, 0}}}.
27592828

27602829
open_index_read(File) ->
27612830
{ok, Fd} = open(File, [read, raw, binary, read_ahead]),
@@ -3442,6 +3511,9 @@ ra(#{options := #{read_ahead := Limit}}) when is_integer(Limit) ->
34423511
ra(_) ->
34433512
#ra{}.
34443513

3514+
manifest_module() ->
3515+
application:get_env(osiris, log_manifest, ?MODULE).
3516+
34453517
generate_log(Msg, MsgsPerChunk, NumMessages, Directory) ->
34463518
Name = filename:basename(Directory),
34473519

@@ -3474,6 +3546,14 @@ write_in_chunks(ToWrite, MsgsPerChunk, Msg, W0) when ToWrite > 0 ->
34743546
write_in_chunks(_, _, _, W) ->
34753547
W.
34763548

3549+
%% Default implementation of osiris_log_manifest:
3550+
init_manifest(Config, _WriterType) ->
3551+
{Config, undefined}.
3552+
handle_event(_Event, undefined) ->
3553+
undefined.
3554+
close_manifest(undefined) ->
3555+
ok.
3556+
34773557
-ifdef(TEST).
34783558
-include_lib("eunit/include/eunit.hrl").
34793559

src/osiris_log_manifest.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
6+
%%
7+
8+
-module(osiris_log_manifest).
9+
10+
-type state() :: term().
11+
12+
-type chunk_info() ::
13+
#{id := osiris:offset(),
14+
timestamp := osiris:timestamp(),
15+
epoch := osiris:epoch(),
16+
num := non_neg_integer(),
17+
type := osiris_log:chunk_type(),
18+
%% size of data + filter + trailer
19+
size := non_neg_integer(),
20+
pos := integer()}.
21+
22+
-type event() :: {segment_opened,
23+
OldSegment :: file:filename_all() | undefined,
24+
NewSegment :: file:filename_all()} |
25+
{chunk_written, chunk_info(), iodata()} |
26+
{retention_updated, [osiris:retention_spec()]}.
27+
28+
-export_type([state/0,
29+
chunk_info/0,
30+
event/0]).
31+
32+
-callback overview(Dir :: file:filename_all()) ->
33+
{osiris_log:range(), osiris_log:epoch_offsets()}.
34+
35+
-callback init_manifest(osiris_log:config(), writer | acceptor) ->
36+
{osiris_log:config(), state()}.
37+
38+
-callback handle_event(event(), state()) -> state().
39+
40+
-callback close_manifest(state()) -> ok.
41+
42+
-callback delete(osiris_log:config()) -> ok.

0 commit comments

Comments
 (0)