4141 ]).
4242
4343-define (SOFT_LIMIT , 32 ).
44- -define (TIMER_TIME , 10000 ).
4544-define (COMMAND_TIMEOUT , 30000 ).
4645-define (UNLIMITED_PREFETCH_COUNT , 2000 ). % % something large for ra
4746% % controls the timer for closing cached segments
7473 pending = #{} :: #{seq () =>
7574 {term (), rabbit_fifo :command ()}},
7675 consumers = #{} :: #{rabbit_types :ctag () => # consumer {}},
77- timer_state :: term (),
7876 cached_segments :: undefined |
7977 {undefined | reference (),
8078 LastSeenMs :: milliseconds (),
@@ -647,8 +645,8 @@ handle_ra_event(QName, Leader, {applied, Seqs},
647645 % channel is interacting with)
648646 % but the fact the queue has just applied suggests
649647 % it's ok to cancel here anyway
650- State3 = cancel_timer ( State2 # state {slow = false ,
651- unsent_commands = #{}}) ,
648+ State3 = State2 # state {slow = false ,
649+ unsent_commands = #{}},
652650 % build up a list of commands to issue
653651 Commands = maps :fold (
654652 fun (Cid , {Settled , Returns , Discards }, Acc ) ->
@@ -699,26 +697,11 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
699697 [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
700698 Leader , maps :size (Pending )]),
701699 State = resend_all_pending (State0 # state {leader = Leader }),
702- {ok , cancel_timer ( State ) , []};
700+ {ok , State , []};
703701handle_ra_event (_QName , _From ,
704702 {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
705703 % TODO: how should these be handled? re-sent on timer or try random
706704 {ok , State0 , []};
707- handle_ra_event (QName , _ , timeout , # state {cfg = # cfg {servers = Servers },
708- leader = OldLeader ,
709- pending = Pending } = State0 ) ->
710- case find_leader (Servers ) of
711- undefined ->
712- % % still no leader, set the timer again
713- {ok , set_timer (QName , State0 ), []};
714- Leader ->
715- ? LOG_DEBUG (" ~ts : ~s Pending applied Timeout ~w to ~w , "
716- " resending ~b pending commands" ,
717- [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
718- Leader , maps :size (Pending )]),
719- State = resend_all_pending (State0 # state {leader = Leader }),
720- {ok , State , []}
721- end ;
722705handle_ra_event (QName , Leader , close_cached_segments ,
723706 # state {cached_segments = CachedSegments } = State ) ->
724707 {ok ,
@@ -1021,24 +1004,6 @@ add_command(Cid, return, MsgIds, Acc) ->
10211004add_command (Cid , discard , MsgIds , Acc ) ->
10221005 [rabbit_fifo :make_discard (Cid , MsgIds ) | Acc ].
10231006
1024- set_timer (QName , # state {leader = Leader0 ,
1025- cfg = # cfg {servers = [Server | _ ]}} = State ) ->
1026- Leader = case Leader0 of
1027- undefined -> Server ;
1028- _ ->
1029- Leader0
1030- end ,
1031- Ref = erlang :send_after (? TIMER_TIME , self (),
1032- {'$gen_cast' ,
1033- {queue_event , QName , {Leader , timeout }}}),
1034- State # state {timer_state = Ref }.
1035-
1036- cancel_timer (# state {timer_state = undefined } = State ) ->
1037- State ;
1038- cancel_timer (# state {timer_state = Ref } = State ) ->
1039- _ = erlang :cancel_timer (Ref , [{async , true }, {info , false }]),
1040- State # state {timer_state = undefined }.
1041-
10421007find_local_or_leader (# state {leader = Leader ,
10431008 cfg = # cfg {servers = Servers }}) ->
10441009 case find_local (Servers ) of
@@ -1055,16 +1020,6 @@ find_local([_ | Rem]) ->
10551020find_local ([]) ->
10561021 undefined .
10571022
1058-
1059- find_leader ([]) ->
1060- undefined ;
1061- find_leader ([Server | Servers ]) ->
1062- case ra :members (Server , 500 ) of
1063- {ok , _ , Leader } -> Leader ;
1064- _ ->
1065- find_leader (Servers )
1066- end .
1067-
10681023qref ({Ref , _ }) -> Ref ;
10691024qref (Ref ) -> Ref .
10701025
0 commit comments