#314 adds new reply mode options for ra:process_command/3 which controls which member of the Ra cluster replies with gen_statem:reply/2 to a call:
local - reply from a node local to the calling process
{member, Member} - reply from the given member
which fall back to the leader if the given member is not in cluster membership. These are useful for blocking a caller until a command has been handled by the given member.
These should be added to ra:pipeline_command/4 as well to control which member of the cluster sends notifications. This would involve some refactoring of how notifications are stored within the cluster state (in ra_server_proc's #state.pending_notifys field) and the shape of the notify ra_server:effect():
|
{notify, #{pid() => [term()]}} | |
Currently, notifications are grouped by pid:
|
add_reply(_, Reply, {notify, Corr, Pid}, |
|
Effects, Notifys) -> |
|
% notify are casts and thus have to include their own pid() |
|
% reply with the supplied correlation so that the sending can do their |
|
% own bookkeeping |
|
CorrData = {Corr, Reply}, |
|
case Notifys of |
|
#{Pid := T} -> |
|
{Effects, Notifys#{Pid => [CorrData | T]}}; |
|
_ -> |
|
{Effects, Notifys#{Pid => [CorrData]}} |
|
end; |
and replied to in batches:
|
send_applied_notifications(#state{pending_notifys = PendingNots} = State, |
|
Nots0) when map_size(PendingNots) > 0 -> |
|
Nots = ra_lib:maps_merge_with(fun(_K, V1, V2) -> |
|
V1 ++ V2 |
|
end, PendingNots, Nots0), |
|
send_applied_notifications(State#state{pending_notifys = #{}}, Nots); |
|
send_applied_notifications(#state{} = State, Nots) -> |
|
Id = id(State), |
|
%% any notifications that could not be sent |
|
%% will be kept and retried |
|
RemNots = maps:filter( |
|
fun(Who, Correlations0) -> |
|
%% correlations are build up in reverse order so we need |
|
%% to reverse before sending |
|
Correlations = lists:reverse(Correlations0), |
|
ok =/= send_ra_event(Who, Correlations, Id, |
|
applied, State) |
|
end, Nots), |
|
case map_size(RemNots) of |
|
0 -> |
|
State; |
|
_ -> |
|
State#state{pending_notifys = RemNots} |
|
end. |
This will need to be refactored so that notifications are grouped by pid and reply-from.
#314 adds new reply mode options for
ra:process_command/3which controls which member of the Ra cluster replies withgen_statem:reply/2to a call:local- reply from a node local to the calling process{member, Member}- reply from the given memberwhich fall back to the leader if the given member is not in cluster membership. These are useful for blocking a caller until a command has been handled by the given member.
These should be added to
ra:pipeline_command/4as well to control which member of the cluster sends notifications. This would involve some refactoring of how notifications are stored within the cluster state (inra_server_proc's#state.pending_notifysfield) and the shape of thenotifyra_server:effect():ra/src/ra_server.erl
Line 153 in 09cc606
Currently, notifications are grouped by pid:
ra/src/ra_server.erl
Lines 2310 to 2321 in 09cc606
and replied to in batches:
ra/src/ra_server_proc.erl
Lines 1708 to 1731 in 09cc606
This will need to be refactored so that notifications are grouped by pid and reply-from.