Skip to content

Commit 6d81744

Browse files
authored
Merge pull request #101 from rabbitmq/init-first-offset-atomic
Ensure a writer|replica start initialises first offset
2 parents 209cebf + 0a08043 commit 6d81744

File tree

4 files changed

+81
-12
lines changed

4 files changed

+81
-12
lines changed

src/osiris.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
start_writer/1,
2626
start_replica/2,
2727
delete_cluster/1,
28-
configure_logger/1]).
28+
configure_logger/1,
29+
get_stats/1]).
2930

3031
%% holds static or rarely changing fields
3132
-record(cfg, {}).
@@ -273,3 +274,12 @@ start_replicas(Config, [Node | Nodes], ReplicaPids) ->
273274
configure_logger(Module) ->
274275
persistent_term:put('$osiris_logger', Module).
275276

277+
-spec get_stats(pid()) -> #{committed_chunk_id => integer(),
278+
first_chunk_id => integer()}.
279+
get_stats(Pid)
280+
when node(Pid) =:= node() ->
281+
{ok, #{offset_ref := ORef}} = gen:call(Pid, '$gen_call', get_reader_context),
282+
#{committed_chunk_id => atomics:get(ORef, 1),
283+
first_chunk_id => atomics:get(ORef, 2)};
284+
get_stats(Pid) when is_pid(Pid) ->
285+
erpc:call(node(Pid), ?MODULE, ?FUNCTION_NAME, [Pid]).

