From 58690983676f0f1c01877baad08a57e4bbac5d4c Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Sun, 29 Dec 2024 00:59:28 +0000 Subject: [PATCH] reject write at leader if conflict This should prevent spurious intra-cluster conflicts most of the time. It is not true consistency, however. spurious conflicts are still possible whenever the nodes in the cluster disagree on the current live set of other nodes. --- src/fabric/src/fabric_doc_update.erl | 186 ++++++++++++++++++++------- 1 file changed, 142 insertions(+), 44 deletions(-) diff --git a/src/fabric/src/fabric_doc_update.erl b/src/fabric/src/fabric_doc_update.erl index 1f5755de09e..a9a7269d82e 100644 --- a/src/fabric/src/fabric_doc_update.erl +++ b/src/fabric/src/fabric_doc_update.erl @@ -22,7 +22,10 @@ doc_count, w, grouped_docs, - reply + reply, + update_options, + leaders = [], + started = [] }). go(_, [], _) -> @@ -33,10 +36,8 @@ go(DbName, AllDocs0, Opts) -> validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), Options = lists:delete(all_or_nothing, Opts), GroupedDocs = lists:map( - fun({#shard{name = Name, node = Node} = Shard, Docs}) -> - Docs1 = untag_docs(Docs), - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}), - {Shard#shard{ref = Ref}, Docs} + fun({#shard{} = Shard, Docs}) -> + {Shard#shard{ref = make_ref()}, Docs} end, group_docs_by_shard(DbName, AllDocs) ), @@ -44,6 +45,7 @@ go(DbName, AllDocs0, Opts) -> RexiMon = fabric_util:create_monitors(Workers), W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), Acc0 = #acc{ + update_options = Options, waiting_count = length(Workers), doc_count = length(AllDocs), w = list_to_integer(W), @@ -51,7 +53,8 @@ go(DbName, AllDocs0, Opts) -> reply = dict:new() }, Timeout = fabric_util:request_timeout(), - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of + Acc1 = start_leaders(Acc0), + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of {ok, {Health, Results}} when Health =:= ok; Health =:= accepted; Health =:= error -> @@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) -> +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, Worker, #acc{} = Acc0) -> #acc{grouped_docs = GroupedDocs} = Acc0, NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef], - skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) -> #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) -> % treat it like rexi_EXIT, the hope at least one copy will return successfully #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message(internal_server_error, Worker, #acc{} = Acc0) -> % happens when we fail to load validation functions in an RPC worker #acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0, NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs), - skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}); + Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs}, + Acc2 = start_followers(Worker, Acc1), + skip_message(Acc2); handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) -> {ok, Acc0}; handle_message({ok, Replies}, Worker, #acc{} = Acc0) -> #acc{ - waiting_count = WaitingCount, doc_count = DocCount, w = W, grouped_docs = GroupedDocs, reply = DocReplyDict0 } = Acc0, - {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {Health, W, Reply} = dict:fold( - fun force_reply/3, - {ok, W, []}, - DocReplyDict - ), - {stop, {Health, Reply}}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of - continue -> - {ok, Acc0#acc{ - waiting_count = WaitingCount - 1, - grouped_docs = NewGrpDocs, - reply = DocReplyDict - }}; - {stop, W, FinalReplies} -> - {stop, {ok, FinalReplies}} - end; - _ -> - {ok, Acc0#acc{ - waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict - }} + {value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs), + IsLeader = lists:member(Worker#shard.ref, Acc0#acc.leaders), + DocReplyDict = append_update_replies(Docs, Replies, W, IsLeader, DocReplyDict0), + Acc1 = Acc0#acc{grouped_docs = NewGrpDocs0, reply = DocReplyDict}, + Acc2 = remove_conflicts(Docs, Replies, Acc1), + NewGrpDocs = Acc2#acc.grouped_docs, + case skip_message(Acc2) of + {stop, Msg} -> + {stop, Msg}; + {ok, Acc3} -> + Acc4 = start_followers(Worker, Acc3), + case {Acc4#acc.waiting_count, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {Health, W, Reply} = dict:fold( + fun force_reply/3, + {ok, W, []}, + DocReplyDict + ), + {stop, {Health, Reply}}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of + continue -> + {ok, Acc4#acc{ + waiting_count = Acc4#acc.waiting_count - 1, + grouped_docs = NewGrpDocs + }}; + {stop, W, FinalReplies} -> + {stop, {ok, FinalReplies}} + end; + _ -> + {ok, Acc4#acc{ + waiting_count = Acc4#acc.waiting_count - 1, + grouped_docs = NewGrpDocs + }} + end end; handle_message({missing_stub, Stub}, _, _) -> throw({missing_stub, Stub}); @@ -318,13 +338,91 @@ group_docs_by_shard(DbName, Docs) -> ) ). -append_update_replies([], [], DocReplyDict) -> +%% use 'lowest' node that hosts this shard range as leader +is_leader(Worker, Workers) -> + Worker#shard.node == + lists:min([W#shard.node || W <- Workers, W#shard.range == Worker#shard.range]). + +start_leaders(#acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs} = Acc0, + {Workers, _} = lists:unzip(GroupedDocs), + LeaderRefs = lists:foldl( + fun({Worker, Docs}, RefAcc) -> + case is_leader(Worker, Workers) of + true -> + start_worker(Worker, Docs, Acc0), + [Worker#shard.ref | RefAcc]; + false -> + RefAcc + end + end, + [], + GroupedDocs + ), + Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}. + +start_followers(#shard{} = Leader, #acc{} = Acc0) -> + Followers = [ + {Worker, Docs} + || {Worker, Docs} <- Acc0#acc.grouped_docs, + Worker#shard.range == Leader#shard.range, + not lists:member(Worker#shard.ref, Acc0#acc.started) + ], + lists:foreach( + fun({Worker, Docs}) -> + start_worker(Worker, Docs, Acc0) + end, + Followers + ), + Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers], + Acc0#acc{started = lists:append([Started, Acc0#acc.started])}. + +start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(Ref) -> + #shard{name = Name, node = Node} = Worker, + #acc{update_options = UpdateOptions} = Acc0, + rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}), + ok; +start_worker(#shard{ref = undefined}, _Docs, #acc{}) -> + % for unit tests below. + ok. + +append_update_replies([], [], _W, _IsLeader, DocReplyDict) -> DocReplyDict; -append_update_replies([Doc | Rest], [], Dict0) -> +append_update_replies([Doc | Rest], [], W, IsLeader, Dict0) -> % icky, if replicated_changes only errors show up in result - append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) -> - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + append_update_replies(Rest, [], W, IsLeader, dict:append(Doc, noreply, Dict0)); +append_update_replies([Doc | Rest1], [conflict | Rest2], W, true, Dict0) -> + %% fake conflict replies from followers as we won't ask them + append_update_replies( + Rest1, Rest2, W, true, dict:append_list(Doc, lists:duplicate(W, conflict), Dict0) + ); +append_update_replies([Doc | Rest1], [Reply | Rest2], W, IsLeader, Dict0) -> + append_update_replies(Rest1, Rest2, W, IsLeader, dict:append(Doc, Reply, Dict0)). + +%% leader found a conflict, remove that doc from the other (follower) workers, +%% removing the worker entirely if no docs remain. +remove_conflicts([], [], #acc{} = Acc0) -> + Acc0; +remove_conflicts([Doc | DocRest], [conflict | ReplyRest], #acc{} = Acc0) -> + #acc{grouped_docs = GroupedDocs0} = Acc0, + GroupedDocs1 = lists:foldl( + fun({Worker, Docs}, FoldAcc) -> + case lists:delete(Doc, Docs) of + [] -> + FoldAcc#acc{waiting_count = FoldAcc#acc.waiting_count - 1}; + Rest -> + [{Worker, Rest} | FoldAcc] + end + end, + [], + GroupedDocs0 + ), + Acc1 = Acc0#acc{grouped_docs = GroupedDocs1}, + remove_conflicts(DocRest, ReplyRest, Acc1); +remove_conflicts([_Doc | DocRest], [_Reply | ReplyRest], #acc{} = Acc0) -> + remove_conflicts(DocRest, ReplyRest, Acc0); +remove_conflicts([_Doc | DocRest], [], #acc{} = Acc0) -> + remove_conflicts(DocRest, [], Acc0). skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) -> {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),