Skip to content

Commit cdade7d

Browse files
author
Peter Tihanyi
committed
Fix repair scenarios where no record on term server
1 parent 3fe5373 commit cdade7d

File tree

3 files changed

+44
-11
lines changed

3 files changed

+44
-11
lines changed

src/pes_registrar.erl

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ handle_event(state_timeout, heartbeat, monitoring, #state{replies = Replies,
200200
%?TRACE("cannot_renew timeout ~p", [Answer], State#state.id),
201201
{stop, {cannot_renew_registration, {timeout, Answer}}, State}
202202
end;
203+
handle_event(info, #promise_reply{result = {nack, {Server, not_found}}} = Reply, monitoring,
204+
#state{id = Id, term = Term, pid = Pid, last_timestamp = Now} = State) ->
205+
% we are in monitoring phase so we can do repair when term not found on server
206+
NewResult = case pes_promise:await(repair(Server, Id, not_found, Term, {Pid, self(), Now})) of
207+
ack -> ack;
208+
_ -> nack
209+
end,
210+
handle_event(info, Reply#promise_reply{result = NewResult}, monitoring, State);
203211
handle_event(info, #promise_reply{result = {nack, {Server, OldTerm}}} = Reply, monitoring,
204212
#state{id = Id, term = Term, pid = Pid, last_timestamp = Now} = State) ->
205213
% we are in monitoring phase so we can do repair because we surely have the majority,
@@ -244,6 +252,10 @@ handle_event(_EventType, _EventContext, handoff, _State) ->
244252
% queue all the stuff until handoff is not ready
245253
{keep_state_and_data, [postpone]};
246254

255+
% transfer is only allowed in monitoring or registered state
256+
handle_event({call, From}, {update, _}, StateName, _State)
257+
when StateName =/= monitoring andalso StateName =/= registered ->
258+
{keep_state_and_data, [{reply, From, {error, not_in_proper_state}}]};
247259
% we need to update the guarded pid
248260
% @TODO unfortunately if the registration not succeed that the old reg could not be restored
249261
handle_event({call, From}, {update, NewPid}, _StateName, State) when node(NewPid) =:= node() ->
@@ -252,8 +264,7 @@ handle_event({call, From}, {update, NewPid}, _StateName, State) when node(NewPid
252264
% If it goes down and the pid is not matched in the state basically we just ignores it.
253265
erlang:monitor(process, NewPid),
254266
{next_state, commit, State#state{pid = NewPid, caller = From}};
255-
handle_event({call, From}, {update, NewPid}, StateName,
256-
#state{id = Id, term = Term} = State) ->
267+
handle_event({call, From}, {update, NewPid}, StateName, #state{id = Id} = State) ->
257268
% things gets complicated we need too transfer the guard process to the target node
258269
Now = pes_time:now(),
259270
Nodes = pes_cluster:nodes(),
@@ -263,9 +274,8 @@ handle_event({call, From}, {update, NewPid}, StateName,
263274
),
264275
TargetNode = node(NewPid),
265276
{ok, NewGuard} = rpc:call(TargetNode, gen_statem, start, [?MODULE, {handoff, NewState}, []]),
266-
CurrentTerm = encapsulate_term(Term),
267277
NewValue = {NewPid, NewGuard, Now},
268-
Promises = [repair(Server, Id, CurrentTerm, NewState#state.term, NewValue) || Server <- Nodes],
278+
Promises = [force_repair(Server, Id, NewState#state.term, NewValue) || Server <- Nodes],
269279
lists:foreach(fun(Promise) -> pes_promise:await(Promise, ?DEFAULT_TIMEOUT) end, Promises),
270280
ok = gen_statem:call(NewGuard, {handoff_ready, StateName}),
271281
gen_statem:reply(From, registered),
@@ -437,6 +447,9 @@ commit(Node, Id, Term, Value) ->
437447
repair(Node, Id, OldTerm, NewTerm, Value) ->
438448
pes_server_sup:repair(Node, Id, OldTerm, encapsulate_term(NewTerm), Value).
439449

450+
force_repair(Node, Id, NewTerm, Value) ->
451+
pes_server_sup:force_repair(Node, Id, encapsulate_term(NewTerm), Value).
452+
440453
encapsulate_term(Term) ->
441454
{Term, self()}.
442455

src/pes_server.erl

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
-compile({no_auto_import, [register/2, is_process_alive/1, send/2]}).
2222

2323
%% API
24-
-export([prepare/3, commit/4, read/2, repair/5]).
24+
-export([prepare/3, commit/4, read/2, repair/5, force_repair/4]).
2525

2626
-export([start_link/1, init/1, loop/1]).
2727
-export([system_continue/3, system_terminate/4, system_get_state/1, system_code_change/4]).
@@ -64,11 +64,16 @@ read({Server, Node}, Id) when Node =:= node() ->
6464
read(Node, Id) ->
6565
async(Node, {read, Id}).
6666

67-
-spec repair(target(), id(), consensus_term_proposal(), consensus_term_proposal(), value()) ->
67+
-spec repair(target(), id(), consensus_term_proposal() | not_found, consensus_term_proposal(), value()) ->
6868
pes_promise:promise().
6969
repair(Node, Id, CurrentTerm, NewTerm, Value) ->
7070
async(Node, {repair, Id, CurrentTerm, NewTerm, Value}).
7171

72+
-spec force_repair(target(), id(), consensus_term_proposal(), value()) ->
73+
pes_promise:promise().
74+
force_repair(Node, Id, NewTerm, Value) ->
75+
async(Node, {repair, Id, NewTerm, Value}).
76+
7277
-spec async(target(), term()) -> pes_promise:promise().
7378
async({Server, Node}, Command) ->
7479
case pes_cluster:is_node_alive(Node) of
@@ -143,9 +148,17 @@ handle_command({commit, Id, {Term, Server}, Value}, #state{data_storage_ref = DS
143148
% Commit can reply with nack and the actual term, and server data.
144149
% To ensure in the mean time no other registration attempt were made,
145150
% we need to send back those values.
146-
handle_command({repair, Id, _Term, {NewTermId, _} = NewTerm, Value},
147-
#state{data_storage_ref = DSR,
148-
term_storage_ref = TSR}) ->
151+
handle_command({repair, Id, Term, NewTerm, Value}, #state{term_storage_ref = TSR} = State) ->
152+
case ets:lookup(TSR, Id) of
153+
[] -> % if the db is empty than we are good to rewrite stuff
154+
handle_command({repair, Id, NewTerm, Value}, State);
155+
[{_Id, StoredTerm}] when StoredTerm =:= Term ->
156+
handle_command({repair, Id, NewTerm, Value}, State);
157+
_ ->
158+
nack
159+
end;
160+
handle_command({repair, Id, {NewTermId, _} = NewTerm, Value},
161+
#state{data_storage_ref = DSR, term_storage_ref = TSR}) ->
149162
pes_stat:count([server, repair]),
150163
true = ets:insert(TSR, {Id, NewTerm}),
151164
true = ets:insert(DSR, {Id, {NewTermId, Value}, pes_time:now()}),

src/pes_server_sup.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
% @TODO different nodes can have different pes_server shard count, and it does not work!!! :/
1212

13-
-export([start_link/1, prepare/3, commit/4, read/2, repair/5]).
13+
-export([start_link/1, prepare/3, commit/4, read/2, repair/5, force_repair/4]).
1414

1515
-export([servers/0]).
1616

@@ -35,11 +35,18 @@ read(Node, Id) ->
3535
-spec repair(node(), Id, Term, NewTerm, term()) ->
3636
pes_promise:promise() when
3737
Id :: pes_server:id(),
38-
Term :: pes_server:consensus_term_proposal(),
38+
Term :: pes_server:consensus_term_proposal() | not_found,
3939
NewTerm :: {pes_server:consensus_term(), pid()}.
4040
repair(Node, Id, OldTerm, NewTerm, Value) ->
4141
pes_server:repair({hash(Id), Node}, Id, OldTerm, NewTerm, Value).
4242

43+
-spec force_repair(node(), Id, NewTerm, term()) ->
44+
pes_promise:promise() when
45+
Id :: pes_server:id(),
46+
NewTerm :: {pes_server:consensus_term(), pid()}.
47+
force_repair(Node, Id, NewTerm, Value) ->
48+
pes_server:force_repair({hash(Id), Node}, Id, NewTerm, Value).
49+
4350
-spec start_link(pos_integer()) -> {ok, pid()}.
4451
start_link(ServerCount) ->
4552
supervisor:start_link({local, ?SERVER}, ?MODULE, ServerCount).

0 commit comments

Comments
 (0)