Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,19 @@ apply_(#{reply_mode := {notify, _Corr, EnqPid}} = Meta,
apply_(Meta, #?ENQ_V2{seq = Seq, msg = RawMsg, size = Size}, State00) ->
%% untracked
apply_enqueue(Meta, undefined, Seq, RawMsg, Size, State00);
apply_(_Meta, #register_enqueuer{pid = Pid},
apply_(#{index := RaftIdx,
system_time := Ts},
#register_enqueuer{pid = Pid},
#?STATE{enqueuers = Enqueuers0,
cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
State = case maps:is_key(Pid, Enqueuers0) of
true ->
%% if the enqueuer exits just echo the overflow state
State0;
false ->
State0#?STATE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
NewEnq = #enqueuer{created = {RaftIdx, Ts},
enqueued_bytes = 0},
State0#?STATE{enqueuers = Enqueuers0#{Pid => NewEnq}}
end,
Res = case is_over_limit(State) of
true when Overflow == reject_publish ->
Expand Down Expand Up @@ -993,7 +997,8 @@ v7_to_v8_consumer(Con, Timeout) ->
delivery_count = element(#consumer.delivery_count, Con)
}.

convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) ->
convert_v7_to_v8(#{index := RaftIdx,
system_time := Ts}, StateV7) ->
%% the structure is intact for now
Cons0 = element(#?STATE.consumers, StateV7),
Waiting0 = element(#?STATE.waiting_consumers, StateV7),
Expand All @@ -1017,11 +1022,18 @@ convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) ->
end, Pq0, No),
Dlx0 = element(#?STATE.dlx, StateV7),
Dlx = Dlx0#?DLX{unused = ?NIL},
Enqs0 = element(#?STATE.enqueuers, StateV7),
Enqs = maps:map(
fun (_Pid, E) ->
E#enqueuer{created = {RaftIdx, Ts},
enqueued_bytes = 0}
end, Enqs0),
StateV8 = StateV7,
StateV8#?STATE{cfg = Cfg#cfg{consumer_disconnected_timeout = 60_000,
delayed_retry = disabled},
reclaimable_bytes = 0,
messages = Pq,
enqueuers = Enqs,
consumers = Cons,
waiting_consumers = Waiting,
next_consumer_timeout = Timeout,
Expand Down Expand Up @@ -1214,6 +1226,13 @@ overview(#?STATE{consumers = Cons,
{?TUPLE(LastTs, _), _} = gb_trees:largest(Tree),
{gb_trees:size(Tree), NextTs, LastTs}
end,
{EnqsByNode, EnqBytesByNode} =
maps:fold(fun (Pid, #enqueuer{enqueued_bytes = Bytes}, {CountAcc, BytesAcc}) ->
Node = node(Pid),
{CountAcc#{Node => maps:get(Node, CountAcc, 0) + 1},
BytesAcc#{Node => maps:get(Node, BytesAcc, 0) + Bytes}}
end, {#{}, #{}}, Enqs),

Overview = #{type => ?STATE,
config => Conf,
num_consumers => map_size(Cons),
Expand All @@ -1231,7 +1250,9 @@ overview(#?STATE{consumers = Cons,
reclaimable_bytes_count => ReclaimableBytes,
smallest_raft_index => smallest_raft_index(State),
num_active_priorities => NumActivePriorities,
messages_by_priority => Detail
messages_by_priority => Detail,
enqueuers_by_node => EnqsByNode,
enqueue_bytes_by_node => EnqBytesByNode
},
DlxOverview = dlx_overview(DlxState),
maps:merge(maps:merge(Overview, DlxOverview), SacOverview).
Expand Down Expand Up @@ -1279,7 +1300,11 @@ which_module(8) -> ?MODULE.
gc = #aux_gc{} :: #aux_gc{},
tick_pid :: undefined | pid(),
cache = #{} :: map(),
last_checkpoint :: tuple() | #snapshot{}
last_checkpoint :: tuple() | #snapshot{},
enq_node_rates = #{} :: #{node() =>
{non_neg_integer(), ra_li:state()}},
leader_ticks = 0 :: non_neg_integer(),
was_leader = false :: boolean()
}).

init_aux(Name) when is_atom(Name) ->
Expand Down Expand Up @@ -1372,7 +1397,8 @@ handle_aux(_RaftState, cast, eval,
#?STATE{reclaimable_bytes = ReclaimableBytes} = ra_aux:machine_state(RaAux),
{Check, Effects} = do_snapshot(EffMacVer, Ts, Check0, RaAux,
ReclaimableBytes, false),
{no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects};
{no_reply, Aux0#?AUX{last_checkpoint = Check,
was_leader = false}, RaAux, Effects};
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
consumer_key = Key} = Ret, Corr, Pid},
Aux0, RaAux0) ->
Expand Down Expand Up @@ -1408,7 +1434,27 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
{no_reply, Aux0, RaAux0, [{append, Ret, {notify, Corr, Pid}}]}
end;
handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]},
#?AUX{tick_pid = Pid} = Aux, RaAux) ->
#?AUX{tick_pid = Pid,
was_leader = WasLeader,
leader_ticks = Ticks0,
enq_node_rates = EnqNodeRates0} = Aux, RaAux) ->
{EnqNodeRates1, Ticks} =
case WasLeader of
true ->
{EnqNodeRates0, Ticks0 + 1};
false ->
{#{}, 1}
end,
EnqBytesByNode = maps:get(enqueue_bytes_by_node, Overview0, #{}),
EnqNodeRates =
maps:fold(
fun (Node, Bytes, Acc) ->
{Prev, Li0} = maps:get(Node, EnqNodeRates1,
{Bytes, ra_li:new(900_000)}),
Delta = max(0, Bytes - Prev),
Li = ra_li:update(Delta, Li0),
Acc#{Node => {Bytes, Li}}
end, #{}, EnqBytesByNode),
Overview = Overview0#{members_info => ra_aux:members_info(RaAux)},
NewPid =
case process_is_alive(Pid) of
Expand All @@ -1422,7 +1468,10 @@ handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]},
end,

%% TODO: check consumer timeouts
{no_reply, Aux#?AUX{tick_pid = NewPid}, RaAux, []};
{no_reply, Aux#?AUX{tick_pid = NewPid,
was_leader = true,
leader_ticks = Ticks,
enq_node_rates = EnqNodeRates}, RaAux, []};
handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
#?STATE{cfg = #cfg{},
consumers = Consumers} = ra_aux:machine_state(RaAux0),
Expand Down Expand Up @@ -1492,6 +1541,16 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
Err ->
{reply, Err, Aux0, RaAux0}
end;
handle_aux(_RaState, {call, _From}, enqueue_node_rates,
#?AUX{leader_ticks = Ticks} = Aux, RaAux)
when Ticks < 2 ->
{reply, {error, not_ready}, Aux, RaAux};
handle_aux(_RaState, {call, _From}, enqueue_node_rates,
#?AUX{enq_node_rates = EnqNodeRates} = Aux, RaAux) ->
Rates = maps:map(fun (_Node, {_LastBytes, Li}) ->
ra_li:rate(Li)
end, EnqNodeRates),
{reply, {ok, Rates}, Aux, RaAux};
handle_aux(_, _, garbage_collection, Aux, RaAux) ->
{no_reply, force_eval_gc(RaAux, Aux), RaAux};
handle_aux(_RaState, _, force_checkpoint,
Expand Down Expand Up @@ -2090,7 +2149,8 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
Size = MetaSize + BodySize,
case maps:get(From, Enqueuers0, undefined) of
undefined ->
State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
NewEnq = #enqueuer{created = {RaftIdx, Ts}},
State1 = State0#?STATE{enqueuers = Enqueuers0#{From => NewEnq}},
{Res, State, Effects} = maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo,
RawMsg, MsgSize, Effects0,
State1),
Expand All @@ -2103,7 +2163,8 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
Header0 = maybe_set_msg_ttl(RawMsg, Ts, Size, State0),
Header = maybe_set_msg_delivery_count(RawMsg, Header0),
Msg = make_msg(RaftIdx, Header),
Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1},
Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1,
enqueued_bytes = Enq0#enqueuer.enqueued_bytes + Size},
MsgCache = case can_immediately_deliver(State0) of
true ->
{RaftIdx, RawMsg};
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@

-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
unused = ?NIL,
created :: {ra:index(), milliseconds()},
status = up :: up | suspected_down,
%% it is useful to have a record of when this was blocked
%% so that we can retry sending the block effect if
%% the publisher did not receive the initial one
blocked :: option(ra:index()),
unused_1 = ?NIL,
enqueued_bytes = 0 :: non_neg_integer(),
unused_2 = ?NIL
}).

Expand Down
Loading