diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 824d5c1b5ec4..9ddc9709a202 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -2919,83 +2919,75 @@ expire_msgs(RaCmdTs, Result, State, Effects) -> %% because the latter can be much slower than the former. case msg_is_expired(RaCmdTs, peek_next_msg(State)) of true -> - expire(RaCmdTs, State, Effects); + expire_batch(RaCmdTs, State, Effects); false -> {Result, State, Effects} end. -expire_shallow(Ts, #?STATE{cfg = #cfg{dead_letter_handler = DLH}, - returns = Returns0, - messages = Messages0, - delayed = Delayed0, - dlx = DlxState0, - reclaimable_bytes = ReclaimableBytes, - messages_total = Tot, - msg_bytes_enqueue = MsgBytesEnqueue} = State0) -> - +expire_shallow(Ts, #?STATE{returns = Returns0, + delayed = Delayed0} = State0) -> %% Promote ready delayed messages to returns queue {ReadyMsgs, Delayed} = take_ready_delayed(Ts, Delayed0), - Returns1 = lists:foldl(fun (Msg, Acc) -> lqueue:in(Msg, Acc) end, - Returns0, ReadyMsgs), - - {Expired0, Returns} = case lqueue:peek(Returns1) of - empty -> - {[], Returns1}; - {value, Returned} -> - case msg_is_expired(Ts, Returned) of - true -> - {[Returned], lqueue:drop(Returns1)}; - false -> - {[], Returns1} - end - end, + Returns = lists:foldl(fun (Msg, Acc) -> lqueue:in(Msg, Acc) end, + Returns0, ReadyMsgs), + State = State0#?STATE{returns = Returns, delayed = Delayed}, + {_, State1, DlxEffects} = expire_batch(Ts, State, []), + {State1, DlxEffects}. + +%% Batch-collect all expired messages from returns and messages queues, +%% then process them in a single discard_or_dead_letter call. +expire_batch(Ts, #?STATE{cfg = #cfg{dead_letter_handler = DLH}, + returns = Returns0, + messages = Messages0, + dlx = DlxState0, + reclaimable_bytes = ReclaimableBytes, + messages_total = Tot, + msg_bytes_enqueue = MsgBytesEnqueue} = State0, + Effects) -> + {ExpiredReturns, Returns} = take_expired_returns(Ts, Returns0), - {Expired, Messages} = rabbit_fifo_pq:take_while( - fun (Msg) -> msg_is_expired(Ts, Msg) end, - Messages0), + {ExpiredMsgs, Messages} = rabbit_fifo_pq:take_while( + fun (Msg) -> msg_is_expired(Ts, Msg) end, + Messages0), - ExpMsgs = Expired0 ++ Expired, + ExpMsgs = ExpiredReturns ++ ExpiredMsgs, {DlxState, RetainedBytes, DlxEffects} = discard_or_dead_letter(ExpMsgs, expired, DLH, DlxState0), - NumExpired = length(ExpMsgs), - - %% calculate total sizes - Size = lists:foldl(fun (Msg, Acc) -> - Header = get_msg_header(Msg), - Acc + get_header(size, Header) - end, 0, ExpMsgs), + {NumExpired, Size} = count_and_size(ExpMsgs), DiscardedSize = Size + (NumExpired * ?ENQ_OVERHEAD_B) - RetainedBytes, State = State0#?STATE{dlx = DlxState, returns = Returns, messages = Messages, - delayed = Delayed, messages_total = Tot - NumExpired, reclaimable_bytes = ReclaimableBytes + DiscardedSize, msg_bytes_enqueue = MsgBytesEnqueue - Size}, - {State, DlxEffects}. - -expire(RaCmdTs, State0, Effects) -> - {Msg, - #?STATE{cfg = #cfg{dead_letter_handler = DLH}, - dlx = DlxState0, - messages_total = Tot, - reclaimable_bytes = ReclaimableBytes, - msg_bytes_enqueue = MsgBytesEnqueue - } = State1} = - take_next_msg(State0), - {DlxState, RetainedBytes, DlxEffects} = - discard_or_dead_letter([Msg], expired, DLH, DlxState0), - Header = get_msg_header(Msg), - Size = get_header(size, Header), - DiscardedSize = Size + ?ENQ_OVERHEAD_B - RetainedBytes, - State = State1#?STATE{dlx = DlxState, - messages_total = Tot - 1, - reclaimable_bytes = ReclaimableBytes + DiscardedSize, - msg_bytes_enqueue = MsgBytesEnqueue - Size}, - expire_msgs(RaCmdTs, true, State, Effects ++ DlxEffects). + {true, State, Effects ++ DlxEffects}. + +take_expired_returns(Ts, Returns) -> + take_expired_returns(Ts, Returns, []). + +take_expired_returns(Ts, Returns0, Acc) -> + case lqueue:peek(Returns0) of + {value, Msg} -> + case msg_is_expired(Ts, Msg) of + true -> + take_expired_returns(Ts, lqueue:drop(Returns0), + [Msg | Acc]); + false -> + {lists:reverse(Acc), Returns0} + end; + empty -> + {lists:reverse(Acc), Returns0} + end. + +count_and_size(Msgs) -> + lists:foldl(fun (Msg, {N, Acc}) -> + Header = get_msg_header(Msg), + {N + 1, Acc + get_header(size, Header)} + end, {0, 0}, Msgs). timer_effect(#?STATE{messages_total = 0, delayed = #delayed{next = undefined}}, Effects) ->