60
60
cookie_db , % cookie_db()
61
61
session_db , % ets() - Entry: #session{}
62
62
profile_name , % atom()
63
- options = # options {}
63
+ options = # options {},
64
+ awaiting % queue() - Entry: #request{}
64
65
}).
65
66
66
67
-define (DELAY , 500 ).
@@ -438,7 +439,8 @@ do_init(ProfileName, CookiesDir) ->
438
439
State = # state {handler_db = HandlerDbName ,
439
440
cookie_db = CookieDb ,
440
441
session_db = SessionDbName ,
441
- profile_name = ProfileName },
442
+ profile_name = ProfileName ,
443
+ awaiting = queue :new ()},
442
444
{ok , State }.
443
445
444
446
@@ -555,7 +557,8 @@ handle_cast({set_options, Options}, State = #state{options = OldOptions}) ->
555
557
port = get_port (Options , OldOptions ),
556
558
verbose = get_verbose (Options , OldOptions ),
557
559
socket_opts = get_socket_opts (Options , OldOptions ),
558
- unix_socket = get_unix_socket_opts (Options , OldOptions )
560
+ unix_socket = get_unix_socket_opts (Options , OldOptions ),
561
+ max_handlers_open = get_max_handlers_open (Options , OldOptions )
559
562
},
560
563
case {OldOptions # options .verbose , NewOptions # options .verbose } of
561
564
{Same , Same } ->
@@ -579,7 +582,8 @@ handle_cast({store_cookies, _},
579
582
handle_cast ({store_cookies , {Cookies , _ }}, State ) ->
580
583
ok = do_store_cookies (Cookies , State ),
581
584
{noreply , State };
582
-
585
+ handle_cast ({await , Request }, # state {awaiting = Awaiting } = State ) ->
586
+ {noreply , State # state {awaiting = queue :in (Request , Awaiting )}};
583
587
handle_cast (Msg , # state {profile_name = ProfileName } = State ) ->
584
588
error_report (ProfileName ,
585
589
" received unknown message"
@@ -592,11 +596,13 @@ handle_cast(Msg, #state{profile_name = ProfileName} = State) ->
592
596
% % {stop, Reason, State} (terminate/2 is called)
593
597
% % Description: Handling all non call/cast messages
594
598
% %---------------------------------------------------------
599
+ % %
595
600
handle_info ({'EXIT' , _ , _ }, State ) ->
596
601
% % Handled in DOWN
597
602
{noreply , State };
598
- handle_info ({'DOWN' , _ , _ , Pid , _ }, State ) ->
599
- ets :match_delete (State # state .handler_db , {'_' , Pid , '_' }),
603
+ handle_info ({'DOWN' , _ , _ , Pid , _ }, State0 ) ->
604
+ ets :match_delete (State0 # state .handler_db , {'_' , Pid , '_' }),
605
+ State = start_enqueued_request (State0 ),
600
606
{noreply , State };
601
607
handle_info (Info , State ) ->
602
608
Report = io_lib :format (" Unknown message in "
@@ -753,18 +759,19 @@ handle_request(Request0 = #request{socket_opts = SocketOpts},
753
759
(Request # request .headers )# http_request_h {connection
754
760
= " close" },
755
761
% % Reset socket_opts to avoid setopts failure.
756
- start_handler (Request # request {headers = Headers , socket_opts = []}, State ),
762
+ Msg = start_handler (Request # request {headers = Headers , socket_opts = []}, State ),
757
763
% % Do not change the state
758
- {reply , { ok , Request # request . id } , State0 };
764
+ {reply , Msg , State0 };
759
765
760
766
handle_request (Request , State = # state {options = Options }) ->
761
767
NewRequest = handle_cookies (generate_request_id (Request ), State ),
762
768
SessionType = session_type (Options ),
763
- case select_session (Request # request .method ,
769
+ Message = case select_session (Request # request .method ,
764
770
Request # request .address ,
765
771
Request # request .scheme , SessionType , State ) of
766
772
{ok , HandlerPid } ->
767
- pipeline_or_keep_alive (NewRequest , HandlerPid , State );
773
+ pipeline_or_keep_alive (NewRequest , HandlerPid , State ),
774
+ {ok , NewRequest # request .id };
768
775
no_connection ->
769
776
start_handler (NewRequest , State );
770
777
{no_session , OpenSessions } when OpenSessions
@@ -778,7 +785,7 @@ handle_request(Request, State = #state{options = Options}) ->
778
785
= " close" },
779
786
start_handler (NewRequest # request {headers = NewHeaders }, State )
780
787
end ,
781
- {reply , { ok , NewRequest # request . id } , State }.
788
+ {reply , Message , State }.
782
789
783
790
784
791
% % Convert Request options to State options
@@ -790,26 +797,53 @@ convert_options([{ip, Value}|T], Options) ->
790
797
convert_options (T , Options # options {ip = Value });
791
798
convert_options ([{port , Value }|T ], Options ) ->
792
799
convert_options (T , Options # options {port = Value });
800
+ convert_options ([{max_handlers_open , Value }|T ], Options ) ->
801
+ convert_options (T , Options # options {max_handlers_open = Value });
793
802
convert_options ([Option |T ], Options = # options {socket_opts = SocketOpts }) ->
794
803
convert_options (T , Options # options {socket_opts = SocketOpts ++ [Option ]}).
795
804
796
- start_handler (# request {id = Id ,
797
- from = From } = Request ,
798
- # state {profile_name = ProfileName ,
799
- handler_db = HandlerDb ,
800
- options = Options }) ->
801
- {ok , Pid } =
802
- case is_inets_manager () of
803
- true ->
804
- httpc_handler_sup :start_child ([whereis (httpc_handler_sup ),
805
- Request , Options , ProfileName ]);
806
- false ->
807
- httpc_handler :start_link (self (), Request , Options , ProfileName )
808
- end ,
809
- HandlerInfo = {Id , Pid , From },
810
- ets :insert (HandlerDb , HandlerInfo ),
811
- erlang :monitor (process , Pid ).
805
+ start_handler (# request {} = Request ,
806
+ # state {handler_db = HandlerDb ,
807
+ options = Options } = State ) ->
808
+ MaxHandlersOpen = Options # options .max_handlers_open ,
809
+ case ets :info (HandlerDb , size ) >= MaxHandlersOpen - 1
810
+ andalso MaxHandlersOpen =/= - 1 of
811
+ true -> wait_for_handler_to_end_then_start (Request );
812
+ false -> do_start_handler (Request , State )
813
+ end .
812
814
815
+ do_start_handler (# request {id = Id ,
816
+ from = From } = Request ,
817
+ # state {profile_name = ProfileName ,
818
+ handler_db = HandlerDb ,
819
+ options = Options }) ->
820
+ {ok , Pid } =
821
+ case is_inets_manager () of
822
+ true ->
823
+ httpc_handler_sup :start_child ([whereis (httpc_handler_sup ),
824
+ Request , Options , ProfileName ]);
825
+ false ->
826
+ httpc_handler :start_link (self (), Request , Options , ProfileName )
827
+ end ,
828
+ HandlerInfo = {Id , Pid , From },
829
+ ets :insert (HandlerDb , HandlerInfo ),
830
+ erlang :monitor (process , Pid ),
831
+ {ok , Id }.
832
+
833
+ wait_for_handler_to_end_then_start (# request {id = Id } = Request ) ->
834
+ gen_server :cast (self (), {await , Request }),
835
+ {ok , {await , Id }}.
836
+
837
+ start_enqueued_request (# state {awaiting = Awaiting } = State0 )->
838
+ case queue :out (Awaiting ) of
839
+ {{value , Request }, NewAwaiting } ->
840
+ State = State0 # state {awaiting = NewAwaiting },
841
+ {ok , Id } = do_start_handler (Request , State # state {awaiting = NewAwaiting }),
842
+ Request # request .from ! {started , Id },
843
+ State ;
844
+ {empty , _ } ->
845
+ State0
846
+ end .
813
847
814
848
select_session (Method , HostPort , Scheme , SessionType ,
815
849
# state {options = # options {max_pipeline_length = MaxPipe ,
@@ -1001,7 +1035,9 @@ get_option(port, #options{port = Port}) ->
1001
1035
get_option (socket_opts , # options {socket_opts = SocketOpts }) ->
1002
1036
SocketOpts ;
1003
1037
get_option (unix_socket , # options {unix_socket = UnixSocket }) ->
1004
- UnixSocket .
1038
+ UnixSocket ;
1039
+ get_option (max_handlers_open , # options {max_handlers_open = MaxHandlersOpen }) ->
1040
+ MaxHandlersOpen .
1005
1041
1006
1042
1007
1043
get_proxy (Opts , # options {proxy = Default }) ->
@@ -1046,6 +1082,9 @@ get_socket_opts(Opts, #options{socket_opts = Default}) ->
1046
1082
get_unix_socket_opts (Opts , # options {unix_socket = Default }) ->
1047
1083
proplists :get_value (unix_socket , Opts , Default ).
1048
1084
1085
+ get_max_handlers_open (Opts , # options {max_handlers_open = Default }) ->
1086
+ proplists :get_value (max_handlers_open , Opts , Default ).
1087
+
1049
1088
handle_verbose (debug ) ->
1050
1089
dbg :p (self (), [call ]),
1051
1090
dbg :tp (? MODULE , [{'_' , [], [{return_trace }]}]);
0 commit comments