src/osiris_log.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ init(#{dir := Dir,
486486
_ ->
487487
0
488488
end,
489+
FirstOffsetFun(NextOffset - 1),
489490
open_new_segment(#?MODULE{cfg = Cfg,
490491
mode =
491492
#write{type = WriterType,
@@ -517,6 +518,7 @@ init(#{dir := Dir,
517518
counters:put(Cnt, ?C_FIRST_TIMESTAMP, FstTs),
518519
counters:put(Cnt, ?C_OFFSET, LastChId + LastNum - 1),
519520
counters:put(Cnt, ?C_SEGMENTS, NumSegments),
521+
FirstOffsetFun(FstChId),
520522
?DEBUG("~s:~s/~b: ~s next offset ~b first offset ~b",
521523
[?MODULE,
522524
?FUNCTION_NAME,
@@ -551,6 +553,7 @@ init(#{dir := Dir,
551553
%% here too?
552554
{ok, _} = file:position(SegFd, eof),
553555
{ok, _} = file:position(IdxFd, eof),
556+
FirstOffsetFun(-1),
554557
#?MODULE{cfg = Cfg,
555558
mode =
556559
#write{type = WriterType,
@@ -2112,7 +2115,7 @@ write_chunk(Chunk,
21122115
Timestamp,
21132116
Epoch,
21142117
NumRecords,
2115-
#?MODULE{cfg = #cfg{counter = CntRef},
2118+
#?MODULE{cfg = #cfg{counter = CntRef} = Cfg,
21162119
fd = Fd,
21172120
index_fd = IdxFd,
21182121
mode =
@@ -2144,6 +2147,7 @@ write_chunk(Chunk,
21442147
%% update counters
21452148
counters:put(CntRef, ?C_OFFSET, NextOffset - 1),
21462149
counters:add(CntRef, ?C_CHUNKS, 1),
2150+
maybe_set_first_offset(Next, Cfg),
21472151
State#?MODULE{mode =
21482152
Write#write{tail_info =
21492153
{NextOffset,
@@ -2152,6 +2156,12 @@ write_chunk(Chunk,
21522156
SegSizeChunks + 1}}}
21532157
end.
21542158

2159+
2160+
maybe_set_first_offset(0, #cfg{first_offset_fun = Fun}) ->
2161+
Fun(0);
2162+
maybe_set_first_offset(_, _Cfg) ->
2163+
ok.
2164+
21552165
max_segment_size_reached(
21562166
#?MODULE{mode = #write{segment_size = {CurrentSizeBytes,
21572167
CurrentSizeChunks}},

src/osiris_writer.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,13 @@ handle_continue(#{name := Name,
161161
process_flag(trap_exit, true),
162162
process_flag(message_queue_data, off_heap),
163163
ORef = atomics:new(2, [{signed, true}]),
164+
atomics:put(ORef, 2, -1),
164165
CntName = {?MODULE, ExtRef},
165166
Log = osiris_log:init(Config#{dir => Dir,
166167
first_offset_fun =>
167-
fun (Fst) ->
168-
atomics:put(ORef, 2, Fst)
169-
end,
168+
fun (Fst) ->
169+
atomics:put(ORef, 2, Fst)
170+
end,
170171
counter_spec =>
171172
{CntName, ?ADD_COUNTER_FIELDS}}),
172173
Trk = osiris_log:recover_tracking(Log),

test/osiris_SUITE.erl

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,9 @@ cluster_restart(Config) ->
621621
epoch => 1,
622622
replica_nodes => Replicas,
623623
leader_node => LeaderNode},
624-
{ok, #{leader_pid := Leader} = Conf} = osiris:start_cluster(Conf0),
624+
{ok, #{leader_pid := Leader,
625+
replica_pids := [Replica1, Replica2]} = Conf} =
626+
osiris:start_cluster(Conf0),
625627
WriterId = <<"wid1">>,
626628
ok = osiris:write(Leader, WriterId, 42, <<"before-restart">>),
627629
receive
@@ -632,12 +634,36 @@ cluster_restart(Config) ->
632634
exit(osiris_written_timeout)
633635
end,
634636

637+
timer:sleep(1),
638+
RangesBeforeRecovery =
639+
[begin
640+
#{committed_chunk_id := E,
641+
first_chunk_id := S} = osiris:get_stats(P),
642+
{S, E}
643+
end || P <- [Leader, Replica1, Replica2]],
644+
645+
?assertEqual([{0,0}, {0,0}, {0,0}], RangesBeforeRecovery),
646+
635647
osiris:stop_cluster(Conf),
636-
{ok, #{leader_pid := Leader1}} =
648+
{ok, #{leader_pid := Leader1,
649+
replica_pids := [ReplicaPid1, ReplicaPid2]}} =
637650
osiris:start_cluster(Conf0#{epoch => 2}),
638651
%% give leader some time to discover the committed offset
639652
timer:sleep(1000),
640-
ok = validate_log(Leader1, [{0, <<"before-restart">>}]),
653+
654+
%% get all log ranges after recovery for each member and validate they are all
655+
%% the same
656+
RangesAfterRecovery =
657+
[begin
658+
#{committed_chunk_id := E,
659+
first_chunk_id := S} = osiris:get_stats(P),
660+
{S, E}
661+
end || P <- [Leader1, ReplicaPid1, ReplicaPid2]],
662+
?assertEqual([{0,0}, {0,0}, {0,0}], RangesAfterRecovery),
663+
ct:pal("~p ~p", [RangesBeforeRecovery, RangesAfterRecovery]),
664+
ok = validate_log_offset_reader(Leader1, [{0, <<"before-restart">>}]),
665+
ok = validate_log_offset_reader(ReplicaPid1, [{0, <<"before-restart">>}]),
666+
ok = validate_log_offset_reader(ReplicaPid2, [{0, <<"before-restart">>}]),
641667

642668
ok = osiris:write(Leader1, WriterId, 43, <<"after-restart">>),
643669
receive
@@ -648,9 +674,11 @@ cluster_restart(Config) ->
648674
exit(osiris_written_timeout)
649675
end,
650676

651-
ok =
652-
validate_log(Leader1,
653-
[{0, <<"before-restart">>}, {1, <<"after-restart">>}]),
677+
ExpectedLog = [{0, <<"before-restart">>}, {1, <<"after-restart">>}],
678+
679+
ok = validate_log_offset_reader(Leader1, ExpectedLog),
680+
ok = validate_log_offset_reader(ReplicaPid1, ExpectedLog),
681+
ok = validate_log_offset_reader(ReplicaPid2, ExpectedLog),
654682
[stop_peer(Ref) || {Ref, _} <- PeerStates],
655683
ok.
656684

@@ -1835,7 +1863,8 @@ search_paths() ->
18351863
validate_log(Leader, Exp) when is_pid(Leader) ->
18361864
case node(Leader) == node() of
18371865
true ->
1838-
{ok, Log0} = osiris_writer:init_data_reader(Leader, {0, empty}, #{counter_spec => {'test', []}}),
1866+
{ok, Log0} = osiris_writer:init_data_reader(Leader, {0, empty},
1867+
#{counter_spec => {'test', []}}),
18391868
validate_log(Log0, Exp);
18401869
false ->
18411870
ok = erpc:call(node(Leader), ?MODULE, ?FUNCTION_NAME, [Leader, Exp])
@@ -1851,6 +1880,25 @@ validate_log(Log0, Expected) ->
18511880
validate_log(Log, Expected -- Entries)
18521881
end.
18531882

1883+
validate_log_offset_reader(Leader, Exp) when is_pid(Leader) ->
1884+
case node(Leader) == node() of
1885+
true ->
1886+
{ok, Log0} = osiris:init_reader(Leader, first, {'test', []}),
1887+
validate_log_offset_reader(Log0, Exp);
1888+
false ->
1889+
ok = erpc:call(node(Leader), ?MODULE, ?FUNCTION_NAME, [Leader, Exp])
1890+
end;
1891+
validate_log_offset_reader(Log, []) ->
1892+
ok = osiris_log:close(Log),
1893+
ok;
1894+
validate_log_offset_reader(Log0, Expected) ->
1895+
case osiris_log:read_chunk_parsed(Log0) of
1896+
{end_of_stream, _} ->
1897+
ct:fail("validate log failed, rem: ~p", [Expected]);
1898+
{Entries, Log} ->
1899+
validate_log_offset_reader(Log, Expected -- Entries)
1900+
end.
1901+
18541902
print_counters() ->
18551903
[begin
18561904
ct:pal("~w counters ~p",

0 commit comments

Comments
 (0)