Skip to content

Commit

Permalink
reject write at leader if conflict
Browse files Browse the repository at this point in the history
This should prevent spurious intra-cluster conflicts most of the
time. It is not true consistency, however. spurious conflicts are
still possible whenever the nodes in the cluster disagree on the
current live set of other nodes.
  • Loading branch information
rnewson committed Jan 6, 2025
1 parent afcbadf commit 2f629ef
Showing 1 changed file with 71 additions and 34 deletions.
105 changes: 71 additions & 34 deletions src/fabric/src/fabric_doc_update.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,40 +96,47 @@ handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) ->
{ok, Acc0};
handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
#acc{
waiting_count = WaitingCount,
len_docs = DocCount,
w = W,
grouped_docs = GroupedDocs,
reply = DocReplyDict0
} = Acc0,
{value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
Acc1 = start_followers(Worker, Acc0),
DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
case {WaitingCount, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(
fun force_reply/3,
{ok, W, []},
DocReplyDict
),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
continue ->
{ok, Acc1#acc{
waiting_count = WaitingCount - 1,
grouped_docs = NewGrpDocs,
reply = DocReplyDict
}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, Acc1#acc{
waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict
}}
{value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
DocReplyDict = append_update_replies(Docs, Replies, W, DocReplyDict0),
Acc1 = Acc0#acc{grouped_docs = NewGrpDocs0, reply = DocReplyDict},
Acc2 = remove_conflicts(Docs, Replies, Acc1),
NewGrpDocs = Acc2#acc.grouped_docs,
case skip_message(Acc2) of
{stop, Msg} ->
{stop, Msg};
{ok, Acc3} ->
Acc4 = start_followers(Worker, Acc3),
case {Acc4#acc.waiting_count, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(
fun force_reply/3,
{ok, W, []},
DocReplyDict
),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
continue ->
{ok, Acc4#acc{
waiting_count = Acc4#acc.waiting_count - 1,
grouped_docs = NewGrpDocs
}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, Acc4#acc{
waiting_count = Acc4#acc.waiting_count - 1,
grouped_docs = NewGrpDocs
}}
end
end;
handle_message({missing_stub, Stub}, _, _) ->
throw({missing_stub, Stub});
Expand Down Expand Up @@ -367,13 +374,43 @@ start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(
start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
ok.

append_update_replies([], [], DocReplyDict) ->
append_update_replies([], [], _W, DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc | Rest], [], Dict0) ->
append_update_replies([Doc | Rest], [], W, Dict0) ->
% icky, if replicated_changes only errors show up in result
append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
append_update_replies(Rest, [], W, dict:append(Doc, noreply, Dict0));
append_update_replies([Doc | Rest1], [conflict | Rest2], W, Dict0) ->
%% fake conflict replies from followers as we won't ask them
append_update_replies(
Rest1, Rest2, W, dict:append_list(Doc, lists:duplicate(W, conflict), Dict0)
);
append_update_replies([Doc | Rest1], [Reply | Rest2], W, Dict0) ->
append_update_replies(Rest1, Rest2, W, dict:append(Doc, Reply, Dict0)).

%% leader found a conflict, remove that doc from the other (follower) workers,
%% removing the worker entirely if no docs remain.
remove_conflicts([], [], #acc{} = Acc0) ->
Acc0;
remove_conflicts([Doc | DocRest], [conflict | ReplyRest], #acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs0} = Acc0,
GroupedDocs1 = lists:foldl(
fun({Worker, Docs}, FoldAcc) ->
case lists:delete(Doc, Docs) of
[] ->
FoldAcc;
Rest ->
[{Worker, Rest} | FoldAcc]
end
end,
[],
GroupedDocs0
),
Acc1 = Acc0#acc{waiting_count = length(GroupedDocs1), grouped_docs = GroupedDocs1},
remove_conflicts(DocRest, ReplyRest, Acc1);
remove_conflicts([_Doc | DocRest], [_Reply | ReplyRest], #acc{} = Acc0) ->
remove_conflicts(DocRest, ReplyRest, Acc0);
remove_conflicts([_Doc | DocRest], [], Acc0) ->
remove_conflicts(DocRest, [], Acc0).

skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) ->
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
Expand Down

0 comments on commit 2f629ef

Please sign in to comment.