diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 6c96eae7404..2914a5b7bae 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -281,7 +281,9 @@ apply_(#{reply_mode := {notify, _Corr, EnqPid}} = Meta, apply_(Meta, #?ENQ_V2{seq = Seq, msg = RawMsg, size = Size}, State00) -> %% untracked apply_enqueue(Meta, undefined, Seq, RawMsg, Size, State00); -apply_(_Meta, #register_enqueuer{pid = Pid}, +apply_(#{index := RaftIdx, + system_time := Ts}, + #register_enqueuer{pid = Pid}, #?STATE{enqueuers = Enqueuers0, cfg = #cfg{overflow_strategy = Overflow}} = State0) -> State = case maps:is_key(Pid, Enqueuers0) of @@ -289,7 +291,9 @@ apply_(_Meta, #register_enqueuer{pid = Pid}, %% if the enqueuer exits just echo the overflow state State0; false -> - State0#?STATE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}} + NewEnq = #enqueuer{created = {RaftIdx, Ts}, + enqueued_bytes = 0}, + State0#?STATE{enqueuers = Enqueuers0#{Pid => NewEnq}} end, Res = case is_over_limit(State) of true when Overflow == reject_publish -> @@ -993,7 +997,8 @@ v7_to_v8_consumer(Con, Timeout) -> delivery_count = element(#consumer.delivery_count, Con) }. -convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) -> +convert_v7_to_v8(#{index := RaftIdx, + system_time := Ts}, StateV7) -> %% the structure is intact for now Cons0 = element(#?STATE.consumers, StateV7), Waiting0 = element(#?STATE.waiting_consumers, StateV7), @@ -1017,11 +1022,18 @@ convert_v7_to_v8(#{system_time := Ts} = _Meta, StateV7) -> end, Pq0, No), Dlx0 = element(#?STATE.dlx, StateV7), Dlx = Dlx0#?DLX{unused = ?NIL}, + Enqs0 = element(#?STATE.enqueuers, StateV7), + Enqs = maps:map( + fun (_Pid, E) -> + E#enqueuer{created = {RaftIdx, Ts}, + enqueued_bytes = 0} + end, Enqs0), StateV8 = StateV7, StateV8#?STATE{cfg = Cfg#cfg{consumer_disconnected_timeout = 60_000, delayed_retry = disabled}, reclaimable_bytes = 0, messages = Pq, + enqueuers = Enqs, consumers = Cons, waiting_consumers = Waiting, next_consumer_timeout = Timeout, @@ -1214,6 +1226,13 @@ overview(#?STATE{consumers = Cons, {?TUPLE(LastTs, _), _} = gb_trees:largest(Tree), {gb_trees:size(Tree), NextTs, LastTs} end, + {EnqsByNode, EnqBytesByNode} = + maps:fold(fun (Pid, #enqueuer{enqueued_bytes = Bytes}, {CountAcc, BytesAcc}) -> + Node = node(Pid), + {CountAcc#{Node => maps:get(Node, CountAcc, 0) + 1}, + BytesAcc#{Node => maps:get(Node, BytesAcc, 0) + Bytes}} + end, {#{}, #{}}, Enqs), + Overview = #{type => ?STATE, config => Conf, num_consumers => map_size(Cons), @@ -1231,7 +1250,9 @@ overview(#?STATE{consumers = Cons, reclaimable_bytes_count => ReclaimableBytes, smallest_raft_index => smallest_raft_index(State), num_active_priorities => NumActivePriorities, - messages_by_priority => Detail + messages_by_priority => Detail, + enqueuers_by_node => EnqsByNode, + enqueue_bytes_by_node => EnqBytesByNode }, DlxOverview = dlx_overview(DlxState), maps:merge(maps:merge(Overview, DlxOverview), SacOverview). @@ -1279,7 +1300,11 @@ which_module(8) -> ?MODULE. gc = #aux_gc{} :: #aux_gc{}, tick_pid :: undefined | pid(), cache = #{} :: map(), - last_checkpoint :: tuple() | #snapshot{} + last_checkpoint :: tuple() | #snapshot{}, + enq_node_rates = #{} :: #{node() => + {non_neg_integer(), ra_li:state()}}, + leader_ticks = 0 :: non_neg_integer(), + was_leader = false :: boolean() }). init_aux(Name) when is_atom(Name) -> @@ -1372,7 +1397,8 @@ handle_aux(_RaftState, cast, eval, #?STATE{reclaimable_bytes = ReclaimableBytes} = ra_aux:machine_state(RaAux), {Check, Effects} = do_snapshot(EffMacVer, Ts, Check0, RaAux, ReclaimableBytes, false), - {no_reply, Aux0#?AUX{last_checkpoint = Check}, RaAux, Effects}; + {no_reply, Aux0#?AUX{last_checkpoint = Check, + was_leader = false}, RaAux, Effects}; handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, consumer_key = Key} = Ret, Corr, Pid}, Aux0, RaAux0) -> @@ -1408,7 +1434,27 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds, {no_reply, Aux0, RaAux0, [{append, Ret, {notify, Corr, Pid}}]} end; handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]}, - #?AUX{tick_pid = Pid} = Aux, RaAux) -> + #?AUX{tick_pid = Pid, + was_leader = WasLeader, + leader_ticks = Ticks0, + enq_node_rates = EnqNodeRates0} = Aux, RaAux) -> + {EnqNodeRates1, Ticks} = + case WasLeader of + true -> + {EnqNodeRates0, Ticks0 + 1}; + false -> + {#{}, 1} + end, + EnqBytesByNode = maps:get(enqueue_bytes_by_node, Overview0, #{}), + EnqNodeRates = + maps:fold( + fun (Node, Bytes, Acc) -> + {Prev, Li0} = maps:get(Node, EnqNodeRates1, + {Bytes, ra_li:new(900_000)}), + Delta = max(0, Bytes - Prev), + Li = ra_li:update(Delta, Li0), + Acc#{Node => {Bytes, Li}} + end, #{}, EnqBytesByNode), Overview = Overview0#{members_info => ra_aux:members_info(RaAux)}, NewPid = case process_is_alive(Pid) of @@ -1422,7 +1468,10 @@ handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]}, end, %% TODO: check consumer timeouts - {no_reply, Aux#?AUX{tick_pid = NewPid}, RaAux, []}; + {no_reply, Aux#?AUX{tick_pid = NewPid, + was_leader = true, + leader_ticks = Ticks, + enq_node_rates = EnqNodeRates}, RaAux, []}; handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) -> #?STATE{cfg = #cfg{}, consumers = Consumers} = ra_aux:machine_state(RaAux0), @@ -1492,6 +1541,16 @@ handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0, Err -> {reply, Err, Aux0, RaAux0} end; +handle_aux(_RaState, {call, _From}, enqueue_node_rates, + #?AUX{leader_ticks = Ticks} = Aux, RaAux) + when Ticks < 2 -> + {reply, {error, not_ready}, Aux, RaAux}; +handle_aux(_RaState, {call, _From}, enqueue_node_rates, + #?AUX{enq_node_rates = EnqNodeRates} = Aux, RaAux) -> + Rates = maps:map(fun (_Node, {_LastBytes, Li}) -> + ra_li:rate(Li) + end, EnqNodeRates), + {reply, {ok, Rates}, Aux, RaAux}; handle_aux(_, _, garbage_collection, Aux, RaAux) -> {no_reply, force_eval_gc(RaAux, Aux), RaAux}; handle_aux(_RaState, _, force_checkpoint, @@ -2090,7 +2149,8 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Size = MetaSize + BodySize, case maps:get(From, Enqueuers0, undefined) of undefined -> - State1 = State0#?STATE{enqueuers = Enqueuers0#{From => #enqueuer{}}}, + NewEnq = #enqueuer{created = {RaftIdx, Ts}}, + State1 = State0#?STATE{enqueuers = Enqueuers0#{From => NewEnq}}, {Res, State, Effects} = maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, MsgSize, Effects0, State1), @@ -2103,7 +2163,8 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Header0 = maybe_set_msg_ttl(RawMsg, Ts, Size, State0), Header = maybe_set_msg_delivery_count(RawMsg, Header0), Msg = make_msg(RaftIdx, Header), - Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1}, + Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1, + enqueued_bytes = Enq0#enqueuer.enqueued_bytes + Size}, MsgCache = case can_immediately_deliver(State0) of true -> {RaftIdx, RawMsg}; diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index d466ad717ab..3357590f415 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -207,13 +207,13 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), - unused = ?NIL, + created :: {ra:index(), milliseconds()}, status = up :: up | suspected_down, %% it is useful to have a record of when this was blocked %% so that we can retry sending the block effect if %% the publisher did not receive the initial one blocked :: option(ra:index()), - unused_1 = ?NIL, + enqueued_bytes = 0 :: non_neg_integer(), unused_2 = ?NIL }).