@@ -32,14 +32,21 @@ stop() ->
3232 application :stop (kinetic ).
3333
3434start (Opts ) when is_list (Opts ) ->
35+ start_pool (),
3536 kinetic_sup :start_link (Opts ).
3637
3738-spec start (normal | {takeover , node ()} | {failover , node ()}, any ()) -> {ok , pid ()}.
3839start (_ , Opts ) ->
40+ start_pool (),
3941 kinetic_sup :start_link (Opts ).
4042
4143-spec stop (any ()) -> ok .
4244stop (_ ) ->
45+ lists :foreach (fun (Region ) ->
46+ PoolName = kinetic_utils :pool_name (Region ),
47+ ok = ehttpc_sup :stop_pool (PoolName )
48+ end ,
49+ kinetic_utils :regions ()),
4350 ok .
4451
4552% Public API
@@ -252,9 +259,7 @@ execute(Operation, Payload, Opts) ->
252259 # kinetic_arguments {aws_credentials = AwsCreds ,
253260 region = Region ,
254261 date = Date ,
255- url = Url ,
256262 host = Host ,
257- lhttpc_opts = LHttpcOpts ,
258263 timeout = Timeout } =
259264 kinetic_config :merge_args (Args , Opts ),
260265 case kinetic_utils :encode ({Payload }) of
@@ -267,23 +272,26 @@ execute(Operation, Payload, Opts) ->
267272 #{" content-type" => " application/x-amz-json-1.1" ,
268273 " connection" => " keep-alive" },
269274 Headers =
270- awsv4 :headers (AwsCreds ,
271- #{service => " kinesis" ,
272- target_api => Target ,
273- method => " POST" ,
274- region => Region ,
275- host => Host ,
276- signed_headers => SignedHeaders ,
277- aws_date => Date },
278- iolist_to_binary (Body )),
279-
280- case lhttpc :request (Url , post , Headers , Body , Timeout , LHttpcOpts ) of
281- {ok , {{200 , _ }, _ResponseHeaders , ResponseBody }} ->
275+ [{Key , iolist_to_binary (Value )}
276+ || {Key , Value }
277+ <- awsv4 :headers (AwsCreds ,
278+ #{service => " kinesis" ,
279+ target_api => Target ,
280+ method => " POST" ,
281+ region => Region ,
282+ host => Host ,
283+ signed_headers => SignedHeaders ,
284+ aws_date => Date },
285+ iolist_to_binary (Body ))],
286+ PoolName = kinetic_utils :pool_name (Region ),
287+ Worker = ehttpc_pool :pick_worker (PoolName ),
288+ case ehttpc :request (Worker , post , {" /" , Headers , Body }, Timeout ) of
289+ {ok , 200 , _ , ResponseBody } ->
282290 {ok , kinetic_utils :decode (ResponseBody )};
283- {ok , {{ Code , _ }, ResponseHeaders , ResponseBody } } ->
284- {error , {Code , ResponseHeaders , ResponseBody }};
285- {error , E } ->
286- {error , E }
291+ {ok , Code , RespHeaders , ResponseBody } ->
292+ {error , {Code , RespHeaders , ResponseBody }};
293+ {error , Error } ->
294+ {error , Error }
287295 end
288296 end
289297 end .
@@ -299,3 +307,17 @@ record_status(Record) ->
299307get_value (Key , TupleList ) ->
300308 {Key , Value } = lists :keyfind (Key , 1 , TupleList ),
301309 Value .
310+
311+ start_pool () ->
312+ PoolSize = application :get_env (? MODULE , pool_size , 100 ),
313+ GunOpts = application :get_env (? MODULE , gun_opts , []),
314+ lists :foreach (fun (Region ) ->
315+ PoolName = kinetic_utils :pool_name (Region ),
316+ Endpoint = kinetic_utils :endpoint (Region ),
317+ ehttpc_sup :start_pool (PoolName ,
318+ [{host , Endpoint },
319+ {port , 443 },
320+ {pool_size , PoolSize },
321+ {gun_opts , GunOpts }])
322+ end ,
323+ kinetic_utils :regions ()).
0 commit comments