Skip to content

Commit

Permalink
start followers on any error
Browse files Browse the repository at this point in the history
ensure we start followers if leader node is down (rexi_DOWN) or
is in maintenance mode (rexi_EXIT).
  • Loading branch information
rnewson committed Jan 6, 2025
1 parent 2f629ef commit e0807f5
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions src/fabric/src/fabric_doc_update.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,32 @@ go(DbName, AllDocs0, Opts) ->
rexi_monitor:stop(RexiMon)
end.

handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) ->
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, Worker, #acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs} = Acc0,
NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef],
skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) ->
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) ->
% treat it like rexi_EXIT, the hope at least one copy will return successfully
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message(internal_server_error, Worker, #acc{} = Acc0) ->
% happens when we fail to load validation functions in an RPC worker
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) ->
{ok, Acc0};
handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
Expand Down

0 comments on commit e0807f5

Please sign in to comment.